Re: updateStateByKey performance & API
Hi Nikos, We experienced something similar in our setting where the Spark app was supposed to write to a Redis instance the final state changes. Over time the delay caused by re-writing the entire dataset in each iteration exceeded the Spark streaming batch size. In our cased the solution was to avoid updateStateByKey and persist the state directly to Redis. That of course means the join of the new keys to the old state needs to be done explicitly. I think the solution to this problem would be to just extend the Spark streaming API by having an alternative state update that instead of a cogroup does something like an inner join. Should be quite straightforward. I went ahead and added an issue: https://issues.apache.org/jira/browse/SPARK-6462 Note that this does not solve the problem when your state grows so large that merging in keys becomes a bottleneck (since that would require something like an IndexRDD). But in your case you mention serialization overhead to be the bottleneck, so maybe you could try filtering out unchanged keys before persisting the data? Just an idea.. Andre On 22/03/15 10:43, "Andre Schumacher" wrote: > > > > Forwarded Message ---- >Subject: Re: updateStateByKey performance & API >Date: Wed, 18 Mar 2015 13:06:15 +0200 >From: Nikos Viorres >To: Akhil Das >CC: user@spark.apache.org > >Hi Akhil, > >Yes, that's what we are planning on doing at the end of the data. At the >moment I am doing performance testing before the job hits production and >testing on 4 cores to get baseline figures and deduced that in order to >grow to 10 - 15 million keys we ll need at batch interval of ~20 secs if >we >don't want to allocate more than 8 cores on this job. The thing is that >since we have a big "silent" window on the user interactions where the >stream will have very few data we would like to be able to use these cores >for batch processing during that window but we can't the way it currently >works. > >best regards >n > >On Wed, Mar 18, 2015 at 12:40 PM, Akhil Das >wrote: > >> You can always throw more machines at this and see if the performance is >> increasing. Since you haven't mentioned anything regarding your # cores >>etc. >> >> Thanks >> Best Regards >> >> On Wed, Mar 18, 2015 at 11:42 AM, nvrs wrote: >> >>> Hi all, >>> >>> We are having a few issues with the performance of updateStateByKey >>> operation in Spark Streaming (1.2.1 at the moment) and any advice >>>would be >>> greatly appreciated. Specifically, on each tick of the system (which is >>> set >>> at 10 secs) we need to update a state tuple where the key is the >>>user_id >>> and >>> value an object with some state about the user. The problem is that >>>using >>> Kryo serialization for 5M users, this gets really slow to the point >>>that >>> we >>> have to increase the period to more than 10 seconds so as not to fall >>> behind. >>> The input for the streaming job is a Kafka stream which is consists of >>>key >>> value pairs of user_ids with some sort of action codes, we join this to >>> our >>> checkpointed state key and update the state. >>> I understand that the reason for iterating over the whole state set is >>>for >>> evicting items or updating state for everyone for time-depended >>> computations >>> but this does not apply on our situation and it hurts performance >>>really >>> bad. >>> Is there a possibility of implementing in the future and extra call in >>>the >>> API for updating only a specific subset of keys? >>> >>> p.s. i will try asap to setting the dstream as non-serialized but then >>>i >>> am >>> worried about GC and checkpointing performance >>> >>> >>> >>> -- >>> View this message in context: >>> >>>http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-per >>>formance-API-tp22113.html >>> Sent from the Apache Spark User List mailing list archive at >>>Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> > > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: updateStateByKey performance & API
Hi Akhil, Yes, that's what we are planning on doing at the end of the data. At the moment I am doing performance testing before the job hits production and testing on 4 cores to get baseline figures and deduced that in order to grow to 10 - 15 million keys we ll need at batch interval of ~20 secs if we don't want to allocate more than 8 cores on this job. The thing is that since we have a big "silent" window on the user interactions where the stream will have very few data we would like to be able to use these cores for batch processing during that window but we can't the way it currently works. best regards n On Wed, Mar 18, 2015 at 12:40 PM, Akhil Das wrote: > You can always throw more machines at this and see if the performance is > increasing. Since you haven't mentioned anything regarding your # cores etc. > > Thanks > Best Regards > > On Wed, Mar 18, 2015 at 11:42 AM, nvrs wrote: > >> Hi all, >> >> We are having a few issues with the performance of updateStateByKey >> operation in Spark Streaming (1.2.1 at the moment) and any advice would be >> greatly appreciated. Specifically, on each tick of the system (which is >> set >> at 10 secs) we need to update a state tuple where the key is the user_id >> and >> value an object with some state about the user. The problem is that using >> Kryo serialization for 5M users, this gets really slow to the point that >> we >> have to increase the period to more than 10 seconds so as not to fall >> behind. >> The input for the streaming job is a Kafka stream which is consists of key >> value pairs of user_ids with some sort of action codes, we join this to >> our >> checkpointed state key and update the state. >> I understand that the reason for iterating over the whole state set is for >> evicting items or updating state for everyone for time-depended >> computations >> but this does not apply on our situation and it hurts performance really >> bad. >> Is there a possibility of implementing in the future and extra call in the >> API for updating only a specific subset of keys? >> >> p.s. i will try asap to setting the dstream as non-serialized but then i >> am >> worried about GC and checkpointing performance >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-performance-API-tp22113.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: updateStateByKey performance & API
You can always throw more machines at this and see if the performance is increasing. Since you haven't mentioned anything regarding your # cores etc. Thanks Best Regards On Wed, Mar 18, 2015 at 11:42 AM, nvrs wrote: > Hi all, > > We are having a few issues with the performance of updateStateByKey > operation in Spark Streaming (1.2.1 at the moment) and any advice would be > greatly appreciated. Specifically, on each tick of the system (which is set > at 10 secs) we need to update a state tuple where the key is the user_id > and > value an object with some state about the user. The problem is that using > Kryo serialization for 5M users, this gets really slow to the point that we > have to increase the period to more than 10 seconds so as not to fall > behind. > The input for the streaming job is a Kafka stream which is consists of key > value pairs of user_ids with some sort of action codes, we join this to our > checkpointed state key and update the state. > I understand that the reason for iterating over the whole state set is for > evicting items or updating state for everyone for time-depended > computations > but this does not apply on our situation and it hurts performance really > bad. > Is there a possibility of implementing in the future and extra call in the > API for updating only a specific subset of keys? > > p.s. i will try asap to setting the dstream as non-serialized but then i am > worried about GC and checkpointing performance > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-performance-API-tp22113.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >