[ https://issues.apache.org/jira/browse/SPARK-11064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14954675#comment-14954675 ]
jason kim commented on SPARK-11064: ----------------------------------- I'm not here to find the right solution. my code: def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: <kafkaURL> <topics>") System.exit(1) } val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } StreamingExamples.setStreamingLogLevels() val Array(brokers, topics) = args // Create context with 2 second batch interval val sparkConf = new SparkConf().setAppName("KafkaWordCountTest").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") // Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder" , "group.id" -> "spark-kafka-consumer") //update zookeeper offset val km = new KafkaManager(kafkaParams) val messages = km.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) //example 2: messages.transform(rdd=>{ km.updateZKOffsets(rdd) rdd}).map(x => (66, 1L)).reduceByKeyAndWindow(_+_, _-_,Seconds(10),Seconds(4)).print() // Start the computation ssc.start() ssc.awaitTermination() } > spark streaming checkpoint question > ----------------------------------- > > Key: SPARK-11064 > URL: https://issues.apache.org/jira/browse/SPARK-11064 > Project: Spark > Issue Type: Question > Affects Versions: 1.4.1 > Reporter: jason kim > Fix For: 2+ > > > java.io.NotSerializableException: DStream checkpointing has been enabled but > the DStreams with their functions are not serializable > Serialization stack: > at > org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550) > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) > at > com.bj58.spark.streaming.KafkaWordCountTest$.main(KafkaWordCountTest.scala:70) > at > com.bj58.spark.streaming.KafkaWordCountTest.main(KafkaWordCountTest.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org