Streaming checkpoint doesn't support Accumulator or Broadcast. See
https://issues.apache.org/jira/browse/SPARK-5206

Here is a workaround:
https://issues.apache.org/jira/browse/SPARK-5206?focusedCommentId=14506806&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14506806

Best Regards,
Shixiong Zhu

2015-12-17 4:39 GMT-08:00 Bartłomiej Alberski <albers...@gmail.com>:

> I prepared simple example helping in reproducing problem:
>
> https://github.com/alberskib/spark-streaming-broadcast-issue
>
> I think that in that way it will be easier for you to understand problem
> and find solution (if any exists)
>
> Thanks
> Bartek
>
> 2015-12-16 23:34 GMT+01:00 Bartłomiej Alberski <albers...@gmail.com>:
>
>> First of all , thanks @tdas for looking into my problem.
>>
>> Yes, I checked it seperately and it is working fine. For below piece of
>> code there is no single exception and values are sent correctly.
>>
>>     val reporter = new MyClassReporter(...)
>>     reporter.send(...)
>>     val out = new FileOutputStream("out123.txt")
>>     val outO = new ObjectOutputStream(out)
>>     outO.writeObject(reporter)
>>     outO.flush()
>>     outO.close()
>>
>>     val in = new FileInputStream("out123.txt")
>>     val inO = new ObjectInputStream(in)
>>     val reporterFromFile  =
>> inO.readObject().asInstanceOf[StreamingGraphiteReporter]
>>     reporterFromFile.send(...)
>>     in.close()
>>
>> Maybe I am wrong but I think that it will be strange if class
>> implementing Serializable and properly broadcasted to executors cannot be
>> serialized and deserialized?
>> I also prepared slightly different piece of code and I received slightly
>> different exception. Right now it looks like:
>> java.lang.ClassCastException: [B cannot be cast to com.example.sender.
>> MyClassReporter.
>>
>> Maybe I am wrong but, it looks like that when restarting from checkpoint
>> it does read proper block of memory to read bytes for MyClassReporter.
>>
>> 2015-12-16 2:38 GMT+01:00 Tathagata Das <t...@databricks.com>:
>>
>>> Could you test serializing and deserializing the MyClassReporter  class
>>> separately?
>>>
>>> On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski <
>>> albers...@gmail.com> wrote:
>>>
>>>> Below is the full stacktrace(real names of my classes were changed)
>>>> with short description of entries from my code:
>>>>
>>>> rdd.mapPartitions{ partition => //this is the line to which second
>>>> stacktrace entry is pointing
>>>>   val sender =  broadcastedValue.value // this is the maing place to
>>>> which first stacktrace entry is pointing
>>>> }
>>>>
>>>> java.lang.ClassCastException:
>>>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>>>> com.example.sender.MyClassReporter
>>>>         at com.example.flow.Calculator
>>>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
>>>>         at com.example.flow.Calculator
>>>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
>>>>         at
>>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>>>>         at
>>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> 2015-12-14 17:10 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:
>>>>
>>>>> Can you show the complete stack trace for the ClassCastException ?
>>>>>
>>>>> Please see the following thread:
>>>>> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Mon, Dec 14, 2015 at 7:33 AM, alberskib <albers...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hey all,
>>>>>>
>>>>>> When my streaming application is restarting from failure (from
>>>>>> checkpoint) I
>>>>>> am receiving strange error:
>>>>>>
>>>>>> java.lang.ClassCastException:
>>>>>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>>>>>> com.example.sender.MyClassReporter.
>>>>>>
>>>>>> Instance of B class is created on driver side (with proper config
>>>>>> passed as
>>>>>> constructor arg) and broadcasted to the executors in order to ensure
>>>>>> that on
>>>>>> each worker there will be only single instance. Everything is going
>>>>>> well up
>>>>>> to place where I am getting value of broadcasted field and executing
>>>>>> function on it i.e.
>>>>>> broadcastedValue.value.send(...)
>>>>>>
>>>>>> Below you can find definition of MyClassReporter (with trait):
>>>>>>
>>>>>> trait Reporter{
>>>>>>   def send(name: String, value: String, timestamp: Long) : Unit
>>>>>>   def flush() : Unit
>>>>>> }
>>>>>>
>>>>>> class MyClassReporter(config: MyClassConfig, flow: String) extends
>>>>>> Reporter
>>>>>> with Serializable {
>>>>>>
>>>>>>   val prefix = s"${config.senderConfig.prefix}.$flow"
>>>>>>
>>>>>>   var counter = 0
>>>>>>
>>>>>>   @transient
>>>>>>   private lazy val sender : GraphiteSender = initialize()
>>>>>>
>>>>>>   @transient
>>>>>>   private lazy val threadPool =
>>>>>>
>>>>>> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
>>>>>>
>>>>>>   private def initialize() = {
>>>>>>       val sender = new Sender(
>>>>>>         new InetSocketAddress(config.senderConfig.hostname,
>>>>>> config.senderConfig.port)
>>>>>>       )
>>>>>>       sys.addShutdownHook{
>>>>>>         sender.close()
>>>>>>       }
>>>>>>       sender
>>>>>>   }
>>>>>>
>>>>>>   override def send(name: String, value: String, timestamp: Long) :
>>>>>> Unit = {
>>>>>>     threadPool.submit(new Runnable {
>>>>>>       override def run(): Unit = {
>>>>>>         try {
>>>>>>           counter += 1
>>>>>>           if (!sender.isConnected)
>>>>>>             sender.connect()
>>>>>>           sender.send(s"$prefix.$name", value, timestamp)
>>>>>>           if (counter % graphiteConfig.batchSize == 0)
>>>>>>             sender.flush()
>>>>>>         }catch {
>>>>>>           case NonFatal(e) => {
>>>>>>             println(s"Problem with sending metric to graphite
>>>>>> $prefix.$name:
>>>>>> $value at $timestamp: ${e.getMessage}", e)
>>>>>>             Try{sender.close()}.recover{
>>>>>>               case NonFatal(e) => println(s"Error closing graphite
>>>>>> ${e.getMessage}", e)
>>>>>>             }
>>>>>>           }
>>>>>>         }
>>>>>>       }
>>>>>>     })
>>>>>>   }
>>>>>>
>>>>>> Do you have any idea how I can solve this issue? Using broadcasted
>>>>>> variable
>>>>>> helps me keeping single socket open to the service on executor.
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to