Re: updateStateByKey performance API

2015-03-23 Thread Andre Schumacher

Hi Nikos,

We experienced something similar in our setting where the Spark app was
supposed to write to a Redis instance the final state changes. Over time
the delay caused by re-writing the entire dataset in each iteration
exceeded the Spark streaming batch size.

In our cased the solution was to avoid updateStateByKey and persist the
state directly to Redis. That of course means the join of the new keys to
the old state needs to be done explicitly. I think the solution to this
problem would be to just extend the Spark streaming API by having an
alternative state update that instead of a cogroup does something like an
inner join. Should be quite straightforward.

I went ahead and added an issue:
https://issues.apache.org/jira/browse/SPARK-6462


Note that this does not solve the problem when your state grows so large
that merging in keys becomes a bottleneck (since that would require
something like an IndexRDD). But in your case you mention serialization
overhead to be the bottleneck, so maybe you could try filtering out
unchanged keys before persisting the data? Just an idea..

Andre



On 22/03/15 10:43, Andre Schumacher andre.sc...@gmail.com wrote:




 Forwarded Message 
Subject: Re: updateStateByKey performance  API
Date: Wed, 18 Mar 2015 13:06:15 +0200
From: Nikos Viorres nvior...@gmail.com
To: Akhil Das ak...@sigmoidanalytics.com
CC: user@spark.apache.org user@spark.apache.org

Hi Akhil,

Yes, that's what we are planning on doing at the end of the data. At the
moment I am doing performance testing before the job hits production and
testing on 4 cores to get baseline figures and deduced that in order to
grow to 10 - 15 million keys we ll need at batch interval of ~20 secs if
we
don't want to allocate more than 8 cores on this job. The thing is that
since we have a big silent window on the user interactions where the
stream will have very few data we would like to be able to use these cores
for batch processing during that window but we can't the way it currently
works.

best regards
n

On Wed, Mar 18, 2015 at 12:40 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You can always throw more machines at this and see if the performance is
 increasing. Since you haven't mentioned anything regarding your # cores
etc.

 Thanks
 Best Regards

 On Wed, Mar 18, 2015 at 11:42 AM, nvrs nvior...@gmail.com wrote:

 Hi all,

 We are having a few issues with the performance of updateStateByKey
 operation in Spark Streaming (1.2.1 at the moment) and any advice
would be
 greatly appreciated. Specifically, on each tick of the system (which is
 set
 at 10 secs) we need to update a state tuple where the key is the
user_id
 and
 value an object with some state about the user. The problem is that
using
 Kryo serialization for 5M users, this gets really slow to the point
that
 we
 have to increase the period to more than 10 seconds so as not to fall
 behind.
 The input for the streaming job is a Kafka stream which is consists of
key
 value pairs of user_ids with some sort of action codes, we join this to
 our
 checkpointed state key and update the state.
 I understand that the reason for iterating over the whole state set is
for
 evicting items or updating state for everyone for time-depended
 computations
 but this does not apply on our situation and it hurts performance
really
 bad.
 Is there a possibility of implementing in the future and extra call in
the
 API for updating only a specific subset of keys?

 p.s. i will try asap to setting the dstream as non-serialized but then
i
 am
 worried about GC and checkpointing performance



 --
 View this message in context:
 
http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-per
formance-API-tp22113.html
 Sent from the Apache Spark User List mailing list archive at
Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: updateStateByKey performance API

2015-03-18 Thread Akhil Das
You can always throw more machines at this and see if the performance is
increasing. Since you haven't mentioned anything regarding your # cores etc.

Thanks
Best Regards

On Wed, Mar 18, 2015 at 11:42 AM, nvrs nvior...@gmail.com wrote:

 Hi all,

 We are having a few issues with the performance of updateStateByKey
 operation in Spark Streaming (1.2.1 at the moment) and any advice would be
 greatly appreciated. Specifically, on each tick of the system (which is set
 at 10 secs) we need to update a state tuple where the key is the user_id
 and
 value an object with some state about the user. The problem is that using
 Kryo serialization for 5M users, this gets really slow to the point that we
 have to increase the period to more than 10 seconds so as not to fall
 behind.
 The input for the streaming job is a Kafka stream which is consists of key
 value pairs of user_ids with some sort of action codes, we join this to our
 checkpointed state key and update the state.
 I understand that the reason for iterating over the whole state set is for
 evicting items or updating state for everyone for time-depended
 computations
 but this does not apply on our situation and it hurts performance really
 bad.
 Is there a possibility of implementing in the future and extra call in the
 API for updating only a specific subset of keys?

 p.s. i will try asap to setting the dstream as non-serialized but then i am
 worried about GC and checkpointing performance



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-performance-API-tp22113.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: updateStateByKey performance API

