[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

2018-11-20 Thread efimpoberezkin
Github user efimpoberezkin closed the pull request at:

https://github.com/apache/spark/pull/21392


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

2018-05-24 Thread efimpoberezkin
Github user efimpoberezkin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21392#discussion_r190473131
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
@@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends 
EpochCoordinatorMessage
  */
 private[sql] case object StopContinuousExecutionWrites extends 
EpochCoordinatorMessage
 
+/**
+ * Returns boolean indicating if size of the epochs queue has exceeded 
maximum epoch backlog.
+ */
+private[sql] case object CheckIfMaxBacklogIsExceeded extends 
EpochCoordinatorMessage
--- End diff --

Okay, thought about something like this but wasn't sure if it's fine to do 
so for the sake of this change. Thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

2018-05-23 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21392#discussion_r190316039
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
@@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends 
EpochCoordinatorMessage
  */
 private[sql] case object StopContinuousExecutionWrites extends 
EpochCoordinatorMessage
 
+/**
+ * Returns boolean indicating if size of the epochs queue has exceeded 
maximum epoch backlog.
+ */
+private[sql] case object CheckIfMaxBacklogIsExceeded extends 
EpochCoordinatorMessage
--- End diff --

I think we'd probably want to add some method like private[streaming] 
stopWithException(e) to ContinuousExecution.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

2018-05-23 Thread efimpoberezkin
Github user efimpoberezkin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21392#discussion_r190160821
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
@@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends 
EpochCoordinatorMessage
  */
 private[sql] case object StopContinuousExecutionWrites extends 
EpochCoordinatorMessage
 
+/**
+ * Returns boolean indicating if size of the epochs queue has exceeded 
maximum epoch backlog.
+ */
+private[sql] case object CheckIfMaxBacklogIsExceeded extends 
EpochCoordinatorMessage
--- End diff --

Do you mean make the query fail right from EpochCoordinator? If yes, I 
wanted to do so, but didn't figure out how to terminate query with exception.
EpochCoordinator has query: ContinuousExecution as a parameter, but then I 
don't see a suitable method for query.. Closest is stop() I guess.
Or am I looking in a completely wrong direction? Please give a hint


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

2018-05-23 Thread efimpoberezkin
Github user efimpoberezkin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21392#discussion_r190158513
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
@@ -153,9 +161,13 @@ private[continuous] class EpochCoordinator(
   // If not, add the epoch being currently processed to epochs waiting 
to be committed,
   // otherwise commit it.
   if (lastCommittedEpoch != epoch - 1) {
-logDebug(s"Epoch $epoch has received commits from all partitions " 
+
-  s"and is waiting for epoch ${epoch - 1} to be committed first.")
-epochsWaitingToBeCommitted.add(epoch)
+if (epochsWaitingToBeCommitted.size == maxEpochBacklog) {
+  maxEpochBacklogExceeded = true
+} else {
+  logDebug(s"Epoch $epoch has received commits from all partitions 
" +
+s"and is waiting for epoch ${epoch - 1} to be committed 
first.")
+  epochsWaitingToBeCommitted.add(epoch)
--- End diff --

Basing on what I discussed with Jose the stream should be killed if backlog 
exceeds value of a certain config option, so yes, why set it back to false 
later. At least that's how I see it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

2018-05-22 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21392#discussion_r190112873
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -233,9 +235,15 @@ class ContinuousExecution(
   }
   false
 } else if (isActive) {
-  currentBatchId = 
epochEndpoint.askSync[Long](IncrementAndGetEpoch)
-  logInfo(s"New epoch $currentBatchId is starting.")
-  true
+  val maxBacklogExceeded = 
epochEndpoint.askSync[Boolean](CheckIfMaxBacklogIsExceeded)
+  if (maxBacklogExceeded) {
+throw new IllegalStateException(
+  "Size of the epochs queue has exceeded maximum allowed 
epoch backlog.")
--- End diff --

Agreed that the code as written won't shut down the stream. But I think it 
does make sense to kill the stream rather than waiting for old epochs. If we 
end up with a large backlog it's almost surely because some partition isn't 
making any progress, so I wouldn't expect Spark to ever be able to catch up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

2018-05-22 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21392#discussion_r190112179
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
@@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends 
EpochCoordinatorMessage
  */
 private[sql] case object StopContinuousExecutionWrites extends 
EpochCoordinatorMessage
 
+/**
+ * Returns boolean indicating if size of the epochs queue has exceeded 
maximum epoch backlog.
+ */
+private[sql] case object CheckIfMaxBacklogIsExceeded extends 
EpochCoordinatorMessage
--- End diff --

I'm not sure we need to make a side-channel in the RPC handler for this. 
I'd try to just make the query fail when the condition is reached in the first 
place.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

2018-05-22 Thread yanlin-Lynn
Github user yanlin-Lynn commented on a diff in the pull request:

https://github.com/apache/spark/pull/21392#discussion_r190109933
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
@@ -153,9 +161,13 @@ private[continuous] class EpochCoordinator(
   // If not, add the epoch being currently processed to epochs waiting 
to be committed,
   // otherwise commit it.
   if (lastCommittedEpoch != epoch - 1) {
-logDebug(s"Epoch $epoch has received commits from all partitions " 
+
-  s"and is waiting for epoch ${epoch - 1} to be committed first.")
-epochsWaitingToBeCommitted.add(epoch)
+if (epochsWaitingToBeCommitted.size == maxEpochBacklog) {
+  maxEpochBacklogExceeded = true
+} else {
+  logDebug(s"Epoch $epoch has received commits from all partitions 
" +
+s"and is waiting for epoch ${epoch - 1} to be committed 
first.")
+  epochsWaitingToBeCommitted.add(epoch)
--- End diff --

once maxEpochBacklogExceeded is set to true, can never set to false again?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

2018-05-22 Thread yanlin-Lynn
Github user yanlin-Lynn commented on a diff in the pull request:

https://github.com/apache/spark/pull/21392#discussion_r190108352
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -233,9 +235,15 @@ class ContinuousExecution(
   }
   false
 } else if (isActive) {
-  currentBatchId = 
epochEndpoint.askSync[Long](IncrementAndGetEpoch)
-  logInfo(s"New epoch $currentBatchId is starting.")
-  true
+  val maxBacklogExceeded = 
epochEndpoint.askSync[Boolean](CheckIfMaxBacklogIsExceeded)
+  if (maxBacklogExceeded) {
+throw new IllegalStateException(
+  "Size of the epochs queue has exceeded maximum allowed 
epoch backlog.")
--- End diff --

Throw exception will cause application to fail.
I think it's better to block and wait old epoch to be committed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

2018-05-22 Thread efimpoberezkin
GitHub user efimpoberezkin opened a pull request:

https://github.com/apache/spark/pull/21392

[SPARK-24063][SS] Control maximum epoch backlog for ContinuousExecution

## What changes were proposed in this pull request?

This pull request adds maxEpochBacklog SQL configuration option. 
EpochCoordinator tracks if the length of the queue of waiting epochs has 
exceeded it. If so, stream is stopped with an error indicating too many epochs 
stacked up

## How was this patch tested?

Existing unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/efimpoberezkin/spark pr/control-epoch-backlog

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21392.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21392


commit 1d2fc298284f6d553d78035f3095e5d2abe2a8a8
Author: Efim Poberezkin 
Date:   2018-04-25T13:01:29Z

Add max epoch backlog option to SQLConf

commit 0919b3f7542aa0a807b0ac56e0da1366f347bb54
Author: Efim Poberezkin 
Date:   2018-05-07T10:11:21Z

Replace logging an error with throwing an exception




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org