Github user yanlin-Lynn commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21392#discussion_r190108352
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
    @@ -233,9 +235,15 @@ class ContinuousExecution(
                   }
                   false
                 } else if (isActive) {
    -              currentBatchId = 
epochEndpoint.askSync[Long](IncrementAndGetEpoch)
    -              logInfo(s"New epoch $currentBatchId is starting.")
    -              true
    +              val maxBacklogExceeded = 
epochEndpoint.askSync[Boolean](CheckIfMaxBacklogIsExceeded)
    +              if (maxBacklogExceeded) {
    +                throw new IllegalStateException(
    +                  "Size of the epochs queue has exceeded maximum allowed 
epoch backlog.")
    --- End diff --
    
    Throw exception will cause application to fail.
    I think it's better to block and wait old epoch to be committed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to