Stateful RDDs?

2014-07-10 Thread Sargun Dhillon
So, one portion of our Spark streaming application requires some
state. Our application takes a bunch of application events (i.e.
user_session_started, user_session_ended, etc..), and calculates out
metrics from these, and writes them to a serving layer (see: Lambda
Architecture). Two related events can be ingested into the streaming
context moments apart, or time inderminate. Given this, and the fact
that our normal windows pump data out every 500-1 ms, with a step
of 500ms, you might end up with two related pieces of data across two
windows. In order to work around this, we go ahead and do
updateStateByKey to persist state, as opposed to persisting key
intermediate state in some external system, as building a system to
handle the complexities of (concurrent, idempotent) updates, as well
as ensure scalability is non-trivial.

The questions I have around this, is even in a highly-partitionable
dataset, what's the upper "scalability" limits with stateful dstreams?
If I have a dataset, starting at around 10-million keys, growing at
that rate monthly, what are the complexities within? Most of the data
is cold. I realize that I can remove data from the stateful dstream,
by sending (key, null) to it, but there is not necessarily an easy way
of knowing when the last update is coming in (unless there is some way
in spark of saying, "Wait N windows, and send this tuple" or "Wait
until all keys in the upstream Dstreams smaller than M are processed"
before sending such a tuple. Additionally, given that my data is
partitionable by datetime, does it make sense to have a custom
datetime partitioner, and just persist the dstream to disk, to ensure
that its RDDs are only pulled off of disk (into memory) occasionally?
What's the cost of having a bunch of relatively large, stateful RDDs
around on disk? Does Spark have to load / deserialize the entire RDD
to update one key?


Re: Stateful RDDs?

2014-07-14 Thread Tathagata Das
Trying answer your questions as concisely as possible

1. In the current implementation, the entire state RDD needs to loaded for
any update. It is a known limitation, that we want to overcome in the
future. Therefore the state Dstream should not be persisted to disk as all
the data in the state RDDs are touched in every batch. Since spark
streaming is not really a dedicated data store, its not really designed to
separate out hot data and cold data.
2. For each key, in the state you could maintain a timestamp of when it was
updated and accordingly return None to filter that state out. Regarding
filtering by the minimum key, there may be a way to periodically figure out
the minimum key at the driver, then propagate out that information to the
executors (update a static variable in the executors) and use that to
filter out the keys.

Hope this helps.

TD



On Thu, Jul 10, 2014 at 10:25 AM, Sargun Dhillon  wrote:

> So, one portion of our Spark streaming application requires some
> state. Our application takes a bunch of application events (i.e.
> user_session_started, user_session_ended, etc..), and calculates out
> metrics from these, and writes them to a serving layer (see: Lambda
> Architecture). Two related events can be ingested into the streaming
> context moments apart, or time inderminate. Given this, and the fact
> that our normal windows pump data out every 500-1 ms, with a step
> of 500ms, you might end up with two related pieces of data across two
> windows. In order to work around this, we go ahead and do
> updateStateByKey to persist state, as opposed to persisting key
> intermediate state in some external system, as building a system to
> handle the complexities of (concurrent, idempotent) updates, as well
> as ensure scalability is non-trivial.
>
> The questions I have around this, is even in a highly-partitionable
> dataset, what's the upper "scalability" limits with stateful dstreams?
> If I have a dataset, starting at around 10-million keys, growing at
> that rate monthly, what are the complexities within? Most of the data
> is cold. I realize that I can remove data from the stateful dstream,
> by sending (key, null) to it, but there is not necessarily an easy way
> of knowing when the last update is coming in (unless there is some way
> in spark of saying, "Wait N windows, and send this tuple" or "Wait
> until all keys in the upstream Dstreams smaller than M are processed"
> before sending such a tuple. Additionally, given that my data is
> partitionable by datetime, does it make sense to have a custom
> datetime partitioner, and just persist the dstream to disk, to ensure
> that its RDDs are only pulled off of disk (into memory) occasionally?
> What's the cost of having a bunch of relatively large, stateful RDDs
> around on disk? Does Spark have to load / deserialize the entire RDD
> to update one key?
>