Re: Sessionization using updateStateByKey
On 15 Jul 2015, at 17:38, Cody Koeninger wrote: > An in-memory hash key data structure of some kind so that you're close to > linear on the number of items in a batch, not the number of outstanding keys. > That's more complex, because you have to deal with expiration for keys that > never get hit, and for unusually long sessions you have to either drop them > or hit durable storage. Thanks, yes. I do the expiration check already to terminate 'active' sessions and flush them to durable storage afterwards. Excuse my Newbie-State: when docing this with my own data structure (e.g. such a hash), where should I execute the code that periodically checks the hash? Right now I am doing that in updateStateByKey - should I rather use foreachRDD? And: if I understand you correctly, you are saying that updateStateByKey is more suitable for e.g. updating 'entities' of which a limited number exists (the users of the visits or the products sold). Yes? Jan > > Maybe someone has a better idea, I'd like to hear it. > > On Wed, Jul 15, 2015 at 8:54 AM, algermissen1971 > wrote: > Hi Cody, > > oh ... I though that was one of *the* use cases for it. Do you have a > suggestion / best practice how to achieve the same thing with better scaling > characteristics? > > Jan > > On 15 Jul 2015, at 15:33, Cody Koeninger wrote: > >> I personally would try to avoid updateStateByKey for sessionization when you >> have long sessions / a lot of keys, because it's linear on the number of >> keys. >> >> On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das wrote: >> [Apologies for repost, for those who have seen this response already in the >> dev mailing list] >> >> 1. When you set ssc.checkpoint(checkpointDir), the spark streaming >> periodically saves the state RDD (which is a snapshot of all the state data) >> to HDFS using RDD checkpointing. In fact, a streaming app with >> updateStateByKey will not start until you set checkpoint directory. >> >> 2. The updateStateByKey performance is sort of independent of the what is >> the source that is being use - receiver based or direct Kafka. The >> absolutely performance obvious depends on a LOT of variables, size of the >> cluster, parallelization, etc. The key things is that you must ensure >> sufficient parallelization at every stage - receiving, shuffles >> (updateStateByKey included), and output. >> >> Some more discussion in my talk - https://www.youtube.com/watch?v=d5UJonrruHk >> >> >> >> On Tue, Jul 14, 2015 at 4:13 PM, swetha wrote: >> >> Hi, >> >> I have a question regarding sessionization using updateStateByKey. If near >> real time state needs to be maintained in a Streaming application, what >> happens when the number of RDDs to maintain the state becomes very large? >> Does it automatically get saved to HDFS and reload when needed or do I have >> to use any code like ssc.checkpoint(checkpointDir)? Also, how is the >> performance if I use both DStream Checkpointing for maintaining the state >> and use Kafka Direct approach for exactly once semantics? >> >> >> Thanks, >> Swetha >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >> > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sessionization using updateStateByKey
Don't get me wrong, we've been able to use updateStateByKey for some jobs, and it's certainly convenient. At a certain point though, iterating through every key on every batch is a less viable solution. On Wed, Jul 15, 2015 at 12:38 PM, Sean McNamara wrote: > I would just like to add that we do the very same/similar thing > at Webtrends, updateStateByKey has been a life-saver for our sessionization > use-cases. > > Cheers, > > Sean > > > On Jul 15, 2015, at 11:20 AM, Silvio Fiorito < > silvio.fior...@granturing.com> wrote: > > Hi Cody, > > I’ve had success using updateStateByKey for real-time sessionization by > aging off timed-out sessions (returning None in the update function). This > was on a large commercial website with millions of hits per day. This was > over a year ago so I don’t have access to the stats any longer for length > of sessions unfortunately, but I seem to remember they were around 10-30 > minutes long. Even with peaks in volume, Spark managed to keep up very well. > > Thanks, > Silvio > > From: Cody Koeninger > Date: Wednesday, July 15, 2015 at 5:38 PM > To: algermissen1971 > Cc: Tathagata Das, swetha, user > Subject: Re: Sessionization using updateStateByKey > > An in-memory hash key data structure of some kind so that you're close > to linear on the number of items in a batch, not the number of outstanding > keys. That's more complex, because you have to deal with expiration for > keys that never get hit, and for unusually long sessions you have to either > drop them or hit durable storage. > > Maybe someone has a better idea, I'd like to hear it. > > On Wed, Jul 15, 2015 at 8:54 AM, algermissen1971 < > algermissen1...@icloud.com> wrote: > >> Hi Cody, >> >> oh ... I though that was one of *the* use cases for it. Do you have a >> suggestion / best practice how to achieve the same thing with better >> scaling characteristics? >> >> Jan >> >> On 15 Jul 2015, at 15:33, Cody Koeninger wrote: >> >> > I personally would try to avoid updateStateByKey for sessionization >> when you have long sessions / a lot of keys, because it's linear on the >> number of keys. >> > >> > On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das >> wrote: >> > [Apologies for repost, for those who have seen this response already in >> the dev mailing list] >> > >> > 1. When you set ssc.checkpoint(checkpointDir), the spark streaming >> periodically saves the state RDD (which is a snapshot of all the state >> data) to HDFS using RDD checkpointing. In fact, a streaming app with >> updateStateByKey will not start until you set checkpoint directory. >> > >> > 2. The updateStateByKey performance is sort of independent of the what >> is the source that is being use - receiver based or direct Kafka. The >> absolutely performance obvious depends on a LOT of variables, size of the >> cluster, parallelization, etc. The key things is that you must ensure >> sufficient parallelization at every stage - receiving, shuffles >> (updateStateByKey included), and output. >> > >> > Some more discussion in my talk - >> https://www.youtube.com/watch?v=d5UJonrruHk >> > >> > >> > >> > On Tue, Jul 14, 2015 at 4:13 PM, swetha >> wrote: >> > >> > Hi, >> > >> > I have a question regarding sessionization using updateStateByKey. If >> near >> > real time state needs to be maintained in a Streaming application, what >> > happens when the number of RDDs to maintain the state becomes very >> large? >> > Does it automatically get saved to HDFS and reload when needed or do I >> have >> > to use any code like ssc.checkpoint(checkpointDir)? Also, how is the >> > performance if I use both DStream Checkpointing for maintaining the >> state >> > and use Kafka Direct approach for exactly once semantics? >> > >> > >> > Thanks, >> > Swetha >> > >> > >> > >> > -- >> > View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html >> > Sent from the Apache Spark User List mailing list archive at Nabble.com >> . >> > >> > - >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> > For additional commands, e-mail: user-h...@spark.apache.org >> > >> > >> > >> >> > >
Re: Sessionization using updateStateByKey
I would just like to add that we do the very same/similar thing at Webtrends, updateStateByKey has been a life-saver for our sessionization use-cases. Cheers, Sean On Jul 15, 2015, at 11:20 AM, Silvio Fiorito mailto:silvio.fior...@granturing.com>> wrote: Hi Cody, I’ve had success using updateStateByKey for real-time sessionization by aging off timed-out sessions (returning None in the update function). This was on a large commercial website with millions of hits per day. This was over a year ago so I don’t have access to the stats any longer for length of sessions unfortunately, but I seem to remember they were around 10-30 minutes long. Even with peaks in volume, Spark managed to keep up very well. Thanks, Silvio From: Cody Koeninger Date: Wednesday, July 15, 2015 at 5:38 PM To: algermissen1971 Cc: Tathagata Das, swetha, user Subject: Re: Sessionization using updateStateByKey An in-memory hash key data structure of some kind so that you're close to linear on the number of items in a batch, not the number of outstanding keys. That's more complex, because you have to deal with expiration for keys that never get hit, and for unusually long sessions you have to either drop them or hit durable storage. Maybe someone has a better idea, I'd like to hear it. On Wed, Jul 15, 2015 at 8:54 AM, algermissen1971 mailto:algermissen1...@icloud.com>> wrote: Hi Cody, oh ... I though that was one of *the* use cases for it. Do you have a suggestion / best practice how to achieve the same thing with better scaling characteristics? Jan On 15 Jul 2015, at 15:33, Cody Koeninger mailto:c...@koeninger.org>> wrote: > I personally would try to avoid updateStateByKey for sessionization when you > have long sessions / a lot of keys, because it's linear on the number of keys. > > On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das > mailto:t...@databricks.com>> wrote: > [Apologies for repost, for those who have seen this response already in the > dev mailing list] > > 1. When you set ssc.checkpoint(checkpointDir), the spark streaming > periodically saves the state RDD (which is a snapshot of all the state data) > to HDFS using RDD checkpointing. In fact, a streaming app with > updateStateByKey will not start until you set checkpoint directory. > > 2. The updateStateByKey performance is sort of independent of the what is the > source that is being use - receiver based or direct Kafka. The absolutely > performance obvious depends on a LOT of variables, size of the cluster, > parallelization, etc. The key things is that you must ensure sufficient > parallelization at every stage - receiving, shuffles (updateStateByKey > included), and output. > > Some more discussion in my talk - https://www.youtube.com/watch?v=d5UJonrruHk > > > > On Tue, Jul 14, 2015 at 4:13 PM, swetha > mailto:swethakasire...@gmail.com>> wrote: > > Hi, > > I have a question regarding sessionization using updateStateByKey. If near > real time state needs to be maintained in a Streaming application, what > happens when the number of RDDs to maintain the state becomes very large? > Does it automatically get saved to HDFS and reload when needed or do I have > to use any code like ssc.checkpoint(checkpointDir)? Also, how is the > performance if I use both DStream Checkpointing for maintaining the state > and use Kafka Direct approach for exactly once semantics? > > > Thanks, > Swetha > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html > Sent from the Apache Spark User List mailing list archive at > Nabble.com<http://Nabble.com>. > > - > To unsubscribe, e-mail: > user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> > For additional commands, e-mail: > user-h...@spark.apache.org<mailto:user-h...@spark.apache.org> > > >
Re: Sessionization using updateStateByKey
Hi Cody, I’ve had success using updateStateByKey for real-time sessionization by aging off timed-out sessions (returning None in the update function). This was on a large commercial website with millions of hits per day. This was over a year ago so I don’t have access to the stats any longer for length of sessions unfortunately, but I seem to remember they were around 10-30 minutes long. Even with peaks in volume, Spark managed to keep up very well. Thanks, Silvio From: Cody Koeninger Date: Wednesday, July 15, 2015 at 5:38 PM To: algermissen1971 Cc: Tathagata Das, swetha, user Subject: Re: Sessionization using updateStateByKey An in-memory hash key data structure of some kind so that you're close to linear on the number of items in a batch, not the number of outstanding keys. That's more complex, because you have to deal with expiration for keys that never get hit, and for unusually long sessions you have to either drop them or hit durable storage. Maybe someone has a better idea, I'd like to hear it. On Wed, Jul 15, 2015 at 8:54 AM, algermissen1971 mailto:algermissen1...@icloud.com>> wrote: Hi Cody, oh ... I though that was one of *the* use cases for it. Do you have a suggestion / best practice how to achieve the same thing with better scaling characteristics? Jan On 15 Jul 2015, at 15:33, Cody Koeninger mailto:c...@koeninger.org>> wrote: > I personally would try to avoid updateStateByKey for sessionization when you > have long sessions / a lot of keys, because it's linear on the number of keys. > > On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das > mailto:t...@databricks.com>> wrote: > [Apologies for repost, for those who have seen this response already in the > dev mailing list] > > 1. When you set ssc.checkpoint(checkpointDir), the spark streaming > periodically saves the state RDD (which is a snapshot of all the state data) > to HDFS using RDD checkpointing. In fact, a streaming app with > updateStateByKey will not start until you set checkpoint directory. > > 2. The updateStateByKey performance is sort of independent of the what is the > source that is being use - receiver based or direct Kafka. The absolutely > performance obvious depends on a LOT of variables, size of the cluster, > parallelization, etc. The key things is that you must ensure sufficient > parallelization at every stage - receiving, shuffles (updateStateByKey > included), and output. > > Some more discussion in my talk - https://www.youtube.com/watch?v=d5UJonrruHk > > > > On Tue, Jul 14, 2015 at 4:13 PM, swetha > mailto:swethakasire...@gmail.com>> wrote: > > Hi, > > I have a question regarding sessionization using updateStateByKey. If near > real time state needs to be maintained in a Streaming application, what > happens when the number of RDDs to maintain the state becomes very large? > Does it automatically get saved to HDFS and reload when needed or do I have > to use any code like ssc.checkpoint(checkpointDir)? Also, how is the > performance if I use both DStream Checkpointing for maintaining the state > and use Kafka Direct approach for exactly once semantics? > > > Thanks, > Swetha > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: > user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> > For additional commands, e-mail: > user-h...@spark.apache.org<mailto:user-h...@spark.apache.org> > > >
Re: Sessionization using updateStateByKey
An in-memory hash key data structure of some kind so that you're close to linear on the number of items in a batch, not the number of outstanding keys. That's more complex, because you have to deal with expiration for keys that never get hit, and for unusually long sessions you have to either drop them or hit durable storage. Maybe someone has a better idea, I'd like to hear it. On Wed, Jul 15, 2015 at 8:54 AM, algermissen1971 wrote: > Hi Cody, > > oh ... I though that was one of *the* use cases for it. Do you have a > suggestion / best practice how to achieve the same thing with better > scaling characteristics? > > Jan > > On 15 Jul 2015, at 15:33, Cody Koeninger wrote: > > > I personally would try to avoid updateStateByKey for sessionization when > you have long sessions / a lot of keys, because it's linear on the number > of keys. > > > > On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das > wrote: > > [Apologies for repost, for those who have seen this response already in > the dev mailing list] > > > > 1. When you set ssc.checkpoint(checkpointDir), the spark streaming > periodically saves the state RDD (which is a snapshot of all the state > data) to HDFS using RDD checkpointing. In fact, a streaming app with > updateStateByKey will not start until you set checkpoint directory. > > > > 2. The updateStateByKey performance is sort of independent of the what > is the source that is being use - receiver based or direct Kafka. The > absolutely performance obvious depends on a LOT of variables, size of the > cluster, parallelization, etc. The key things is that you must ensure > sufficient parallelization at every stage - receiving, shuffles > (updateStateByKey included), and output. > > > > Some more discussion in my talk - > https://www.youtube.com/watch?v=d5UJonrruHk > > > > > > > > On Tue, Jul 14, 2015 at 4:13 PM, swetha > wrote: > > > > Hi, > > > > I have a question regarding sessionization using updateStateByKey. If > near > > real time state needs to be maintained in a Streaming application, what > > happens when the number of RDDs to maintain the state becomes very large? > > Does it automatically get saved to HDFS and reload when needed or do I > have > > to use any code like ssc.checkpoint(checkpointDir)? Also, how is the > > performance if I use both DStream Checkpointing for maintaining the state > > and use Kafka Direct approach for exactly once semantics? > > > > > > Thanks, > > Swetha > > > > > > > > -- > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > - > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > For additional commands, e-mail: user-h...@spark.apache.org > > > > > > > >
Re: Sessionization using updateStateByKey
Hi Cody, oh ... I though that was one of *the* use cases for it. Do you have a suggestion / best practice how to achieve the same thing with better scaling characteristics? Jan On 15 Jul 2015, at 15:33, Cody Koeninger wrote: > I personally would try to avoid updateStateByKey for sessionization when you > have long sessions / a lot of keys, because it's linear on the number of keys. > > On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das wrote: > [Apologies for repost, for those who have seen this response already in the > dev mailing list] > > 1. When you set ssc.checkpoint(checkpointDir), the spark streaming > periodically saves the state RDD (which is a snapshot of all the state data) > to HDFS using RDD checkpointing. In fact, a streaming app with > updateStateByKey will not start until you set checkpoint directory. > > 2. The updateStateByKey performance is sort of independent of the what is the > source that is being use - receiver based or direct Kafka. The absolutely > performance obvious depends on a LOT of variables, size of the cluster, > parallelization, etc. The key things is that you must ensure sufficient > parallelization at every stage - receiving, shuffles (updateStateByKey > included), and output. > > Some more discussion in my talk - https://www.youtube.com/watch?v=d5UJonrruHk > > > > On Tue, Jul 14, 2015 at 4:13 PM, swetha wrote: > > Hi, > > I have a question regarding sessionization using updateStateByKey. If near > real time state needs to be maintained in a Streaming application, what > happens when the number of RDDs to maintain the state becomes very large? > Does it automatically get saved to HDFS and reload when needed or do I have > to use any code like ssc.checkpoint(checkpointDir)? Also, how is the > performance if I use both DStream Checkpointing for maintaining the state > and use Kafka Direct approach for exactly once semantics? > > > Thanks, > Swetha > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sessionization using updateStateByKey
I personally would try to avoid updateStateByKey for sessionization when you have long sessions / a lot of keys, because it's linear on the number of keys. On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das wrote: > [Apologies for repost, for those who have seen this response already in > the dev mailing list] > > 1. When you set ssc.checkpoint(checkpointDir), the spark streaming > periodically saves the state RDD (which is a snapshot of all the state > data) to HDFS using RDD checkpointing. In fact, a streaming app with > updateStateByKey will not start until you set checkpoint directory. > > 2. The updateStateByKey performance is sort of independent of the what is > the source that is being use - receiver based or direct Kafka. The > absolutely performance obvious depends on a LOT of variables, size of the > cluster, parallelization, etc. The key things is that you must ensure > sufficient parallelization at every stage - receiving, shuffles > (updateStateByKey included), and output. > > Some more discussion in my talk - > https://www.youtube.com/watch?v=d5UJonrruHk > > > > On Tue, Jul 14, 2015 at 4:13 PM, swetha wrote: > >> >> Hi, >> >> I have a question regarding sessionization using updateStateByKey. If near >> real time state needs to be maintained in a Streaming application, what >> happens when the number of RDDs to maintain the state becomes very large? >> Does it automatically get saved to HDFS and reload when needed or do I >> have >> to use any code like ssc.checkpoint(checkpointDir)? Also, how is the >> performance if I use both DStream Checkpointing for maintaining the state >> and use Kafka Direct approach for exactly once semantics? >> >> >> Thanks, >> Swetha >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: Sessionization using updateStateByKey
[Apologies for repost, for those who have seen this response already in the dev mailing list] 1. When you set ssc.checkpoint(checkpointDir), the spark streaming periodically saves the state RDD (which is a snapshot of all the state data) to HDFS using RDD checkpointing. In fact, a streaming app with updateStateByKey will not start until you set checkpoint directory. 2. The updateStateByKey performance is sort of independent of the what is the source that is being use - receiver based or direct Kafka. The absolutely performance obvious depends on a LOT of variables, size of the cluster, parallelization, etc. The key things is that you must ensure sufficient parallelization at every stage - receiving, shuffles (updateStateByKey included), and output. Some more discussion in my talk - https://www.youtube.com/watch?v=d5UJonrruHk On Tue, Jul 14, 2015 at 4:13 PM, swetha wrote: > > Hi, > > I have a question regarding sessionization using updateStateByKey. If near > real time state needs to be maintained in a Streaming application, what > happens when the number of RDDs to maintain the state becomes very large? > Does it automatically get saved to HDFS and reload when needed or do I have > to use any code like ssc.checkpoint(checkpointDir)? Also, how is the > performance if I use both DStream Checkpointing for maintaining the state > and use Kafka Direct approach for exactly once semantics? > > > Thanks, > Swetha > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Sessionization using updateStateByKey
Hi, I have a question regarding sessionization using updateStateByKey. If near real time state needs to be maintained in a Streaming application, what happens when the number of RDDs to maintain the state becomes very large? Does it automatically get saved to HDFS and reload when needed or do I have to use any code like ssc.checkpoint(checkpointDir)? Also, how is the performance if I use both DStream Checkpointing for maintaining the state and use Kafka Direct approach for exactly once semantics? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org