Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21392#discussion_r190112873
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
    @@ -233,9 +235,15 @@ class ContinuousExecution(
                   }
                   false
                 } else if (isActive) {
    -              currentBatchId = 
epochEndpoint.askSync[Long](IncrementAndGetEpoch)
    -              logInfo(s"New epoch $currentBatchId is starting.")
    -              true
    +              val maxBacklogExceeded = 
epochEndpoint.askSync[Boolean](CheckIfMaxBacklogIsExceeded)
    +              if (maxBacklogExceeded) {
    +                throw new IllegalStateException(
    +                  "Size of the epochs queue has exceeded maximum allowed 
epoch backlog.")
    --- End diff --
    
    Agreed that the code as written won't shut down the stream. But I think it 
does make sense to kill the stream rather than waiting for old epochs. If we 
end up with a large backlog it's almost surely because some partition isn't 
making any progress, so I wouldn't expect Spark to ever be able to catch up.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to