Great, glad it worked out! Just keep an eye on memory usage as you roll it out. Like I said before, if you’ll be running this 24/7 consider cleaning up sessions by returning None after some sort of timeout.
On 12/18/14, 8:25 PM, "Pierce Lamb" <richard.pierce.l...@gmail.com> wrote: >This produces the expected output, thank you! > >On Thu, Dec 18, 2014 at 12:11 PM, Silvio Fiorito ><silvio.fior...@granturing.com> wrote: >> Ok, I have a better idea of what you’re trying to do now. >> >> I think the prob might be the map. The first time the function runs, >> currentValue will be None. Using map on None returns None. >> >> Instead, try: >> >> Some(currentValue.getOrElse(Seq.empty) ++ newValues) >> >> I think that should give you the expected result. >> >> >> From: Pierce Lamb <richard.pierce.l...@gmail.com> >> Date: Thursday, December 18, 2014 at 2:31 PM >> To: Silvio Fiorito <silvio.fior...@granturing.com> >> Cc: "user@spark.apache.org" <user@spark.apache.org> >> Subject: Re: Help with updateStateByKey >> >> Hi Silvio, >> >> This is a great suggestion (I wanted to get rid of groupByKey), I have >>been >> trying to implement it this morning, but having some trouble. I would >>love >> to see your code for the function that goes inside updateStateByKey >> >> Here is my current code: >> >> def updateGroupByKey( newValues: Seq[(String, Long, Long)], >> currentValue: Option[Seq[(String, Long, >>Long)]] >> ): Option[Seq[(String, Long, Long)]] = { >> >> currentValue.map{ case (v) => v ++ newValues >> } >> } >> >> val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey) >> >> >> However, when I run it the grouped DStream doesn't get populated with >> anything. The issue is probably that currentValue is not actually an >> Option[Seq[triple]] but rather an Option[triple]. However if I change >>it to >> an Option[triple] then I have to also return an Option[triple] for >> updateStateByKey to compile, but I want that return value to be an >> Option[Seq[triple]] because ultimately i want the data to look like >> (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested, >> startTime, EndTime)...]) and have that Seq build over time >> >> Am I thinking about this wrong? >> >> Thank you >> >> On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito >> <silvio.fior...@granturing.com> wrote: >>> >>> Hi Pierce, >>> >>> You shouldn’t have to use groupByKey because updateStateByKey will get >>>a >>> Seq of all the values for that key already. >>> >>> I used that for realtime sessionization as well. What I did was key my >>> incoming events, then send them to udpateStateByKey. The >>>updateStateByKey >>> function then received a Seq of the events and the Option of the >>>previous >>> state for that key. The sessionization code then did its thing to >>>check if >>> the incoming events were part of the same session, based on a >>>configured >>> timeout. If a session already was active (from the previous state) and >>>it >>> hadn’t exceeded the timeout, it used that value. Otherwise it >>>generated a >>> new session id. Then the return value for the updateStateByKey function >>> was a Tuple of session id and last timestamp. >>> >>> Then I joined the DStream with the session ids, which were both keyed >>>off >>> the same id and continued my processing. Your requirements may be >>> different, but that’s what worked well for me. >>> >>> Another thing to consider is cleaning up old sessions by returning >>>None in >>> the updateStateByKey function. This will help with long running apps >>>and >>> minimize memory usage (and checkpoint size). >>> >>> I was using something similar to the method above on a live production >>> stream with very little CPU and memory footprint, running for weeks at >>>a >>> time, processing up to 15M events per day with fluctuating traffic. >>> >>> Thanks, >>> Silvio >>> >>> >>> >>> On 12/17/14, 10:07 PM, "Pierce Lamb" <richard.pierce.l...@gmail.com> >>> wrote: >>> >>> >I am trying to run stateful Spark Streaming computations over (fake) >>> >apache web server logs read from Kafka. The goal is to "sessionize" >>> >the web traffic similar to this blog post: >>> >>> > >>>>http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessioni >>>>zat >>> >ion-with-spark-streaming-and-apache-hadoop/ >>> > >>> >The only difference is that I want to "sessionize" each page the IP >>> >hits, instead of the entire session. I was able to do this reading >>> >from a file of fake web traffic using Spark in batch mode, but now I >>> >want to do it in a streaming context. >>> > >>> >Log files are read from Kafka and parsed into K/V pairs of >>> > >>> >(String, (String, Long, Long)) or >>> > >>> >(IP, (requestPage, time, time)) >>> > >>> >I then call "groupByKey()" on this K/V pair. In batch mode, this would >>> >produce a: >>> > >>> >(String, CollectionBuffer((String, Long, Long), ...) or >>> > >>> >(IP, CollectionBuffer((requestPage, time, time), ...) >>> > >>> >In a StreamingContext, it produces a: >>> > >>> >(String, ArrayBuffer((String, Long, Long), ...) like so: >>> > >>> >(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) >>> > >>> >However, as the next microbatch (DStream) arrives, this information is >>> >discarded. Ultimately what I want is for that ArrayBuffer to fill up >>> >over time as a given IP continues to interact and to run some >>> >computations on its data to "sessionize" the page time. I believe the >>> >operator to make that happen is "updateStateByKey." I'm having some >>> >trouble with this operator (I'm new to both Spark & Scala); any help >>> >is appreciated. >>> > >>> >Thus far: >>> > >>> > val grouped = >>> >ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) >>> > >>> > >>> > def updateGroupByKey( >>> > a: Seq[(String, ArrayBuffer[(String, >>> >Long, Long)])], >>> > b: Option[(String, ArrayBuffer[(String, >>> >Long, Long)])] >>> > ): Option[(String, ArrayBuffer[(String, >>> >Long, Long)])] = { >>> > >>> > } >>> > >>> >--------------------------------------------------------------------- >>> >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> >For additional commands, e-mail: user-h...@spark.apache.org >>> >