[ https://issues.apache.org/jira/browse/SPARK-20436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15979820#comment-15979820 ]
fangfengbin commented on SPARK-20436: ------------------------------------- There are other people have this problem: http://stackoverflow.com/questions/39039157/check-pointing-several-filestreams-in-my-spark-streaming-context > NullPointerException when restart from checkpoint file > ------------------------------------------------------ > > Key: SPARK-20436 > URL: https://issues.apache.org/jira/browse/SPARK-20436 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 1.5.0 > Reporter: fangfengbin > > I have written a Spark Streaming application which have two DStreams. > Code is : > object KafkaTwoInkfk { > def main(args: Array[String]) { > val Array(checkPointDir, brokers, topic1, topic2, batchSize) = args > val ssc = StreamingContext.getOrCreate(checkPointDir, () => > createContext(args)) > ssc.start() > ssc.awaitTermination() > } > def createContext(args : Array[String]) : StreamingContext = { > val Array(checkPointDir, brokers, topic1, topic2, batchSize) = args > val sparkConf = new SparkConf().setAppName("KafkaWordCount") > val ssc = new StreamingContext(sparkConf, Seconds(batchSize.toLong)) > ssc.checkpoint(checkPointDir) > val topicArr1 = topic1.split(",") > val topicSet1 = topicArr1.toSet > val topicArr2 = topic2.split(",") > val topicSet2 = topicArr2.toSet > val kafkaParams = Map[String, String]( > "metadata.broker.list" -> brokers > ) > val lines1 = KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, topicSet1) > val words1 = lines1.map(_._2).flatMap(_.split(" ")) > val wordCounts1 = words1.map(x => { > (x, 1L)}).reduceByKey(_ + _) > wordCounts1.print() > val lines2 = KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, topicSet2) > val words2 = lines1.map(_._2).flatMap(_.split(" ")) > val wordCounts2 = words2.map(x => { > (x, 1L)}).reduceByKey(_ + _) > wordCounts2.print() > return ssc > } > } > when restart from checkpoint file, it throw NullPointerException: > java.lang.NullPointerException > at > org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:126) > at > org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:124) > at > org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:124) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1291) > at > org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:124) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:528) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:523) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:523) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1291) > at > org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:523) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:190) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:185) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:185) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1291) > at > org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:185) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:145) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:145) > at > org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:145) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1325) > at > org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:146) > at > org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:524) > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:572) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:571) > at com.spark.test.KafkaTwoInkfk$.main(KafkaTwoInkfk.scala:21) > at com.spark.test.KafkaTwoInkfk.main(KafkaTwoInkfk.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:760) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:190) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:215) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:129) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org