Hi Pierce,

You shouldn’t have to use groupByKey because updateStateByKey will get a 
Seq of all the values for that key already.

I used that for realtime sessionization as well. What I did was key my 
incoming events, then send them to udpateStateByKey. The updateStateByKey 
function then received a Seq of the events and the Option of the previous 
state for that key. The sessionization code then did its thing to check if 
the incoming events were part of the same session, based on a configured 
timeout. If a session already was active (from the previous state) and it 
hadn’t exceeded the timeout, it used that value. Otherwise it generated a 
new session id. Then the return value for the updateStateByKey function 
was a Tuple of session id and last timestamp.

Then I joined the DStream with the session ids, which were both keyed off 
the same id and continued my processing. Your requirements may be 
different, but that’s what worked well for me.

Another thing to consider is cleaning up old sessions by returning None in 
the updateStateByKey function. This will help with long running apps and 
minimize memory usage (and checkpoint size).

I was using something similar to the method above on a live production 
stream with very little CPU and memory footprint, running for weeks at a 
time, processing up to 15M events per day with fluctuating traffic.

Thanks,
Silvio



On 12/17/14, 10:07 PM, "Pierce Lamb" <richard.pierce.l...@gmail.com> wrote:

>I am trying to run stateful Spark Streaming computations over (fake)
>apache web server logs read from Kafka. The goal is to "sessionize"
>the web traffic similar to this blog post:
>http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat
>ion-with-spark-streaming-and-apache-hadoop/
>
>The only difference is that I want to "sessionize" each page the IP
>hits, instead of the entire session. I was able to do this reading
>from a file of fake web traffic using Spark in batch mode, but now I
>want to do it in a streaming context.
>
>Log files are read from Kafka and parsed into K/V pairs of
>
>(String, (String, Long, Long)) or
>
>(IP, (requestPage, time, time))
>
>I then call "groupByKey()" on this K/V pair. In batch mode, this would
>produce a:
>
>(String, CollectionBuffer((String, Long, Long), ...) or
>
>(IP, CollectionBuffer((requestPage, time, time), ...)
>
>In a StreamingContext, it produces a:
>
>(String, ArrayBuffer((String, Long, Long), ...) like so:
>
>(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
>
>However, as the next microbatch (DStream) arrives, this information is
>discarded. Ultimately what I want is for that ArrayBuffer to fill up
>over time as a given IP continues to interact and to run some
>computations on its data to "sessionize" the page time. I believe the
>operator to make that happen is "updateStateByKey." I'm having some
>trouble with this operator (I'm new to both Spark & Scala); any help
>is appreciated.
>
>Thus far:
>
>    val grouped = 
>ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
>
>
>        def updateGroupByKey(
>                              a: Seq[(String, ArrayBuffer[(String,
>Long, Long)])],
>                              b: Option[(String, ArrayBuffer[(String,
>Long, Long)])]
>                              ): Option[(String, ArrayBuffer[(String,
>Long, Long)])] = {
>
>      }
>
>---------------------------------------------------------------------
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

Reply via email to