Github user efimpoberezkin commented on a diff in the pull request: https://github.com/apache/spark/pull/21392#discussion_r190158513 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala --- @@ -153,9 +161,13 @@ private[continuous] class EpochCoordinator( // If not, add the epoch being currently processed to epochs waiting to be committed, // otherwise commit it. if (lastCommittedEpoch != epoch - 1) { - logDebug(s"Epoch $epoch has received commits from all partitions " + - s"and is waiting for epoch ${epoch - 1} to be committed first.") - epochsWaitingToBeCommitted.add(epoch) + if (epochsWaitingToBeCommitted.size == maxEpochBacklog) { + maxEpochBacklogExceeded = true + } else { + logDebug(s"Epoch $epoch has received commits from all partitions " + + s"and is waiting for epoch ${epoch - 1} to be committed first.") + epochsWaitingToBeCommitted.add(epoch) --- End diff -- Basing on what I discussed with Jose the stream should be killed if backlog exceeds value of a certain config option, so yes, why set it back to false later. At least that's how I see it
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org