[Spark Streaming] Help with updateStateByKey()

2015-04-23 Thread allonsy
Hi everybody,

I think I could use some help with the /updateStateByKey()/ JAVA method in
Spark Streaming.

*Context:*

I have a /JavaReceiverInputDStreamDataUpdate du/ DStream, where object
/DataUpdate/ mainly has 2 fields of interest (in my case), namely
du.personId (an Integer) and du.cell.hashCode() (Integer, again). Obviously,
I am processing several /DataUpdate/ objects (coming from a log file read in
microbatches), and every /personId/ will be 'associated' to several
/du.cell.hashCode()/s.

What I need to do is, for every /personId/ statefully counting how many
times it appears with a particular /du.cell.hashCode()/, possibly
partitioning by the /personId/ key.

(Long story short: an area is split in cells and I wonder how many times
every person appears in every cell  )

In a very naive way, I guess everything should look like a
/HashMappersonId, HashMaplt;cell.hashCode(), count/, but I am not quite
sure how to partition by /personId/ and increase the count. 

It looks like method /updateStateByKey()/ should do the trick (I am new to
Spark Streaming), yet I can't figure out in which way.

Any suggestions?

Feel free to ask anything in case I was unclear or more information is
needed. :)


Thank you!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Help-with-updateStateByKey-tp22637.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Help with updateStateByKey

2014-12-18 Thread Tathagata Das
Another point to start playing with updateStateByKey is the example
StatefulNetworkWordCount. See the streaming examples directory in the
Spark repository.

TD



