Streaming checkpoint doesn't support Accumulator or Broadcast. See https://issues.apache.org/jira/browse/SPARK-5206
Here is a workaround: https://issues.apache.org/jira/browse/SPARK-5206?focusedCommentId=14506806&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14506806 Best Regards, Shixiong Zhu 2015-12-17 4:39 GMT-08:00 Bartłomiej Alberski <albers...@gmail.com>: > I prepared simple example helping in reproducing problem: > > https://github.com/alberskib/spark-streaming-broadcast-issue > > I think that in that way it will be easier for you to understand problem > and find solution (if any exists) > > Thanks > Bartek > > 2015-12-16 23:34 GMT+01:00 Bartłomiej Alberski <albers...@gmail.com>: > >> First of all , thanks @tdas for looking into my problem. >> >> Yes, I checked it seperately and it is working fine. For below piece of >> code there is no single exception and values are sent correctly. >> >> val reporter = new MyClassReporter(...) >> reporter.send(...) >> val out = new FileOutputStream("out123.txt") >> val outO = new ObjectOutputStream(out) >> outO.writeObject(reporter) >> outO.flush() >> outO.close() >> >> val in = new FileInputStream("out123.txt") >> val inO = new ObjectInputStream(in) >> val reporterFromFile = >> inO.readObject().asInstanceOf[StreamingGraphiteReporter] >> reporterFromFile.send(...) >> in.close() >> >> Maybe I am wrong but I think that it will be strange if class >> implementing Serializable and properly broadcasted to executors cannot be >> serialized and deserialized? >> I also prepared slightly different piece of code and I received slightly >> different exception. Right now it looks like: >> java.lang.ClassCastException: [B cannot be cast to com.example.sender. >> MyClassReporter. >> >> Maybe I am wrong but, it looks like that when restarting from checkpoint >> it does read proper block of memory to read bytes for MyClassReporter. >> >> 2015-12-16 2:38 GMT+01:00 Tathagata Das <t...@databricks.com>: >> >>> Could you test serializing and deserializing the MyClassReporter class >>> separately? >>> >>> On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski < >>> albers...@gmail.com> wrote: >>> >>>> Below is the full stacktrace(real names of my classes were changed) >>>> with short description of entries from my code: >>>> >>>> rdd.mapPartitions{ partition => //this is the line to which second >>>> stacktrace entry is pointing >>>> val sender = broadcastedValue.value // this is the maing place to >>>> which first stacktrace entry is pointing >>>> } >>>> >>>> java.lang.ClassCastException: >>>> org.apache.spark.util.SerializableConfiguration cannot be cast to >>>> com.example.sender.MyClassReporter >>>> at com.example.flow.Calculator >>>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87) >>>> at com.example.flow.Calculator >>>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82) >>>> at >>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) >>>> at >>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>>> at >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> 2015-12-14 17:10 GMT+01:00 Ted Yu <yuzhih...@gmail.com>: >>>> >>>>> Can you show the complete stack trace for the ClassCastException ? >>>>> >>>>> Please see the following thread: >>>>> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1 >>>>> >>>>> Cheers >>>>> >>>>> On Mon, Dec 14, 2015 at 7:33 AM, alberskib <albers...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hey all, >>>>>> >>>>>> When my streaming application is restarting from failure (from >>>>>> checkpoint) I >>>>>> am receiving strange error: >>>>>> >>>>>> java.lang.ClassCastException: >>>>>> org.apache.spark.util.SerializableConfiguration cannot be cast to >>>>>> com.example.sender.MyClassReporter. >>>>>> >>>>>> Instance of B class is created on driver side (with proper config >>>>>> passed as >>>>>> constructor arg) and broadcasted to the executors in order to ensure >>>>>> that on >>>>>> each worker there will be only single instance. Everything is going >>>>>> well up >>>>>> to place where I am getting value of broadcasted field and executing >>>>>> function on it i.e. >>>>>> broadcastedValue.value.send(...) >>>>>> >>>>>> Below you can find definition of MyClassReporter (with trait): >>>>>> >>>>>> trait Reporter{ >>>>>> def send(name: String, value: String, timestamp: Long) : Unit >>>>>> def flush() : Unit >>>>>> } >>>>>> >>>>>> class MyClassReporter(config: MyClassConfig, flow: String) extends >>>>>> Reporter >>>>>> with Serializable { >>>>>> >>>>>> val prefix = s"${config.senderConfig.prefix}.$flow" >>>>>> >>>>>> var counter = 0 >>>>>> >>>>>> @transient >>>>>> private lazy val sender : GraphiteSender = initialize() >>>>>> >>>>>> @transient >>>>>> private lazy val threadPool = >>>>>> >>>>>> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()) >>>>>> >>>>>> private def initialize() = { >>>>>> val sender = new Sender( >>>>>> new InetSocketAddress(config.senderConfig.hostname, >>>>>> config.senderConfig.port) >>>>>> ) >>>>>> sys.addShutdownHook{ >>>>>> sender.close() >>>>>> } >>>>>> sender >>>>>> } >>>>>> >>>>>> override def send(name: String, value: String, timestamp: Long) : >>>>>> Unit = { >>>>>> threadPool.submit(new Runnable { >>>>>> override def run(): Unit = { >>>>>> try { >>>>>> counter += 1 >>>>>> if (!sender.isConnected) >>>>>> sender.connect() >>>>>> sender.send(s"$prefix.$name", value, timestamp) >>>>>> if (counter % graphiteConfig.batchSize == 0) >>>>>> sender.flush() >>>>>> }catch { >>>>>> case NonFatal(e) => { >>>>>> println(s"Problem with sending metric to graphite >>>>>> $prefix.$name: >>>>>> $value at $timestamp: ${e.getMessage}", e) >>>>>> Try{sender.close()}.recover{ >>>>>> case NonFatal(e) => println(s"Error closing graphite >>>>>> ${e.getMessage}", e) >>>>>> } >>>>>> } >>>>>> } >>>>>> } >>>>>> }) >>>>>> } >>>>>> >>>>>> Do you have any idea how I can solve this issue? Using broadcasted >>>>>> variable >>>>>> helps me keeping single socket open to the service on executor. >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> View this message in context: >>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html >>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>> Nabble.com. >>>>>> >>>>>> --------------------------------------------------------------------- >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >>>>>> >>>>> >>>> >>> >> >