Hi Shekhar, As both of your state functions does the same thing can't you do a union of dtsreams before applying mapWithState() ? It might be difficult if one state function is dependent on other state. This requires a named state, which can be accessed in other state functions. I have not gone through the details but the PR (https://github.com/apache/spark/pull/11645) from Tathagat seems to be in that direction .
Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Fri, Apr 8, 2016 at 3:53 PM, Shekhar Bansal < shekhar0...@yahoo.com.invalid> wrote: > Hi > Can we share spark streaming state between two DStreams?? > Basically I want to create state using first stream and enrich second > stream using state. > Example: I have modified StatefulNetworkWordCount example. I am creating > state using first stream and enriching second stream with count of first > stream. > > val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", > 1))) > > > val mappingFuncForFirstStream = (batchTime: Time, word: String, one: > Option[Int], state: State[Int]) => { > val sum = one.getOrElse(0) + state.getOption.getOrElse(0) > val output = (word, sum) > state.update(sum) > > Some(output) > } > > val mappingFuncForSecondStream = (batchTime: Time, word: String, one: > Option[Int], state: State[Int]) => { > val sum = state.getOption.getOrElse(0) > val output = (word, sum) > > Some(output) > } > > > > // first stream > KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, topicsSet) > .flatMap(r=>r._2.split(" ")) > .map(x => (x, 1)) > > .mapWithState(StateSpec.function(mappingFuncForFirstStream).initialState(initialRDD).timeout(Minutes(10))) > .print(1) > > > > // second stream > KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams2, mergeTopicSet) > .flatMap(r=>r._2.split(" ")) > .map(x => (x, 1)) > > .mapWithState(StateSpec.function(mappingFuncForSecondStream).initialState(initialRDD).timeout(Minutes(10))) > .print(50) > > > In checkpointing directory, I can see two different state RDDs. > I am using spark-1.6.1 and kafka-0.8.2.1 > > Regards > Shekhar >