On Thu, Dec 18, 2014 at 6:07 AM, Pierce Lamb
richard.pierce.l...@gmail.com wrote:
 I am trying to run stateful Spark Streaming computations over (fake)
 apache web server logs read from Kafka. The goal is to sessionize
 the web traffic similar to this blog post:
 http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/

 The only difference is that I want to sessionize each page the IP
 hits, instead of the entire session. I was able to do this reading
 from a file of fake web traffic using Spark in batch mode, but now I
 want to do it in a streaming context.

 Log files are read from Kafka and parsed into K/V pairs of

 (String, (String, Long, Long)) or

 (IP, (requestPage, time, time))

 I then call groupByKey() on this K/V pair. In batch mode, this would
 produce a:

 (String, CollectionBuffer((String, Long, Long), ...) or

 (IP, CollectionBuffer((requestPage, time, time), ...)

 In a StreamingContext, it produces a:

 (String, ArrayBuffer((String, Long, Long), ...) like so:

 (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))

 However, as the next microbatch (DStream) arrives, this information is
 discarded. Ultimately what I want is for that ArrayBuffer to fill up
 over time as a given IP continues to interact and to run some
 computations on its data to sessionize the page time. I believe the
 operator to make that happen is updateStateByKey. I'm having some
 trouble with this operator (I'm new to both Spark  Scala); any help
 is appreciated.

 Thus far:

 val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)


 def updateGroupByKey(
   a: Seq[(String, ArrayBuffer[(String,
 Long, Long)])],
   b: Option[(String, ArrayBuffer[(String,
 Long, Long)])]
   ): Option[(String, ArrayBuffer[(String,
 Long, Long)])] = {

   }

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Help with updateStateByKey

2014-12-18 Thread Silvio Fiorito
Hi Pierce,

You shouldn’t have to use groupByKey because updateStateByKey will get a 
Seq of all the values for that key already.

I used that for realtime sessionization as well. What I did was key my 
incoming events, then send them to udpateStateByKey. The updateStateByKey 
function then received a Seq of the events and the Option of the previous 
state for that key. The sessionization code then did its thing to check if 
the incoming events were part of the same session, based on a configured 
timeout. If a session already was active (from the previous state) and it 
hadn’t exceeded the timeout, it used that value. Otherwise it generated a 
new session id. Then the return value for the updateStateByKey function 
was a Tuple of session id and last timestamp.

Then I joined the DStream with the session ids, which were both keyed off 
the same id and continued my processing. Your requirements may be 
different, but that’s what worked well for me.

Another thing to consider is cleaning up old sessions by returning None in 
the updateStateByKey function. This will help with long running apps and 
minimize memory usage (and checkpoint size).

I was using something similar to the method above on a live production 
stream with very little CPU and memory footprint, running for weeks at a 
time, processing up to 15M events per day with fluctuating traffic.

Thanks,
Silvio



On 12/17/14, 10:07 PM, Pierce Lamb richard.pierce.l...@gmail.com wrote:

I am trying to run stateful Spark Streaming computations over (fake)
apache web server logs read from Kafka. The goal is to sessionize
the web traffic similar to this blog post:
http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat
ion-with-spark-streaming-and-apache-hadoop/

The only difference is that I want to sessionize each page the IP
hits, instead of the entire session. I was able to do this reading
from a file of fake web traffic using Spark in batch mode, but now I
want to do it in a streaming context.

Log files are read from Kafka and parsed into K/V pairs of

(String, (String, Long, Long)) or

(IP, (requestPage, time, time))

I then call groupByKey() on this K/V pair. In batch mode, this would
produce a:

(String, CollectionBuffer((String, Long, Long), ...) or

(IP, CollectionBuffer((requestPage, time, time), ...)

In a StreamingContext, it produces a:

(String, ArrayBuffer((String, Long, Long), ...) like so:

(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))

However, as the next microbatch (DStream) arrives, this information is
discarded. Ultimately what I want is for that ArrayBuffer to fill up
over time as a given IP continues to interact and to run some
computations on its data to sessionize the page time. I believe the
operator to make that happen is updateStateByKey. I'm having some
trouble with this operator (I'm new to both Spark  Scala); any help
is appreciated.

Thus far:

val grouped = 
ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)


def updateGroupByKey(
  a: Seq[(String, ArrayBuffer[(String,
Long, Long)])],
  b: Option[(String, ArrayBuffer[(String,
Long, Long)])]
  ): Option[(String, ArrayBuffer[(String,
Long, Long)])] = {

  }

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Help with updateStateByKey

2014-12-18 Thread Pierce Lamb
This produces the expected output, thank you!

On Thu, Dec 18, 2014 at 12:11 PM, Silvio Fiorito
silvio.fior...@granturing.com wrote:
 Ok, I have a better idea of what you’re trying to do now.

 I think the prob might be the map. The first time the function runs,
 currentValue will be None. Using map on None returns None.

 Instead, try:

 Some(currentValue.getOrElse(Seq.empty) ++ newValues)

 I think that should give you the expected result.


 From: Pierce Lamb richard.pierce.l...@gmail.com
 Date: Thursday, December 18, 2014 at 2:31 PM
 To: Silvio Fiorito silvio.fior...@granturing.com
 Cc: user@spark.apache.org user@spark.apache.org
 Subject: Re: Help with updateStateByKey

 Hi Silvio,

 This is a great suggestion (I wanted to get rid of groupByKey), I have been
 trying to implement it this morning, but having some trouble. I would love
 to see your code for the function that goes inside updateStateByKey

 Here is my current code:

  def updateGroupByKey( newValues: Seq[(String, Long, Long)],
   currentValue: Option[Seq[(String, Long, Long)]]
   ): Option[Seq[(String, Long, Long)]] = {

   currentValue.map{ case (v) = v ++ newValues
   }
 }

 val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey)


 However, when I run it the grouped DStream doesn't get populated with
 anything. The issue is probably that currentValue is not actually an
 Option[Seq[triple]] but rather an Option[triple]. However if I change it to
 an Option[triple] then I have to also return an Option[triple] for
 updateStateByKey to compile, but I want that return value to be an
 Option[Seq[triple]] because ultimately i want the data to look like
 (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested,
 startTime, EndTime)...]) and have that Seq build over time

 Am I thinking about this wrong?

 Thank you

 On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito
 silvio.fior...@granturing.com wrote:

 Hi Pierce,

 You shouldn’t have to use groupByKey because updateStateByKey will get a
 Seq of all the values for that key already.

 I used that for realtime sessionization as well. What I did was key my
 incoming events, then send them to udpateStateByKey. The updateStateByKey
 function then received a Seq of the events and the Option of the previous
 state for that key. The sessionization code then did its thing to check if
 the incoming events were part of the same session, based on a configured
 timeout. If a session already was active (from the previous state) and it
 hadn’t exceeded the timeout, it used that value. Otherwise it generated a
 new session id. Then the return value for the updateStateByKey function
 was a Tuple of session id and last timestamp.

 Then I joined the DStream with the session ids, which were both keyed off
 the same id and continued my processing. Your requirements may be
 different, but that’s what worked well for me.

 Another thing to consider is cleaning up old sessions by returning None in
 the updateStateByKey function. This will help with long running apps and
 minimize memory usage (and checkpoint size).

 I was using something similar to the method above on a live production
 stream with very little CPU and memory footprint, running for weeks at a
 time, processing up to 15M events per day with fluctuating traffic.

 Thanks,
 Silvio



 On 12/17/14, 10:07 PM, Pierce Lamb richard.pierce.l...@gmail.com
 wrote:

 I am trying to run stateful Spark Streaming computations over (fake)
 apache web server logs read from Kafka. The goal is to sessionize
 the web traffic similar to this blog post:

  http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat
 ion-with-spark-streaming-and-apache-hadoop/
 
 The only difference is that I want to sessionize each page the IP
 hits, instead of the entire session. I was able to do this reading
 from a file of fake web traffic using Spark in batch mode, but now I
 want to do it in a streaming context.
 
 Log files are read from Kafka and parsed into K/V pairs of
 
 (String, (String, Long, Long)) or
 
 (IP, (requestPage, time, time))
 
 I then call groupByKey() on this K/V pair. In batch mode, this would
 produce a:
 
 (String, CollectionBuffer((String, Long, Long), ...) or
 
 (IP, CollectionBuffer((requestPage, time, time), ...)
 
 In a StreamingContext, it produces a:
 
 (String, ArrayBuffer((String, Long, Long), ...) like so:
 
 (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
 
 However, as the next microbatch (DStream) arrives, this information is
 discarded. Ultimately what I want is for that ArrayBuffer to fill up
 over time as a given IP continues to interact and to run some
 computations on its data to sessionize the page time. I believe the
 operator to make that happen is updateStateByKey. I'm having some
 trouble with this operator (I'm new to both Spark  Scala); any help
 is appreciated.
 
 Thus far:
 
 val grouped =
 ipTimeStamp.groupByKey

Re: Help with updateStateByKey

2014-12-18 Thread Silvio Fiorito
Great, glad it worked out! Just keep an eye on memory usage as you roll it 
out. Like I said before, if you’ll be running this 24/7 consider cleaning 
up sessions by returning None after some sort of timeout.




On 12/18/14, 8:25 PM, Pierce Lamb richard.pierce.l...@gmail.com wrote:

This produces the expected output, thank you!

On Thu, Dec 18, 2014 at 12:11 PM, Silvio Fiorito
silvio.fior...@granturing.com wrote:
 Ok, I have a better idea of what you’re trying to do now.

 I think the prob might be the map. The first time the function runs,
 currentValue will be None. Using map on None returns None.

 Instead, try:

 Some(currentValue.getOrElse(Seq.empty) ++ newValues)

 I think that should give you the expected result.


 From: Pierce Lamb richard.pierce.l...@gmail.com
 Date: Thursday, December 18, 2014 at 2:31 PM
 To: Silvio Fiorito silvio.fior...@granturing.com
 Cc: user@spark.apache.org user@spark.apache.org
 Subject: Re: Help with updateStateByKey

 Hi Silvio,

 This is a great suggestion (I wanted to get rid of groupByKey), I have 
been
 trying to implement it this morning, but having some trouble. I would 
love
 to see your code for the function that goes inside updateStateByKey

 Here is my current code:

  def updateGroupByKey( newValues: Seq[(String, Long, Long)],
   currentValue: Option[Seq[(String, Long, 
Long)]]
   ): Option[Seq[(String, Long, Long)]] = {

   currentValue.map{ case (v) = v ++ newValues
   }
 }

 val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey)


 However, when I run it the grouped DStream doesn't get populated with
 anything. The issue is probably that currentValue is not actually an
 Option[Seq[triple]] but rather an Option[triple]. However if I change 
it to
 an Option[triple] then I have to also return an Option[triple] for
 updateStateByKey to compile, but I want that return value to be an
 Option[Seq[triple]] because ultimately i want the data to look like
 (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested,
 startTime, EndTime)...]) and have that Seq build over time

 Am I thinking about this wrong?

 Thank you

 On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito
 silvio.fior...@granturing.com wrote:

 Hi Pierce,

 You shouldn’t have to use groupByKey because updateStateByKey will get 
a
 Seq of all the values for that key already.

 I used that for realtime sessionization as well. What I did was key my
 incoming events, then send them to udpateStateByKey. The 
updateStateByKey
 function then received a Seq of the events and the Option of the 
previous
 state for that key. The sessionization code then did its thing to 
check if
 the incoming events were part of the same session, based on a 
configured
 timeout. If a session already was active (from the previous state) and 
it
 hadn’t exceeded the timeout, it used that value. Otherwise it 
generated a
 new session id. Then the return value for the updateStateByKey function
 was a Tuple of session id and last timestamp.

 Then I joined the DStream with the session ids, which were both keyed 
off
 the same id and continued my processing. Your requirements may be
 different, but that’s what worked well for me.

 Another thing to consider is cleaning up old sessions by returning 
None in
 the updateStateByKey function. This will help with long running apps 
and
 minimize memory usage (and checkpoint size).

 I was using something similar to the method above on a live production
 stream with very little CPU and memory footprint, running for weeks at 
a
 time, processing up to 15M events per day with fluctuating traffic.

 Thanks,
 Silvio



 On 12/17/14, 10:07 PM, Pierce Lamb richard.pierce.l...@gmail.com
 wrote:

 I am trying to run stateful Spark Streaming computations over (fake)
 apache web server logs read from Kafka. The goal is to sessionize
 the web traffic similar to this blog post:

  
http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessioni
zat
 ion-with-spark-streaming-and-apache-hadoop/
 
 The only difference is that I want to sessionize each page the IP
 hits, instead of the entire session. I was able to do this reading
 from a file of fake web traffic using Spark in batch mode, but now I
 want to do it in a streaming context.
 
 Log files are read from Kafka and parsed into K/V pairs of
 
 (String, (String, Long, Long)) or
 
 (IP, (requestPage, time, time))
 
 I then call groupByKey() on this K/V pair. In batch mode, this would
 produce a:
 
 (String, CollectionBuffer((String, Long, Long), ...) or
 
 (IP, CollectionBuffer((requestPage, time, time), ...)
 
 In a StreamingContext, it produces a:
 
 (String, ArrayBuffer((String, Long, Long), ...) like so:
 
 (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
 
 However, as the next microbatch (DStream) arrives, this information is
 discarded. Ultimately what I want is for that ArrayBuffer to fill up
 over time as a given IP continues

Help with updateStateByKey

2014-12-17 Thread Pierce Lamb
I am trying to run stateful Spark Streaming computations over (fake)
apache web server logs read from Kafka. The goal is to sessionize
the web traffic similar to this blog post:
http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/

The only difference is that I want to sessionize each page the IP
hits, instead of the entire session. I was able to do this reading
from a file of fake web traffic using Spark in batch mode, but now I
want to do it in a streaming context.

Log files are read from Kafka and parsed into K/V pairs of

(String, (String, Long, Long)) or

(IP, (requestPage, time, time))

I then call groupByKey() on this K/V pair. In batch mode, this would
produce a:

(String, CollectionBuffer((String, Long, Long), ...) or

(IP, CollectionBuffer((requestPage, time, time), ...)

In a StreamingContext, it produces a:

(String, ArrayBuffer((String, Long, Long), ...) like so:

(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))

However, as the next microbatch (DStream) arrives, this information is
discarded. Ultimately what I want is for that ArrayBuffer to fill up
over time as a given IP continues to interact and to run some
computations on its data to sessionize the page time. I believe the
operator to make that happen is updateStateByKey. I'm having some
trouble with this operator (I'm new to both Spark  Scala); any help
is appreciated.

Thus far:

val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)


def updateGroupByKey(
  a: Seq[(String, ArrayBuffer[(String,
Long, Long)])],
  b: Option[(String, ArrayBuffer[(String,
Long, Long)])]
  ): Option[(String, ArrayBuffer[(String,
Long, Long)])] = {

  }

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org