Can you show the complete stack trace for the ClassCastException ?

Please see the following thread:
http://search-hadoop.com/m/q3RTtgEUHVmJA1T1

Cheers

On Mon, Dec 14, 2015 at 7:33 AM, alberskib <albers...@gmail.com> wrote:

> Hey all,
>
> When my streaming application is restarting from failure (from checkpoint)
> I
> am receiving strange error:
>
> java.lang.ClassCastException:
> org.apache.spark.util.SerializableConfiguration cannot be cast to
> com.example.sender.MyClassReporter.
>
> Instance of B class is created on driver side (with proper config passed as
> constructor arg) and broadcasted to the executors in order to ensure that
> on
> each worker there will be only single instance. Everything is going well up
> to place where I am getting value of broadcasted field and executing
> function on it i.e.
> broadcastedValue.value.send(...)
>
> Below you can find definition of MyClassReporter (with trait):
>
> trait Reporter{
>   def send(name: String, value: String, timestamp: Long) : Unit
>   def flush() : Unit
> }
>
> class MyClassReporter(config: MyClassConfig, flow: String) extends Reporter
> with Serializable {
>
>   val prefix = s"${config.senderConfig.prefix}.$flow"
>
>   var counter = 0
>
>   @transient
>   private lazy val sender : GraphiteSender = initialize()
>
>   @transient
>   private lazy val threadPool =
> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
>
>   private def initialize() = {
>       val sender = new Sender(
>         new InetSocketAddress(config.senderConfig.hostname,
> config.senderConfig.port)
>       )
>       sys.addShutdownHook{
>         sender.close()
>       }
>       sender
>   }
>
>   override def send(name: String, value: String, timestamp: Long) : Unit =
> {
>     threadPool.submit(new Runnable {
>       override def run(): Unit = {
>         try {
>           counter += 1
>           if (!sender.isConnected)
>             sender.connect()
>           sender.send(s"$prefix.$name", value, timestamp)
>           if (counter % graphiteConfig.batchSize == 0)
>             sender.flush()
>         }catch {
>           case NonFatal(e) => {
>             println(s"Problem with sending metric to graphite
> $prefix.$name:
> $value at $timestamp: ${e.getMessage}", e)
>             Try{sender.close()}.recover{
>               case NonFatal(e) => println(s"Error closing graphite
> ${e.getMessage}", e)
>             }
>           }
>         }
>       }
>     })
>   }
>
> Do you have any idea how I can solve this issue? Using broadcasted variable
> helps me keeping single socket open to the service on executor.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to