Please disregard the "window" functions...it turns out that was development code. Everything else is correct.
val rawLEFT: DStream[String] = ssc.textFileStream(dirLEFT). window(Seconds(30)) val rawRIGHT: DStream[String] = ssc.textFileStream(dirRIGHT). window(Seconds(30)) should be val rawLEFT: DStream[String] = ssc.textFileStream(dirLEFT) val rawRIGHT: DStream[String] = ssc.textFileStream(dirRIGHT) On Wed, Dec 2, 2015 at 2:12 PM, Aris <arisofala...@gmail.com> wrote: > Hello folks, > > I'm on the newest spark 1.6.0-SNAPSHOT Spark Streaming with the new > trackStateByKey API. I'm trying to do something fairly simple that requires > knowing state across minibatches, so I am trying to see if it can be done. > > I basically have two types of data to do a join, left-side and right-side, > that will come in batches. I want to accumulate the left-side data and the > right-side data across mini-batches and on each mini batch I want to JOIN > left-side and right-side for ALL the history I have so far (up to the point > of my timeout). However, when I tried trackStateByKey, it works if I dump > all my data at once (an unrealistic case), so all the left-side data and > right-side data are in the same mini-batch, thus everything is available > for a join in that single mini batch. However, when I let the data for > left-side and right-side come in chunks over different mini batches...only > the left data and right data in THAT mini-batch are actually "available" to > me for that JOIN operation! The history is NOT being used in the join > because those "keys" were not touched by the input chunk of data for the > mini-batch. > > Maybe I'm doing something wrong? I thought that trackStateByKey would let > me accumulate my input data (subject to a timeout) across separate > mini-batches and let me do my big join. > > Here's sample code of what I'm doing. I'm keeping state in stateLEFT and > stateRIGHT, the join key is a tuple of (JoinKey1, JoinKey2). The strange > part is that my history of state is NOT available when I do the join in the > code. I am NOT using timeouts right now. > > > val trackStateFn = > (batchTime: Time, > stringJson: RawJSON, > keyWithParsedJson: Option[((JoinKey1, JoinKey2), JValue)], > state: State[((JoinKey1, JoinKey2), JValue)]) => { > if (state.isTimingOut) None > else { > keyWithParsedJson.foreach(parsed => state.update(parsed)) > keyWithParsedJson // Just return the data ready for joining > } > > } > > val sparkConf = new SparkConf().setAppName("Denormalizer") > > val ssc = new StreamingContext(sparkConf, Seconds(10)) > ssc.checkpoint("/tmp/denormStateful/") > > val rawLEFT: DStream[String] = > ssc.textFileStream(dirLEFT).window(Seconds(30)) > val rawRIGHT: DStream[String] = > ssc.textFileStream(dirRIGHT).window(Seconds(30)) > > implicit val formats = DefaultFormats //++ JodaTimeSerializers.all > > val jsonLEFT: DStream[(RawJSON, ((JoinKey1, JoinKey2), JValue))] = > rawLEFT.map(transformParseJoinKeys) > val jsonRIGHT: DStream[(RawJSON, ((JoinKey1, JoinKey2), JValue))] = > rawRIGHT.map(transformParseJoinKeys) > > val stateLEFT: TrackStateDStream[RawJSON, ((JoinKey1, JoinKey2),JValue), > ((JoinKey1, JoinKey2),JValue), ((JoinKey1, JoinKey2),JValue)] = > jsonLEFT.trackStateByKey(StateSpec.function(trackStateFn)) // TODO add > timeout, maybe initial state? > val stateRIGHT: TrackStateDStream[RawJSON, ((JoinKey1, JoinKey2),JValue), > ((JoinKey1, JoinKey2),JValue), ((JoinKey1, JoinKey2),JValue)] = > jsonRIGHT.trackStateByKey(StateSpec.function(trackStateFn)) > > val jsonJoinedBoth: DStream[((JoinKey1, JoinKey2), (JValue, JValue))] = > stateLEFT.join(stateRIGHT) > > val jsonPretty = jsonJoinedBoth.map { tuple => > mergeLEFTandRIGHT(tuple._2._1, tuple._2._1) > } > > jsonPretty.foreachRDD{rdd => > if (rdd.count() > 0){ > val timestamp = (System.currentTimeMillis() % 100000000).toString > val fileName = s"denorm-stateful-$timestamp.out" > rdd.saveAsTextFile(fileName) > println(s"Output ${rdd.count} lines of data to $fileName") > } > } > > ssc.start() > ssc.awaitTermination() >