Could you test serializing and deserializing the MyClassReporter  class
separately?

On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski <albers...@gmail.com>
wrote:

> Below is the full stacktrace(real names of my classes were changed) with
> short description of entries from my code:
>
> rdd.mapPartitions{ partition => //this is the line to which second
> stacktrace entry is pointing
>   val sender =  broadcastedValue.value // this is the maing place to which
> first stacktrace entry is pointing
> }
>
> java.lang.ClassCastException:
> org.apache.spark.util.SerializableConfiguration cannot be cast to
> com.example.sender.MyClassReporter
>         at com.example.flow.Calculator
> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
>         at com.example.flow.Calculator
> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>         at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:88)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
> 2015-12-14 17:10 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:
>
>> Can you show the complete stack trace for the ClassCastException ?
>>
>> Please see the following thread:
>> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1
>>
>> Cheers
>>
>> On Mon, Dec 14, 2015 at 7:33 AM, alberskib <albers...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> When my streaming application is restarting from failure (from
>>> checkpoint) I
>>> am receiving strange error:
>>>
>>> java.lang.ClassCastException:
>>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>>> com.example.sender.MyClassReporter.
>>>
>>> Instance of B class is created on driver side (with proper config passed
>>> as
>>> constructor arg) and broadcasted to the executors in order to ensure
>>> that on
>>> each worker there will be only single instance. Everything is going well
>>> up
>>> to place where I am getting value of broadcasted field and executing
>>> function on it i.e.
>>> broadcastedValue.value.send(...)
>>>
>>> Below you can find definition of MyClassReporter (with trait):
>>>
>>> trait Reporter{
>>>   def send(name: String, value: String, timestamp: Long) : Unit
>>>   def flush() : Unit
>>> }
>>>
>>> class MyClassReporter(config: MyClassConfig, flow: String) extends
>>> Reporter
>>> with Serializable {
>>>
>>>   val prefix = s"${config.senderConfig.prefix}.$flow"
>>>
>>>   var counter = 0
>>>
>>>   @transient
>>>   private lazy val sender : GraphiteSender = initialize()
>>>
>>>   @transient
>>>   private lazy val threadPool =
>>> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
>>>
>>>   private def initialize() = {
>>>       val sender = new Sender(
>>>         new InetSocketAddress(config.senderConfig.hostname,
>>> config.senderConfig.port)
>>>       )
>>>       sys.addShutdownHook{
>>>         sender.close()
>>>       }
>>>       sender
>>>   }
>>>
>>>   override def send(name: String, value: String, timestamp: Long) : Unit
>>> = {
>>>     threadPool.submit(new Runnable {
>>>       override def run(): Unit = {
>>>         try {
>>>           counter += 1
>>>           if (!sender.isConnected)
>>>             sender.connect()
>>>           sender.send(s"$prefix.$name", value, timestamp)
>>>           if (counter % graphiteConfig.batchSize == 0)
>>>             sender.flush()
>>>         }catch {
>>>           case NonFatal(e) => {
>>>             println(s"Problem with sending metric to graphite
>>> $prefix.$name:
>>> $value at $timestamp: ${e.getMessage}", e)
>>>             Try{sender.close()}.recover{
>>>               case NonFatal(e) => println(s"Error closing graphite
>>> ${e.getMessage}", e)
>>>             }
>>>           }
>>>         }
>>>       }
>>>     })
>>>   }
>>>
>>> Do you have any idea how I can solve this issue? Using broadcasted
>>> variable
>>> helps me keeping single socket open to the service on executor.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>

Reply via email to