Github user yanlin-Lynn commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21392#discussion_r190109933
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
    @@ -153,9 +161,13 @@ private[continuous] class EpochCoordinator(
           // If not, add the epoch being currently processed to epochs waiting 
to be committed,
           // otherwise commit it.
           if (lastCommittedEpoch != epoch - 1) {
    -        logDebug(s"Epoch $epoch has received commits from all partitions " 
+
    -          s"and is waiting for epoch ${epoch - 1} to be committed first.")
    -        epochsWaitingToBeCommitted.add(epoch)
    +        if (epochsWaitingToBeCommitted.size == maxEpochBacklog) {
    +          maxEpochBacklogExceeded = true
    +        } else {
    +          logDebug(s"Epoch $epoch has received commits from all partitions 
" +
    +            s"and is waiting for epoch ${epoch - 1} to be committed 
first.")
    +          epochsWaitingToBeCommitted.add(epoch)
    --- End diff --
    
    once maxEpochBacklogExceeded is set to true, can never set to false again?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to