[ 
https://issues.apache.org/jira/browse/SPARK-11064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14954675#comment-14954675
 ] 

jason kim commented on SPARK-11064:
-----------------------------------

I'm not here to find the right solution.

my code:
    def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage:  <kafkaURL> <topics>")
      System.exit(1)
    }

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new 
SparkConf().setAppName("KafkaWordCountTest").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, 
"serializer.class" -> "kafka.serializer.StringEncoder" ,  "group.id" -> 
"spark-kafka-consumer")
        
    //update zookeeper offset
    val km = new KafkaManager(kafkaParams)
    val messages = km.createDirectStream[String, String, StringDecoder, 
StringDecoder](
      ssc, kafkaParams, topicsSet)

    //example  2:
      messages.transform(rdd=>{
        km.updateZKOffsets(rdd) 
        rdd}).map(x => (66, 1L)).reduceByKeyAndWindow(_+_,  
_-_,Seconds(10),Seconds(4)).print()
    // Start the computation
    ssc.start()
    ssc.awaitTermination()
}

> spark streaming checkpoint question
> -----------------------------------
>
>                 Key: SPARK-11064
>                 URL: https://issues.apache.org/jira/browse/SPARK-11064
>             Project: Spark
>          Issue Type: Question
>    Affects Versions: 1.4.1
>            Reporter: jason kim
>             Fix For: 2+
>
>
> java.io.NotSerializableException: DStream checkpointing has been enabled but 
> the DStreams with their functions are not serializable
> Serialization stack:
>       at 
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550)
>       at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
>       at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>       at 
> com.bj58.spark.streaming.KafkaWordCountTest$.main(KafkaWordCountTest.scala:70)
>       at 
> com.bj58.spark.streaming.KafkaWordCountTest.main(KafkaWordCountTest.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>       at java.lang.reflect.Method.invoke(Method.java:597)
>       at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>       at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>       at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>       at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to