Zoltán Zvara created SPARK-36240: ------------------------------------ Summary: Graceful termination of Spark Structured Streaming queries Key: SPARK-36240 URL: https://issues.apache.org/jira/browse/SPARK-36240 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 3.1.2 Environment: Spark Streaming provides a way to gracefully stop the streaming application using the configuration parameter {{spark.streaming.stopGracefullyOnShutdown}}. The configuration states: {quote}If {{true}}, Spark shuts down the {{StreamingContext}} gracefully on JVM shutdown rather than immediately. {quote} This effectively stops the job generation (see {{JobGenerator}} of Spark Streaming) and lets the current {{Job}} (corresponding to a micro-batch) be finished instead of canceling the active job itself.
Some applications may require graceful stopping so that their output would remain consistent - an output that is written out halfway poses a lot of problems for applications that would require "exactly-once" semantics. There is no support in Structured Streaming to gracefully stop queries/streaming applications. Naive solutions found on the web propose checking whether the queries are active using {{query.isActive}} or checking query state directly and then attempting to call {{stop()}} at the right time. In Structured Streaming, with the current implementation, {{stop()}} cancels the job group that may lead to inconsistent output, because it still depends on the timing of the cancellation. _Proposed solution:_ Strictly speaking in the context of the micro-batch execution model, a {{StreamingQuery}} that we want to gracefully stop would be of implementation {{MicroBatchExecution. }}The motivation is similar to that of the Streaming Context's gracefulness: stop the "job generation" and then wait for any active job to finish, instead of canceling the jobs. The micro-batch scheduling is managed by a {{ProcessingTimeExecutor}} of the {{MicroBatchExecution}} class. {code:java} private val triggerExecutor = trigger match { case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) case OneTimeTrigger => OneTimeExecutor() case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") } {code} The following while-true is being run be the job generation mechanism. The {{triggerHandler}} is a UDF that generates the micro-batches. {code:java} override def execute(triggerHandler: () => Boolean): Unit = { while (true) { val triggerTimeMs = clock.getTimeMillis val nextTriggerTimeMs = nextBatchTime(triggerTimeMs) val terminated = !triggerHandler() if (intervalMs > 0) { val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs if (batchElapsedTimeMs > intervalMs) { notifyBatchFallingBehind(batchElapsedTimeMs) } if (terminated) { return } clock.waitTillTime(nextTriggerTimeMs) } else { if (terminated) { return } } } } {code} Here, upon a {{gracefulStop()}} signal from the queries could essentially signal {{ProcessingTimeExecutor}} to stop triggering new batches. Then another mechanism is required that would await until any current job is finished. Then, it would call {{stop()}} and then the {{SparkSession}} may be stopped as well. Reporter: Zoltán Zvara Fix For: 3.2.0 -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org