First of all , thanks @tdas for looking into my problem.

Yes, I checked it seperately and it is working fine. For below piece of
code there is no single exception and values are sent correctly.

    val reporter = new MyClassReporter(...)
    reporter.send(...)
    val out = new FileOutputStream("out123.txt")
    val outO = new ObjectOutputStream(out)
    outO.writeObject(reporter)
    outO.flush()
    outO.close()

    val in = new FileInputStream("out123.txt")
    val inO = new ObjectInputStream(in)
    val reporterFromFile  =
inO.readObject().asInstanceOf[StreamingGraphiteReporter]
    reporterFromFile.send(...)
    in.close()

Maybe I am wrong but I think that it will be strange if class implementing
Serializable and properly broadcasted to executors cannot be serialized and
deserialized?
I also prepared slightly different piece of code and I received slightly
different exception. Right now it looks like:
java.lang.ClassCastException: [B cannot be cast to com.example.sender.
MyClassReporter.

Maybe I am wrong but, it looks like that when restarting from checkpoint it
does read proper block of memory to read bytes for MyClassReporter.

2015-12-16 2:38 GMT+01:00 Tathagata Das <t...@databricks.com>:

> Could you test serializing and deserializing the MyClassReporter  class
> separately?
>
> On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski <albers...@gmail.com>
> wrote:
>
>> Below is the full stacktrace(real names of my classes were changed) with
>> short description of entries from my code:
>>
>> rdd.mapPartitions{ partition => //this is the line to which second
>> stacktrace entry is pointing
>>   val sender =  broadcastedValue.value // this is the maing place to
>> which first stacktrace entry is pointing
>> }
>>
>> java.lang.ClassCastException:
>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>> com.example.sender.MyClassReporter
>>         at com.example.flow.Calculator
>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
>>         at com.example.flow.Calculator
>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>         at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> 2015-12-14 17:10 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:
>>
>>> Can you show the complete stack trace for the ClassCastException ?
>>>
>>> Please see the following thread:
>>> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1
>>>
>>> Cheers
>>>
>>> On Mon, Dec 14, 2015 at 7:33 AM, alberskib <albers...@gmail.com> wrote:
>>>
>>>> Hey all,
>>>>
>>>> When my streaming application is restarting from failure (from
>>>> checkpoint) I
>>>> am receiving strange error:
>>>>
>>>> java.lang.ClassCastException:
>>>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>>>> com.example.sender.MyClassReporter.
>>>>
>>>> Instance of B class is created on driver side (with proper config
>>>> passed as
>>>> constructor arg) and broadcasted to the executors in order to ensure
>>>> that on
>>>> each worker there will be only single instance. Everything is going
>>>> well up
>>>> to place where I am getting value of broadcasted field and executing
>>>> function on it i.e.
>>>> broadcastedValue.value.send(...)
>>>>
>>>> Below you can find definition of MyClassReporter (with trait):
>>>>
>>>> trait Reporter{
>>>>   def send(name: String, value: String, timestamp: Long) : Unit
>>>>   def flush() : Unit
>>>> }
>>>>
>>>> class MyClassReporter(config: MyClassConfig, flow: String) extends
>>>> Reporter
>>>> with Serializable {
>>>>
>>>>   val prefix = s"${config.senderConfig.prefix}.$flow"
>>>>
>>>>   var counter = 0
>>>>
>>>>   @transient
>>>>   private lazy val sender : GraphiteSender = initialize()
>>>>
>>>>   @transient
>>>>   private lazy val threadPool =
>>>>
>>>> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
>>>>
>>>>   private def initialize() = {
>>>>       val sender = new Sender(
>>>>         new InetSocketAddress(config.senderConfig.hostname,
>>>> config.senderConfig.port)
>>>>       )
>>>>       sys.addShutdownHook{
>>>>         sender.close()
>>>>       }
>>>>       sender
>>>>   }
>>>>
>>>>   override def send(name: String, value: String, timestamp: Long) :
>>>> Unit = {
>>>>     threadPool.submit(new Runnable {
>>>>       override def run(): Unit = {
>>>>         try {
>>>>           counter += 1
>>>>           if (!sender.isConnected)
>>>>             sender.connect()
>>>>           sender.send(s"$prefix.$name", value, timestamp)
>>>>           if (counter % graphiteConfig.batchSize == 0)
>>>>             sender.flush()
>>>>         }catch {
>>>>           case NonFatal(e) => {
>>>>             println(s"Problem with sending metric to graphite
>>>> $prefix.$name:
>>>> $value at $timestamp: ${e.getMessage}", e)
>>>>             Try{sender.close()}.recover{
>>>>               case NonFatal(e) => println(s"Error closing graphite
>>>> ${e.getMessage}", e)
>>>>             }
>>>>           }
>>>>         }
>>>>       }
>>>>     })
>>>>   }
>>>>
>>>> Do you have any idea how I can solve this issue? Using broadcasted
>>>> variable
>>>> helps me keeping single socket open to the service on executor.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>

Reply via email to