Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint
Hey all, When my streaming application is restarting from failure (from checkpoint) I am receiving strange error: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to com.example.sender.MyClassReporter. Instance of B class is created on driver side (with proper config passed as constructor arg) and broadcasted to the executors in order to ensure that on each worker there will be only single instance. Everything is going well up to place where I am getting value of broadcasted field and executing function on it i.e. broadcastedValue.value.send(...) Below you can find definition of MyClassReporter (with trait): trait Reporter{ def send(name: String, value: String, timestamp: Long) : Unit def flush() : Unit } class MyClassReporter(config: MyClassConfig, flow: String) extends Reporter with Serializable { val prefix = s"${config.senderConfig.prefix}.$flow" var counter = 0 @transient private lazy val sender : GraphiteSender = initialize() @transient private lazy val threadPool = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()) private def initialize() = { val sender = new Sender( new InetSocketAddress(config.senderConfig.hostname, config.senderConfig.port) ) sys.addShutdownHook{ sender.close() } sender } override def send(name: String, value: String, timestamp: Long) : Unit = { threadPool.submit(new Runnable { override def run(): Unit = { try { counter += 1 if (!sender.isConnected) sender.connect() sender.send(s"$prefix.$name", value, timestamp) if (counter % graphiteConfig.batchSize == 0) sender.flush() }catch { case NonFatal(e) => { println(s"Problem with sending metric to graphite $prefix.$name: $value at $timestamp: ${e.getMessage}", e) Try{sender.close()}.recover{ case NonFatal(e) => println(s"Error closing graphite ${e.getMessage}", e) } } } } }) } Do you have any idea how I can solve this issue? Using broadcasted variable helps me keeping single socket open to the service on executor. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Ensuring eager evaluation inside mapPartitions
Hi all, I am wondering whether there is way to ensure that two consecutive maps inside mapPartition will not be chained together. To illustrate my question I prepared short example: rdd.mapPartitions(it => { it.map(x => foo(x)).map(y => y.getResult) } I would like to ensure that foo method will be applied to all records (from partition) and only after that method getResult invoked on each record. It could be beneficial in situation that foo method is some kind of time consuming IO operation i.e. request to external service for data (data that couldn't be prefetched). I know that converting iterator into list will do the job but maybe there is more clever way for doing it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Ensuring-eager-evaluation-inside-mapPartitions-tp25085.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Issue with the class generated from avro schema
Hi all, I have piece of code written in spark that loads data from HDFS into java classes generated from avro idl. On RDD created in that way I am executing simple operation which results depends on fact whether I cache RDD before it or not i.e if I run code below val loadedData = loadFromHDFS[Data](path,...) println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) // 20 program will print 20, on the other hand executing next code val loadedData = loadFromHDFS[Data](path,...).cache() println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) // 1 result in 1 printed to stdout. When I inspect values of the fields after reading cached data it seems I am pretty sure that root cause of described problem is issue with serialization of classes generated from avro idl, but I do not know how to resolve it. I tried to use Kryo, registering generated class (Data), registering different serializers from chill_avro for given class (SpecificRecordSerializer, SpecificRecordBinarySerializer, etc), but none of those ideas helps me. I post exactly the same question on stackoverflow but I did not receive any repsponse. link <http://stackoverflow.com/questions/33027851/spark-issue-with-the-class-generated-from-avro-schema> What is more I created minimal working example, thanks to which it will be easy to reproduce problem. link <https://github.com/alberskib/spark-avro-serialization-issue> How I can solve this problem? Thanks, Bartek -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-the-class-generated-from-avro-schema-tp24997.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org