HiCan we share spark streaming state between two DStreams??Basically I want to create state using first stream and enrich second stream using state.Example: I have modified StatefulNetworkWordCount example. I am creating state using first stream and enriching second stream with count of first stream.val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
val mappingFuncForFirstStream = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) Some(output) } val mappingFuncForSecondStream = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => { val sum = state.getOption.getOrElse(0) val output = (word, sum) Some(output) } // first stream KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) .flatMap(r=>r._2.split(" ")) .map(x => (x, 1)) .mapWithState(StateSpec.function(mappingFuncForFirstStream).initialState(initialRDD).timeout(Minutes(10))) .print(1) // second stream KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams2, mergeTopicSet) .flatMap(r=>r._2.split(" ")) .map(x => (x, 1)) .mapWithState(StateSpec.function(mappingFuncForSecondStream).initialState(initialRDD).timeout(Minutes(10))) .print(50) In checkpointing directory, I can see two different state RDDs.I am using spark-1.6.1 and kafka-0.8.2.1 RegardsShekhar