This produces the expected output, thank you!

On Thu, Dec 18, 2014 at 12:11 PM, Silvio Fiorito
<silvio.fior...@granturing.com> wrote:
> Ok, I have a better idea of what you’re trying to do now.
>
> I think the prob might be the map. The first time the function runs,
> currentValue will be None. Using map on None returns None.
>
> Instead, try:
>
> Some(currentValue.getOrElse(Seq.empty) ++ newValues)
>
> I think that should give you the expected result.
>
>
> From: Pierce Lamb <richard.pierce.l...@gmail.com>
> Date: Thursday, December 18, 2014 at 2:31 PM
> To: Silvio Fiorito <silvio.fior...@granturing.com>
> Cc: "user@spark.apache.org" <user@spark.apache.org>
> Subject: Re: Help with updateStateByKey
>
> Hi Silvio,
>
> This is a great suggestion (I wanted to get rid of groupByKey), I have been
> trying to implement it this morning, but having some trouble. I would love
> to see your code for the function that goes inside updateStateByKey
>
> Here is my current code:
>
>  def updateGroupByKey( newValues: Seq[(String, Long, Long)],
>                           currentValue: Option[Seq[(String, Long, Long)]]
>                           ): Option[Seq[(String, Long, Long)]] = {
>
>       currentValue.map{ case (v) => v ++ newValues
>       }
>     }
>
>     val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey)
>
>
> However, when I run it the grouped DStream doesn't get populated with
> anything. The issue is probably that currentValue is not actually an
> Option[Seq[triple]] but rather an Option[triple]. However if I change it to
> an Option[triple] then I have to also return an Option[triple] for
> updateStateByKey to compile, but I want that return value to be an
> Option[Seq[triple]] because ultimately i want the data to look like
> (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested,
> startTime, EndTime)...]) and have that Seq build over time
>
> Am I thinking about this wrong?
>
> Thank you
>
> On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito
> <silvio.fior...@granturing.com> wrote:
>>
>> Hi Pierce,
>>
>> You shouldn’t have to use groupByKey because updateStateByKey will get a
>> Seq of all the values for that key already.
>>
>> I used that for realtime sessionization as well. What I did was key my
>> incoming events, then send them to udpateStateByKey. The updateStateByKey
>> function then received a Seq of the events and the Option of the previous
>> state for that key. The sessionization code then did its thing to check if
>> the incoming events were part of the same session, based on a configured
>> timeout. If a session already was active (from the previous state) and it
>> hadn’t exceeded the timeout, it used that value. Otherwise it generated a
>> new session id. Then the return value for the updateStateByKey function
>> was a Tuple of session id and last timestamp.
>>
>> Then I joined the DStream with the session ids, which were both keyed off
>> the same id and continued my processing. Your requirements may be
>> different, but that’s what worked well for me.
>>
>> Another thing to consider is cleaning up old sessions by returning None in
>> the updateStateByKey function. This will help with long running apps and
>> minimize memory usage (and checkpoint size).
>>
>> I was using something similar to the method above on a live production
>> stream with very little CPU and memory footprint, running for weeks at a
>> time, processing up to 15M events per day with fluctuating traffic.
>>
>> Thanks,
>> Silvio
>>
>>
>>
>> On 12/17/14, 10:07 PM, "Pierce Lamb" <richard.pierce.l...@gmail.com>
>> wrote:
>>
>> >I am trying to run stateful Spark Streaming computations over (fake)
>> >apache web server logs read from Kafka. The goal is to "sessionize"
>> >the web traffic similar to this blog post:
>>
>> > >http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat
>> >ion-with-spark-streaming-and-apache-hadoop/
>> >
>> >The only difference is that I want to "sessionize" each page the IP
>> >hits, instead of the entire session. I was able to do this reading
>> >from a file of fake web traffic using Spark in batch mode, but now I
>> >want to do it in a streaming context.
>> >
>> >Log files are read from Kafka and parsed into K/V pairs of
>> >
>> >(String, (String, Long, Long)) or
>> >
>> >(IP, (requestPage, time, time))
>> >
>> >I then call "groupByKey()" on this K/V pair. In batch mode, this would
>> >produce a:
>> >
>> >(String, CollectionBuffer((String, Long, Long), ...) or
>> >
>> >(IP, CollectionBuffer((requestPage, time, time), ...)
>> >
>> >In a StreamingContext, it produces a:
>> >
>> >(String, ArrayBuffer((String, Long, Long), ...) like so:
>> >
>> >(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
>> >
>> >However, as the next microbatch (DStream) arrives, this information is
>> >discarded. Ultimately what I want is for that ArrayBuffer to fill up
>> >over time as a given IP continues to interact and to run some
>> >computations on its data to "sessionize" the page time. I believe the
>> >operator to make that happen is "updateStateByKey." I'm having some
>> >trouble with this operator (I'm new to both Spark & Scala); any help
>> >is appreciated.
>> >
>> >Thus far:
>> >
>> >    val grouped =
>> >ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
>> >
>> >
>> >        def updateGroupByKey(
>> >                              a: Seq[(String, ArrayBuffer[(String,
>> >Long, Long)])],
>> >                              b: Option[(String, ArrayBuffer[(String,
>> >Long, Long)])]
>> >                              ): Option[(String, ArrayBuffer[(String,
>> >Long, Long)])] = {
>> >
>> >      }
>> >
>> >---------------------------------------------------------------------
>> >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >For additional commands, e-mail: user-h...@spark.apache.org
>> >

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to