Github user efimpoberezkin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21392#discussion_r190158513
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
    @@ -153,9 +161,13 @@ private[continuous] class EpochCoordinator(
           // If not, add the epoch being currently processed to epochs waiting 
to be committed,
           // otherwise commit it.
           if (lastCommittedEpoch != epoch - 1) {
    -        logDebug(s"Epoch $epoch has received commits from all partitions " 
+
    -          s"and is waiting for epoch ${epoch - 1} to be committed first.")
    -        epochsWaitingToBeCommitted.add(epoch)
    +        if (epochsWaitingToBeCommitted.size == maxEpochBacklog) {
    +          maxEpochBacklogExceeded = true
    +        } else {
    +          logDebug(s"Epoch $epoch has received commits from all partitions 
" +
    +            s"and is waiting for epoch ${epoch - 1} to be committed 
first.")
    +          epochsWaitingToBeCommitted.add(epoch)
    --- End diff --
    
    Basing on what I discussed with Jose the stream should be killed if backlog 
exceeds value of a certain config option, so yes, why set it back to false 
later. At least that's how I see it


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to