Re: spark streaming - saving kafka DStream into hadoop throws exception
Hm, now I am also seeing this problem. The essence of my code is: final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); JavaStreamingContextFactory streamingContextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { return new JavaStreamingContext(sparkContext, new Duration(batchDurationMS)); } }; streamingContext = JavaStreamingContext.getOrCreate( checkpointDirString, sparkContext.hadoopConfiguration(), streamingContextFactory, false); streamingContext.checkpoint(checkpointDirString); yields: 2014-10-31 14:29:00,211 ERROR OneForOneStrategy:66 org.apache.hadoop.conf.Configuration - field (class org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9, name: conf$2, type: class org.apache.hadoop.conf.Configuration) - object (class org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9, function2) - field (class org.apache.spark.streaming.dstream.ForEachDStream, name: org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc, type: interface scala.Function2) - object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@cb8016a) ... This looks like it's due to PairRDDFunctions, as this saveFunc seems to be org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9 : def saveAsNewAPIHadoopFiles( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ : NewOutputFormat[_, _]], conf: Configuration = new Configuration ) { val saveFunc = (rdd: RDD[(K, V)], time: Time) = { val file = rddToFileName(prefix, suffix, time) rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) } self.foreachRDD(saveFunc) } conf indeed is serialized to make it part of saveFunc, no? but it can't be serialized. But surely this doesn't fail all the time or someone would have noticed by now... It could be a particular closure problem again. Any ideas on whether this is a problem, or if there's a workaround? checkpointing does not work at all for me as a result. On Fri, Aug 15, 2014 at 10:37 PM, salemi alireza.sal...@udo.edu wrote: Hi All, I am just trying to save the kafka dstream to hadoop as followed val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap) dStream.saveAsHadoopFiles(hdfsDataUrl, data) It throws the following exception. What am I doing wrong? 14/08/15 14:30:09 ERROR OneForOneStrategy: org.apache.hadoop.mapred.JobConf java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
Re: spark streaming - saving kafka DStream into hadoop throws exception
Somewhere, your function has a reference to the Hadoop JobConf object and is trying to send that to the workers. It's not in this code you pasted so must be from something slightly different? It shouldn't need to send that around and in fact it can't be serialized as you see. If you need a Hadoop Configuration object, you can get that from SparkContext, which you can get from the StreamingContext. On Fri, Aug 15, 2014 at 9:37 PM, salemi alireza.sal...@udo.edu wrote: Hi All, I am just trying to save the kafka dstream to hadoop as followed val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap) dStream.saveAsHadoopFiles(hdfsDataUrl, data) It throws the following exception. What am I doing wrong? 14/08/15 14:30:09 ERROR OneForOneStrategy: org.apache.hadoop.mapred.JobConf java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:440) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:168) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:185) at org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:259) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ^C14/08/15 14:30:10 ERROR OneForOneStrategy: org.apache.hadoop.mapred.JobConf java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at
Re: spark streaming - saving kafka DStream into hadoop throws exception
Look this is the whole program. I am not trying to serialize the JobConf. def main(args: Array[String]) { try { val properties = getProperties(settings.properties) StreamingExamples.setStreamingLogLevels() val zkQuorum = properties.get(zookeeper.list).toString() val topic = properties.get(topic.name).toString() val group = properties.get(group.name).toString() val threads = properties.get(consumer.threads).toString() val topicpMap = Map(topic - threads.toInt) val hdfsNameNodeUrl = properties.get(hdfs.namenode.url).toString() val hdfsCheckPointUrl = hdfsNameNodeUrl + properties.get(hdfs.checkpoint.path).toString() val hdfsDataUrl = hdfsNameNodeUrl + properties.get(hdfs.data.path).toString() val checkPointInterval = properties.get(spark.streaming.checkpoint.interval).toString().toInt val sparkConf = new SparkConf().setAppName(KafkaMessageReceiver) println(===) println(kafka configuration: zk: + zkQuorum + ; topic: + topic + ; group: + group + ; threads: + threads) println(===) val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(hdfsCheckPointUrl) val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap) dStream.checkpoint(Seconds(checkPointInterval)) dStream.saveAsNewAPIHadoopFiles(hdfsDataUrl, csv, classOf[String], classOf[String], classOf[TextOutputFormat[String,String]], ssc.sparkContext.hadoopConfiguration) val eventData = dStream.map(_._2).map(_.split(,)).map(data = DataObject(data(0), data(1), data(2), data(3), data(4), data(5), data(6), data(7), data(8).toLong, data(9), data(10), data(11), data(12).toLong, data(13), data(14))) val count = eventData.filter(_.state == COMPLETE).countByWindow(Minutes(15), Seconds(1)) count.map(cnt = the Total count of calls in complete state in the last 15 minutes is: + cnt).print() ssc.start() ssc.awaitTermination() } catch { case e: Exception = println(exception caught: + e); } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-saving-kafka-DStream-into-hadoop-throws-exception-tp12202p12207.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark streaming - saving kafka DStream into hadoop throws exception
if I reduce the app to the following code then I don't see the exception. It creates the hadoop files but they are empty! The DStream doesn't get written out to the files! def main(args: Array[String]) { try { val properties = getProperties(settings.properties) StreamingExamples.setStreamingLogLevels() val zkQuorum = properties.get(zookeeper.list).toString() val topic = properties.get(topic.name).toString() val group = properties.get(group.name).toString() val threads = properties.get(consumer.threads).toString() val topicpMap = Map(topic - threads.toInt) val hdfsNameNodeUrl = properties.get(hdfs.namenode.url).toString() val hdfsCheckPointUrl = hdfsNameNodeUrl + properties.get(hdfs.checkpoint.path).toString() val hdfsDataUrl = hdfsNameNodeUrl + properties.get(hdfs.data.path).toString() val checkPointInterval = properties.get(spark.streaming.checkpoint.interval).toString().toInt val sparkConf = new SparkConf().setAppName(KafkaMessageReceiver) println(===) println(kafka configuration: zk: + zkQuorum + ; topic: + topic + ; group: + group + ; threads: + threads) println(===) val ssc = new StreamingContext(sparkConf, Seconds(1)) val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap) dStream.saveAsNewAPIHadoopFiles(hdfsDataUrl, csv, classOf[String], classOf[String], classOf[TextOutputFormat[String,String]], ssc.sparkContext.hadoopConfiguration) ssc.start() ssc.awaitTermination() } catch { case e: Exception = println(exception caught: + e); } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-saving-kafka-DStream-into-hadoop-throws-exception-tp12202p12213.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org