[Spark Streaming] Help with updateStateByKey()
Hi everybody, I think I could use some help with the /updateStateByKey()/ JAVA method in Spark Streaming. *Context:* I have a /JavaReceiverInputDStreamDataUpdate du/ DStream, where object /DataUpdate/ mainly has 2 fields of interest (in my case), namely du.personId (an Integer) and du.cell.hashCode() (Integer, again). Obviously, I am processing several /DataUpdate/ objects (coming from a log file read in microbatches), and every /personId/ will be 'associated' to several /du.cell.hashCode()/s. What I need to do is, for every /personId/ statefully counting how many times it appears with a particular /du.cell.hashCode()/, possibly partitioning by the /personId/ key. (Long story short: an area is split in cells and I wonder how many times every person appears in every cell ) In a very naive way, I guess everything should look like a /HashMappersonId, HashMaplt;cell.hashCode(), count/, but I am not quite sure how to partition by /personId/ and increase the count. It looks like method /updateStateByKey()/ should do the trick (I am new to Spark Streaming), yet I can't figure out in which way. Any suggestions? Feel free to ask anything in case I was unclear or more information is needed. :) Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Help-with-updateStateByKey-tp22637.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help with updateStateByKey
Another point to start playing with updateStateByKey is the example StatefulNetworkWordCount. See the streaming examples directory in the Spark repository. TD On Thu, Dec 18, 2014 at 6:07 AM, Pierce Lamb richard.pierce.l...@gmail.com wrote: I am trying to run stateful Spark Streaming computations over (fake) apache web server logs read from Kafka. The goal is to sessionize the web traffic similar to this blog post: http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ The only difference is that I want to sessionize each page the IP hits, instead of the entire session. I was able to do this reading from a file of fake web traffic using Spark in batch mode, but now I want to do it in a streaming context. Log files are read from Kafka and parsed into K/V pairs of (String, (String, Long, Long)) or (IP, (requestPage, time, time)) I then call groupByKey() on this K/V pair. In batch mode, this would produce a: (String, CollectionBuffer((String, Long, Long), ...) or (IP, CollectionBuffer((requestPage, time, time), ...) In a StreamingContext, it produces a: (String, ArrayBuffer((String, Long, Long), ...) like so: (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) However, as the next microbatch (DStream) arrives, this information is discarded. Ultimately what I want is for that ArrayBuffer to fill up over time as a given IP continues to interact and to run some computations on its data to sessionize the page time. I believe the operator to make that happen is updateStateByKey. I'm having some trouble with this operator (I'm new to both Spark Scala); any help is appreciated. Thus far: val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) def updateGroupByKey( a: Seq[(String, ArrayBuffer[(String, Long, Long)])], b: Option[(String, ArrayBuffer[(String, Long, Long)])] ): Option[(String, ArrayBuffer[(String, Long, Long)])] = { } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help with updateStateByKey
Hi Pierce, You shouldn’t have to use groupByKey because updateStateByKey will get a Seq of all the values for that key already. I used that for realtime sessionization as well. What I did was key my incoming events, then send them to udpateStateByKey. The updateStateByKey function then received a Seq of the events and the Option of the previous state for that key. The sessionization code then did its thing to check if the incoming events were part of the same session, based on a configured timeout. If a session already was active (from the previous state) and it hadn’t exceeded the timeout, it used that value. Otherwise it generated a new session id. Then the return value for the updateStateByKey function was a Tuple of session id and last timestamp. Then I joined the DStream with the session ids, which were both keyed off the same id and continued my processing. Your requirements may be different, but that’s what worked well for me. Another thing to consider is cleaning up old sessions by returning None in the updateStateByKey function. This will help with long running apps and minimize memory usage (and checkpoint size). I was using something similar to the method above on a live production stream with very little CPU and memory footprint, running for weeks at a time, processing up to 15M events per day with fluctuating traffic. Thanks, Silvio On 12/17/14, 10:07 PM, Pierce Lamb richard.pierce.l...@gmail.com wrote: I am trying to run stateful Spark Streaming computations over (fake) apache web server logs read from Kafka. The goal is to sessionize the web traffic similar to this blog post: http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat ion-with-spark-streaming-and-apache-hadoop/ The only difference is that I want to sessionize each page the IP hits, instead of the entire session. I was able to do this reading from a file of fake web traffic using Spark in batch mode, but now I want to do it in a streaming context. Log files are read from Kafka and parsed into K/V pairs of (String, (String, Long, Long)) or (IP, (requestPage, time, time)) I then call groupByKey() on this K/V pair. In batch mode, this would produce a: (String, CollectionBuffer((String, Long, Long), ...) or (IP, CollectionBuffer((requestPage, time, time), ...) In a StreamingContext, it produces a: (String, ArrayBuffer((String, Long, Long), ...) like so: (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) However, as the next microbatch (DStream) arrives, this information is discarded. Ultimately what I want is for that ArrayBuffer to fill up over time as a given IP continues to interact and to run some computations on its data to sessionize the page time. I believe the operator to make that happen is updateStateByKey. I'm having some trouble with this operator (I'm new to both Spark Scala); any help is appreciated. Thus far: val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) def updateGroupByKey( a: Seq[(String, ArrayBuffer[(String, Long, Long)])], b: Option[(String, ArrayBuffer[(String, Long, Long)])] ): Option[(String, ArrayBuffer[(String, Long, Long)])] = { } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help with updateStateByKey
This produces the expected output, thank you! On Thu, Dec 18, 2014 at 12:11 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Ok, I have a better idea of what you’re trying to do now. I think the prob might be the map. The first time the function runs, currentValue will be None. Using map on None returns None. Instead, try: Some(currentValue.getOrElse(Seq.empty) ++ newValues) I think that should give you the expected result. From: Pierce Lamb richard.pierce.l...@gmail.com Date: Thursday, December 18, 2014 at 2:31 PM To: Silvio Fiorito silvio.fior...@granturing.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Help with updateStateByKey Hi Silvio, This is a great suggestion (I wanted to get rid of groupByKey), I have been trying to implement it this morning, but having some trouble. I would love to see your code for the function that goes inside updateStateByKey Here is my current code: def updateGroupByKey( newValues: Seq[(String, Long, Long)], currentValue: Option[Seq[(String, Long, Long)]] ): Option[Seq[(String, Long, Long)]] = { currentValue.map{ case (v) = v ++ newValues } } val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey) However, when I run it the grouped DStream doesn't get populated with anything. The issue is probably that currentValue is not actually an Option[Seq[triple]] but rather an Option[triple]. However if I change it to an Option[triple] then I have to also return an Option[triple] for updateStateByKey to compile, but I want that return value to be an Option[Seq[triple]] because ultimately i want the data to look like (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested, startTime, EndTime)...]) and have that Seq build over time Am I thinking about this wrong? Thank you On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito silvio.fior...@granturing.com wrote: Hi Pierce, You shouldn’t have to use groupByKey because updateStateByKey will get a Seq of all the values for that key already. I used that for realtime sessionization as well. What I did was key my incoming events, then send them to udpateStateByKey. The updateStateByKey function then received a Seq of the events and the Option of the previous state for that key. The sessionization code then did its thing to check if the incoming events were part of the same session, based on a configured timeout. If a session already was active (from the previous state) and it hadn’t exceeded the timeout, it used that value. Otherwise it generated a new session id. Then the return value for the updateStateByKey function was a Tuple of session id and last timestamp. Then I joined the DStream with the session ids, which were both keyed off the same id and continued my processing. Your requirements may be different, but that’s what worked well for me. Another thing to consider is cleaning up old sessions by returning None in the updateStateByKey function. This will help with long running apps and minimize memory usage (and checkpoint size). I was using something similar to the method above on a live production stream with very little CPU and memory footprint, running for weeks at a time, processing up to 15M events per day with fluctuating traffic. Thanks, Silvio On 12/17/14, 10:07 PM, Pierce Lamb richard.pierce.l...@gmail.com wrote: I am trying to run stateful Spark Streaming computations over (fake) apache web server logs read from Kafka. The goal is to sessionize the web traffic similar to this blog post: http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionizat ion-with-spark-streaming-and-apache-hadoop/ The only difference is that I want to sessionize each page the IP hits, instead of the entire session. I was able to do this reading from a file of fake web traffic using Spark in batch mode, but now I want to do it in a streaming context. Log files are read from Kafka and parsed into K/V pairs of (String, (String, Long, Long)) or (IP, (requestPage, time, time)) I then call groupByKey() on this K/V pair. In batch mode, this would produce a: (String, CollectionBuffer((String, Long, Long), ...) or (IP, CollectionBuffer((requestPage, time, time), ...) In a StreamingContext, it produces a: (String, ArrayBuffer((String, Long, Long), ...) like so: (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) However, as the next microbatch (DStream) arrives, this information is discarded. Ultimately what I want is for that ArrayBuffer to fill up over time as a given IP continues to interact and to run some computations on its data to sessionize the page time. I believe the operator to make that happen is updateStateByKey. I'm having some trouble with this operator (I'm new to both Spark Scala); any help is appreciated. Thus far: val grouped = ipTimeStamp.groupByKey
Re: Help with updateStateByKey
Great, glad it worked out! Just keep an eye on memory usage as you roll it out. Like I said before, if you’ll be running this 24/7 consider cleaning up sessions by returning None after some sort of timeout. On 12/18/14, 8:25 PM, Pierce Lamb richard.pierce.l...@gmail.com wrote: This produces the expected output, thank you! On Thu, Dec 18, 2014 at 12:11 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Ok, I have a better idea of what you’re trying to do now. I think the prob might be the map. The first time the function runs, currentValue will be None. Using map on None returns None. Instead, try: Some(currentValue.getOrElse(Seq.empty) ++ newValues) I think that should give you the expected result. From: Pierce Lamb richard.pierce.l...@gmail.com Date: Thursday, December 18, 2014 at 2:31 PM To: Silvio Fiorito silvio.fior...@granturing.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Help with updateStateByKey Hi Silvio, This is a great suggestion (I wanted to get rid of groupByKey), I have been trying to implement it this morning, but having some trouble. I would love to see your code for the function that goes inside updateStateByKey Here is my current code: def updateGroupByKey( newValues: Seq[(String, Long, Long)], currentValue: Option[Seq[(String, Long, Long)]] ): Option[Seq[(String, Long, Long)]] = { currentValue.map{ case (v) = v ++ newValues } } val grouped = ipTimeStamp.updateStateByKey(updateGroupByKey) However, when I run it the grouped DStream doesn't get populated with anything. The issue is probably that currentValue is not actually an Option[Seq[triple]] but rather an Option[triple]. However if I change it to an Option[triple] then I have to also return an Option[triple] for updateStateByKey to compile, but I want that return value to be an Option[Seq[triple]] because ultimately i want the data to look like (IPaddress, Seq[(pageRequested, startTime, EndTime), (pageRequested, startTime, EndTime)...]) and have that Seq build over time Am I thinking about this wrong? Thank you On Thu, Dec 18, 2014 at 6:05 AM, Silvio Fiorito silvio.fior...@granturing.com wrote: Hi Pierce, You shouldn’t have to use groupByKey because updateStateByKey will get a Seq of all the values for that key already. I used that for realtime sessionization as well. What I did was key my incoming events, then send them to udpateStateByKey. The updateStateByKey function then received a Seq of the events and the Option of the previous state for that key. The sessionization code then did its thing to check if the incoming events were part of the same session, based on a configured timeout. If a session already was active (from the previous state) and it hadn’t exceeded the timeout, it used that value. Otherwise it generated a new session id. Then the return value for the updateStateByKey function was a Tuple of session id and last timestamp. Then I joined the DStream with the session ids, which were both keyed off the same id and continued my processing. Your requirements may be different, but that’s what worked well for me. Another thing to consider is cleaning up old sessions by returning None in the updateStateByKey function. This will help with long running apps and minimize memory usage (and checkpoint size). I was using something similar to the method above on a live production stream with very little CPU and memory footprint, running for weeks at a time, processing up to 15M events per day with fluctuating traffic. Thanks, Silvio On 12/17/14, 10:07 PM, Pierce Lamb richard.pierce.l...@gmail.com wrote: I am trying to run stateful Spark Streaming computations over (fake) apache web server logs read from Kafka. The goal is to sessionize the web traffic similar to this blog post: http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessioni zat ion-with-spark-streaming-and-apache-hadoop/ The only difference is that I want to sessionize each page the IP hits, instead of the entire session. I was able to do this reading from a file of fake web traffic using Spark in batch mode, but now I want to do it in a streaming context. Log files are read from Kafka and parsed into K/V pairs of (String, (String, Long, Long)) or (IP, (requestPage, time, time)) I then call groupByKey() on this K/V pair. In batch mode, this would produce a: (String, CollectionBuffer((String, Long, Long), ...) or (IP, CollectionBuffer((requestPage, time, time), ...) In a StreamingContext, it produces a: (String, ArrayBuffer((String, Long, Long), ...) like so: (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) However, as the next microbatch (DStream) arrives, this information is discarded. Ultimately what I want is for that ArrayBuffer to fill up over time as a given IP continues
Help with updateStateByKey
I am trying to run stateful Spark Streaming computations over (fake) apache web server logs read from Kafka. The goal is to sessionize the web traffic similar to this blog post: http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ The only difference is that I want to sessionize each page the IP hits, instead of the entire session. I was able to do this reading from a file of fake web traffic using Spark in batch mode, but now I want to do it in a streaming context. Log files are read from Kafka and parsed into K/V pairs of (String, (String, Long, Long)) or (IP, (requestPage, time, time)) I then call groupByKey() on this K/V pair. In batch mode, this would produce a: (String, CollectionBuffer((String, Long, Long), ...) or (IP, CollectionBuffer((requestPage, time, time), ...) In a StreamingContext, it produces a: (String, ArrayBuffer((String, Long, Long), ...) like so: (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) However, as the next microbatch (DStream) arrives, this information is discarded. Ultimately what I want is for that ArrayBuffer to fill up over time as a given IP continues to interact and to run some computations on its data to sessionize the page time. I believe the operator to make that happen is updateStateByKey. I'm having some trouble with this operator (I'm new to both Spark Scala); any help is appreciated. Thus far: val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) def updateGroupByKey( a: Seq[(String, ArrayBuffer[(String, Long, Long)])], b: Option[(String, ArrayBuffer[(String, Long, Long)])] ): Option[(String, ArrayBuffer[(String, Long, Long)])] = { } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org