This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 79aae64380f [SPARK-40849][SS] Async log purge 79aae64380f is described below commit 79aae64380ff83570549cb8c4ed85ffb022fc8eb Author: Jerry Peng <jerry.p...@databricks.com> AuthorDate: Mon Oct 24 11:09:40 2022 +0900 [SPARK-40849][SS] Async log purge ### What changes were proposed in this pull request? Purging old entries in both the offset log and commit log will be done asynchronously. For every micro-batch, older entries in both offset log and commit log are deleted. This is done so that the offset log and commit log do not continually grow. Please reference logic here https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L539 The time spent performing these log purges is grouped with the “walCommit” execution time in the StreamingProgressListener metrics. Around two thirds of the “walCommit” execution time is performing these purge operations thus making these operations asynchronous will also reduce latency. Also, we do not necessarily need to perform the purges every micro-batch. When these purges are executed asynchronously, they do not need to block micro-batch execution and we don’t need to start a [...] ### Why are the changes needed? Decrease microbatch processing latency ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests Closes #38313 from jerrypeng/SPARK-40849. Authored-by: Jerry Peng <jerry.p...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../scala/org/apache/spark/util/ThreadUtils.scala | 4 +- .../org/apache/spark/sql/internal/SQLConf.scala | 9 +++ .../sql/execution/streaming/AsyncLogPurge.scala | 82 +++++++++++++++++++++ .../sql/execution/streaming/ErrorNotifier.scala | 46 ++++++++++++ .../execution/streaming/MicroBatchExecution.scala | 22 +++++- .../sql/execution/streaming/StreamExecution.scala | 7 ++ .../streaming/MicroBatchExecutionSuite.scala | 85 +++++++++++++++++++++- 7 files changed, 249 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index d45dc937910..99b4e894bf0 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -162,9 +162,9 @@ private[spark] object ThreadUtils { /** * Wrapper over newSingleThreadExecutor. */ - def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = { + def newDaemonSingleThreadExecutor(threadName: String): ThreadPoolExecutor = { val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build() - Executors.newSingleThreadExecutor(threadFactory) + Executors.newFixedThreadPool(1, threadFactory).asInstanceOf[ThreadPoolExecutor] } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 72eb420de37..ebff9ce546d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1982,6 +1982,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ASYNC_LOG_PURGE = + buildConf("spark.sql.streaming.asyncLogPurge.enabled") + .internal() + .doc("When true, purging the offset log and " + + "commit log of old entries will be done asynchronously.") + .version("3.4.0") + .booleanConf + .createWithDefault(true) + val VARIABLE_SUBSTITUTE_ENABLED = buildConf("spark.sql.variable.substitute") .doc("This enables substitution using syntax like `${var}`, `${system:var}`, " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala new file mode 100644 index 00000000000..b3729dbc7b4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncLogPurge.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.ThreadUtils + +/** + * Used to enable the capability to allow log purges to be done asynchronously + */ +trait AsyncLogPurge extends Logging { + + protected var currentBatchId: Long + + protected val minLogEntriesToMaintain: Int + + + protected[sql] val errorNotifier: ErrorNotifier + + protected val sparkSession: SparkSession + + private val asyncPurgeExecutorService + = ThreadUtils.newDaemonSingleThreadExecutor("async-log-purge") + + private val purgeRunning = new AtomicBoolean(false) + + protected def purge(threshold: Long): Unit + + protected lazy val useAsyncPurge: Boolean = sparkSession.conf.get(SQLConf.ASYNC_LOG_PURGE) + + protected def purgeAsync(): Unit = { + if (purgeRunning.compareAndSet(false, true)) { + // save local copy because currentBatchId may get updated. There are not really + // any concurrency issues here in regards to calculating the purge threshold + // but for the sake of defensive coding lets make a copy + val currentBatchIdCopy: Long = currentBatchId + asyncPurgeExecutorService.execute(() => { + try { + purge(currentBatchIdCopy - minLogEntriesToMaintain) + } catch { + case throwable: Throwable => + logError("Encountered error while performing async log purge", throwable) + errorNotifier.markError(throwable) + } finally { + purgeRunning.set(false) + } + }) + } else { + log.debug("Skipped log purging since there is already one in progress.") + } + } + + protected def asyncLogPurgeShutdown(): Unit = { + ThreadUtils.shutdown(asyncPurgeExecutorService) + } + + // used for testing + private[sql] def arePendingAsyncPurge: Boolean = { + purgeRunning.get() || + asyncPurgeExecutorService.getQueue.size() > 0 || + asyncPurgeExecutorService.getActiveCount > 0 + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ErrorNotifier.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ErrorNotifier.scala new file mode 100644 index 00000000000..0f25d0667a0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ErrorNotifier.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.atomic.AtomicReference + +import org.apache.spark.internal.Logging + +/** + * Class to notify of any errors that might have occurred out of band + */ +class ErrorNotifier extends Logging { + + private val error = new AtomicReference[Throwable] + + /** To indicate any errors that have occurred */ + def markError(th: Throwable): Unit = { + logError("A fatal error has occurred.", th) + error.set(th) + } + + /** Get any errors that have occurred */ + def getError(): Option[Throwable] = { + Option(error.get()) + } + + /** Throw errors that have occurred */ + def throwErrorIfExists(): Unit = { + getError().foreach({th => throw th}) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 153bc82f892..5f8fb93827b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -46,7 +46,9 @@ class MicroBatchExecution( plan: WriteToStream) extends StreamExecution( sparkSession, plan.name, plan.resolvedCheckpointLocation, plan.inputQuery, plan.sink, trigger, - triggerClock, plan.outputMode, plan.deleteCheckpointOnStop) { + triggerClock, plan.outputMode, plan.deleteCheckpointOnStop) with AsyncLogPurge { + + protected[sql] val errorNotifier = new ErrorNotifier() @volatile protected var sources: Seq[SparkDataStream] = Seq.empty @@ -210,6 +212,14 @@ class MicroBatchExecution( logInfo(s"Query $prettyIdString was stopped") } + override def cleanup(): Unit = { + super.cleanup() + + // shutdown and cleanup required for async log purge mechanism + asyncLogPurgeShutdown() + logInfo(s"Async log purge executor pool for query ${prettyIdString} has been shutdown") + } + /** Begins recording statistics about query progress for a given trigger. */ override protected def startTrigger(): Unit = { super.startTrigger() @@ -226,6 +236,10 @@ class MicroBatchExecution( triggerExecutor.execute(() => { if (isActive) { + + // check if there are any previous errors and bubble up any existing async operations + errorNotifier.throwErrorIfExists + var currentBatchHasNewData = false // Whether the current batch had new data startTrigger() @@ -536,7 +550,11 @@ class MicroBatchExecution( // It is now safe to discard the metadata beyond the minimum number to retain. // Note that purge is exclusive, i.e. it purges everything before the target ID. if (minLogEntriesToMaintain < currentBatchId) { - purge(currentBatchId - minLogEntriesToMaintain) + if (useAsyncPurge) { + purgeAsync() + } else { + purge(currentBatchId - minLogEntriesToMaintain) + } } } noNewData = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index eeaa37aa7ff..5afd744f5e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -347,6 +347,7 @@ abstract class StreamExecution( try { stopSources() + cleanup() state.set(TERMINATED) currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false) @@ -410,6 +411,12 @@ abstract class StreamExecution( } } + + /** + * Any clean up that needs to happen when the query is stopped or exits + */ + protected def cleanup(): Unit = {} + /** * Interrupts the query execution thread and awaits its termination until until it exceeds the * timeout. The timeout can be set on "spark.sql.streaming.stopTimeout". diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala index 749ca9d06ea..0ddd48420ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala @@ -21,17 +21,20 @@ import java.io.File import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should._ +import org.scalatest.time.{Seconds, Span} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.connector.read.streaming import org.apache.spark.sql.connector.read.streaming.SparkDataStream import org.apache.spark.sql.functions.{count, timestamp_seconds, window} -import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest, Trigger} import org.apache.spark.sql.types.{LongType, StructType} import org.apache.spark.util.Utils -class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter { +class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter with Matchers { import testImplicits._ @@ -39,6 +42,84 @@ class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter { sqlContext.streams.active.foreach(_.stop()) } + def getListOfFiles(dir: String): List[File] = { + val d = new File(dir) + if (d.exists && d.isDirectory) { + d.listFiles.filter(_.isFile).toList + } else { + List[File]() + } + } + + test("async log purging") { + withSQLConf(SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2", SQLConf.ASYNC_LOG_PURGE.key -> "true") { + withTempDir { checkpointLocation => + val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) + val ds = inputData.toDS() + testStream(ds)( + StartStream(checkpointLocation = checkpointLocation.getCanonicalPath), + AddData(inputData, 0), + CheckNewAnswer(0), + AddData(inputData, 1), + CheckNewAnswer(1), + Execute { q => + getListOfFiles(checkpointLocation + "/offsets") + .filter(file => !file.isHidden) + .map(file => file.getName.toInt) + .sorted should equal(Array(0, 1)) + getListOfFiles(checkpointLocation + "/commits") + .filter(file => !file.isHidden) + .map(file => file.getName.toInt) + .sorted should equal(Array(0, 1)) + }, + AddData(inputData, 2), + CheckNewAnswer(2), + AddData(inputData, 3), + CheckNewAnswer(3), + Execute { q => + eventually(timeout(Span(5, Seconds))) { + q.asInstanceOf[MicroBatchExecution].arePendingAsyncPurge should be(false) + } + + getListOfFiles(checkpointLocation + "/offsets") + .filter(file => !file.isHidden) + .map(file => file.getName.toInt) + .sorted should equal(Array(1, 2, 3)) + getListOfFiles(checkpointLocation + "/commits") + .filter(file => !file.isHidden) + .map(file => file.getName.toInt) + .sorted should equal(Array(1, 2, 3)) + }, + StopStream + ) + } + } + } + + test("error notifier test") { + withSQLConf(SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2", SQLConf.ASYNC_LOG_PURGE.key -> "true") { + withTempDir { checkpointLocation => + val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext) + val ds = inputData.toDS() + val e = intercept[StreamingQueryException] { + + testStream(ds)( + StartStream(checkpointLocation = checkpointLocation.getCanonicalPath), + AddData(inputData, 0), + CheckNewAnswer(0), + AddData(inputData, 1), + CheckNewAnswer(1), + Execute { q => + q.asInstanceOf[MicroBatchExecution].errorNotifier.markError(new Exception("test")) + }, + AddData(inputData, 2), + CheckNewAnswer(2)) + } + e.getCause.getMessage should include("test") + } + } + } + test("SPARK-24156: do not plan a no-data batch again after it has already been planned") { val inputData = MemoryStream[Int] val df = inputData.toDF() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org