Can you show the complete stack trace for the ClassCastException ? Please see the following thread: http://search-hadoop.com/m/q3RTtgEUHVmJA1T1
Cheers On Mon, Dec 14, 2015 at 7:33 AM, alberskib <albers...@gmail.com> wrote: > Hey all, > > When my streaming application is restarting from failure (from checkpoint) > I > am receiving strange error: > > java.lang.ClassCastException: > org.apache.spark.util.SerializableConfiguration cannot be cast to > com.example.sender.MyClassReporter. > > Instance of B class is created on driver side (with proper config passed as > constructor arg) and broadcasted to the executors in order to ensure that > on > each worker there will be only single instance. Everything is going well up > to place where I am getting value of broadcasted field and executing > function on it i.e. > broadcastedValue.value.send(...) > > Below you can find definition of MyClassReporter (with trait): > > trait Reporter{ > def send(name: String, value: String, timestamp: Long) : Unit > def flush() : Unit > } > > class MyClassReporter(config: MyClassConfig, flow: String) extends Reporter > with Serializable { > > val prefix = s"${config.senderConfig.prefix}.$flow" > > var counter = 0 > > @transient > private lazy val sender : GraphiteSender = initialize() > > @transient > private lazy val threadPool = > ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()) > > private def initialize() = { > val sender = new Sender( > new InetSocketAddress(config.senderConfig.hostname, > config.senderConfig.port) > ) > sys.addShutdownHook{ > sender.close() > } > sender > } > > override def send(name: String, value: String, timestamp: Long) : Unit = > { > threadPool.submit(new Runnable { > override def run(): Unit = { > try { > counter += 1 > if (!sender.isConnected) > sender.connect() > sender.send(s"$prefix.$name", value, timestamp) > if (counter % graphiteConfig.batchSize == 0) > sender.flush() > }catch { > case NonFatal(e) => { > println(s"Problem with sending metric to graphite > $prefix.$name: > $value at $timestamp: ${e.getMessage}", e) > Try{sender.close()}.recover{ > case NonFatal(e) => println(s"Error closing graphite > ${e.getMessage}", e) > } > } > } > } > }) > } > > Do you have any idea how I can solve this issue? Using broadcasted variable > helps me keeping single socket open to the service on executor. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >