This produces the expected output, thank you! On Thu, Dec 18, 2014 at 12:11 PM, Silvio Fiorito <silvio.fior...@granturing.com> wrote: > Ok, I have a better idea of what you’re trying to do now. > > I think the prob might be the map. The first time the function runs, > currentValue will be None. Using map on None returns None. > > Instead, try: > > Some(currentValue.getOrElse(Seq.empty) ++ newValues) > > I think that should give you the expected result. > > > From: Pierce Lamb <richard.pierce.l...@gmail.com> > Date: Thursday, December 18, 2014 at 2:31 PM > To: Silvio Fiorito <silvio.fior...@granturing.com> > Cc: "user@spark.apache.org" <user@spark.apache.org> > Subject: Re: Help with updateStateByKey > > Hi Silvio, > > This is a great suggestion (I wanted to get rid of groupByKey), I have been > trying to implement it this morning, but having some trouble. I would love > to see your code for the function that goes inside updateStateByKey > > Here is my current code: > > def updateGroupByKey( newValues: Seq[(String, Long, Long)], > currentValue: Option[Seq[(String, Long, Long)]] > ): Option[Seq[(String, Long, Long)]] = { > > currentValue.map{ case (v) => v ++ newValues > } > } > > val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey) > > > However, when I run it the grouped DStream doesn't get populated with > anything. The issue is probably that currentValue is not actually an > Option[Seq[triple]] but rather an Option[triple]. However if I change it to > an Option[triple] then I have to also return an Option[triple] for > updateStateByKey to compile, but I want that return value to be an > Option[Seq[triple]] because ultimately i want the data to look like > (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested, > startTime, EndTime)...]) and have that Seq build over time > > Am I thinking about this wrong? > > Thank you > > On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito > <silvio.fior...@granturing.com> wrote: >> >> Hi Pierce, >> >> You shouldn’t have to use groupByKey because updateStateByKey will get a >> Seq of all the values for that key already. >> >> I used that for realtime sessionization as well. What I did was key my >> incoming events, then send them to udpateStateByKey. The updateStateByKey >> function then received a Seq of the events and the Option of the previous >> state for that key. The sessionization code then did its thing to check if >> the incoming events were part of the same session, based on a configured >> timeout. If a session already was active (from the previous state) and it >> hadn’t exceeded the timeout, it used that value. Otherwise it generated a >> new session id. Then the return value for the updateStateByKey function >> was a Tuple of session id and last timestamp. >> >> Then I joined the DStream with the session ids, which were both keyed off >> the same id and continued my processing. Your requirements may be >> different, but that’s what worked well for me. >> >> Another thing to consider is cleaning up old sessions by returning None in >> the updateStateByKey function. This will help with long running apps and >> minimize memory usage (and checkpoint size). >> >> I was using something similar to the method above on a live production >> stream with very little CPU and memory footprint, running for weeks at a >> time, processing up to 15M events per day with fluctuating traffic. >> >> Thanks, >> Silvio >> >> >> >> On 12/17/14, 10:07 PM, "Pierce Lamb" <richard.pierce.l...@gmail.com> >> wrote: >> >> >I am trying to run stateful Spark Streaming computations over (fake) >> >apache web server logs read from Kafka. The goal is to "sessionize" >> >the web traffic similar to this blog post: >> >> > >http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat >> >ion-with-spark-streaming-and-apache-hadoop/ >> > >> >The only difference is that I want to "sessionize" each page the IP >> >hits, instead of the entire session. I was able to do this reading >> >from a file of fake web traffic using Spark in batch mode, but now I >> >want to do it in a streaming context. >> > >> >Log files are read from Kafka and parsed into K/V pairs of >> > >> >(String, (String, Long, Long)) or >> > >> >(IP, (requestPage, time, time)) >> > >> >I then call "groupByKey()" on this K/V pair. In batch mode, this would >> >produce a: >> > >> >(String, CollectionBuffer((String, Long, Long), ...) or >> > >> >(IP, CollectionBuffer((requestPage, time, time), ...) >> > >> >In a StreamingContext, it produces a: >> > >> >(String, ArrayBuffer((String, Long, Long), ...) like so: >> > >> >(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) >> > >> >However, as the next microbatch (DStream) arrives, this information is >> >discarded. Ultimately what I want is for that ArrayBuffer to fill up >> >over time as a given IP continues to interact and to run some >> >computations on its data to "sessionize" the page time. I believe the >> >operator to make that happen is "updateStateByKey." I'm having some >> >trouble with this operator (I'm new to both Spark & Scala); any help >> >is appreciated. >> > >> >Thus far: >> > >> > val grouped = >> >ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) >> > >> > >> > def updateGroupByKey( >> > a: Seq[(String, ArrayBuffer[(String, >> >Long, Long)])], >> > b: Option[(String, ArrayBuffer[(String, >> >Long, Long)])] >> > ): Option[(String, ArrayBuffer[(String, >> >Long, Long)])] = { >> > >> > } >> > >> >--------------------------------------------------------------------- >> >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >For additional commands, e-mail: user-h...@spark.apache.org >> >
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org