First of all , thanks @tdas for looking into my problem. Yes, I checked it seperately and it is working fine. For below piece of code there is no single exception and values are sent correctly.
val reporter = new MyClassReporter(...) reporter.send(...) val out = new FileOutputStream("out123.txt") val outO = new ObjectOutputStream(out) outO.writeObject(reporter) outO.flush() outO.close() val in = new FileInputStream("out123.txt") val inO = new ObjectInputStream(in) val reporterFromFile = inO.readObject().asInstanceOf[StreamingGraphiteReporter] reporterFromFile.send(...) in.close() Maybe I am wrong but I think that it will be strange if class implementing Serializable and properly broadcasted to executors cannot be serialized and deserialized? I also prepared slightly different piece of code and I received slightly different exception. Right now it looks like: java.lang.ClassCastException: [B cannot be cast to com.example.sender. MyClassReporter. Maybe I am wrong but, it looks like that when restarting from checkpoint it does read proper block of memory to read bytes for MyClassReporter. 2015-12-16 2:38 GMT+01:00 Tathagata Das <t...@databricks.com>: > Could you test serializing and deserializing the MyClassReporter class > separately? > > On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski <albers...@gmail.com> > wrote: > >> Below is the full stacktrace(real names of my classes were changed) with >> short description of entries from my code: >> >> rdd.mapPartitions{ partition => //this is the line to which second >> stacktrace entry is pointing >> val sender = broadcastedValue.value // this is the maing place to >> which first stacktrace entry is pointing >> } >> >> java.lang.ClassCastException: >> org.apache.spark.util.SerializableConfiguration cannot be cast to >> com.example.sender.MyClassReporter >> at com.example.flow.Calculator >> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87) >> at com.example.flow.Calculator >> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:88) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> 2015-12-14 17:10 GMT+01:00 Ted Yu <yuzhih...@gmail.com>: >> >>> Can you show the complete stack trace for the ClassCastException ? >>> >>> Please see the following thread: >>> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1 >>> >>> Cheers >>> >>> On Mon, Dec 14, 2015 at 7:33 AM, alberskib <albers...@gmail.com> wrote: >>> >>>> Hey all, >>>> >>>> When my streaming application is restarting from failure (from >>>> checkpoint) I >>>> am receiving strange error: >>>> >>>> java.lang.ClassCastException: >>>> org.apache.spark.util.SerializableConfiguration cannot be cast to >>>> com.example.sender.MyClassReporter. >>>> >>>> Instance of B class is created on driver side (with proper config >>>> passed as >>>> constructor arg) and broadcasted to the executors in order to ensure >>>> that on >>>> each worker there will be only single instance. Everything is going >>>> well up >>>> to place where I am getting value of broadcasted field and executing >>>> function on it i.e. >>>> broadcastedValue.value.send(...) >>>> >>>> Below you can find definition of MyClassReporter (with trait): >>>> >>>> trait Reporter{ >>>> def send(name: String, value: String, timestamp: Long) : Unit >>>> def flush() : Unit >>>> } >>>> >>>> class MyClassReporter(config: MyClassConfig, flow: String) extends >>>> Reporter >>>> with Serializable { >>>> >>>> val prefix = s"${config.senderConfig.prefix}.$flow" >>>> >>>> var counter = 0 >>>> >>>> @transient >>>> private lazy val sender : GraphiteSender = initialize() >>>> >>>> @transient >>>> private lazy val threadPool = >>>> >>>> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()) >>>> >>>> private def initialize() = { >>>> val sender = new Sender( >>>> new InetSocketAddress(config.senderConfig.hostname, >>>> config.senderConfig.port) >>>> ) >>>> sys.addShutdownHook{ >>>> sender.close() >>>> } >>>> sender >>>> } >>>> >>>> override def send(name: String, value: String, timestamp: Long) : >>>> Unit = { >>>> threadPool.submit(new Runnable { >>>> override def run(): Unit = { >>>> try { >>>> counter += 1 >>>> if (!sender.isConnected) >>>> sender.connect() >>>> sender.send(s"$prefix.$name", value, timestamp) >>>> if (counter % graphiteConfig.batchSize == 0) >>>> sender.flush() >>>> }catch { >>>> case NonFatal(e) => { >>>> println(s"Problem with sending metric to graphite >>>> $prefix.$name: >>>> $value at $timestamp: ${e.getMessage}", e) >>>> Try{sender.close()}.recover{ >>>> case NonFatal(e) => println(s"Error closing graphite >>>> ${e.getMessage}", e) >>>> } >>>> } >>>> } >>>> } >>>> }) >>>> } >>>> >>>> Do you have any idea how I can solve this issue? Using broadcasted >>>> variable >>>> helps me keeping single socket open to the service on executor. >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >> >