[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...
Github user efimpoberezkin closed the pull request at: https://github.com/apache/spark/pull/21392 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...
Github user efimpoberezkin commented on a diff in the pull request: https://github.com/apache/spark/pull/21392#discussion_r190473131 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala --- @@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage */ private[sql] case object StopContinuousExecutionWrites extends EpochCoordinatorMessage +/** + * Returns boolean indicating if size of the epochs queue has exceeded maximum epoch backlog. + */ +private[sql] case object CheckIfMaxBacklogIsExceeded extends EpochCoordinatorMessage --- End diff -- Okay, thought about something like this but wasn't sure if it's fine to do so for the sake of this change. Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21392#discussion_r190316039 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala --- @@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage */ private[sql] case object StopContinuousExecutionWrites extends EpochCoordinatorMessage +/** + * Returns boolean indicating if size of the epochs queue has exceeded maximum epoch backlog. + */ +private[sql] case object CheckIfMaxBacklogIsExceeded extends EpochCoordinatorMessage --- End diff -- I think we'd probably want to add some method like private[streaming] stopWithException(e) to ContinuousExecution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...
Github user efimpoberezkin commented on a diff in the pull request: https://github.com/apache/spark/pull/21392#discussion_r190160821 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala --- @@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage */ private[sql] case object StopContinuousExecutionWrites extends EpochCoordinatorMessage +/** + * Returns boolean indicating if size of the epochs queue has exceeded maximum epoch backlog. + */ +private[sql] case object CheckIfMaxBacklogIsExceeded extends EpochCoordinatorMessage --- End diff -- Do you mean make the query fail right from EpochCoordinator? If yes, I wanted to do so, but didn't figure out how to terminate query with exception. EpochCoordinator has query: ContinuousExecution as a parameter, but then I don't see a suitable method for query.. Closest is stop() I guess. Or am I looking in a completely wrong direction? Please give a hint --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...
Github user efimpoberezkin commented on a diff in the pull request: https://github.com/apache/spark/pull/21392#discussion_r190158513 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala --- @@ -153,9 +161,13 @@ private[continuous] class EpochCoordinator( // If not, add the epoch being currently processed to epochs waiting to be committed, // otherwise commit it. if (lastCommittedEpoch != epoch - 1) { -logDebug(s"Epoch $epoch has received commits from all partitions " + - s"and is waiting for epoch ${epoch - 1} to be committed first.") -epochsWaitingToBeCommitted.add(epoch) +if (epochsWaitingToBeCommitted.size == maxEpochBacklog) { + maxEpochBacklogExceeded = true +} else { + logDebug(s"Epoch $epoch has received commits from all partitions " + +s"and is waiting for epoch ${epoch - 1} to be committed first.") + epochsWaitingToBeCommitted.add(epoch) --- End diff -- Basing on what I discussed with Jose the stream should be killed if backlog exceeds value of a certain config option, so yes, why set it back to false later. At least that's how I see it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21392#discussion_r190112873 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -233,9 +235,15 @@ class ContinuousExecution( } false } else if (isActive) { - currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch) - logInfo(s"New epoch $currentBatchId is starting.") - true + val maxBacklogExceeded = epochEndpoint.askSync[Boolean](CheckIfMaxBacklogIsExceeded) + if (maxBacklogExceeded) { +throw new IllegalStateException( + "Size of the epochs queue has exceeded maximum allowed epoch backlog.") --- End diff -- Agreed that the code as written won't shut down the stream. But I think it does make sense to kill the stream rather than waiting for old epochs. If we end up with a large backlog it's almost surely because some partition isn't making any progress, so I wouldn't expect Spark to ever be able to catch up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21392#discussion_r190112179 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala --- @@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage */ private[sql] case object StopContinuousExecutionWrites extends EpochCoordinatorMessage +/** + * Returns boolean indicating if size of the epochs queue has exceeded maximum epoch backlog. + */ +private[sql] case object CheckIfMaxBacklogIsExceeded extends EpochCoordinatorMessage --- End diff -- I'm not sure we need to make a side-channel in the RPC handler for this. I'd try to just make the query fail when the condition is reached in the first place. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...
Github user yanlin-Lynn commented on a diff in the pull request: https://github.com/apache/spark/pull/21392#discussion_r190109933 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala --- @@ -153,9 +161,13 @@ private[continuous] class EpochCoordinator( // If not, add the epoch being currently processed to epochs waiting to be committed, // otherwise commit it. if (lastCommittedEpoch != epoch - 1) { -logDebug(s"Epoch $epoch has received commits from all partitions " + - s"and is waiting for epoch ${epoch - 1} to be committed first.") -epochsWaitingToBeCommitted.add(epoch) +if (epochsWaitingToBeCommitted.size == maxEpochBacklog) { + maxEpochBacklogExceeded = true +} else { + logDebug(s"Epoch $epoch has received commits from all partitions " + +s"and is waiting for epoch ${epoch - 1} to be committed first.") + epochsWaitingToBeCommitted.add(epoch) --- End diff -- once maxEpochBacklogExceeded is set to true, can never set to false again? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...
Github user yanlin-Lynn commented on a diff in the pull request: https://github.com/apache/spark/pull/21392#discussion_r190108352 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -233,9 +235,15 @@ class ContinuousExecution( } false } else if (isActive) { - currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch) - logInfo(s"New epoch $currentBatchId is starting.") - true + val maxBacklogExceeded = epochEndpoint.askSync[Boolean](CheckIfMaxBacklogIsExceeded) + if (maxBacklogExceeded) { +throw new IllegalStateException( + "Size of the epochs queue has exceeded maximum allowed epoch backlog.") --- End diff -- Throw exception will cause application to fail. I think it's better to block and wait old epoch to be committed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...
GitHub user efimpoberezkin opened a pull request: https://github.com/apache/spark/pull/21392 [SPARK-24063][SS] Control maximum epoch backlog for ContinuousExecution ## What changes were proposed in this pull request? This pull request adds maxEpochBacklog SQL configuration option. EpochCoordinator tracks if the length of the queue of waiting epochs has exceeded it. If so, stream is stopped with an error indicating too many epochs stacked up ## How was this patch tested? Existing unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/efimpoberezkin/spark pr/control-epoch-backlog Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21392.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21392 commit 1d2fc298284f6d553d78035f3095e5d2abe2a8a8 Author: Efim PoberezkinDate: 2018-04-25T13:01:29Z Add max epoch backlog option to SQLConf commit 0919b3f7542aa0a807b0ac56e0da1366f347bb54 Author: Efim Poberezkin Date: 2018-05-07T10:11:21Z Replace logging an error with throwing an exception --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org