Great, glad it worked out! Just keep an eye on memory usage as you roll it 
out. Like I said before, if you’ll be running this 24/7 consider cleaning 
up sessions by returning None after some sort of timeout.




On 12/18/14, 8:25 PM, "Pierce Lamb" <richard.pierce.l...@gmail.com> wrote:

>This produces the expected output, thank you!
>
>On Thu, Dec 18, 2014 at 12:11 PM, Silvio Fiorito
><silvio.fior...@granturing.com> wrote:
>> Ok, I have a better idea of what you’re trying to do now.
>>
>> I think the prob might be the map. The first time the function runs,
>> currentValue will be None. Using map on None returns None.
>>
>> Instead, try:
>>
>> Some(currentValue.getOrElse(Seq.empty) ++ newValues)
>>
>> I think that should give you the expected result.
>>
>>
>> From: Pierce Lamb <richard.pierce.l...@gmail.com>
>> Date: Thursday, December 18, 2014 at 2:31 PM
>> To: Silvio Fiorito <silvio.fior...@granturing.com>
>> Cc: "user@spark.apache.org" <user@spark.apache.org>
>> Subject: Re: Help with updateStateByKey
>>
>> Hi Silvio,
>>
>> This is a great suggestion (I wanted to get rid of groupByKey), I have 
>>been
>> trying to implement it this morning, but having some trouble. I would 
>>love
>> to see your code for the function that goes inside updateStateByKey
>>
>> Here is my current code:
>>
>>  def updateGroupByKey( newValues: Seq[(String, Long, Long)],
>>                           currentValue: Option[Seq[(String, Long, 
>>Long)]]
>>                           ): Option[Seq[(String, Long, Long)]] = {
>>
>>       currentValue.map{ case (v) => v ++ newValues
>>       }
>>     }
>>
>>     val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey)
>>
>>
>> However, when I run it the grouped DStream doesn't get populated with
>> anything. The issue is probably that currentValue is not actually an
>> Option[Seq[triple]] but rather an Option[triple]. However if I change 
>>it to
>> an Option[triple] then I have to also return an Option[triple] for
>> updateStateByKey to compile, but I want that return value to be an
>> Option[Seq[triple]] because ultimately i want the data to look like
>> (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested,
>> startTime, EndTime)...]) and have that Seq build over time
>>
>> Am I thinking about this wrong?
>>
>> Thank you
>>
>> On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito
>> <silvio.fior...@granturing.com> wrote:
>>>
>>> Hi Pierce,
>>>
>>> You shouldn’t have to use groupByKey because updateStateByKey will get 
>>>a
>>> Seq of all the values for that key already.
>>>
>>> I used that for realtime sessionization as well. What I did was key my
>>> incoming events, then send them to udpateStateByKey. The 
>>>updateStateByKey
>>> function then received a Seq of the events and the Option of the 
>>>previous
>>> state for that key. The sessionization code then did its thing to 
>>>check if
>>> the incoming events were part of the same session, based on a 
>>>configured
>>> timeout. If a session already was active (from the previous state) and 
>>>it
>>> hadn’t exceeded the timeout, it used that value. Otherwise it 
>>>generated a
>>> new session id. Then the return value for the updateStateByKey function
>>> was a Tuple of session id and last timestamp.
>>>
>>> Then I joined the DStream with the session ids, which were both keyed 
>>>off
>>> the same id and continued my processing. Your requirements may be
>>> different, but that’s what worked well for me.
>>>
>>> Another thing to consider is cleaning up old sessions by returning 
>>>None in
>>> the updateStateByKey function. This will help with long running apps 
>>>and
>>> minimize memory usage (and checkpoint size).
>>>
>>> I was using something similar to the method above on a live production
>>> stream with very little CPU and memory footprint, running for weeks at 
>>>a
>>> time, processing up to 15M events per day with fluctuating traffic.
>>>
>>> Thanks,
>>> Silvio
>>>
>>>
>>>
>>> On 12/17/14, 10:07 PM, "Pierce Lamb" <richard.pierce.l...@gmail.com>
>>> wrote:
>>>
>>> >I am trying to run stateful Spark Streaming computations over (fake)
>>> >apache web server logs read from Kafka. The goal is to "sessionize"
>>> >the web traffic similar to this blog post:
>>>
>>> > 
>>>>http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessioni
>>>>zat
>>> >ion-with-spark-streaming-and-apache-hadoop/
>>> >
>>> >The only difference is that I want to "sessionize" each page the IP
>>> >hits, instead of the entire session. I was able to do this reading
>>> >from a file of fake web traffic using Spark in batch mode, but now I
>>> >want to do it in a streaming context.
>>> >
>>> >Log files are read from Kafka and parsed into K/V pairs of
>>> >
>>> >(String, (String, Long, Long)) or
>>> >
>>> >(IP, (requestPage, time, time))
>>> >
>>> >I then call "groupByKey()" on this K/V pair. In batch mode, this would
>>> >produce a:
>>> >
>>> >(String, CollectionBuffer((String, Long, Long), ...) or
>>> >
>>> >(IP, CollectionBuffer((requestPage, time, time), ...)
>>> >
>>> >In a StreamingContext, it produces a:
>>> >
>>> >(String, ArrayBuffer((String, Long, Long), ...) like so:
>>> >
>>> >(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
>>> >
>>> >However, as the next microbatch (DStream) arrives, this information is
>>> >discarded. Ultimately what I want is for that ArrayBuffer to fill up
>>> >over time as a given IP continues to interact and to run some
>>> >computations on its data to "sessionize" the page time. I believe the
>>> >operator to make that happen is "updateStateByKey." I'm having some
>>> >trouble with this operator (I'm new to both Spark & Scala); any help
>>> >is appreciated.
>>> >
>>> >Thus far:
>>> >
>>> >    val grouped =
>>> >ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
>>> >
>>> >
>>> >        def updateGroupByKey(
>>> >                              a: Seq[(String, ArrayBuffer[(String,
>>> >Long, Long)])],
>>> >                              b: Option[(String, ArrayBuffer[(String,
>>> >Long, Long)])]
>>> >                              ): Option[(String, ArrayBuffer[(String,
>>> >Long, Long)])] = {
>>> >
>>> >      }
>>> >
>>> >---------------------------------------------------------------------
>>> >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> >For additional commands, e-mail: user-h...@spark.apache.org
>>> >

Reply via email to