[2/2] spark git commit: [SPARK-11290][STREAMING] Basic implementation of trackStateByKey

2015-11-10 Thread tdas
[SPARK-11290][STREAMING] Basic implementation of trackStateByKey

Current updateStateByKey provides stateful processing in Spark Streaming. It 
allows the user to maintain per-key state and manage that state using an 
updateFunction. The updateFunction is called for each key, and it uses new data 
and existing state of the key, to generate an updated state. However, based on 
community feedback, we have learnt the following lessons.
* Need for more optimized state management that does not scan every key
* Need to make it easier to implement common use cases - (a) timeout of idle 
data, (b) returning items other than state

The high level idea that of this PR
* Introduce a new API trackStateByKey that, allows the user to update per-key 
state, and emit arbitrary records. The new API is necessary as this will have 
significantly different semantics than the existing updateStateByKey API. This 
API will have direct support for timeouts.
* Internally, the system will keep the state data as a map/list within the 
partitions of the state RDDs. The new data RDDs will be partitioned 
appropriately, and for all the key-value data, it will lookup the map/list in 
the state RDD partition and create a new list/map of updated state data. The 
new state RDD partition will be created based on the update data and if 
necessary, with old data.
Here is the detailed design doc. Please take a look and provide feedback as 
comments.
https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#heading=h.ph3w0clkd4em

This is still WIP. Major things left to be done.
- [x] Implement basic functionality of state tracking, with initial RDD and 
timeouts
- [x] Unit tests for state tracking
- [x] Unit tests for initial RDD and timeout
- [ ] Unit tests for TrackStateRDD
   - [x] state creating, updating, removing
   - [ ] emitting
   - [ ] checkpointing
- [x] Misc unit tests for State, TrackStateSpec, etc.
- [x] Update docs and experimental tags

Author: Tathagata Das 

Closes #9256 from tdas/trackStateByKey.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99f5f988
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99f5f988
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99f5f988

Branch: refs/heads/master
Commit: 99f5f988612b3093d73d9ce98819767e822fcbff
Parents: bd70244
Author: Tathagata Das 
Authored: Tue Nov 10 23:16:18 2015 -0800
Committer: Tathagata Das 
Committed: Tue Nov 10 23:16:18 2015 -0800

--
 .../streaming/StatefulNetworkWordCount.scala|  25 +-
 .../org/apache/spark/streaming/State.scala  | 193 
 .../org/apache/spark/streaming/StateSpec.scala  | 212 
 .../dstream/PairDStreamFunctions.scala  |  46 +-
 .../streaming/dstream/TrackStateDStream.scala   | 142 ++
 .../spark/streaming/rdd/TrackStateRDD.scala | 188 +++
 .../apache/spark/streaming/util/StateMap.scala  | 337 +
 .../apache/spark/streaming/StateMapSuite.scala  | 314 
 .../spark/streaming/TrackStateByKeySuite.scala  | 494 +++
 .../streaming/rdd/TrackStateRDDSuite.scala  | 193 
 10 files changed, 2125 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/99f5f988/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
index 02ba1c2..be2ae0b 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
@@ -44,18 +44,6 @@ object StatefulNetworkWordCount {
 
 StreamingExamples.setStreamingLogLevels()
 
-val updateFunc = (values: Seq[Int], state: Option[Int]) => {
-  val currentCount = values.sum
-
-  val previousCount = state.getOrElse(0)
-
-  Some(currentCount + previousCount)
-}
-
-val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) 
=> {
-  iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
-}
-
 val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
 // Create the context with a 1 second batch size
 val ssc = new StreamingContext(sparkConf, Seconds(1))
@@ -71,9 +59,16 @@ object StatefulNetworkWordCount {
 val wordDstream = words.map(x => (x, 1))
 
 // Update the cumulative count using updateStateByKey
-// This will give a Dstream made of state (which is 

[2/2] spark git commit: [SPARK-11290][STREAMING] Basic implementation of trackStateByKey

2015-11-10 Thread tdas
[SPARK-11290][STREAMING] Basic implementation of trackStateByKey

Current updateStateByKey provides stateful processing in Spark Streaming. It 
allows the user to maintain per-key state and manage that state using an 
updateFunction. The updateFunction is called for each key, and it uses new data 
and existing state of the key, to generate an updated state. However, based on 
community feedback, we have learnt the following lessons.
* Need for more optimized state management that does not scan every key
* Need to make it easier to implement common use cases - (a) timeout of idle 
data, (b) returning items other than state

The high level idea that of this PR
* Introduce a new API trackStateByKey that, allows the user to update per-key 
state, and emit arbitrary records. The new API is necessary as this will have 
significantly different semantics than the existing updateStateByKey API. This 
API will have direct support for timeouts.
* Internally, the system will keep the state data as a map/list within the 
partitions of the state RDDs. The new data RDDs will be partitioned 
appropriately, and for all the key-value data, it will lookup the map/list in 
the state RDD partition and create a new list/map of updated state data. The 
new state RDD partition will be created based on the update data and if 
necessary, with old data.
Here is the detailed design doc. Please take a look and provide feedback as 
comments.
https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#heading=h.ph3w0clkd4em

This is still WIP. Major things left to be done.
- [x] Implement basic functionality of state tracking, with initial RDD and 
timeouts
- [x] Unit tests for state tracking
- [x] Unit tests for initial RDD and timeout
- [ ] Unit tests for TrackStateRDD
   - [x] state creating, updating, removing
   - [ ] emitting
   - [ ] checkpointing
- [x] Misc unit tests for State, TrackStateSpec, etc.
- [x] Update docs and experimental tags

Author: Tathagata Das 

Closes #9256 from tdas/trackStateByKey.

(cherry picked from commit 99f5f988612b3093d73d9ce98819767e822fcbff)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/daa74be6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/daa74be6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/daa74be6

Branch: refs/heads/branch-1.6
Commit: daa74be6f863061221bb0c2f94e70672e6fcbeaa
Parents: 85bc729
Author: Tathagata Das 
Authored: Tue Nov 10 23:16:18 2015 -0800
Committer: Tathagata Das 
Committed: Tue Nov 10 23:16:37 2015 -0800

--
 .../streaming/StatefulNetworkWordCount.scala|  25 +-
 .../org/apache/spark/streaming/State.scala  | 193 
 .../org/apache/spark/streaming/StateSpec.scala  | 212 
 .../dstream/PairDStreamFunctions.scala  |  46 +-
 .../streaming/dstream/TrackStateDStream.scala   | 142 ++
 .../spark/streaming/rdd/TrackStateRDD.scala | 188 +++
 .../apache/spark/streaming/util/StateMap.scala  | 337 +
 .../apache/spark/streaming/StateMapSuite.scala  | 314 
 .../spark/streaming/TrackStateByKeySuite.scala  | 494 +++
 .../streaming/rdd/TrackStateRDDSuite.scala  | 193 
 10 files changed, 2125 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/daa74be6/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
index 02ba1c2..be2ae0b 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
@@ -44,18 +44,6 @@ object StatefulNetworkWordCount {
 
 StreamingExamples.setStreamingLogLevels()
 
-val updateFunc = (values: Seq[Int], state: Option[Int]) => {
-  val currentCount = values.sum
-
-  val previousCount = state.getOrElse(0)
-
-  Some(currentCount + previousCount)
-}
-
-val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) 
=> {
-  iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
-}
-
 val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
 // Create the context with a 1 second batch size
 val ssc = new StreamingContext(sparkConf, Seconds(1))
@@ -71,9 +59,16 @@ object StatefulNetworkWordCount {
 val wordDstream =