[ https://issues.apache.org/jira/browse/SPARK-36240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17384755#comment-17384755 ]
Zoltán Zvara commented on SPARK-36240: -------------------------------------- Based on suggestions and guidance of the Spark community, we would be happy to implement this feature to Spark Structured Streaming. > Graceful termination of Spark Structured Streaming queries > ---------------------------------------------------------- > > Key: SPARK-36240 > URL: https://issues.apache.org/jira/browse/SPARK-36240 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming > Affects Versions: 3.1.2 > Environment: > > Reporter: Zoltán Zvara > Priority: Major > Fix For: 3.2.0 > > > Spark Streaming provides a way to gracefully stop the streaming application > using the configuration parameter > {{spark.streaming.stopGracefullyOnShutdown}}. The configuration states: > {quote}If {{true}}, Spark shuts down the {{StreamingContext}} gracefully on > JVM shutdown rather than immediately. > {quote} > This effectively stops the job generation (see {{JobGenerator}} of Spark > Streaming) and lets the current {{Job}} (corresponding to a micro-batch) be > finished instead of canceling the active job itself. > Some applications may require graceful stopping so that their output would > remain consistent - an output that is written out halfway poses a lot of > problems for applications that would require "exactly-once" semantics. > There is no support in Structured Streaming to gracefully stop > queries/streaming applications. > Naive solutions found on the web propose checking whether the queries are > active using {{query.isActive}} or checking query state directly and then > attempting to call {{stop()}} at the right time. In Structured Streaming, > with the current implementation, {{stop()}} cancels the job group that may > lead to inconsistent output, because it still depends on the timing of the > cancellation. > _Proposed solution:_ > Strictly speaking in the context of the micro-batch execution model, a > {{StreamingQuery}} that we want to gracefully stop would be of implementation > \{{MicroBatchExecution. }}The motivation is similar to that of the Streaming > Context's gracefulness: stop the "job generation" and then wait for any > active job to finish, instead of canceling the jobs. > The micro-batch scheduling is managed by a {{ProcessingTimeExecutor}} of the > {{MicroBatchExecution}} class. > > {code:java} > private val triggerExecutor = trigger match { > case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) > case OneTimeTrigger => OneTimeExecutor() > case _ => throw new IllegalStateException(s"Unknown type of trigger: > $trigger") > } > {code} > The following while-true is being run be the job generation mechanism. The > {{triggerHandler}} is a UDF that generates the micro-batches. > {code:java} > override def execute(triggerHandler: () => Boolean): Unit = { > while (true) { > val triggerTimeMs = clock.getTimeMillis > val nextTriggerTimeMs = nextBatchTime(triggerTimeMs) > val terminated = !triggerHandler() > if (intervalMs > 0) { > val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs > if (batchElapsedTimeMs > intervalMs) { > notifyBatchFallingBehind(batchElapsedTimeMs) > } > if (terminated) { > return > } > clock.waitTillTime(nextTriggerTimeMs) > } else { > if (terminated) { > return > } > } > } > } > {code} > Here, upon a {{gracefulStop()}} signal from the queries could essentially > signal {{ProcessingTimeExecutor}} to stop triggering new batches. > Then another mechanism is required that would await until any current job is > finished. Then, it would call {{stop()}} and then the {{SparkSession}} may be > stopped as well. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org