2015-03-18 Thread Nikos Viorres
Hi Akhil,

Yes, that's what we are planning on doing at the end of the data. At the
moment I am doing performance testing before the job hits production and
testing on 4 cores to get baseline figures and deduced that in order to
grow to 10 - 15 million keys we ll need at batch interval of ~20 secs if we
don't want to allocate more than 8 cores on this job. The thing is that
since we have a big silent window on the user interactions where the
stream will have very few data we would like to be able to use these cores
for batch processing during that window but we can't the way it currently
works.

best regards
n

On Wed, Mar 18, 2015 at 12:40 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You can always throw more machines at this and see if the performance is
 increasing. Since you haven't mentioned anything regarding your # cores etc.

 Thanks
 Best Regards

 On Wed, Mar 18, 2015 at 11:42 AM, nvrs nvior...@gmail.com wrote:

 Hi all,

 We are having a few issues with the performance of updateStateByKey
 operation in Spark Streaming (1.2.1 at the moment) and any advice would be
 greatly appreciated. Specifically, on each tick of the system (which is
 set
 at 10 secs) we need to update a state tuple where the key is the user_id
 and
 value an object with some state about the user. The problem is that using
 Kryo serialization for 5M users, this gets really slow to the point that
 we
 have to increase the period to more than 10 seconds so as not to fall
 behind.
 The input for the streaming job is a Kafka stream which is consists of key
 value pairs of user_ids with some sort of action codes, we join this to
 our
 checkpointed state key and update the state.
 I understand that the reason for iterating over the whole state set is for
 evicting items or updating state for everyone for time-depended
 computations
 but this does not apply on our situation and it hurts performance really
 bad.
 Is there a possibility of implementing in the future and extra call in the
 API for updating only a specific subset of keys?

 p.s. i will try asap to setting the dstream as non-serialized but then i
 am
 worried about GC and checkpointing performance



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-performance-API-tp22113.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





updateStateByKey performance / API

2015-03-18 Thread Nikos Viorres
Hi all,

We are having a few issues with the performance of updateStateByKey
operation in Spark Streaming (1.2.1 at the moment) and any advice would be
greatly appreciated. Specifically, on each tick of the system (which is set
at 10 secs) we need to update a state tuple where the key is the user_id
and value an object with some state about the user. The problem is that
using Kryo serialization for 5M users, this gets really slow to the point
that we have to increase the period to more than 10 seconds so as not to
fall behind.
The input for the streaming job is a Kafka stream which is consists of key
value pairs of user_ids with some sort of action codes, we join this to our
checkpointed state key and update the state.
I understand that the reason for iterating over the whole state set is for
evicting items or updating state for everyone for time-depended
computations but this does not apply on our situation and it hurts
performance really bad.
Is there a possibility of implementing in the future and extra call in the
API for updating only a specific subset of keys?

p.s. i will try asap to setting the dstream as non-serialized but then i am
worried about GC and checkpointing performance


updateStateByKey performance API

2015-03-18 Thread nvrs
Hi all,

We are having a few issues with the performance of updateStateByKey
operation in Spark Streaming (1.2.1 at the moment) and any advice would be
greatly appreciated. Specifically, on each tick of the system (which is set
at 10 secs) we need to update a state tuple where the key is the user_id and
value an object with some state about the user. The problem is that using
Kryo serialization for 5M users, this gets really slow to the point that we
have to increase the period to more than 10 seconds so as not to fall
behind. 
The input for the streaming job is a Kafka stream which is consists of key
value pairs of user_ids with some sort of action codes, we join this to our
checkpointed state key and update the state.
I understand that the reason for iterating over the whole state set is for
evicting items or updating state for everyone for time-depended computations
but this does not apply on our situation and it hurts performance really
bad. 
Is there a possibility of implementing in the future and extra call in the
API for updating only a specific subset of keys?

p.s. i will try asap to setting the dstream as non-serialized but then i am
worried about GC and checkpointing performance



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-performance-API-tp22113.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org