[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-02-01 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-178132065
  
I did not merge this into 1.6 or before because of 2 reasons:
- It doesn't merge cleanly, and more importantly
- This changes internal semantics and it's not technically a bug
Let me know if you disagree.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-02-01 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51463360
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -821,6 +821,75 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
 checkpointWriter.stop()
   }
 
+  test("SPARK-6847: stack overflow when updateStateByKey is followed by a 
checkpointed dstream") {
+// In this test, there are two updateStateByKey operators. The RDD DAG 
is as follows:
+//
+// batch 1batch 2batch 3 ...
+//
+// 1) input rdd  input rdd  input rdd
+//   |  |  |
+//   v  v  v
+// 2) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+//   v/ v/ v
+// 3)  map rdd ---map rdd ---map rdd ...
+//   |  |  |
+//   v  v  v
+// 4) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+//   v/ v/ v
+// 5)  map rdd ---map rdd ---map rdd ...
+//
+// Every batch depends on its previous batch, so "updateStateByKey" 
needs to do checkpoint to
+// break the RDD chain. However, before SPARK-6847, when the state RDD 
(layer 5) of the second
+// "updateStateByKey" does checkpoint, it won't checkpoint the state 
RDD (layer 3) of the first
+// "updateStateByKey" (Note: "updateStateByKey" has already marked 
that its state RDD (layer 3)
+// should be checkpointed). Hence, the connections between layer 2 and 
layer 3 won't be broken
+// and the RDD chain will grow infinitely and cause StackOverflow.
+//
+// Therefore SPARK-6847 introduces 
"spark.checkpoint.checkpointAllMarked" to force checkpointing
+// all marked RDDs in the DAG to resolve this issue. (For the previous 
example, it will break
+// connections between layer 2 and layer 3)
+ssc = new StreamingContext(master, framework, batchDuration)
+val batchCounter = new BatchCounter(ssc)
+ssc.checkpoint(checkpointDir)
+val inputDStream = new CheckpointInputDStream(ssc)
+val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+  Some(values.sum + state.getOrElse(0))
+}
+@volatile var shouldCheckpointAllMarkedRDDs = false
+@volatile var rddsCheckpointed = false
+inputDStream.map(i => (i, i))
+  .updateStateByKey(updateFunc).checkpoint(batchDuration)
+  .updateStateByKey(updateFunc).checkpoint(batchDuration)
+  .foreachRDD { rdd =>
+/**
+ * Find all RDDs that are marked for checkpointing in the 
specified RDD and its ancestors.
+ */
+def findAllMarkedRDDs(rdd: RDD[_]): List[RDD[_]] = {
--- End diff --

oh I see... that's unfortunate


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-02-01 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-178130645
  
Merged into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-02-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/10934


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-177353166
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-177353167
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50448/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-30 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51352293
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -821,6 +821,75 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
 checkpointWriter.stop()
   }
 
+  test("SPARK-6847: stack overflow when updateStateByKey is followed by a 
checkpointed dstream") {
+// In this test, there are two updateStateByKey operators. The RDD DAG 
is as follows:
+//
+// batch 1batch 2batch 3 ...
+//
+// 1) input rdd  input rdd  input rdd
+//   |  |  |
+//   v  v  v
+// 2) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+//   v/ v/ v
+// 3)  map rdd ---map rdd ---map rdd ...
+//   |  |  |
+//   v  v  v
+// 4) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+//   v/ v/ v
+// 5)  map rdd ---map rdd ---map rdd ...
+//
+// Every batch depends on its previous batch, so "updateStateByKey" 
needs to do checkpoint to
+// break the RDD chain. However, before SPARK-6847, when the state RDD 
(layer 5) of the second
+// "updateStateByKey" does checkpoint, it won't checkpoint the state 
RDD (layer 3) of the first
+// "updateStateByKey" (Note: "updateStateByKey" has already marked 
that its state RDD (layer 3)
+// should be checkpointed). Hence, the connections between layer 2 and 
layer 3 won't be broken
+// and the RDD chain will grow infinitely and cause StackOverflow.
+//
+// Therefore SPARK-6847 introduces 
"spark.checkpoint.checkpointAllMarked" to force checkpointing
+// all marked RDDs in the DAG to resolve this issue. (For the previous 
example, it will break
+// connections between layer 2 and layer 3)
+ssc = new StreamingContext(master, framework, batchDuration)
+val batchCounter = new BatchCounter(ssc)
+ssc.checkpoint(checkpointDir)
+val inputDStream = new CheckpointInputDStream(ssc)
+val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+  Some(values.sum + state.getOrElse(0))
+}
+@volatile var shouldCheckpointAllMarkedRDDs = false
+@volatile var rddsCheckpointed = false
+inputDStream.map(i => (i, i))
+  .updateStateByKey(updateFunc).checkpoint(batchDuration)
+  .updateStateByKey(updateFunc).checkpoint(batchDuration)
+  .foreachRDD { rdd =>
+/**
+ * Find all RDDs that are marked for checkpointing in the 
specified RDD and its ancestors.
+ */
+def findAllMarkedRDDs(rdd: RDD[_]): List[RDD[_]] = {
--- End diff --

> I meant put this in a private def outside of this test actually. It would 
make the test body smaller.

But it will refer to the CheckpointSuite class which is not serializable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-177353119
  
**[Test build #50448 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50448/consoleFull)**
 for PR 10934 at commit 
[`20e4509`](https://github.com/apache/spark/commit/20e45095506067f3f5195470e3a390cd4872e531).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-177327392
  
**[Test build #50448 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50448/consoleFull)**
 for PR 10934 at commit 
[`20e4509`](https://github.com/apache/spark/commit/20e45095506067f3f5195470e3a390cd4872e531).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51330625
  
--- Diff: core/src/test/scala/org/apache/spark/CheckpointSuite.scala ---
@@ -512,6 +512,21 @@ class CheckpointSuite extends SparkFunSuite with 
RDDCheckpointTester with LocalS
 assert(rdd.isCheckpointedAndMaterialized === true)
 assert(rdd.partitions.size === 0)
   }
+
+  runTest("checkpoint all marked RDDs") { reliableCheckpoint: Boolean =>
--- End diff --

can you add another test to show that if we don't set this flag then the 
parent RDD is not checkpointed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51334790
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -821,6 +821,70 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
 checkpointWriter.stop()
   }
 
+  test("SPARK-6847: stack overflow when updateStateByKey is followed by a 
checkpointed dstream") {
+// In this test, there are two updateStateByKey operators. The RDD DAG 
is as follows:
+//
+// batch 1batch 2batch 3 ...
+//
+// 1) input rdd  input rdd  input rdd
+//   |  |  |
+// 2) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+// 3)  map rdd ---map rdd ---map rdd ...
+//   |
+// 4) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+// 5)  map rdd ---map rdd ---map rdd ...
+//
+// Every batch depends on its previous batch, so "updateStateByKey" 
needs to do checkpoint to
+// break the RDD chain. However, before SPARK-6847, when the state RDD 
(layer 5) of the second
+// "updateStateByKey" does checkpoint, it won't checkpoint the state 
RDD (layer 3) of the first
+// "updateStateByKey" (Note: "updateStateByKey" has already marked 
that its state RDD (layer 3)
+// should be checkpointed). Hence, the connections between layer 2 and 
layer 3 won't be broken
+// and the RDD chain will grow infinitely and cause StackOverflow.
+//
+// Therefore SPARK-6847 introduces 
"spark.checkpoint.checkpointAllMarked" to force checkpointing
+// all marked RDDs in the DAG to resolve this issue. (For the previous 
example, it will break
+// connections between layer 2 and layer 3)
+ssc = new StreamingContext(master, framework, batchDuration)
+val batchCounter = new BatchCounter(ssc)
+ssc.checkpoint(checkpointDir)
+val inputDStream = new CheckpointInputDStream(ssc)
+val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+  Some(values.sum + state.getOrElse(0))
+}
+@volatile var checkpointAllMarkedRDDsEnable = false
+@volatile var rddsCheckpointed = false
+inputDStream.map(i => (i, i))
+  .updateStateByKey(updateFunc).checkpoint(batchDuration)
+  .updateStateByKey(updateFunc).checkpoint(batchDuration)
+  .foreachRDD { rdd =>
+checkpointAllMarkedRDDsEnable =
+  
Option(rdd.sparkContext.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED)).
+map(_.toBoolean).getOrElse(false)
+
+val stateRDDs = {
+  def findAllMarkedRDDs(_rdd: RDD[_], buffer: 
ArrayBuffer[RDD[_]]): Unit = {
+if (_rdd.checkpointData.isDefined) {
+  buffer += _rdd
+}
+_rdd.dependencies.foreach(dep => findAllMarkedRDDs(dep.rdd, 
buffer))
+  }
+
+  val buffer = new ArrayBuffer[RDD[_]]
+  findAllMarkedRDDs(rdd, buffer)
+  buffer.toSeq
+}
--- End diff --

I kept it here to avoid NotSerializableException. Just simplified these 
codes to make it easy to read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-177032299
  
**[Test build #50421 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50421/consoleFull)**
 for PR 10934 at commit 
[`97e39c0`](https://github.com/apache/spark/commit/97e39c045f3ee16713b2015150ba12a0815d7fc4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51330972
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -821,6 +821,70 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
 checkpointWriter.stop()
   }
 
+  test("SPARK-6847: stack overflow when updateStateByKey is followed by a 
checkpointed dstream") {
+// In this test, there are two updateStateByKey operators. The RDD DAG 
is as follows:
+//
+// batch 1batch 2batch 3 ...
+//
+// 1) input rdd  input rdd  input rdd
+//   |  |  |
+// 2) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+// 3)  map rdd ---map rdd ---map rdd ...
+//   |
+// 4) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+// 5)  map rdd ---map rdd ---map rdd ...
+//
+// Every batch depends on its previous batch, so "updateStateByKey" 
needs to do checkpoint to
+// break the RDD chain. However, before SPARK-6847, when the state RDD 
(layer 5) of the second
+// "updateStateByKey" does checkpoint, it won't checkpoint the state 
RDD (layer 3) of the first
+// "updateStateByKey" (Note: "updateStateByKey" has already marked 
that its state RDD (layer 3)
+// should be checkpointed). Hence, the connections between layer 2 and 
layer 3 won't be broken
+// and the RDD chain will grow infinitely and cause StackOverflow.
+//
+// Therefore SPARK-6847 introduces 
"spark.checkpoint.checkpointAllMarked" to force checkpointing
+// all marked RDDs in the DAG to resolve this issue. (For the previous 
example, it will break
+// connections between layer 2 and layer 3)
+ssc = new StreamingContext(master, framework, batchDuration)
+val batchCounter = new BatchCounter(ssc)
+ssc.checkpoint(checkpointDir)
+val inputDStream = new CheckpointInputDStream(ssc)
+val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+  Some(values.sum + state.getOrElse(0))
+}
+@volatile var checkpointAllMarkedRDDsEnable = false
+@volatile var rddsCheckpointed = false
+inputDStream.map(i => (i, i))
+  .updateStateByKey(updateFunc).checkpoint(batchDuration)
+  .updateStateByKey(updateFunc).checkpoint(batchDuration)
+  .foreachRDD { rdd =>
+checkpointAllMarkedRDDsEnable =
+  
Option(rdd.sparkContext.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED)).
+map(_.toBoolean).getOrElse(false)
+
+val stateRDDs = {
+  def findAllMarkedRDDs(_rdd: RDD[_], buffer: 
ArrayBuffer[RDD[_]]): Unit = {
+if (_rdd.checkpointData.isDefined) {
+  buffer += _rdd
+}
+_rdd.dependencies.foreach(dep => findAllMarkedRDDs(dep.rdd, 
buffer))
+  }
+
+  val buffer = new ArrayBuffer[RDD[_]]
+  findAllMarkedRDDs(rdd, buffer)
+  buffer.toSeq
+}
--- End diff --

can you extract this to a helper method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51330247
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1697,6 +1706,8 @@ abstract class RDD[T: ClassTag](
  */
 object RDD {
 
+  private[spark] val CHECKPOINT_ALL_MARKED = 
"spark.checkpoint.checkpointAllMarked"
--- End diff --

This sounds a little awkward. I would call this (and other similar 
variables) `CHECKPOINT_ALL_MARKED_ANCESTORS`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51330867
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -821,6 +821,70 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
 checkpointWriter.stop()
   }
 
+  test("SPARK-6847: stack overflow when updateStateByKey is followed by a 
checkpointed dstream") {
+// In this test, there are two updateStateByKey operators. The RDD DAG 
is as follows:
+//
+// batch 1batch 2batch 3 ...
+//
+// 1) input rdd  input rdd  input rdd
+//   |  |  |
+// 2) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+// 3)  map rdd ---map rdd ---map rdd ...
+//   |
+// 4) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+// 5)  map rdd ---map rdd ---map rdd ...
+//
+// Every batch depends on its previous batch, so "updateStateByKey" 
needs to do checkpoint to
+// break the RDD chain. However, before SPARK-6847, when the state RDD 
(layer 5) of the second
+// "updateStateByKey" does checkpoint, it won't checkpoint the state 
RDD (layer 3) of the first
+// "updateStateByKey" (Note: "updateStateByKey" has already marked 
that its state RDD (layer 3)
+// should be checkpointed). Hence, the connections between layer 2 and 
layer 3 won't be broken
+// and the RDD chain will grow infinitely and cause StackOverflow.
+//
+// Therefore SPARK-6847 introduces 
"spark.checkpoint.checkpointAllMarked" to force checkpointing
+// all marked RDDs in the DAG to resolve this issue. (For the previous 
example, it will break
+// connections between layer 2 and layer 3)
+ssc = new StreamingContext(master, framework, batchDuration)
+val batchCounter = new BatchCounter(ssc)
+ssc.checkpoint(checkpointDir)
+val inputDStream = new CheckpointInputDStream(ssc)
+val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+  Some(values.sum + state.getOrElse(0))
+}
+@volatile var checkpointAllMarkedRDDsEnable = false
--- End diff --

Enabled


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51329576
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1535,6 +1535,10 @@ abstract class RDD[T: ClassTag](
 
   private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
 
+  // Whether checkpoint all RDDs that are marked with the checkpoint flag.
--- End diff --

We need to expand on this comment:
```
// Whether to checkpoint all RDDs that are marked for checkpointing. By 
default, we stop
// as soon as we find the first such RDD. This optimization allows us to 
write less data
// but is not safe for all workloads. E.g. in streaming we may checkpoint 
both an RDD
// and its parent every batch, in which case the parent may never be 
checkpointed
// and its lineage never truncated (SPARK-6847).
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51330708
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 ---
@@ -243,6 +244,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
 // Example: BlockRDDs are created in this thread, and it needs to 
access BlockManager
 // Update: This is probably redundant after threadlocal stuff in 
SparkEnv has been removed.
 SparkEnv.set(ssc.env)
+
+// Enable "spark.checkpoint.checkpointAllMarked" to make sure that all 
RDDs marked with the
+// checkpoint flag are all checkpointed to avoid the stack overflow 
issue. See SPARK-6847
--- End diff --

No need to duplicate the name of the flag here again, just say:
```
// Checkpoint all RDDs marked for checkpointing to ensure their lineages 
are truncated
// periodically. Otherwise, we may run into stack overflows (SPARK-6847).
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51330811
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -821,6 +821,70 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
 checkpointWriter.stop()
   }
 
+  test("SPARK-6847: stack overflow when updateStateByKey is followed by a 
checkpointed dstream") {
+// In this test, there are two updateStateByKey operators. The RDD DAG 
is as follows:
+//
+// batch 1batch 2batch 3 ...
+//
+// 1) input rdd  input rdd  input rdd
+//   |  |  |
+// 2) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+// 3)  map rdd ---map rdd ---map rdd ...
+//   |
+// 4) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+// 5)  map rdd ---map rdd ---map rdd ...
--- End diff --

Beautiful!! One thing I would suggest is adding the direction of the down 
arrow if possible, i.e.:
```
 a
 |
 v
 b
```
instead of 
```
 a
 |
 b
```
so it's clearer which one is the parent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51330737
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 ---
@@ -210,6 +210,9 @@ class JobScheduler(val ssc: StreamingContext) extends 
Logging {
   s"""Streaming job from $batchLinkText""")
 ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, 
job.time.milliseconds.toString)
 ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, 
job.outputOpId.toString)
+// Enable "spark.checkpoint.checkpointAllMarked" to make sure that 
all RDDs marked with the
+// checkpoint flag are all checkpointed to avoid the stack 
overflow issue. See SPARK-6847
--- End diff --

same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51331064
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1578,6 +1582,11 @@ abstract class RDD[T: ClassTag](
   if (!doCheckpointCalled) {
 doCheckpointCalled = true
 if (checkpointData.isDefined) {
+  if (recursiveCheckpoint) {
--- End diff --

Yeah! Can we add a TODO?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51331038
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1535,6 +1535,10 @@ abstract class RDD[T: ClassTag](
 
   private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
 
+  // Whether recursively checkpoint all RDDs that are marked with the 
checkpoint flag.
+  private val recursiveCheckpoint =
+
Option(sc.getLocalProperty("spark.checkpoint.recursive")).map(_.toBoolean).getOrElse(false)
--- End diff --

This is a hard one... I think `checkpointAllMarkedAncestors` is least 
ambiguous


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-177049310
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50421/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-177049306
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-177048766
  
**[Test build #50421 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50421/consoleFull)**
 for PR 10934 at commit 
[`97e39c0`](https://github.com/apache/spark/commit/97e39c045f3ee16713b2015150ba12a0815d7fc4).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51341040
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -821,6 +821,75 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
 checkpointWriter.stop()
   }
 
+  test("SPARK-6847: stack overflow when updateStateByKey is followed by a 
checkpointed dstream") {
+// In this test, there are two updateStateByKey operators. The RDD DAG 
is as follows:
+//
+// batch 1batch 2batch 3 ...
+//
+// 1) input rdd  input rdd  input rdd
+//   |  |  |
+//   v  v  v
+// 2) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+//   v/ v/ v
+// 3)  map rdd ---map rdd ---map rdd ...
+//   |  |  |
+//   v  v  v
+// 4) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+//   v/ v/ v
+// 5)  map rdd ---map rdd ---map rdd ...
+//
+// Every batch depends on its previous batch, so "updateStateByKey" 
needs to do checkpoint to
+// break the RDD chain. However, before SPARK-6847, when the state RDD 
(layer 5) of the second
+// "updateStateByKey" does checkpoint, it won't checkpoint the state 
RDD (layer 3) of the first
+// "updateStateByKey" (Note: "updateStateByKey" has already marked 
that its state RDD (layer 3)
+// should be checkpointed). Hence, the connections between layer 2 and 
layer 3 won't be broken
+// and the RDD chain will grow infinitely and cause StackOverflow.
+//
+// Therefore SPARK-6847 introduces 
"spark.checkpoint.checkpointAllMarked" to force checkpointing
+// all marked RDDs in the DAG to resolve this issue. (For the previous 
example, it will break
+// connections between layer 2 and layer 3)
+ssc = new StreamingContext(master, framework, batchDuration)
+val batchCounter = new BatchCounter(ssc)
+ssc.checkpoint(checkpointDir)
+val inputDStream = new CheckpointInputDStream(ssc)
+val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+  Some(values.sum + state.getOrElse(0))
+}
+@volatile var shouldCheckpointAllMarkedRDDs = false
+@volatile var rddsCheckpointed = false
+inputDStream.map(i => (i, i))
+  .updateStateByKey(updateFunc).checkpoint(batchDuration)
+  .updateStateByKey(updateFunc).checkpoint(batchDuration)
+  .foreachRDD { rdd =>
+/**
+ * Find all RDDs that are marked for checkpointing in the 
specified RDD and its ancestors.
+ */
+def findAllMarkedRDDs(rdd: RDD[_]): List[RDD[_]] = {
+  val markedRDDs = rdd.dependencies.flatMap(dep => 
findAllMarkedRDDs(dep.rdd)).toList
+  if (rdd.checkpointData.isDefined) {
+rdd :: markedRDDs
+  } else {
+markedRDDs
+  }
+}
+
+shouldCheckpointAllMarkedRDDs =
+  
Option(rdd.sparkContext.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS)).
+map(_.toBoolean).getOrElse(false)
+
+val stateRDDs = findAllMarkedRDDs(rdd)
+  rdd.count()
+  // Check the two state RDDs are both checkpointed
+  rddsCheckpointed = stateRDDs.size == 2 && 
stateRDDs.forall(_.isCheckpointed)
+}
--- End diff --

hm indentation is weird here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51341044
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -821,6 +821,75 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
 checkpointWriter.stop()
   }
 
+  test("SPARK-6847: stack overflow when updateStateByKey is followed by a 
checkpointed dstream") {
+// In this test, there are two updateStateByKey operators. The RDD DAG 
is as follows:
+//
+// batch 1batch 2batch 3 ...
+//
+// 1) input rdd  input rdd  input rdd
+//   |  |  |
+//   v  v  v
+// 2) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+//   v/ v/ v
+// 3)  map rdd ---map rdd ---map rdd ...
+//   |  |  |
+//   v  v  v
+// 4) cogroup rdd   ---> cogroup rdd   ---> cogroup rdd  ...
+//   | /| /|
+//   v/ v/ v
+// 5)  map rdd ---map rdd ---map rdd ...
+//
+// Every batch depends on its previous batch, so "updateStateByKey" 
needs to do checkpoint to
+// break the RDD chain. However, before SPARK-6847, when the state RDD 
(layer 5) of the second
+// "updateStateByKey" does checkpoint, it won't checkpoint the state 
RDD (layer 3) of the first
+// "updateStateByKey" (Note: "updateStateByKey" has already marked 
that its state RDD (layer 3)
+// should be checkpointed). Hence, the connections between layer 2 and 
layer 3 won't be broken
+// and the RDD chain will grow infinitely and cause StackOverflow.
+//
+// Therefore SPARK-6847 introduces 
"spark.checkpoint.checkpointAllMarked" to force checkpointing
+// all marked RDDs in the DAG to resolve this issue. (For the previous 
example, it will break
+// connections between layer 2 and layer 3)
+ssc = new StreamingContext(master, framework, batchDuration)
+val batchCounter = new BatchCounter(ssc)
+ssc.checkpoint(checkpointDir)
+val inputDStream = new CheckpointInputDStream(ssc)
+val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+  Some(values.sum + state.getOrElse(0))
+}
+@volatile var shouldCheckpointAllMarkedRDDs = false
+@volatile var rddsCheckpointed = false
+inputDStream.map(i => (i, i))
+  .updateStateByKey(updateFunc).checkpoint(batchDuration)
+  .updateStateByKey(updateFunc).checkpoint(batchDuration)
+  .foreachRDD { rdd =>
+/**
+ * Find all RDDs that are marked for checkpointing in the 
specified RDD and its ancestors.
+ */
+def findAllMarkedRDDs(rdd: RDD[_]): List[RDD[_]] = {
--- End diff --

I meant put this in a `private def` outside of this test actually


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-177081591
  
LGTM, I'll merge this once you address the minor comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51328868
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1535,6 +1535,10 @@ abstract class RDD[T: ClassTag](
 
   private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
 
+  // Whether checkpoint all RDDs that are marked with the checkpoint flag.
+  private val checkpointAllMarked =
--- End diff --

I would call this `checkpointAncestors`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r51330500
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1578,6 +1582,11 @@ abstract class RDD[T: ClassTag](
   if (!doCheckpointCalled) {
 doCheckpointCalled = true
 if (checkpointData.isDefined) {
+  if (checkpointAllMarked) {
+// Checkpoint dependencies first because dependencies will be 
set to
+// ReliableCheckpointRDD after checkpointing.
--- End diff --

```
// Checkpoint parents first because our lineage would be truncated after
// we checkpoint ourselves
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-29 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-177020382
  
Looks great! I only have documentation and test suggestions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-175406784
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-175414765
  
**[Test build #50173 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50173/consoleFull)**
 for PR 10934 at commit 
[`ef3983b`](https://github.com/apache/spark/commit/ef3983ba07ddb3c14f2946ef85f0445d491ce840).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-175464973
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50173/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-175464971
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-175408940
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-175406790
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50171/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-175464840
  
**[Test build #50173 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50173/consoleFull)**
 for PR 10934 at commit 
[`ef3983b`](https://github.com/apache/spark/commit/ef3983ba07ddb3c14f2946ef85f0445d491ce840).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/10934

[SPARK-6847][Core][Streaming]Fix stack overflow issue when updateStateByKey 
is followed by a checkpointed dstream

Add a local property to indicate if checkpointing all RDDs that are marked 
with the checkpoint flag, and enable it in Streaming

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark recursive-checkpoint

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/10934.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #10934


commit 36cba8ca763e7df5a20d5ab015812354ade365c9
Author: Shixiong Zhu 
Date:   2016-01-27T00:20:33Z

Fix stack overflow issue when updateStateByKey is followed by a 
checkpointed dstream

Add a local property to indicate if checkpointing all RDDs that are marked 
with the checkpoint flag, and enable it in Streaming




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread zsxwing
Github user zsxwing commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-175310767
  
/cc @andrewor14 @tdas 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-175314567
  
**[Test build #50141 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50141/consoleFull)**
 for PR 10934 at commit 
[`36cba8c`](https://github.com/apache/spark/commit/36cba8ca763e7df5a20d5ab015812354ade365c9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r50929760
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1535,6 +1535,10 @@ abstract class RDD[T: ClassTag](
 
   private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
 
+  // Whether recursively checkpoint all RDDs that are marked with the 
checkpoint flag.
+  private val recursiveCheckpoint =
+
Option(sc.getLocalProperty("spark.checkpoint.recursive")).map(_.toBoolean).getOrElse(false)
--- End diff --

Well its always recursive. The difference is whether checkpoint all that 
have been marked or not. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r50929833
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1578,6 +1582,11 @@ abstract class RDD[T: ClassTag](
   if (!doCheckpointCalled) {
 doCheckpointCalled = true
 if (checkpointData.isDefined) {
+  if (recursiveCheckpoint) {
--- End diff --

I wonder whether whether we can collect all the RDDs that needs to be 
checkpointed, and then checkpoint them in parallel.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r5092
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 ---
@@ -243,6 +243,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
 // Example: BlockRDDs are created in this thread, and it needs to 
access BlockManager
 // Update: This is probably redundant after threadlocal stuff in 
SparkEnv has been removed.
 SparkEnv.set(ssc.env)
+
+// Enable "spark.checkpoint.recursive" to make sure that all RDDs 
marked with the checkpoint
+// flag are all checkpointed to avoid the stack overflow issue. See 
SPARK-6847
+ssc.sparkContext.setLocalProperty("spark.checkpoint.recursive", "true")
--- End diff --

Why is this in "JobGenerator" only, and not also in the JobScheduler that 
is actually running the jobs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r50929995
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1535,6 +1535,10 @@ abstract class RDD[T: ClassTag](
 
   private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
 
+  // Whether recursively checkpoint all RDDs that are marked with the 
checkpoint flag.
+  private val recursiveCheckpoint =
+
Option(sc.getLocalProperty("spark.checkpoint.recursive")).map(_.toBoolean).getOrElse(false)
--- End diff --

Better name suggestion for this one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r50930243
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 ---
@@ -243,6 +243,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
 // Example: BlockRDDs are created in this thread, and it needs to 
access BlockManager
 // Update: This is probably redundant after threadlocal stuff in 
SparkEnv has been removed.
 SparkEnv.set(ssc.env)
+
+// Enable "spark.checkpoint.recursive" to make sure that all RDDs 
marked with the checkpoint
+// flag are all checkpointed to avoid the stack overflow issue. See 
SPARK-6847
+ssc.sparkContext.setLocalProperty("spark.checkpoint.recursive", "true")
--- End diff --

`localProperties` uses `InheritableThreadLocal` so they will be inherited 
by child threads.

However, looks it's better not to depend on this implicit assumption and 
just set it explicitly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r50933356
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1535,6 +1535,10 @@ abstract class RDD[T: ClassTag](
 
   private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
 
+  // Whether recursively checkpoint all RDDs that are marked with the 
checkpoint flag.
+  private val recursiveCheckpoint =
+
Option(sc.getLocalProperty("spark.checkpoint.recursive")).map(_.toBoolean).getOrElse(false)
--- End diff --

"spark.checkpoint.checkpointAllMarked" ?? @andrewor14 thoughts.

Btw, shouldnt this be a constant variable in some object? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r50933676
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 ---
@@ -243,6 +243,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
 // Example: BlockRDDs are created in this thread, and it needs to 
access BlockManager
 // Update: This is probably redundant after threadlocal stuff in 
SparkEnv has been removed.
 SparkEnv.set(ssc.env)
+
+// Enable "spark.checkpoint.recursive" to make sure that all RDDs 
marked with the checkpoint
+// flag are all checkpointed to avoid the stack overflow issue. See 
SPARK-6847
+ssc.sparkContext.setLocalProperty("spark.checkpoint.recursive", "true")
--- End diff --

YES. Please set it explicitly in both JobGenerator and JobScheduler.
There are other variables like this that are set in the JobScheduler, 
please put it in the same place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r50933818
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -821,6 +821,33 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
 checkpointWriter.stop()
   }
 
+  test("SPARK-6847: stack overflow when updateStateByKey is followed by a 
checkpointed dstream") {
+ssc = new StreamingContext(master, framework, batchDuration)
+val batchCounter = new BatchCounter(ssc)
+ssc.checkpoint(checkpointDir)
+val inputDStream = new CheckpointInputDStream(ssc)
+val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+  Some(values.sum + state.getOrElse(0))
+}
+@volatile var recursiveCheckpoint = false
+@volatile var rddsBothCheckpointed = false
+inputDStream.map(i => (i, i)).
+  updateStateByKey[Int](updateFunc).checkpoint(batchDuration).
+  map(i => i).checkpoint(batchDuration).
--- End diff --

would you test this using two updateStateByKey

also this code style is weird and hard to read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/10934#discussion_r50934112
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -821,6 +821,33 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
 checkpointWriter.stop()
   }
 
+  test("SPARK-6847: stack overflow when updateStateByKey is followed by a 
checkpointed dstream") {
+ssc = new StreamingContext(master, framework, batchDuration)
--- End diff --

Would be nice to add a bit of context in the state, with may be a ASCII art 
showing the DAG structure 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-175360587
  
**[Test build #50141 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50141/consoleFull)**
 for PR 10934 at commit 
[`36cba8c`](https://github.com/apache/spark/commit/36cba8ca763e7df5a20d5ab015812354ade365c9).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-175360743
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50141/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-6847][Core][Streaming]Fix stack overflo...

2016-01-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/10934#issuecomment-175360742
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org