Re: Spark Streaming share state between two streams

2016-04-08 Thread Rishi Mishra
Hi Shekhar,
As both of your state functions does the same thing can't you do a union of
dtsreams before applying mapWithState() ? It might be difficult if one
state function is dependent on other state. This requires a named state,
which can be accessed in other state functions. I have not gone through the
details but the PR (https://github.com/apache/spark/pull/11645)  from
Tathagat seems to be in that direction .

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Fri, Apr 8, 2016 at 3:53 PM, Shekhar Bansal <
shekhar0...@yahoo.com.invalid> wrote:

> Hi
> Can we share spark streaming state between two DStreams??
> Basically I want to create state using first stream and enrich second
> stream using state.
> Example: I have modified StatefulNetworkWordCount example. I am creating
> state using first stream and enriching second stream with count of first
> stream.
>
> val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 
> 1)))
>
>
> val mappingFuncForFirstStream = (batchTime: Time, word: String, one: 
> Option[Int], state: State[Int]) => {
>   val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
>   val output = (word, sum)
>   state.update(sum)
>
>   Some(output)
> }
>
> val mappingFuncForSecondStream = (batchTime: Time, word: String, one: 
> Option[Int], state: State[Int]) => {
>   val sum = state.getOption.getOrElse(0)
>   val output = (word, sum)
>
>   Some(output)
> }
>
>
>
> // first stream
> KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, topicsSet)
>   .flatMap(r=>r._2.split(" "))
>   .map(x => (x, 1))
>   
> .mapWithState(StateSpec.function(mappingFuncForFirstStream).initialState(initialRDD).timeout(Minutes(10)))
>   .print(1)
>
>
>
> // second stream
> KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams2, mergeTopicSet)
>   .flatMap(r=>r._2.split(" "))
>   .map(x => (x, 1))
>   
> .mapWithState(StateSpec.function(mappingFuncForSecondStream).initialState(initialRDD).timeout(Minutes(10)))
>   .print(50)
>
>
> In checkpointing directory, I can see two different state RDDs.
> I am using spark-1.6.1 and kafka-0.8.2.1
>
> Regards
> Shekhar
>


Spark Streaming share state between two streams

2016-04-08 Thread Shekhar Bansal
HiCan we share spark streaming state between two DStreams??Basically I want to 
create state using first stream and enrich second stream using state.Example: I 
have modified StatefulNetworkWordCount example. I am creating state using first 
stream and enriching second stream with count of first stream.val initialRDD = 
ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))


val mappingFuncForFirstStream = (batchTime: Time, word: String, one: 
Option[Int], state: State[Int]) => {
  val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
  val output = (word, sum)
  state.update(sum)

  Some(output)
}

val mappingFuncForSecondStream = (batchTime: Time, word: String, one: 
Option[Int], state: State[Int]) => {
  val sum = state.getOption.getOrElse(0)
  val output = (word, sum)

  Some(output)
}



// first stream
KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topicsSet)
  .flatMap(r=>r._2.split(" "))
  .map(x => (x, 1))
  
.mapWithState(StateSpec.function(mappingFuncForFirstStream).initialState(initialRDD).timeout(Minutes(10)))
  .print(1)



// second stream
KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams2, mergeTopicSet)
  .flatMap(r=>r._2.split(" "))
  .map(x => (x, 1))
  
.mapWithState(StateSpec.function(mappingFuncForSecondStream).initialState(initialRDD).timeout(Minutes(10)))
  .print(50)
In checkpointing directory, I can see two different state RDDs.I am using 
spark-1.6.1 and kafka-0.8.2.1
RegardsShekhar