HeartSaVioR commented on code in PR #38517: URL: https://github.com/apache/spark/pull/38517#discussion_r1043131174
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = + "asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = + "_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { + extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, + "1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( + sparkSession: SparkSession, + trigger: Trigger, + triggerClock: Clock, + extraOptions: Map[String, String], + plan: WriteToStream) + extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { + + protected val asyncProgressTrackingCheckpointingIntervalMs: Long + = AsyncProgressTrackingMicroBatchExecution + .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions) + + // Offsets that are ready to be committed by the source. + // This is needed so that we can call source commit in the same thread as micro-batch execution + // to be thread safe + private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]() + + // to cache the batch id of the last batch written to storage + private val lastBatchPersistedToDurableStorage = new AtomicLong(-1) + + override val triggerExecutor: TriggerExecutor = validateAndGetTrigger() + + private var isFirstBatch: Boolean = true + + // thread pool is only one thread because we want offset + // writes to execute in order in a serialized fashion + protected val asyncWritesExecutorService + = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler( + "async-log-write", + 2, // one for offset commit and one for completion commit + new RejectedExecutionHandler() { + override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = { + try { + if (!executor.isShutdown) { + val start = System.currentTimeMillis() + executor.getQueue.put(r) + logDebug( + s"Async write paused execution for " + + s"${System.currentTimeMillis() - start} due to task queue being full." + ) + } + } catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + throw new RejectedExecutionException("Producer interrupted", e) + case e: Throwable => + logError("Encountered error in async write executor service", e) + errorNotifier.markError(e) + } + } + }) + + override val offsetLog = new AsyncOffsetSeqLog( + sparkSession, + checkpointFile("offsets"), + asyncWritesExecutorService, + asyncProgressTrackingCheckpointingIntervalMs, + clock = triggerClock + ) + + override val commitLog = + new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService) + + override def markMicroBatchExecutionStart(): Unit = { + // check if pipeline is stateful + checkNotStatefulPipeline + } + + override def cleanUpLastExecutedMicroBatch(): Unit = { + // this is a no op for async progress tracking since we only want to commit sources only + // after the offset WAL commit has be successfully written + } + + /** + * Should not call super method as we need to do something completely different + * in this method for async progress tracking + */ + override def markMicroBatchStart(): Unit = { + // Because we are using a thread pool with only one thread, async writes to the offset log + // are still written in a serial / in order fashion + offsetLog + .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)) + .thenAccept(tuple => { + val (batchId, persistedToDurableStorage) = tuple + if (persistedToDurableStorage) { + + // batch id cache not initialized + if (lastBatchPersistedToDurableStorage.get == -1) { + lastBatchPersistedToDurableStorage.set( + offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1)) + } + + if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) { + // sanity check to make sure batch ids are monotonically increasing + assert(lastBatchPersistedToDurableStorage.get < batchId) + val prevBatchOff = offsetLog.get(lastBatchPersistedToDurableStorage.get()) + if (prevBatchOff.isDefined) { + // Offset is ready to be committed by the source. Add to queue + sourceCommitQueue.add(prevBatchOff.get) + } else { + throw new IllegalStateException( + s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't exist" + ) + } + } + lastBatchPersistedToDurableStorage.set(batchId) + } + }) + .exceptionally((th: Throwable) => { + logError("Encountered error while performing async offset write", th) + errorNotifier.markError(th) + return + }) + + // check if there are offsets that are ready to be committed by the source + var offset = sourceCommitQueue.poll() + while (offset != null) { + commitSources(offset) + offset = sourceCommitQueue.poll() + } + } + + override def markMicroBatchEnd(): Unit = { + watermarkTracker.updateWatermark(lastExecution.executedPlan) + reportTimeTaken("commitOffsets") { + // check if current batch there is a async write for the offset log is issued for this batch + // if so, we should do the same for commit log + if (!offsetLog.getAsyncOffsetWrite(currentBatchId).isEmpty) { + commitLog + .addAsync(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)) + .exceptionally((th: Throwable) => { + logError("Got exception during async write", th) + errorNotifier.markError(th) + return + }) + } else { + if (!commitLog.addInMemory( + currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) { + throw new IllegalStateException( + s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId" + ) + } + } + offsetLog.removeAsyncOffsetWrite(currentBatchId) + } + committedOffsets ++= availableOffsets + } + + // need to look at the number of files on disk + override def purge(threshold: Long): Unit = { + while (offsetLog.writtenToDurableStorage.size() > minLogEntriesToMaintain) { + offsetLog.writtenToDurableStorage.poll() + } + offsetLog.purge(offsetLog.writtenToDurableStorage.peek()) + + while (commitLog.writtenToDurableStorage.size() > minLogEntriesToMaintain) { + commitLog.writtenToDurableStorage.poll() + } + commitLog.purge(commitLog.writtenToDurableStorage.peek()) + } + + override def cleanup(): Unit = { + super.cleanup() + + ThreadUtils.shutdown(asyncWritesExecutorService) + logInfo(s"Async progress tracking executor pool for query ${prettyIdString} has been shutdown") + } + + // used for testing + def areWritesPendingOrInProgress(): Boolean = { + asyncWritesExecutorService.getQueue.size() > 0 || asyncWritesExecutorService.getActiveCount > 0 + } + + private def validateAndGetTrigger(): TriggerExecutor = { + // validate that the pipeline is using a supported sink + if (!extraOptions + .get( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK Review Comment: Also, just to be clear, this option is for advanced usage and we won't document this explicitly, do I understand correctly? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = + "asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = + "_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { + extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, + "1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( + sparkSession: SparkSession, + trigger: Trigger, + triggerClock: Clock, + extraOptions: Map[String, String], + plan: WriteToStream) + extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { + + protected val asyncProgressTrackingCheckpointingIntervalMs: Long + = AsyncProgressTrackingMicroBatchExecution + .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions) + + // Offsets that are ready to be committed by the source. + // This is needed so that we can call source commit in the same thread as micro-batch execution + // to be thread safe + private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]() + + // to cache the batch id of the last batch written to storage + private val lastBatchPersistedToDurableStorage = new AtomicLong(-1) + + override val triggerExecutor: TriggerExecutor = validateAndGetTrigger() + + private var isFirstBatch: Boolean = true + + // thread pool is only one thread because we want offset + // writes to execute in order in a serialized fashion + protected val asyncWritesExecutorService + = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler( + "async-log-write", + 2, // one for offset commit and one for completion commit + new RejectedExecutionHandler() { + override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = { + try { + if (!executor.isShutdown) { + val start = System.currentTimeMillis() + executor.getQueue.put(r) + logDebug( + s"Async write paused execution for " + + s"${System.currentTimeMillis() - start} due to task queue being full." + ) + } + } catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + throw new RejectedExecutionException("Producer interrupted", e) + case e: Throwable => + logError("Encountered error in async write executor service", e) + errorNotifier.markError(e) + } + } + }) + + override val offsetLog = new AsyncOffsetSeqLog( + sparkSession, + checkpointFile("offsets"), + asyncWritesExecutorService, + asyncProgressTrackingCheckpointingIntervalMs, + clock = triggerClock + ) + + override val commitLog = + new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService) + + override def markMicroBatchExecutionStart(): Unit = { + // check if pipeline is stateful + checkNotStatefulPipeline + } + + override def cleanUpLastExecutedMicroBatch(): Unit = { + // this is a no op for async progress tracking since we only want to commit sources only + // after the offset WAL commit has be successfully written + } + + /** + * Should not call super method as we need to do something completely different + * in this method for async progress tracking + */ + override def markMicroBatchStart(): Unit = { + // Because we are using a thread pool with only one thread, async writes to the offset log + // are still written in a serial / in order fashion + offsetLog + .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)) + .thenAccept(tuple => { + val (batchId, persistedToDurableStorage) = tuple + if (persistedToDurableStorage) { + + // batch id cache not initialized + if (lastBatchPersistedToDurableStorage.get == -1) { + lastBatchPersistedToDurableStorage.set( + offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1)) + } + + if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) { + // sanity check to make sure batch ids are monotonically increasing + assert(lastBatchPersistedToDurableStorage.get < batchId) + val prevBatchOff = offsetLog.get(lastBatchPersistedToDurableStorage.get()) + if (prevBatchOff.isDefined) { + // Offset is ready to be committed by the source. Add to queue + sourceCommitQueue.add(prevBatchOff.get) + } else { + throw new IllegalStateException( + s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't exist" + ) + } + } + lastBatchPersistedToDurableStorage.set(batchId) + } + }) + .exceptionally((th: Throwable) => { + logError("Encountered error while performing async offset write", th) + errorNotifier.markError(th) + return + }) + + // check if there are offsets that are ready to be committed by the source + var offset = sourceCommitQueue.poll() + while (offset != null) { + commitSources(offset) + offset = sourceCommitQueue.poll() + } + } + + override def markMicroBatchEnd(): Unit = { + watermarkTracker.updateWatermark(lastExecution.executedPlan) + reportTimeTaken("commitOffsets") { + // check if current batch there is a async write for the offset log is issued for this batch + // if so, we should do the same for commit log + if (!offsetLog.getAsyncOffsetWrite(currentBatchId).isEmpty) { Review Comment: nit: simply use `nonEmpty` rather than ! + isEmpty? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = + "asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = + "_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { + extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, + "1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( + sparkSession: SparkSession, + trigger: Trigger, + triggerClock: Clock, + extraOptions: Map[String, String], + plan: WriteToStream) + extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { + + protected val asyncProgressTrackingCheckpointingIntervalMs: Long + = AsyncProgressTrackingMicroBatchExecution + .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions) + + // Offsets that are ready to be committed by the source. + // This is needed so that we can call source commit in the same thread as micro-batch execution + // to be thread safe + private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]() + + // to cache the batch id of the last batch written to storage + private val lastBatchPersistedToDurableStorage = new AtomicLong(-1) + + override val triggerExecutor: TriggerExecutor = validateAndGetTrigger() + + private var isFirstBatch: Boolean = true + + // thread pool is only one thread because we want offset + // writes to execute in order in a serialized fashion + protected val asyncWritesExecutorService + = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler( + "async-log-write", + 2, // one for offset commit and one for completion commit + new RejectedExecutionHandler() { + override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = { + try { + if (!executor.isShutdown) { + val start = System.currentTimeMillis() + executor.getQueue.put(r) + logDebug( + s"Async write paused execution for " + + s"${System.currentTimeMillis() - start} due to task queue being full." + ) + } + } catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + throw new RejectedExecutionException("Producer interrupted", e) + case e: Throwable => + logError("Encountered error in async write executor service", e) + errorNotifier.markError(e) + } + } + }) + + override val offsetLog = new AsyncOffsetSeqLog( + sparkSession, + checkpointFile("offsets"), + asyncWritesExecutorService, + asyncProgressTrackingCheckpointingIntervalMs, + clock = triggerClock + ) + + override val commitLog = + new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService) + + override def markMicroBatchExecutionStart(): Unit = { + // check if pipeline is stateful + checkNotStatefulPipeline + } + + override def cleanUpLastExecutedMicroBatch(): Unit = { + // this is a no op for async progress tracking since we only want to commit sources only + // after the offset WAL commit has be successfully written + } + + /** + * Should not call super method as we need to do something completely different + * in this method for async progress tracking + */ + override def markMicroBatchStart(): Unit = { + // Because we are using a thread pool with only one thread, async writes to the offset log + // are still written in a serial / in order fashion + offsetLog + .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)) + .thenAccept(tuple => { + val (batchId, persistedToDurableStorage) = tuple + if (persistedToDurableStorage) { + + // batch id cache not initialized + if (lastBatchPersistedToDurableStorage.get == -1) { + lastBatchPersistedToDurableStorage.set( + offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1)) + } + + if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) { + // sanity check to make sure batch ids are monotonically increasing + assert(lastBatchPersistedToDurableStorage.get < batchId) + val prevBatchOff = offsetLog.get(lastBatchPersistedToDurableStorage.get()) + if (prevBatchOff.isDefined) { + // Offset is ready to be committed by the source. Add to queue + sourceCommitQueue.add(prevBatchOff.get) + } else { + throw new IllegalStateException( + s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't exist" + ) + } + } + lastBatchPersistedToDurableStorage.set(batchId) + } + }) + .exceptionally((th: Throwable) => { + logError("Encountered error while performing async offset write", th) + errorNotifier.markError(th) + return + }) + + // check if there are offsets that are ready to be committed by the source + var offset = sourceCommitQueue.poll() + while (offset != null) { + commitSources(offset) + offset = sourceCommitQueue.poll() + } + } + + override def markMicroBatchEnd(): Unit = { + watermarkTracker.updateWatermark(lastExecution.executedPlan) + reportTimeTaken("commitOffsets") { + // check if current batch there is a async write for the offset log is issued for this batch + // if so, we should do the same for commit log + if (!offsetLog.getAsyncOffsetWrite(currentBatchId).isEmpty) { + commitLog + .addAsync(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)) + .exceptionally((th: Throwable) => { + logError("Got exception during async write", th) + errorNotifier.markError(th) + return + }) + } else { + if (!commitLog.addInMemory( + currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) { + throw new IllegalStateException( + s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId" Review Comment: Let's mention "commit" log specifically. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = + "asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = + "_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { + extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, + "1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( + sparkSession: SparkSession, + trigger: Trigger, + triggerClock: Clock, + extraOptions: Map[String, String], + plan: WriteToStream) + extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { + + protected val asyncProgressTrackingCheckpointingIntervalMs: Long + = AsyncProgressTrackingMicroBatchExecution + .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions) + + // Offsets that are ready to be committed by the source. + // This is needed so that we can call source commit in the same thread as micro-batch execution + // to be thread safe + private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]() + + // to cache the batch id of the last batch written to storage + private val lastBatchPersistedToDurableStorage = new AtomicLong(-1) + + override val triggerExecutor: TriggerExecutor = validateAndGetTrigger() + + private var isFirstBatch: Boolean = true + + // thread pool is only one thread because we want offset + // writes to execute in order in a serialized fashion + protected val asyncWritesExecutorService + = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler( + "async-log-write", + 2, // one for offset commit and one for completion commit + new RejectedExecutionHandler() { + override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = { + try { + if (!executor.isShutdown) { + val start = System.currentTimeMillis() + executor.getQueue.put(r) + logDebug( + s"Async write paused execution for " + + s"${System.currentTimeMillis() - start} due to task queue being full." + ) + } + } catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + throw new RejectedExecutionException("Producer interrupted", e) + case e: Throwable => + logError("Encountered error in async write executor service", e) + errorNotifier.markError(e) + } + } + }) + + override val offsetLog = new AsyncOffsetSeqLog( + sparkSession, + checkpointFile("offsets"), + asyncWritesExecutorService, + asyncProgressTrackingCheckpointingIntervalMs, + clock = triggerClock + ) + + override val commitLog = + new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService) + + override def markMicroBatchExecutionStart(): Unit = { + // check if pipeline is stateful + checkNotStatefulPipeline + } + + override def cleanUpLastExecutedMicroBatch(): Unit = { + // this is a no op for async progress tracking since we only want to commit sources only + // after the offset WAL commit has be successfully written + } + + /** + * Should not call super method as we need to do something completely different + * in this method for async progress tracking + */ + override def markMicroBatchStart(): Unit = { + // Because we are using a thread pool with only one thread, async writes to the offset log + // are still written in a serial / in order fashion + offsetLog + .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)) + .thenAccept(tuple => { + val (batchId, persistedToDurableStorage) = tuple + if (persistedToDurableStorage) { + + // batch id cache not initialized + if (lastBatchPersistedToDurableStorage.get == -1) { + lastBatchPersistedToDurableStorage.set( + offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1)) + } + + if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) { + // sanity check to make sure batch ids are monotonically increasing + assert(lastBatchPersistedToDurableStorage.get < batchId) + val prevBatchOff = offsetLog.get(lastBatchPersistedToDurableStorage.get()) + if (prevBatchOff.isDefined) { + // Offset is ready to be committed by the source. Add to queue + sourceCommitQueue.add(prevBatchOff.get) + } else { + throw new IllegalStateException( + s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't exist" + ) + } + } + lastBatchPersistedToDurableStorage.set(batchId) + } + }) + .exceptionally((th: Throwable) => { + logError("Encountered error while performing async offset write", th) Review Comment: Let's add batch ID as well. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = + "asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = + "_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { + extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, + "1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( + sparkSession: SparkSession, + trigger: Trigger, + triggerClock: Clock, + extraOptions: Map[String, String], + plan: WriteToStream) + extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { + + protected val asyncProgressTrackingCheckpointingIntervalMs: Long + = AsyncProgressTrackingMicroBatchExecution + .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions) + + // Offsets that are ready to be committed by the source. + // This is needed so that we can call source commit in the same thread as micro-batch execution + // to be thread safe + private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]() + + // to cache the batch id of the last batch written to storage + private val lastBatchPersistedToDurableStorage = new AtomicLong(-1) + + override val triggerExecutor: TriggerExecutor = validateAndGetTrigger() + + private var isFirstBatch: Boolean = true + + // thread pool is only one thread because we want offset + // writes to execute in order in a serialized fashion + protected val asyncWritesExecutorService + = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler( + "async-log-write", + 2, // one for offset commit and one for completion commit + new RejectedExecutionHandler() { + override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = { + try { + if (!executor.isShutdown) { + val start = System.currentTimeMillis() + executor.getQueue.put(r) + logDebug( + s"Async write paused execution for " + + s"${System.currentTimeMillis() - start} due to task queue being full." + ) + } + } catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + throw new RejectedExecutionException("Producer interrupted", e) + case e: Throwable => + logError("Encountered error in async write executor service", e) + errorNotifier.markError(e) + } + } + }) + + override val offsetLog = new AsyncOffsetSeqLog( + sparkSession, + checkpointFile("offsets"), + asyncWritesExecutorService, + asyncProgressTrackingCheckpointingIntervalMs, + clock = triggerClock + ) + + override val commitLog = + new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService) + + override def markMicroBatchExecutionStart(): Unit = { + // check if pipeline is stateful + checkNotStatefulPipeline + } + + override def cleanUpLastExecutedMicroBatch(): Unit = { + // this is a no op for async progress tracking since we only want to commit sources only + // after the offset WAL commit has be successfully written + } + + /** + * Should not call super method as we need to do something completely different + * in this method for async progress tracking + */ + override def markMicroBatchStart(): Unit = { + // Because we are using a thread pool with only one thread, async writes to the offset log + // are still written in a serial / in order fashion + offsetLog + .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)) + .thenAccept(tuple => { + val (batchId, persistedToDurableStorage) = tuple + if (persistedToDurableStorage) { + Review Comment: nit: maybe unnecessary empty new line ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = + "asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = + "_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { + extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, + "1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( + sparkSession: SparkSession, + trigger: Trigger, + triggerClock: Clock, + extraOptions: Map[String, String], + plan: WriteToStream) + extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { + + protected val asyncProgressTrackingCheckpointingIntervalMs: Long + = AsyncProgressTrackingMicroBatchExecution + .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions) + + // Offsets that are ready to be committed by the source. + // This is needed so that we can call source commit in the same thread as micro-batch execution + // to be thread safe + private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]() + + // to cache the batch id of the last batch written to storage + private val lastBatchPersistedToDurableStorage = new AtomicLong(-1) + + override val triggerExecutor: TriggerExecutor = validateAndGetTrigger() + + private var isFirstBatch: Boolean = true + + // thread pool is only one thread because we want offset + // writes to execute in order in a serialized fashion + protected val asyncWritesExecutorService + = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler( + "async-log-write", + 2, // one for offset commit and one for completion commit + new RejectedExecutionHandler() { + override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = { + try { + if (!executor.isShutdown) { + val start = System.currentTimeMillis() + executor.getQueue.put(r) + logDebug( + s"Async write paused execution for " + + s"${System.currentTimeMillis() - start} due to task queue being full." + ) + } + } catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + throw new RejectedExecutionException("Producer interrupted", e) + case e: Throwable => + logError("Encountered error in async write executor service", e) + errorNotifier.markError(e) + } + } + }) + + override val offsetLog = new AsyncOffsetSeqLog( + sparkSession, + checkpointFile("offsets"), + asyncWritesExecutorService, + asyncProgressTrackingCheckpointingIntervalMs, + clock = triggerClock + ) + + override val commitLog = + new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService) + + override def markMicroBatchExecutionStart(): Unit = { + // check if pipeline is stateful Review Comment: There seems to be a term issue. https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html We have zero mention of pipeline in the guide doc. It's just a "query", specifically, "streaming query". ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = + "asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = + "_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { + extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, + "1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( + sparkSession: SparkSession, + trigger: Trigger, + triggerClock: Clock, + extraOptions: Map[String, String], + plan: WriteToStream) + extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { + + protected val asyncProgressTrackingCheckpointingIntervalMs: Long + = AsyncProgressTrackingMicroBatchExecution + .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions) + + // Offsets that are ready to be committed by the source. + // This is needed so that we can call source commit in the same thread as micro-batch execution + // to be thread safe + private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]() + + // to cache the batch id of the last batch written to storage + private val lastBatchPersistedToDurableStorage = new AtomicLong(-1) + + override val triggerExecutor: TriggerExecutor = validateAndGetTrigger() + + private var isFirstBatch: Boolean = true Review Comment: The point here is not whether the batch is first one from current run. The point here is whether we checked the query is stateless already or not. Shall we rename this accordingly? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = + "asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = + "_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { + extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, + "1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( + sparkSession: SparkSession, + trigger: Trigger, + triggerClock: Clock, + extraOptions: Map[String, String], + plan: WriteToStream) + extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { + + protected val asyncProgressTrackingCheckpointingIntervalMs: Long + = AsyncProgressTrackingMicroBatchExecution + .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions) + + // Offsets that are ready to be committed by the source. + // This is needed so that we can call source commit in the same thread as micro-batch execution + // to be thread safe + private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]() + + // to cache the batch id of the last batch written to storage + private val lastBatchPersistedToDurableStorage = new AtomicLong(-1) + + override val triggerExecutor: TriggerExecutor = validateAndGetTrigger() + + private var isFirstBatch: Boolean = true + + // thread pool is only one thread because we want offset + // writes to execute in order in a serialized fashion + protected val asyncWritesExecutorService + = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler( + "async-log-write", + 2, // one for offset commit and one for completion commit + new RejectedExecutionHandler() { + override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = { + try { + if (!executor.isShutdown) { + val start = System.currentTimeMillis() + executor.getQueue.put(r) + logDebug( + s"Async write paused execution for " + + s"${System.currentTimeMillis() - start} due to task queue being full." + ) + } + } catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + throw new RejectedExecutionException("Producer interrupted", e) + case e: Throwable => + logError("Encountered error in async write executor service", e) + errorNotifier.markError(e) + } + } + }) + + override val offsetLog = new AsyncOffsetSeqLog( + sparkSession, + checkpointFile("offsets"), + asyncWritesExecutorService, + asyncProgressTrackingCheckpointingIntervalMs, + clock = triggerClock + ) + + override val commitLog = + new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService) + + override def markMicroBatchExecutionStart(): Unit = { + // check if pipeline is stateful + checkNotStatefulPipeline + } + + override def cleanUpLastExecutedMicroBatch(): Unit = { + // this is a no op for async progress tracking since we only want to commit sources only + // after the offset WAL commit has be successfully written + } + + /** + * Should not call super method as we need to do something completely different + * in this method for async progress tracking + */ + override def markMicroBatchStart(): Unit = { + // Because we are using a thread pool with only one thread, async writes to the offset log + // are still written in a serial / in order fashion + offsetLog + .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)) + .thenAccept(tuple => { + val (batchId, persistedToDurableStorage) = tuple + if (persistedToDurableStorage) { + + // batch id cache not initialized + if (lastBatchPersistedToDurableStorage.get == -1) { + lastBatchPersistedToDurableStorage.set( + offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1)) + } + + if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) { + // sanity check to make sure batch ids are monotonically increasing + assert(lastBatchPersistedToDurableStorage.get < batchId) + val prevBatchOff = offsetLog.get(lastBatchPersistedToDurableStorage.get()) + if (prevBatchOff.isDefined) { + // Offset is ready to be committed by the source. Add to queue + sourceCommitQueue.add(prevBatchOff.get) + } else { + throw new IllegalStateException( + s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't exist" + ) + } + } + lastBatchPersistedToDurableStorage.set(batchId) + } + }) + .exceptionally((th: Throwable) => { + logError("Encountered error while performing async offset write", th) + errorNotifier.markError(th) + return + }) + + // check if there are offsets that are ready to be committed by the source + var offset = sourceCommitQueue.poll() + while (offset != null) { + commitSources(offset) + offset = sourceCommitQueue.poll() + } + } + + override def markMicroBatchEnd(): Unit = { + watermarkTracker.updateWatermark(lastExecution.executedPlan) + reportTimeTaken("commitOffsets") { + // check if current batch there is a async write for the offset log is issued for this batch + // if so, we should do the same for commit log + if (!offsetLog.getAsyncOffsetWrite(currentBatchId).isEmpty) { + commitLog + .addAsync(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)) + .exceptionally((th: Throwable) => { + logError("Got exception during async write", th) Review Comment: Could we be more specific for the error log message? I expect the error message to provide the information that the error happened during write for commit log, and which batch ID. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = + "asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = + "_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { + extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, + "1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( + sparkSession: SparkSession, + trigger: Trigger, + triggerClock: Clock, + extraOptions: Map[String, String], + plan: WriteToStream) + extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { + + protected val asyncProgressTrackingCheckpointingIntervalMs: Long + = AsyncProgressTrackingMicroBatchExecution + .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions) + + // Offsets that are ready to be committed by the source. + // This is needed so that we can call source commit in the same thread as micro-batch execution + // to be thread safe + private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]() + + // to cache the batch id of the last batch written to storage + private val lastBatchPersistedToDurableStorage = new AtomicLong(-1) + + override val triggerExecutor: TriggerExecutor = validateAndGetTrigger() + + private var isFirstBatch: Boolean = true + + // thread pool is only one thread because we want offset + // writes to execute in order in a serialized fashion + protected val asyncWritesExecutorService + = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler( + "async-log-write", + 2, // one for offset commit and one for completion commit + new RejectedExecutionHandler() { + override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = { + try { + if (!executor.isShutdown) { + val start = System.currentTimeMillis() + executor.getQueue.put(r) + logDebug( + s"Async write paused execution for " + + s"${System.currentTimeMillis() - start} due to task queue being full." + ) + } + } catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + throw new RejectedExecutionException("Producer interrupted", e) + case e: Throwable => + logError("Encountered error in async write executor service", e) + errorNotifier.markError(e) + } + } + }) + + override val offsetLog = new AsyncOffsetSeqLog( + sparkSession, + checkpointFile("offsets"), + asyncWritesExecutorService, + asyncProgressTrackingCheckpointingIntervalMs, + clock = triggerClock + ) + + override val commitLog = + new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService) + + override def markMicroBatchExecutionStart(): Unit = { + // check if pipeline is stateful + checkNotStatefulPipeline + } + + override def cleanUpLastExecutedMicroBatch(): Unit = { + // this is a no op for async progress tracking since we only want to commit sources only + // after the offset WAL commit has be successfully written + } + + /** + * Should not call super method as we need to do something completely different + * in this method for async progress tracking + */ + override def markMicroBatchStart(): Unit = { + // Because we are using a thread pool with only one thread, async writes to the offset log + // are still written in a serial / in order fashion + offsetLog + .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)) + .thenAccept(tuple => { + val (batchId, persistedToDurableStorage) = tuple + if (persistedToDurableStorage) { + + // batch id cache not initialized + if (lastBatchPersistedToDurableStorage.get == -1) { + lastBatchPersistedToDurableStorage.set( + offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1)) + } + + if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) { + // sanity check to make sure batch ids are monotonically increasing + assert(lastBatchPersistedToDurableStorage.get < batchId) + val prevBatchOff = offsetLog.get(lastBatchPersistedToDurableStorage.get()) + if (prevBatchOff.isDefined) { + // Offset is ready to be committed by the source. Add to queue + sourceCommitQueue.add(prevBatchOff.get) + } else { + throw new IllegalStateException( + s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't exist" Review Comment: It'd be nice to be more specific. If we just see the error message in the exception, there is no information the batch ID does not exist from "where". It's more likely not user facing one but it's not friendly even to us. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { Review Comment: nit: If the singleton object is purely for helper, maybe better to switch this with class definition, so that more important thing comes first. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = + "asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = + "_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { + extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, + "1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( + sparkSession: SparkSession, + trigger: Trigger, + triggerClock: Clock, + extraOptions: Map[String, String], + plan: WriteToStream) + extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { + Review Comment: nit: maybe better to import `AsyncProgressTrackingMicroBatchExecution._` and leverage it. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = + "asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = + "_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { + extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, + "1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( + sparkSession: SparkSession, + trigger: Trigger, + triggerClock: Clock, + extraOptions: Map[String, String], + plan: WriteToStream) + extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { Review Comment: nit: 2 spaces here. Please refer the "class" part of indentation explanation. https://github.com/databricks/scala-style-guide#indent ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.OutputStream +import java.util.concurrent.{CompletableFuture, ConcurrentLinkedDeque, ThreadPoolExecutor} + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.SparkSession + +/** + * Implementation of CommitLog to perform asynchronous writes to storage + */ +class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService: ThreadPoolExecutor) + extends CommitLog(sparkSession, path) { + + // A queue of batches written to storage. Used to keep track when to purge old batches + val writtenToDurableStorage = + new ConcurrentLinkedDeque[Long](listBatchesOnDisk.toList.asJavaCollection) + + /** + * Writes a new batch to the commit log asynchronously + * @param batchId id of batch to write + * @param metadata metadata of batch to write + * @return a CompeletableFuture that contains the batch id. The future is completed when + * the async write of the batch is completed. Future may also be completed exceptionally + * to indicate some write error. + */ + def addAsync(batchId: Long, metadata: CommitMetadata): CompletableFuture[Long] = { + require(metadata != null, "'null' metadata cannot be written to a metadata log") + val future: CompletableFuture[Long] = addNewBatchByStreamAsync(batchId) { output => + serialize(metadata, output) + }.thenApply((ret: Boolean) => { + if (ret) { + batchId + } else { + throw new IllegalStateException( + s"Concurrent update to the log. Multiple streaming jobs detected for $batchId" Review Comment: commit log ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = + "asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = + "_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { + extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, Review Comment: nit: `AsyncProgressTrackingMicroBatchExecution.` looks to be unnecessary. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = + "asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = + "_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { + extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, + "1000" Review Comment: Please make sure this default value is documented as well when we document this. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.OutputStream +import java.util.concurrent.{CompletableFuture, ConcurrentLinkedDeque, ThreadPoolExecutor} + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.SparkSession + +/** + * Implementation of CommitLog to perform asynchronous writes to storage + */ +class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService: ThreadPoolExecutor) + extends CommitLog(sparkSession, path) { + + // A queue of batches written to storage. Used to keep track when to purge old batches + val writtenToDurableStorage = + new ConcurrentLinkedDeque[Long](listBatchesOnDisk.toList.asJavaCollection) + + /** + * Writes a new batch to the commit log asynchronously + * @param batchId id of batch to write + * @param metadata metadata of batch to write + * @return a CompeletableFuture that contains the batch id. The future is completed when + * the async write of the batch is completed. Future may also be completed exceptionally + * to indicate some write error. + */ + def addAsync(batchId: Long, metadata: CommitMetadata): CompletableFuture[Long] = { + require(metadata != null, "'null' metadata cannot be written to a metadata log") + val future: CompletableFuture[Long] = addNewBatchByStreamAsync(batchId) { output => + serialize(metadata, output) + }.thenApply((ret: Boolean) => { + if (ret) { + batchId + } else { + throw new IllegalStateException( + s"Concurrent update to the log. Multiple streaming jobs detected for $batchId" + ) + } + }) + + future + } + + /** + * Adds batch to commit log only in memory and not persisted to durable storage. This method is + * used when we don't want to persist the commit log entry for every micro batch + * to durable storage + * @param batchId id of batch to write + * @param metadata metadata of batch to write + * @return true if operation is successful otherwise false. + */ + def addInMemory(batchId: Long, metadata: CommitMetadata): Boolean = { + if (batchCache.containsKey(batchId)) { + false + } else { + batchCache.put(batchId, metadata) + true + } + } + + /** + * Purge entries in the commit log up to thresholdBatchId. This method is synchronized so that Review Comment: same here ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.OutputStream +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{Clock, SystemClock} + +/** + * Used to write entries to the offset log asynchronously + */ +class AsyncOffsetSeqLog( + sparkSession: SparkSession, + path: String, + executorService: ThreadPoolExecutor, + offsetCommitIntervalMs: Long, + clock: Clock = new SystemClock()) + extends OffsetSeqLog(sparkSession, path) { + + // the cache needs to be enabled because we may not be persisting every entry to durable storage + // entries not persisted to durable storage will just be stored in memory for faster lookups + assert(metadataCacheEnabled == true) + + // A map of the current pending offset writes. Key -> batch Id, Value -> CompletableFuture + // Used to determine if a commit log entry for this batch also needs to be persisted to storage + private val pendingOffsetWrites = new ConcurrentHashMap[Long, CompletableFuture[Long]]() + + // Keeps track the last time a commit was issued. Used for issuing commits to storage at + // the configured intervals + private val lastCommitIssuedTimestampMs: AtomicLong = new AtomicLong(-1) + + // A queue of batches written to storage. Used to keep track when to purge old batches + val writtenToDurableStorage = + new ConcurrentLinkedDeque[Long](listBatchesOnDisk.toList.asJavaCollection) + + /** + * Get a async offset write by batch id. To check if a corresponding commit log entry + * needs to be written to durable storage as well + * @param batchId + * @return a option to indicate whether a async offset write was issued for the batch with id + */ + def getAsyncOffsetWrite(batchId: Long): Option[CompletableFuture[Long]] = { + Option(pendingOffsetWrites.get(batchId)) + } + + /** + * Remove the async offset write when we don't need to keep track of it anymore + * @param batchId + */ + def removeAsyncOffsetWrite(batchId: Long): Unit = { + pendingOffsetWrites.remove(batchId) + } + + /** + * Writes a new batch to the offset log asynchronously + * @param batchId id of batch to write + * @param metadata metadata of batch to write + * @return a CompeletableFuture that contains the batch id. The future is completed when + * the async write of the batch is completed. Future may also be completed exceptionally + * to indicate some write error. + */ + def addAsync(batchId: Long, metadata: OffsetSeq): CompletableFuture[(Long, Boolean)] = { + require(metadata != null, "'null' metadata cannot written to a metadata log") + + def issueAsyncWrite(batchId: Long): CompletableFuture[Long] = { + lastCommitIssuedTimestampMs.set(clock.getTimeMillis()) + val future: CompletableFuture[Long] = addNewBatchByStreamAsync(batchId) { output => + serialize(metadata, output) + }.thenApply((ret: Boolean) => { + if (ret) { + batchId + } else { + throw new IllegalStateException( + s"Concurrent update to the log. Multiple streaming jobs detected for $batchId" Review Comment: Shall we be more specific for "where"? offset? commit? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.OutputStream +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{Clock, SystemClock} + +/** + * Used to write entries to the offset log asynchronously + */ +class AsyncOffsetSeqLog( + sparkSession: SparkSession, + path: String, + executorService: ThreadPoolExecutor, + offsetCommitIntervalMs: Long, + clock: Clock = new SystemClock()) + extends OffsetSeqLog(sparkSession, path) { + + // the cache needs to be enabled because we may not be persisting every entry to durable storage + // entries not persisted to durable storage will just be stored in memory for faster lookups + assert(metadataCacheEnabled == true) + + // A map of the current pending offset writes. Key -> batch Id, Value -> CompletableFuture + // Used to determine if a commit log entry for this batch also needs to be persisted to storage + private val pendingOffsetWrites = new ConcurrentHashMap[Long, CompletableFuture[Long]]() + + // Keeps track the last time a commit was issued. Used for issuing commits to storage at + // the configured intervals + private val lastCommitIssuedTimestampMs: AtomicLong = new AtomicLong(-1) + + // A queue of batches written to storage. Used to keep track when to purge old batches + val writtenToDurableStorage = + new ConcurrentLinkedDeque[Long](listBatchesOnDisk.toList.asJavaCollection) + + /** + * Get a async offset write by batch id. To check if a corresponding commit log entry + * needs to be written to durable storage as well + * @param batchId + * @return a option to indicate whether a async offset write was issued for the batch with id + */ + def getAsyncOffsetWrite(batchId: Long): Option[CompletableFuture[Long]] = { + Option(pendingOffsetWrites.get(batchId)) + } + + /** + * Remove the async offset write when we don't need to keep track of it anymore + * @param batchId + */ + def removeAsyncOffsetWrite(batchId: Long): Unit = { + pendingOffsetWrites.remove(batchId) + } + + /** + * Writes a new batch to the offset log asynchronously + * @param batchId id of batch to write + * @param metadata metadata of batch to write + * @return a CompeletableFuture that contains the batch id. The future is completed when + * the async write of the batch is completed. Future may also be completed exceptionally + * to indicate some write error. + */ + def addAsync(batchId: Long, metadata: OffsetSeq): CompletableFuture[(Long, Boolean)] = { + require(metadata != null, "'null' metadata cannot written to a metadata log") + + def issueAsyncWrite(batchId: Long): CompletableFuture[Long] = { + lastCommitIssuedTimestampMs.set(clock.getTimeMillis()) + val future: CompletableFuture[Long] = addNewBatchByStreamAsync(batchId) { output => + serialize(metadata, output) + }.thenApply((ret: Boolean) => { + if (ret) { + batchId + } else { + throw new IllegalStateException( + s"Concurrent update to the log. Multiple streaming jobs detected for $batchId" + ) + } + }) + pendingOffsetWrites.put(batchId, future) + future + } + + val lastIssuedTs = lastCommitIssuedTimestampMs.get() + val future: CompletableFuture[(Long, Boolean)] = { + if (offsetCommitIntervalMs > 0) { + if ((lastIssuedTs == -1) // haven't started any commits yet + || (lastIssuedTs + offsetCommitIntervalMs) <= clock.getTimeMillis()) { + issueAsyncWrite(batchId).thenApply((batchId: Long) => { + (batchId, true) + }) + } else { + // just return completed future because we are not persisting this offset + CompletableFuture.completedFuture((batchId, false)) + } + } else { + // offset commit interval is not enabled + issueAsyncWrite(batchId).thenApply((batchId: Long) => { + (batchId, true) + }) + } + } + + batchCache.put(batchId, metadata) + future + } + + /** + * Adds new batch asynchronously + * @param batchId id of batch to write + * @param fn serialization function + * @return CompletableFuture that contains a boolean do + * indicate whether the write was successfuly or not. + * Future can also be completed exceptionally to indicate write errors. + */ + private def addNewBatchByStreamAsync(batchId: Long)( + fn: OutputStream => Unit): CompletableFuture[Boolean] = { + val future = new CompletableFuture[Boolean]() + val batchMetadataFile = batchIdToPath(batchId) + + if (batchCache.containsKey(batchId)) { + future.complete(false) + future + } else { + executorService.submit(new Runnable { + override def run(): Unit = { + try { + if (fileManager.exists(batchMetadataFile)) { + future.complete(false) + } else { + val start = System.currentTimeMillis() + write( + batchMetadataFile, + fn + ) + logDebug( + s"Offset commit for batch ${batchId} took" + + s" ${System.currentTimeMillis() - start} ms to be persisted to durable storage" + ) + writtenToDurableStorage.add(batchId) + future.complete(true) + } + } catch { + case e: Throwable => + logError(s"Encountered error while writing batch ${batchId} to offset log", e) + future.completeExceptionally(e) + } + } + }) + future + } + } + + /** + * Purge entries in the offset log up to thresholdBatchId. This method is synchronized so that Review Comment: Is there synchronization between the two? I don't see any synchronization here. Am I missing something? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.OutputStream +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{Clock, SystemClock} + +/** + * Used to write entries to the offset log asynchronously + */ +class AsyncOffsetSeqLog( + sparkSession: SparkSession, + path: String, + executorService: ThreadPoolExecutor, + offsetCommitIntervalMs: Long, + clock: Clock = new SystemClock()) + extends OffsetSeqLog(sparkSession, path) { Review Comment: nit: 2 spaces ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.OutputStream +import java.util.concurrent.{CompletableFuture, ConcurrentLinkedDeque, ThreadPoolExecutor} + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.SparkSession + +/** + * Implementation of CommitLog to perform asynchronous writes to storage + */ +class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService: ThreadPoolExecutor) + extends CommitLog(sparkSession, path) { Review Comment: nit: 2 spaces ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, ThreadUtils} + +object AsyncProgressTrackingMicroBatchExecution { + val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled" + val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS = + "asyncProgressTrackingCheckpointIntervalMs" + + // for testing purposes + val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK = + "_asyncProgressTrackingOverrideSinkSupportCheck" + + private def getAsyncProgressTrackingCheckpointingIntervalMs( + extraOptions: Map[String, String]): Long = { + extraOptions + .getOrElse( + AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, + "1000" + ) + .toLong + } +} + +/** + * Class to execute micro-batches when async progress tracking is enabled + */ +class AsyncProgressTrackingMicroBatchExecution( + sparkSession: SparkSession, + trigger: Trigger, + triggerClock: Clock, + extraOptions: Map[String, String], + plan: WriteToStream) + extends MicroBatchExecution(sparkSession, trigger, triggerClock, extraOptions, plan) { + + protected val asyncProgressTrackingCheckpointingIntervalMs: Long + = AsyncProgressTrackingMicroBatchExecution + .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions) + + // Offsets that are ready to be committed by the source. + // This is needed so that we can call source commit in the same thread as micro-batch execution + // to be thread safe + private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]() + + // to cache the batch id of the last batch written to storage + private val lastBatchPersistedToDurableStorage = new AtomicLong(-1) + + override val triggerExecutor: TriggerExecutor = validateAndGetTrigger() + + private var isFirstBatch: Boolean = true + + // thread pool is only one thread because we want offset + // writes to execute in order in a serialized fashion + protected val asyncWritesExecutorService + = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler( + "async-log-write", + 2, // one for offset commit and one for completion commit + new RejectedExecutionHandler() { + override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = { + try { + if (!executor.isShutdown) { + val start = System.currentTimeMillis() + executor.getQueue.put(r) + logDebug( + s"Async write paused execution for " + + s"${System.currentTimeMillis() - start} due to task queue being full." + ) + } + } catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + throw new RejectedExecutionException("Producer interrupted", e) + case e: Throwable => + logError("Encountered error in async write executor service", e) + errorNotifier.markError(e) + } + } + }) + + override val offsetLog = new AsyncOffsetSeqLog( + sparkSession, + checkpointFile("offsets"), + asyncWritesExecutorService, + asyncProgressTrackingCheckpointingIntervalMs, + clock = triggerClock + ) + + override val commitLog = + new AsyncCommitLog(sparkSession, checkpointFile("commits"), asyncWritesExecutorService) + + override def markMicroBatchExecutionStart(): Unit = { + // check if pipeline is stateful + checkNotStatefulPipeline + } + + override def cleanUpLastExecutedMicroBatch(): Unit = { + // this is a no op for async progress tracking since we only want to commit sources only + // after the offset WAL commit has be successfully written + } + + /** + * Should not call super method as we need to do something completely different + * in this method for async progress tracking + */ + override def markMicroBatchStart(): Unit = { + // Because we are using a thread pool with only one thread, async writes to the offset log + // are still written in a serial / in order fashion + offsetLog + .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)) + .thenAccept(tuple => { + val (batchId, persistedToDurableStorage) = tuple + if (persistedToDurableStorage) { + + // batch id cache not initialized + if (lastBatchPersistedToDurableStorage.get == -1) { + lastBatchPersistedToDurableStorage.set( + offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1)) + } + + if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) { + // sanity check to make sure batch ids are monotonically increasing + assert(lastBatchPersistedToDurableStorage.get < batchId) + val prevBatchOff = offsetLog.get(lastBatchPersistedToDurableStorage.get()) + if (prevBatchOff.isDefined) { + // Offset is ready to be committed by the source. Add to queue + sourceCommitQueue.add(prevBatchOff.get) + } else { + throw new IllegalStateException( + s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't exist" + ) + } + } + lastBatchPersistedToDurableStorage.set(batchId) + } + }) + .exceptionally((th: Throwable) => { + logError("Encountered error while performing async offset write", th) + errorNotifier.markError(th) + return + }) + + // check if there are offsets that are ready to be committed by the source + var offset = sourceCommitQueue.poll() + while (offset != null) { + commitSources(offset) + offset = sourceCommitQueue.poll() + } + } + + override def markMicroBatchEnd(): Unit = { + watermarkTracker.updateWatermark(lastExecution.executedPlan) + reportTimeTaken("commitOffsets") { + // check if current batch there is a async write for the offset log is issued for this batch + // if so, we should do the same for commit log + if (!offsetLog.getAsyncOffsetWrite(currentBatchId).isEmpty) { + commitLog + .addAsync(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)) + .exceptionally((th: Throwable) => { + logError("Got exception during async write", th) + errorNotifier.markError(th) + return + }) + } else { + if (!commitLog.addInMemory( + currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) { + throw new IllegalStateException( + s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId" Review Comment: Btw is it even possible to happen? If there are multiple queries based on same checkpoint running concurrently even in the same driver, the instance of AsyncCommitLog would be different. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.OutputStream +import java.util.concurrent.{CompletableFuture, ConcurrentLinkedDeque, ThreadPoolExecutor} + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.SparkSession + +/** + * Implementation of CommitLog to perform asynchronous writes to storage + */ +class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService: ThreadPoolExecutor) + extends CommitLog(sparkSession, path) { + Review Comment: Does this class also require metadataCacheEnabled == true? If then let's also add require here as well. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.OutputStream +import java.util.concurrent.{CompletableFuture, ConcurrentLinkedDeque, ThreadPoolExecutor} + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.SparkSession + +/** + * Implementation of CommitLog to perform asynchronous writes to storage + */ +class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService: ThreadPoolExecutor) + extends CommitLog(sparkSession, path) { + + // A queue of batches written to storage. Used to keep track when to purge old batches + val writtenToDurableStorage = + new ConcurrentLinkedDeque[Long](listBatchesOnDisk.toList.asJavaCollection) + + /** + * Writes a new batch to the commit log asynchronously + * @param batchId id of batch to write + * @param metadata metadata of batch to write + * @return a CompeletableFuture that contains the batch id. The future is completed when + * the async write of the batch is completed. Future may also be completed exceptionally + * to indicate some write error. + */ + def addAsync(batchId: Long, metadata: CommitMetadata): CompletableFuture[Long] = { + require(metadata != null, "'null' metadata cannot be written to a metadata log") + val future: CompletableFuture[Long] = addNewBatchByStreamAsync(batchId) { output => + serialize(metadata, output) + }.thenApply((ret: Boolean) => { + if (ret) { + batchId + } else { + throw new IllegalStateException( + s"Concurrent update to the log. Multiple streaming jobs detected for $batchId" + ) + } + }) + Review Comment: Is updating batchCache missed or intended? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org