spark git commit: [SPARK-11979][STREAMING] Empty TrackStateRDD cannot be checkpointed and recovered from checkpoint file

2015-11-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 151d7c2ba -> 216988688


[SPARK-11979][STREAMING] Empty TrackStateRDD cannot be checkpointed and 
recovered from checkpoint file

This solves the following exception caused when empty state RDD is checkpointed 
and recovered. The root cause is that an empty OpenHashMapBasedStateMap cannot 
be deserialized as the initialCapacity is set to zero.
```
Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most 
recent failure: Lost task 0.0 in stage 6.0 (TID 20, localhost): 
java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity
at scala.Predef$.require(Predef.scala:233)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:96)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:86)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.readObject(StateMap.scala:291)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
```

Author: Tathagata Das 

Closes #9958 from tdas/SPARK-11979.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21698868
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21698868
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21698868

Branch: refs/heads/master
Commit: 2169886883d33b33acf378ac42a626576b342df1
Parents: 151d7c2
Author: Tathagata Das 
Authored: Tue Nov 24 23:13:01 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Nov 24 23:13:01 2015 -0800

--
 .../apache/spark/streaming/util/StateMap.scala  | 19 -
 .../apache/spark/streaming/StateMapSuite.scala  | 30 +---
 .../streaming/rdd/TrackStateRDDSuite.scala  | 10 +++
 3 files changed, 42 insertions(+), 17 

spark git commit: [SPARK-11979][STREAMING] Empty TrackStateRDD cannot be checkpointed and recovered from checkpoint file

2015-11-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 68bcb9b33 -> 7f030aa42


[SPARK-11979][STREAMING] Empty TrackStateRDD cannot be checkpointed and 
recovered from checkpoint file

This solves the following exception caused when empty state RDD is checkpointed 
and recovered. The root cause is that an empty OpenHashMapBasedStateMap cannot 
be deserialized as the initialCapacity is set to zero.
```
Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most 
recent failure: Lost task 0.0 in stage 6.0 (TID 20, localhost): 
java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity
at scala.Predef$.require(Predef.scala:233)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:96)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:86)
at 
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.readObject(StateMap.scala:291)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
```

Author: Tathagata Das 

Closes #9958 from tdas/SPARK-11979.

(cherry picked from commit 2169886883d33b33acf378ac42a626576b342df1)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f030aa4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f030aa4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f030aa4

Branch: refs/heads/branch-1.6
Commit: 7f030aa422802a8e7077e1c74a59ab9a5fe54488
Parents: 68bcb9b
Author: Tathagata Das 
Authored: Tue Nov 24 23:13:01 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Nov 24 23:13:29 2015 -0800

--
 .../apache/spark/streaming/util/StateMap.scala  | 19 -