Zoltán Zvara created SPARK-36240:
------------------------------------

             Summary: Graceful termination of Spark Structured Streaming queries
                 Key: SPARK-36240
                 URL: https://issues.apache.org/jira/browse/SPARK-36240
             Project: Spark
          Issue Type: New Feature
          Components: Structured Streaming
    Affects Versions: 3.1.2
         Environment: Spark Streaming provides a way to gracefully stop the 
streaming application using the configuration parameter 
{{spark.streaming.stopGracefullyOnShutdown}}. The configuration states:
{quote}If {{true}}, Spark shuts down the {{StreamingContext}} gracefully on JVM 
shutdown rather than immediately.
{quote}
This effectively stops the job generation (see {{JobGenerator}} of Spark 
Streaming) and lets the current {{Job}} (corresponding to a micro-batch) be 
finished instead of canceling the active job itself.

Some applications may require graceful stopping so that their output would 
remain consistent - an output that is written out halfway poses a lot of 
problems for applications that would require "exactly-once" semantics.

There is no support in Structured Streaming to gracefully stop 
queries/streaming applications.

Naive solutions found on the web propose checking whether the queries are 
active using {{query.isActive}} or checking query state directly and then 
attempting to call {{stop()}} at the right time. In Structured Streaming, with 
the current implementation, {{stop()}} cancels the job group that may lead to 
inconsistent output, because it still depends on the timing of the cancellation.

_Proposed solution:_

Strictly speaking in the context of the micro-batch execution model, a 
{{StreamingQuery}} that we want to gracefully stop would be of implementation 
{{MicroBatchExecution. }}The motivation is similar to that of the Streaming 
Context's gracefulness: stop the "job generation" and then wait for any active 
job to finish, instead of canceling the jobs.

The micro-batch scheduling is managed by a {{ProcessingTimeExecutor}} of the 
{{MicroBatchExecution}} class.

 
{code:java}
private val triggerExecutor = trigger match {
  case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
  case OneTimeTrigger => OneTimeExecutor()
  case _ => throw new IllegalStateException(s"Unknown type of trigger: 
$trigger")
}
{code}
The following while-true is being run be the job generation mechanism. The 
{{triggerHandler}} is a UDF that generates the micro-batches.
{code:java}
override def execute(triggerHandler: () => Boolean): Unit = {
  while (true) {
    val triggerTimeMs = clock.getTimeMillis
    val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
    val terminated = !triggerHandler()
    if (intervalMs > 0) {
      val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
      if (batchElapsedTimeMs > intervalMs) {
        notifyBatchFallingBehind(batchElapsedTimeMs)
      }
      if (terminated) {
        return
      }
      clock.waitTillTime(nextTriggerTimeMs)
    } else {
      if (terminated) {
        return
      }
    }
  }
}
{code}
Here, upon a {{gracefulStop()}} signal from the queries could essentially 
signal {{ProcessingTimeExecutor}} to stop triggering new batches.

Then another mechanism is required that would await until any current job is 
finished. Then, it would call {{stop()}} and then the {{SparkSession}} may be 
stopped as well.

 
            Reporter: Zoltán Zvara
             Fix For: 3.2.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to