Please disregard the "window" functions...it turns out that was development
code. Everything else is correct.

 val rawLEFT: DStream[String] = ssc.textFileStream(dirLEFT).
window(Seconds(30))
 val rawRIGHT: DStream[String] = ssc.textFileStream(dirRIGHT).
window(Seconds(30))

should be

 val rawLEFT: DStream[String] = ssc.textFileStream(dirLEFT)
 val rawRIGHT: DStream[String] = ssc.textFileStream(dirRIGHT)

On Wed, Dec 2, 2015 at 2:12 PM, Aris <arisofala...@gmail.com> wrote:

> Hello folks,
>
> I'm on the newest spark 1.6.0-SNAPSHOT Spark Streaming with the new
> trackStateByKey API. I'm trying to do something fairly simple that requires
> knowing state across minibatches, so I am trying to see if it can be done.
>
> I basically have two types of data to do a join, left-side and right-side,
> that will come in batches. I want to accumulate the left-side data and the
> right-side data across mini-batches and on each mini batch I want to JOIN
> left-side and right-side for ALL the history I have so far (up to the point
> of my timeout). However, when I tried trackStateByKey, it works if I dump
> all my data at once (an unrealistic case), so all the left-side data and
> right-side data are in the same mini-batch, thus everything is available
> for a join in that single mini batch. However, when I let the data for
> left-side and right-side come in chunks over different mini batches...only
> the left data and right data in THAT mini-batch are actually "available" to
> me for that JOIN operation! The history is NOT being used in the join
> because those "keys" were not touched by the input chunk of data for the
> mini-batch.
>
> Maybe I'm doing something wrong? I thought that trackStateByKey would let
> me accumulate my input data (subject to a timeout) across separate
> mini-batches and let me do my big join.
>
> Here's sample code of what I'm doing. I'm keeping state in stateLEFT and
> stateRIGHT, the join key is a tuple of (JoinKey1, JoinKey2). The  strange
> part is that my history of state is NOT available when I do the join in the
> code. I am NOT using timeouts right now.
>
>
> val trackStateFn =
>   (batchTime: Time,
>    stringJson: RawJSON,
>    keyWithParsedJson: Option[((JoinKey1, JoinKey2), JValue)],
>    state: State[((JoinKey1, JoinKey2), JValue)]) => {
>     if (state.isTimingOut) None
>     else {
>       keyWithParsedJson.foreach(parsed => state.update(parsed))
>       keyWithParsedJson // Just return the data ready for joining
>     }
>
>   }
>
>  val sparkConf = new SparkConf().setAppName("Denormalizer")
>
>  val ssc = new StreamingContext(sparkConf, Seconds(10))
>  ssc.checkpoint("/tmp/denormStateful/")
>
>  val rawLEFT: DStream[String] =
> ssc.textFileStream(dirLEFT).window(Seconds(30))
>  val rawRIGHT: DStream[String] =
> ssc.textFileStream(dirRIGHT).window(Seconds(30))
>
>  implicit val formats = DefaultFormats //++ JodaTimeSerializers.all
>
>  val jsonLEFT: DStream[(RawJSON, ((JoinKey1, JoinKey2), JValue))] =
> rawLEFT.map(transformParseJoinKeys)
>  val jsonRIGHT: DStream[(RawJSON, ((JoinKey1, JoinKey2), JValue))] =
> rawRIGHT.map(transformParseJoinKeys)
>
>  val stateLEFT: TrackStateDStream[RawJSON, ((JoinKey1, JoinKey2),JValue),
> ((JoinKey1, JoinKey2),JValue), ((JoinKey1, JoinKey2),JValue)] =
>    jsonLEFT.trackStateByKey(StateSpec.function(trackStateFn)) // TODO add
> timeout, maybe initial state?
>  val stateRIGHT: TrackStateDStream[RawJSON, ((JoinKey1, JoinKey2),JValue),
> ((JoinKey1, JoinKey2),JValue), ((JoinKey1, JoinKey2),JValue)] =
>    jsonRIGHT.trackStateByKey(StateSpec.function(trackStateFn))
>
>  val jsonJoinedBoth: DStream[((JoinKey1, JoinKey2), (JValue, JValue))] =
> stateLEFT.join(stateRIGHT)
>
>  val jsonPretty = jsonJoinedBoth.map { tuple =>
>    mergeLEFTandRIGHT(tuple._2._1, tuple._2._1)
>  }
>
>  jsonPretty.foreachRDD{rdd =>
>    if (rdd.count() > 0){
>      val timestamp  = (System.currentTimeMillis() % 100000000).toString
>      val fileName = s"denorm-stateful-$timestamp.out"
>      rdd.saveAsTextFile(fileName)
>      println(s"Output ${rdd.count} lines of data to $fileName")
>    }
>  }
>
>  ssc.start()
>  ssc.awaitTermination()
>

Reply via email to