Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint
Considering the @transient annotations and the work done in the instance initializer, not much state is really be broadcast to the executors. It might be simpler to just create these instances on the executors, rather than trying to broadcast them? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698p29044.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint
Streaming checkpoint doesn't support Accumulator or Broadcast. See https://issues.apache.org/jira/browse/SPARK-5206 Here is a workaround: https://issues.apache.org/jira/browse/SPARK-5206?focusedCommentId=14506806&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14506806 Best Regards, Shixiong Zhu 2015-12-17 4:39 GMT-08:00 Bartłomiej Alberski : > I prepared simple example helping in reproducing problem: > > https://github.com/alberskib/spark-streaming-broadcast-issue > > I think that in that way it will be easier for you to understand problem > and find solution (if any exists) > > Thanks > Bartek > > 2015-12-16 23:34 GMT+01:00 Bartłomiej Alberski : > >> First of all , thanks @tdas for looking into my problem. >> >> Yes, I checked it seperately and it is working fine. For below piece of >> code there is no single exception and values are sent correctly. >> >> val reporter = new MyClassReporter(...) >> reporter.send(...) >> val out = new FileOutputStream("out123.txt") >> val outO = new ObjectOutputStream(out) >> outO.writeObject(reporter) >> outO.flush() >> outO.close() >> >> val in = new FileInputStream("out123.txt") >> val inO = new ObjectInputStream(in) >> val reporterFromFile = >> inO.readObject().asInstanceOf[StreamingGraphiteReporter] >> reporterFromFile.send(...) >> in.close() >> >> Maybe I am wrong but I think that it will be strange if class >> implementing Serializable and properly broadcasted to executors cannot be >> serialized and deserialized? >> I also prepared slightly different piece of code and I received slightly >> different exception. Right now it looks like: >> java.lang.ClassCastException: [B cannot be cast to com.example.sender. >> MyClassReporter. >> >> Maybe I am wrong but, it looks like that when restarting from checkpoint >> it does read proper block of memory to read bytes for MyClassReporter. >> >> 2015-12-16 2:38 GMT+01:00 Tathagata Das : >> >>> Could you test serializing and deserializing the MyClassReporter class >>> separately? >>> >>> On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski < >>> albers...@gmail.com> wrote: >>> Below is the full stacktrace(real names of my classes were changed) with short description of entries from my code: rdd.mapPartitions{ partition => //this is the line to which second stacktrace entry is pointing val sender = broadcastedValue.value // this is the maing place to which first stacktrace entry is pointing } java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to com.example.sender.MyClassReporter at com.example.flow.Calculator $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87) at com.example.flow.Calculator $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-12-14 17:10 GMT+01:00 Ted Yu : > Can you show the complete stack trace for the ClassCastException ? > > Please see the following thread: > http://search-hadoop.com/m/q3RTtgEUHVmJA1T1 > > Cheers > > On Mon, Dec 14, 2015 at 7:33 AM, alberskib > wrote: > >> Hey all, >> >> When my streaming application is restarting from failure (from >> checkpoint) I >> am receiving strange error: >> >> java.lang.ClassCastException: >> org.apache.spark.util.SerializableConfiguration cannot be cast to >> com.example.sender.MyClassReporter. >> >> Instance of B class is created on driver side (with proper config >> passed as >> constructor arg) and broadcasted to the executors in order to ensure >> that on >> each worker there will be only single instance. Everything is going >> well up >> to place where I am getting value of broadcasted field and executing >> functio
Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint
I prepared simple example helping in reproducing problem: https://github.com/alberskib/spark-streaming-broadcast-issue I think that in that way it will be easier for you to understand problem and find solution (if any exists) Thanks Bartek 2015-12-16 23:34 GMT+01:00 Bartłomiej Alberski : > First of all , thanks @tdas for looking into my problem. > > Yes, I checked it seperately and it is working fine. For below piece of > code there is no single exception and values are sent correctly. > > val reporter = new MyClassReporter(...) > reporter.send(...) > val out = new FileOutputStream("out123.txt") > val outO = new ObjectOutputStream(out) > outO.writeObject(reporter) > outO.flush() > outO.close() > > val in = new FileInputStream("out123.txt") > val inO = new ObjectInputStream(in) > val reporterFromFile = > inO.readObject().asInstanceOf[StreamingGraphiteReporter] > reporterFromFile.send(...) > in.close() > > Maybe I am wrong but I think that it will be strange if class implementing > Serializable and properly broadcasted to executors cannot be serialized and > deserialized? > I also prepared slightly different piece of code and I received slightly > different exception. Right now it looks like: > java.lang.ClassCastException: [B cannot be cast to com.example.sender. > MyClassReporter. > > Maybe I am wrong but, it looks like that when restarting from checkpoint > it does read proper block of memory to read bytes for MyClassReporter. > > 2015-12-16 2:38 GMT+01:00 Tathagata Das : > >> Could you test serializing and deserializing the MyClassReporter class >> separately? >> >> On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski > > wrote: >> >>> Below is the full stacktrace(real names of my classes were changed) with >>> short description of entries from my code: >>> >>> rdd.mapPartitions{ partition => //this is the line to which second >>> stacktrace entry is pointing >>> val sender = broadcastedValue.value // this is the maing place to >>> which first stacktrace entry is pointing >>> } >>> >>> java.lang.ClassCastException: >>> org.apache.spark.util.SerializableConfiguration cannot be cast to >>> com.example.sender.MyClassReporter >>> at com.example.flow.Calculator >>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87) >>> at com.example.flow.Calculator >>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> 2015-12-14 17:10 GMT+01:00 Ted Yu : >>> Can you show the complete stack trace for the ClassCastException ? Please see the following thread: http://search-hadoop.com/m/q3RTtgEUHVmJA1T1 Cheers On Mon, Dec 14, 2015 at 7:33 AM, alberskib wrote: > Hey all, > > When my streaming application is restarting from failure (from > checkpoint) I > am receiving strange error: > > java.lang.ClassCastException: > org.apache.spark.util.SerializableConfiguration cannot be cast to > com.example.sender.MyClassReporter. > > Instance of B class is created on driver side (with proper config > passed as > constructor arg) and broadcasted to the executors in order to ensure > that on > each worker there will be only single instance. Everything is going > well up > to place where I am getting value of broadcasted field and executing > function on it i.e. > broadcastedValue.value.send(...) > > Below you can find definition of MyClassReporter (with trait): > > trait Reporter{ > def send(name: String, value: String, timestamp: Long) : Unit > def flush() : Unit > } > > class MyClassReporter(config: MyClassConfig, flow: String) extends > Reporter > with Serializable { > > val prefix = s"${config.senderConfig.prefix}.$flow" > > var counter = 0 > > @transient > private lazy val sender
Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint
First of all , thanks @tdas for looking into my problem. Yes, I checked it seperately and it is working fine. For below piece of code there is no single exception and values are sent correctly. val reporter = new MyClassReporter(...) reporter.send(...) val out = new FileOutputStream("out123.txt") val outO = new ObjectOutputStream(out) outO.writeObject(reporter) outO.flush() outO.close() val in = new FileInputStream("out123.txt") val inO = new ObjectInputStream(in) val reporterFromFile = inO.readObject().asInstanceOf[StreamingGraphiteReporter] reporterFromFile.send(...) in.close() Maybe I am wrong but I think that it will be strange if class implementing Serializable and properly broadcasted to executors cannot be serialized and deserialized? I also prepared slightly different piece of code and I received slightly different exception. Right now it looks like: java.lang.ClassCastException: [B cannot be cast to com.example.sender. MyClassReporter. Maybe I am wrong but, it looks like that when restarting from checkpoint it does read proper block of memory to read bytes for MyClassReporter. 2015-12-16 2:38 GMT+01:00 Tathagata Das : > Could you test serializing and deserializing the MyClassReporter class > separately? > > On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski > wrote: > >> Below is the full stacktrace(real names of my classes were changed) with >> short description of entries from my code: >> >> rdd.mapPartitions{ partition => //this is the line to which second >> stacktrace entry is pointing >> val sender = broadcastedValue.value // this is the maing place to >> which first stacktrace entry is pointing >> } >> >> java.lang.ClassCastException: >> org.apache.spark.util.SerializableConfiguration cannot be cast to >> com.example.sender.MyClassReporter >> at com.example.flow.Calculator >> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87) >> at com.example.flow.Calculator >> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:88) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> 2015-12-14 17:10 GMT+01:00 Ted Yu : >> >>> Can you show the complete stack trace for the ClassCastException ? >>> >>> Please see the following thread: >>> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1 >>> >>> Cheers >>> >>> On Mon, Dec 14, 2015 at 7:33 AM, alberskib wrote: >>> Hey all, When my streaming application is restarting from failure (from checkpoint) I am receiving strange error: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to com.example.sender.MyClassReporter. Instance of B class is created on driver side (with proper config passed as constructor arg) and broadcasted to the executors in order to ensure that on each worker there will be only single instance. Everything is going well up to place where I am getting value of broadcasted field and executing function on it i.e. broadcastedValue.value.send(...) Below you can find definition of MyClassReporter (with trait): trait Reporter{ def send(name: String, value: String, timestamp: Long) : Unit def flush() : Unit } class MyClassReporter(config: MyClassConfig, flow: String) extends Reporter with Serializable { val prefix = s"${config.senderConfig.prefix}.$flow" var counter = 0 @transient private lazy val sender : GraphiteSender = initialize() @transient private lazy val threadPool = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()) private def initialize() = { val sender = new Sender( new InetSocketAddress(config.senderConfig.hostname, config.senderConfig.port) ) sys.addShutdownHook{ sender.close() } s
Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint
Could you test serializing and deserializing the MyClassReporter class separately? On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski wrote: > Below is the full stacktrace(real names of my classes were changed) with > short description of entries from my code: > > rdd.mapPartitions{ partition => //this is the line to which second > stacktrace entry is pointing > val sender = broadcastedValue.value // this is the maing place to which > first stacktrace entry is pointing > } > > java.lang.ClassCastException: > org.apache.spark.util.SerializableConfiguration cannot be cast to > com.example.sender.MyClassReporter > at com.example.flow.Calculator > $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87) > at com.example.flow.Calculator > $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > 2015-12-14 17:10 GMT+01:00 Ted Yu : > >> Can you show the complete stack trace for the ClassCastException ? >> >> Please see the following thread: >> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1 >> >> Cheers >> >> On Mon, Dec 14, 2015 at 7:33 AM, alberskib wrote: >> >>> Hey all, >>> >>> When my streaming application is restarting from failure (from >>> checkpoint) I >>> am receiving strange error: >>> >>> java.lang.ClassCastException: >>> org.apache.spark.util.SerializableConfiguration cannot be cast to >>> com.example.sender.MyClassReporter. >>> >>> Instance of B class is created on driver side (with proper config passed >>> as >>> constructor arg) and broadcasted to the executors in order to ensure >>> that on >>> each worker there will be only single instance. Everything is going well >>> up >>> to place where I am getting value of broadcasted field and executing >>> function on it i.e. >>> broadcastedValue.value.send(...) >>> >>> Below you can find definition of MyClassReporter (with trait): >>> >>> trait Reporter{ >>> def send(name: String, value: String, timestamp: Long) : Unit >>> def flush() : Unit >>> } >>> >>> class MyClassReporter(config: MyClassConfig, flow: String) extends >>> Reporter >>> with Serializable { >>> >>> val prefix = s"${config.senderConfig.prefix}.$flow" >>> >>> var counter = 0 >>> >>> @transient >>> private lazy val sender : GraphiteSender = initialize() >>> >>> @transient >>> private lazy val threadPool = >>> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()) >>> >>> private def initialize() = { >>> val sender = new Sender( >>> new InetSocketAddress(config.senderConfig.hostname, >>> config.senderConfig.port) >>> ) >>> sys.addShutdownHook{ >>> sender.close() >>> } >>> sender >>> } >>> >>> override def send(name: String, value: String, timestamp: Long) : Unit >>> = { >>> threadPool.submit(new Runnable { >>> override def run(): Unit = { >>> try { >>> counter += 1 >>> if (!sender.isConnected) >>> sender.connect() >>> sender.send(s"$prefix.$name", value, timestamp) >>> if (counter % graphiteConfig.batchSize == 0) >>> sender.flush() >>> }catch { >>> case NonFatal(e) => { >>> println(s"Problem with sending metric to graphite >>> $prefix.$name: >>> $value at $timestamp: ${e.getMessage}", e) >>> Try{sender.close()}.recover{ >>> case NonFatal(e) => println(s"Error closing graphite >>> ${e.getMessage}", e) >>> } >>> } >>> } >>> } >>> }) >>> } >>> >>> Do you have any idea how I can solve this issue? Using broadcasted >>> variable >>> helps me keeping single socket open to the service on executor. >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.co
Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint
Below is the full stacktrace(real names of my classes were changed) with short description of entries from my code: rdd.mapPartitions{ partition => //this is the line to which second stacktrace entry is pointing val sender = broadcastedValue.value // this is the maing place to which first stacktrace entry is pointing } java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to com.example.sender.MyClassReporter at com.example.flow.Calculator $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87) at com.example.flow.Calculator $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-12-14 17:10 GMT+01:00 Ted Yu : > Can you show the complete stack trace for the ClassCastException ? > > Please see the following thread: > http://search-hadoop.com/m/q3RTtgEUHVmJA1T1 > > Cheers > > On Mon, Dec 14, 2015 at 7:33 AM, alberskib wrote: > >> Hey all, >> >> When my streaming application is restarting from failure (from >> checkpoint) I >> am receiving strange error: >> >> java.lang.ClassCastException: >> org.apache.spark.util.SerializableConfiguration cannot be cast to >> com.example.sender.MyClassReporter. >> >> Instance of B class is created on driver side (with proper config passed >> as >> constructor arg) and broadcasted to the executors in order to ensure that >> on >> each worker there will be only single instance. Everything is going well >> up >> to place where I am getting value of broadcasted field and executing >> function on it i.e. >> broadcastedValue.value.send(...) >> >> Below you can find definition of MyClassReporter (with trait): >> >> trait Reporter{ >> def send(name: String, value: String, timestamp: Long) : Unit >> def flush() : Unit >> } >> >> class MyClassReporter(config: MyClassConfig, flow: String) extends >> Reporter >> with Serializable { >> >> val prefix = s"${config.senderConfig.prefix}.$flow" >> >> var counter = 0 >> >> @transient >> private lazy val sender : GraphiteSender = initialize() >> >> @transient >> private lazy val threadPool = >> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()) >> >> private def initialize() = { >> val sender = new Sender( >> new InetSocketAddress(config.senderConfig.hostname, >> config.senderConfig.port) >> ) >> sys.addShutdownHook{ >> sender.close() >> } >> sender >> } >> >> override def send(name: String, value: String, timestamp: Long) : Unit >> = { >> threadPool.submit(new Runnable { >> override def run(): Unit = { >> try { >> counter += 1 >> if (!sender.isConnected) >> sender.connect() >> sender.send(s"$prefix.$name", value, timestamp) >> if (counter % graphiteConfig.batchSize == 0) >> sender.flush() >> }catch { >> case NonFatal(e) => { >> println(s"Problem with sending metric to graphite >> $prefix.$name: >> $value at $timestamp: ${e.getMessage}", e) >> Try{sender.close()}.recover{ >> case NonFatal(e) => println(s"Error closing graphite >> ${e.getMessage}", e) >> } >> } >> } >> } >> }) >> } >> >> Do you have any idea how I can solve this issue? Using broadcasted >> variable >> helps me keeping single socket open to the service on executor. >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint
Can you show the complete stack trace for the ClassCastException ? Please see the following thread: http://search-hadoop.com/m/q3RTtgEUHVmJA1T1 Cheers On Mon, Dec 14, 2015 at 7:33 AM, alberskib wrote: > Hey all, > > When my streaming application is restarting from failure (from checkpoint) > I > am receiving strange error: > > java.lang.ClassCastException: > org.apache.spark.util.SerializableConfiguration cannot be cast to > com.example.sender.MyClassReporter. > > Instance of B class is created on driver side (with proper config passed as > constructor arg) and broadcasted to the executors in order to ensure that > on > each worker there will be only single instance. Everything is going well up > to place where I am getting value of broadcasted field and executing > function on it i.e. > broadcastedValue.value.send(...) > > Below you can find definition of MyClassReporter (with trait): > > trait Reporter{ > def send(name: String, value: String, timestamp: Long) : Unit > def flush() : Unit > } > > class MyClassReporter(config: MyClassConfig, flow: String) extends Reporter > with Serializable { > > val prefix = s"${config.senderConfig.prefix}.$flow" > > var counter = 0 > > @transient > private lazy val sender : GraphiteSender = initialize() > > @transient > private lazy val threadPool = > ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()) > > private def initialize() = { > val sender = new Sender( > new InetSocketAddress(config.senderConfig.hostname, > config.senderConfig.port) > ) > sys.addShutdownHook{ > sender.close() > } > sender > } > > override def send(name: String, value: String, timestamp: Long) : Unit = > { > threadPool.submit(new Runnable { > override def run(): Unit = { > try { > counter += 1 > if (!sender.isConnected) > sender.connect() > sender.send(s"$prefix.$name", value, timestamp) > if (counter % graphiteConfig.batchSize == 0) > sender.flush() > }catch { > case NonFatal(e) => { > println(s"Problem with sending metric to graphite > $prefix.$name: > $value at $timestamp: ${e.getMessage}", e) > Try{sender.close()}.recover{ > case NonFatal(e) => println(s"Error closing graphite > ${e.getMessage}", e) > } > } > } > } > }) > } > > Do you have any idea how I can solve this issue? Using broadcasted variable > helps me keeping single socket open to the service on executor. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint
Hey all, When my streaming application is restarting from failure (from checkpoint) I am receiving strange error: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to com.example.sender.MyClassReporter. Instance of B class is created on driver side (with proper config passed as constructor arg) and broadcasted to the executors in order to ensure that on each worker there will be only single instance. Everything is going well up to place where I am getting value of broadcasted field and executing function on it i.e. broadcastedValue.value.send(...) Below you can find definition of MyClassReporter (with trait): trait Reporter{ def send(name: String, value: String, timestamp: Long) : Unit def flush() : Unit } class MyClassReporter(config: MyClassConfig, flow: String) extends Reporter with Serializable { val prefix = s"${config.senderConfig.prefix}.$flow" var counter = 0 @transient private lazy val sender : GraphiteSender = initialize() @transient private lazy val threadPool = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()) private def initialize() = { val sender = new Sender( new InetSocketAddress(config.senderConfig.hostname, config.senderConfig.port) ) sys.addShutdownHook{ sender.close() } sender } override def send(name: String, value: String, timestamp: Long) : Unit = { threadPool.submit(new Runnable { override def run(): Unit = { try { counter += 1 if (!sender.isConnected) sender.connect() sender.send(s"$prefix.$name", value, timestamp) if (counter % graphiteConfig.batchSize == 0) sender.flush() }catch { case NonFatal(e) => { println(s"Problem with sending metric to graphite $prefix.$name: $value at $timestamp: ${e.getMessage}", e) Try{sender.close()}.recover{ case NonFatal(e) => println(s"Error closing graphite ${e.getMessage}", e) } } } } }) } Do you have any idea how I can solve this issue? Using broadcasted variable helps me keeping single socket open to the service on executor. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org