HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1043131174


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+    .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as 
micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+    "async-log-write",
+    2, // one for offset commit and one for completion commit
+    new RejectedExecutionHandler() {
+      override def rejectedExecution(r: Runnable, executor: 
ThreadPoolExecutor): Unit = {
+        try {
+          if (!executor.isShutdown) {
+            val start = System.currentTimeMillis()
+            executor.getQueue.put(r)
+            logDebug(
+              s"Async write paused execution for " +
+                s"${System.currentTimeMillis() - start} due to task queue 
being full."
+            )
+          }
+        } catch {
+          case e: InterruptedException =>
+            Thread.currentThread.interrupt()
+            throw new RejectedExecutionException("Producer interrupted", e)
+          case e: Throwable =>
+            logError("Encountered error in async write executor service", e)
+            errorNotifier.markError(e)
+        }
+      }
+    })
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+    sparkSession,
+    checkpointFile("offsets"),
+    asyncWritesExecutorService,
+    asyncProgressTrackingCheckpointingIntervalMs,
+    clock = triggerClock
+  )
+
+  override val commitLog =
+    new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+    // check if pipeline is stateful
+    checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+    // this is a no op for async progress tracking since we only want to 
commit sources only
+    // after the offset WAL commit has be successfully written
+  }
+
+  /**
+   * Should not call super method as we need to do something completely 
different
+   * in this method for async progress tracking
+   */
+  override def markMicroBatchStart(): Unit = {
+    // Because we are using a thread pool with only one thread, async writes 
to the offset log
+    // are still written in a serial / in order fashion
+    offsetLog
+      .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, 
offsetSeqMetadata))
+      .thenAccept(tuple => {
+        val (batchId, persistedToDurableStorage) = tuple
+        if (persistedToDurableStorage) {
+
+          // batch id cache not initialized
+          if (lastBatchPersistedToDurableStorage.get == -1) {
+            lastBatchPersistedToDurableStorage.set(
+              offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1))
+          }
+
+          if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) {
+            // sanity check to make sure batch ids are monotonically increasing
+            assert(lastBatchPersistedToDurableStorage.get < batchId)
+            val prevBatchOff = 
offsetLog.get(lastBatchPersistedToDurableStorage.get())
+            if (prevBatchOff.isDefined) {
+              // Offset is ready to be committed by the source. Add to queue
+              sourceCommitQueue.add(prevBatchOff.get)
+            } else {
+              throw new IllegalStateException(
+                s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't 
exist"
+              )
+            }
+          }
+          lastBatchPersistedToDurableStorage.set(batchId)
+        }
+      })
+      .exceptionally((th: Throwable) => {
+        logError("Encountered error while performing async offset write", th)
+        errorNotifier.markError(th)
+        return
+      })
+
+    // check if there are offsets that are ready to be committed by the source
+    var offset = sourceCommitQueue.poll()
+    while (offset != null) {
+      commitSources(offset)
+      offset = sourceCommitQueue.poll()
+    }
+  }
+
+  override def markMicroBatchEnd(): Unit = {
+    watermarkTracker.updateWatermark(lastExecution.executedPlan)
+    reportTimeTaken("commitOffsets") {
+      // check if current batch there is a async write for the offset log is 
issued for this batch
+      // if so, we should do the same for commit log
+      if (!offsetLog.getAsyncOffsetWrite(currentBatchId).isEmpty) {
+        commitLog
+          .addAsync(currentBatchId, 
CommitMetadata(watermarkTracker.currentWatermark))
+          .exceptionally((th: Throwable) => {
+            logError("Got exception during async write", th)
+            errorNotifier.markError(th)
+            return
+          })
+      } else {
+        if (!commitLog.addInMemory(
+          currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) {
+          throw new IllegalStateException(
+            s"Concurrent update to the log. Multiple streaming jobs detected 
for $currentBatchId"
+          )
+        }
+      }
+      offsetLog.removeAsyncOffsetWrite(currentBatchId)
+    }
+    committedOffsets ++= availableOffsets
+  }
+
+  // need to look at the number of files on disk
+  override def purge(threshold: Long): Unit = {
+    while (offsetLog.writtenToDurableStorage.size() > minLogEntriesToMaintain) 
{
+      offsetLog.writtenToDurableStorage.poll()
+    }
+    offsetLog.purge(offsetLog.writtenToDurableStorage.peek())
+
+    while (commitLog.writtenToDurableStorage.size() > minLogEntriesToMaintain) 
{
+      commitLog.writtenToDurableStorage.poll()
+    }
+    commitLog.purge(commitLog.writtenToDurableStorage.peek())
+  }
+
+  override def cleanup(): Unit = {
+    super.cleanup()
+
+    ThreadUtils.shutdown(asyncWritesExecutorService)
+    logInfo(s"Async progress tracking executor pool for query 
${prettyIdString} has been shutdown")
+  }
+
+  // used for testing
+  def areWritesPendingOrInProgress(): Boolean = {
+    asyncWritesExecutorService.getQueue.size() > 0 || 
asyncWritesExecutorService.getActiveCount > 0
+  }
+
+  private def validateAndGetTrigger(): TriggerExecutor = {
+    // validate that the pipeline is using a supported sink
+    if (!extraOptions
+      .get(
+        
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK

Review Comment:
   Also, just to be clear, this option is for advanced usage and we won't 
document this explicitly, do I understand correctly?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+    .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as 
micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+    "async-log-write",
+    2, // one for offset commit and one for completion commit
+    new RejectedExecutionHandler() {
+      override def rejectedExecution(r: Runnable, executor: 
ThreadPoolExecutor): Unit = {
+        try {
+          if (!executor.isShutdown) {
+            val start = System.currentTimeMillis()
+            executor.getQueue.put(r)
+            logDebug(
+              s"Async write paused execution for " +
+                s"${System.currentTimeMillis() - start} due to task queue 
being full."
+            )
+          }
+        } catch {
+          case e: InterruptedException =>
+            Thread.currentThread.interrupt()
+            throw new RejectedExecutionException("Producer interrupted", e)
+          case e: Throwable =>
+            logError("Encountered error in async write executor service", e)
+            errorNotifier.markError(e)
+        }
+      }
+    })
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+    sparkSession,
+    checkpointFile("offsets"),
+    asyncWritesExecutorService,
+    asyncProgressTrackingCheckpointingIntervalMs,
+    clock = triggerClock
+  )
+
+  override val commitLog =
+    new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+    // check if pipeline is stateful
+    checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+    // this is a no op for async progress tracking since we only want to 
commit sources only
+    // after the offset WAL commit has be successfully written
+  }
+
+  /**
+   * Should not call super method as we need to do something completely 
different
+   * in this method for async progress tracking
+   */
+  override def markMicroBatchStart(): Unit = {
+    // Because we are using a thread pool with only one thread, async writes 
to the offset log
+    // are still written in a serial / in order fashion
+    offsetLog
+      .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, 
offsetSeqMetadata))
+      .thenAccept(tuple => {
+        val (batchId, persistedToDurableStorage) = tuple
+        if (persistedToDurableStorage) {
+
+          // batch id cache not initialized
+          if (lastBatchPersistedToDurableStorage.get == -1) {
+            lastBatchPersistedToDurableStorage.set(
+              offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1))
+          }
+
+          if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) {
+            // sanity check to make sure batch ids are monotonically increasing
+            assert(lastBatchPersistedToDurableStorage.get < batchId)
+            val prevBatchOff = 
offsetLog.get(lastBatchPersistedToDurableStorage.get())
+            if (prevBatchOff.isDefined) {
+              // Offset is ready to be committed by the source. Add to queue
+              sourceCommitQueue.add(prevBatchOff.get)
+            } else {
+              throw new IllegalStateException(
+                s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't 
exist"
+              )
+            }
+          }
+          lastBatchPersistedToDurableStorage.set(batchId)
+        }
+      })
+      .exceptionally((th: Throwable) => {
+        logError("Encountered error while performing async offset write", th)
+        errorNotifier.markError(th)
+        return
+      })
+
+    // check if there are offsets that are ready to be committed by the source
+    var offset = sourceCommitQueue.poll()
+    while (offset != null) {
+      commitSources(offset)
+      offset = sourceCommitQueue.poll()
+    }
+  }
+
+  override def markMicroBatchEnd(): Unit = {
+    watermarkTracker.updateWatermark(lastExecution.executedPlan)
+    reportTimeTaken("commitOffsets") {
+      // check if current batch there is a async write for the offset log is 
issued for this batch
+      // if so, we should do the same for commit log
+      if (!offsetLog.getAsyncOffsetWrite(currentBatchId).isEmpty) {

Review Comment:
   nit: simply use `nonEmpty` rather than ! + isEmpty?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+    .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as 
micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+    "async-log-write",
+    2, // one for offset commit and one for completion commit
+    new RejectedExecutionHandler() {
+      override def rejectedExecution(r: Runnable, executor: 
ThreadPoolExecutor): Unit = {
+        try {
+          if (!executor.isShutdown) {
+            val start = System.currentTimeMillis()
+            executor.getQueue.put(r)
+            logDebug(
+              s"Async write paused execution for " +
+                s"${System.currentTimeMillis() - start} due to task queue 
being full."
+            )
+          }
+        } catch {
+          case e: InterruptedException =>
+            Thread.currentThread.interrupt()
+            throw new RejectedExecutionException("Producer interrupted", e)
+          case e: Throwable =>
+            logError("Encountered error in async write executor service", e)
+            errorNotifier.markError(e)
+        }
+      }
+    })
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+    sparkSession,
+    checkpointFile("offsets"),
+    asyncWritesExecutorService,
+    asyncProgressTrackingCheckpointingIntervalMs,
+    clock = triggerClock
+  )
+
+  override val commitLog =
+    new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+    // check if pipeline is stateful
+    checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+    // this is a no op for async progress tracking since we only want to 
commit sources only
+    // after the offset WAL commit has be successfully written
+  }
+
+  /**
+   * Should not call super method as we need to do something completely 
different
+   * in this method for async progress tracking
+   */
+  override def markMicroBatchStart(): Unit = {
+    // Because we are using a thread pool with only one thread, async writes 
to the offset log
+    // are still written in a serial / in order fashion
+    offsetLog
+      .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, 
offsetSeqMetadata))
+      .thenAccept(tuple => {
+        val (batchId, persistedToDurableStorage) = tuple
+        if (persistedToDurableStorage) {
+
+          // batch id cache not initialized
+          if (lastBatchPersistedToDurableStorage.get == -1) {
+            lastBatchPersistedToDurableStorage.set(
+              offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1))
+          }
+
+          if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) {
+            // sanity check to make sure batch ids are monotonically increasing
+            assert(lastBatchPersistedToDurableStorage.get < batchId)
+            val prevBatchOff = 
offsetLog.get(lastBatchPersistedToDurableStorage.get())
+            if (prevBatchOff.isDefined) {
+              // Offset is ready to be committed by the source. Add to queue
+              sourceCommitQueue.add(prevBatchOff.get)
+            } else {
+              throw new IllegalStateException(
+                s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't 
exist"
+              )
+            }
+          }
+          lastBatchPersistedToDurableStorage.set(batchId)
+        }
+      })
+      .exceptionally((th: Throwable) => {
+        logError("Encountered error while performing async offset write", th)
+        errorNotifier.markError(th)
+        return
+      })
+
+    // check if there are offsets that are ready to be committed by the source
+    var offset = sourceCommitQueue.poll()
+    while (offset != null) {
+      commitSources(offset)
+      offset = sourceCommitQueue.poll()
+    }
+  }
+
+  override def markMicroBatchEnd(): Unit = {
+    watermarkTracker.updateWatermark(lastExecution.executedPlan)
+    reportTimeTaken("commitOffsets") {
+      // check if current batch there is a async write for the offset log is 
issued for this batch
+      // if so, we should do the same for commit log
+      if (!offsetLog.getAsyncOffsetWrite(currentBatchId).isEmpty) {
+        commitLog
+          .addAsync(currentBatchId, 
CommitMetadata(watermarkTracker.currentWatermark))
+          .exceptionally((th: Throwable) => {
+            logError("Got exception during async write", th)
+            errorNotifier.markError(th)
+            return
+          })
+      } else {
+        if (!commitLog.addInMemory(
+          currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) {
+          throw new IllegalStateException(
+            s"Concurrent update to the log. Multiple streaming jobs detected 
for $currentBatchId"

Review Comment:
   Let's mention "commit" log specifically.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+    .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as 
micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+    "async-log-write",
+    2, // one for offset commit and one for completion commit
+    new RejectedExecutionHandler() {
+      override def rejectedExecution(r: Runnable, executor: 
ThreadPoolExecutor): Unit = {
+        try {
+          if (!executor.isShutdown) {
+            val start = System.currentTimeMillis()
+            executor.getQueue.put(r)
+            logDebug(
+              s"Async write paused execution for " +
+                s"${System.currentTimeMillis() - start} due to task queue 
being full."
+            )
+          }
+        } catch {
+          case e: InterruptedException =>
+            Thread.currentThread.interrupt()
+            throw new RejectedExecutionException("Producer interrupted", e)
+          case e: Throwable =>
+            logError("Encountered error in async write executor service", e)
+            errorNotifier.markError(e)
+        }
+      }
+    })
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+    sparkSession,
+    checkpointFile("offsets"),
+    asyncWritesExecutorService,
+    asyncProgressTrackingCheckpointingIntervalMs,
+    clock = triggerClock
+  )
+
+  override val commitLog =
+    new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+    // check if pipeline is stateful
+    checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+    // this is a no op for async progress tracking since we only want to 
commit sources only
+    // after the offset WAL commit has be successfully written
+  }
+
+  /**
+   * Should not call super method as we need to do something completely 
different
+   * in this method for async progress tracking
+   */
+  override def markMicroBatchStart(): Unit = {
+    // Because we are using a thread pool with only one thread, async writes 
to the offset log
+    // are still written in a serial / in order fashion
+    offsetLog
+      .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, 
offsetSeqMetadata))
+      .thenAccept(tuple => {
+        val (batchId, persistedToDurableStorage) = tuple
+        if (persistedToDurableStorage) {
+
+          // batch id cache not initialized
+          if (lastBatchPersistedToDurableStorage.get == -1) {
+            lastBatchPersistedToDurableStorage.set(
+              offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1))
+          }
+
+          if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) {
+            // sanity check to make sure batch ids are monotonically increasing
+            assert(lastBatchPersistedToDurableStorage.get < batchId)
+            val prevBatchOff = 
offsetLog.get(lastBatchPersistedToDurableStorage.get())
+            if (prevBatchOff.isDefined) {
+              // Offset is ready to be committed by the source. Add to queue
+              sourceCommitQueue.add(prevBatchOff.get)
+            } else {
+              throw new IllegalStateException(
+                s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't 
exist"
+              )
+            }
+          }
+          lastBatchPersistedToDurableStorage.set(batchId)
+        }
+      })
+      .exceptionally((th: Throwable) => {
+        logError("Encountered error while performing async offset write", th)

Review Comment:
   Let's add batch ID as well.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+    .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as 
micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+    "async-log-write",
+    2, // one for offset commit and one for completion commit
+    new RejectedExecutionHandler() {
+      override def rejectedExecution(r: Runnable, executor: 
ThreadPoolExecutor): Unit = {
+        try {
+          if (!executor.isShutdown) {
+            val start = System.currentTimeMillis()
+            executor.getQueue.put(r)
+            logDebug(
+              s"Async write paused execution for " +
+                s"${System.currentTimeMillis() - start} due to task queue 
being full."
+            )
+          }
+        } catch {
+          case e: InterruptedException =>
+            Thread.currentThread.interrupt()
+            throw new RejectedExecutionException("Producer interrupted", e)
+          case e: Throwable =>
+            logError("Encountered error in async write executor service", e)
+            errorNotifier.markError(e)
+        }
+      }
+    })
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+    sparkSession,
+    checkpointFile("offsets"),
+    asyncWritesExecutorService,
+    asyncProgressTrackingCheckpointingIntervalMs,
+    clock = triggerClock
+  )
+
+  override val commitLog =
+    new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+    // check if pipeline is stateful
+    checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+    // this is a no op for async progress tracking since we only want to 
commit sources only
+    // after the offset WAL commit has be successfully written
+  }
+
+  /**
+   * Should not call super method as we need to do something completely 
different
+   * in this method for async progress tracking
+   */
+  override def markMicroBatchStart(): Unit = {
+    // Because we are using a thread pool with only one thread, async writes 
to the offset log
+    // are still written in a serial / in order fashion
+    offsetLog
+      .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, 
offsetSeqMetadata))
+      .thenAccept(tuple => {
+        val (batchId, persistedToDurableStorage) = tuple
+        if (persistedToDurableStorage) {
+

Review Comment:
   nit: maybe unnecessary empty new line



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+    .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as 
micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+    "async-log-write",
+    2, // one for offset commit and one for completion commit
+    new RejectedExecutionHandler() {
+      override def rejectedExecution(r: Runnable, executor: 
ThreadPoolExecutor): Unit = {
+        try {
+          if (!executor.isShutdown) {
+            val start = System.currentTimeMillis()
+            executor.getQueue.put(r)
+            logDebug(
+              s"Async write paused execution for " +
+                s"${System.currentTimeMillis() - start} due to task queue 
being full."
+            )
+          }
+        } catch {
+          case e: InterruptedException =>
+            Thread.currentThread.interrupt()
+            throw new RejectedExecutionException("Producer interrupted", e)
+          case e: Throwable =>
+            logError("Encountered error in async write executor service", e)
+            errorNotifier.markError(e)
+        }
+      }
+    })
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+    sparkSession,
+    checkpointFile("offsets"),
+    asyncWritesExecutorService,
+    asyncProgressTrackingCheckpointingIntervalMs,
+    clock = triggerClock
+  )
+
+  override val commitLog =
+    new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+    // check if pipeline is stateful

Review Comment:
   There seems to be a term issue.
   
   
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
   
   We have zero mention of pipeline in the guide doc. It's just a "query", 
specifically, "streaming query".



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+    .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as 
micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true

Review Comment:
   The point here is not whether the batch is first one from current run. The 
point here is whether we checked the query is stateless already or not. Shall 
we rename this accordingly?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+    .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as 
micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+    "async-log-write",
+    2, // one for offset commit and one for completion commit
+    new RejectedExecutionHandler() {
+      override def rejectedExecution(r: Runnable, executor: 
ThreadPoolExecutor): Unit = {
+        try {
+          if (!executor.isShutdown) {
+            val start = System.currentTimeMillis()
+            executor.getQueue.put(r)
+            logDebug(
+              s"Async write paused execution for " +
+                s"${System.currentTimeMillis() - start} due to task queue 
being full."
+            )
+          }
+        } catch {
+          case e: InterruptedException =>
+            Thread.currentThread.interrupt()
+            throw new RejectedExecutionException("Producer interrupted", e)
+          case e: Throwable =>
+            logError("Encountered error in async write executor service", e)
+            errorNotifier.markError(e)
+        }
+      }
+    })
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+    sparkSession,
+    checkpointFile("offsets"),
+    asyncWritesExecutorService,
+    asyncProgressTrackingCheckpointingIntervalMs,
+    clock = triggerClock
+  )
+
+  override val commitLog =
+    new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+    // check if pipeline is stateful
+    checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+    // this is a no op for async progress tracking since we only want to 
commit sources only
+    // after the offset WAL commit has be successfully written
+  }
+
+  /**
+   * Should not call super method as we need to do something completely 
different
+   * in this method for async progress tracking
+   */
+  override def markMicroBatchStart(): Unit = {
+    // Because we are using a thread pool with only one thread, async writes 
to the offset log
+    // are still written in a serial / in order fashion
+    offsetLog
+      .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, 
offsetSeqMetadata))
+      .thenAccept(tuple => {
+        val (batchId, persistedToDurableStorage) = tuple
+        if (persistedToDurableStorage) {
+
+          // batch id cache not initialized
+          if (lastBatchPersistedToDurableStorage.get == -1) {
+            lastBatchPersistedToDurableStorage.set(
+              offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1))
+          }
+
+          if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) {
+            // sanity check to make sure batch ids are monotonically increasing
+            assert(lastBatchPersistedToDurableStorage.get < batchId)
+            val prevBatchOff = 
offsetLog.get(lastBatchPersistedToDurableStorage.get())
+            if (prevBatchOff.isDefined) {
+              // Offset is ready to be committed by the source. Add to queue
+              sourceCommitQueue.add(prevBatchOff.get)
+            } else {
+              throw new IllegalStateException(
+                s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't 
exist"
+              )
+            }
+          }
+          lastBatchPersistedToDurableStorage.set(batchId)
+        }
+      })
+      .exceptionally((th: Throwable) => {
+        logError("Encountered error while performing async offset write", th)
+        errorNotifier.markError(th)
+        return
+      })
+
+    // check if there are offsets that are ready to be committed by the source
+    var offset = sourceCommitQueue.poll()
+    while (offset != null) {
+      commitSources(offset)
+      offset = sourceCommitQueue.poll()
+    }
+  }
+
+  override def markMicroBatchEnd(): Unit = {
+    watermarkTracker.updateWatermark(lastExecution.executedPlan)
+    reportTimeTaken("commitOffsets") {
+      // check if current batch there is a async write for the offset log is 
issued for this batch
+      // if so, we should do the same for commit log
+      if (!offsetLog.getAsyncOffsetWrite(currentBatchId).isEmpty) {
+        commitLog
+          .addAsync(currentBatchId, 
CommitMetadata(watermarkTracker.currentWatermark))
+          .exceptionally((th: Throwable) => {
+            logError("Got exception during async write", th)

Review Comment:
   Could we be more specific for the error log message? I expect the error 
message to provide the information that the error happened during write for 
commit log, and which batch ID.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+    .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as 
micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+    "async-log-write",
+    2, // one for offset commit and one for completion commit
+    new RejectedExecutionHandler() {
+      override def rejectedExecution(r: Runnable, executor: 
ThreadPoolExecutor): Unit = {
+        try {
+          if (!executor.isShutdown) {
+            val start = System.currentTimeMillis()
+            executor.getQueue.put(r)
+            logDebug(
+              s"Async write paused execution for " +
+                s"${System.currentTimeMillis() - start} due to task queue 
being full."
+            )
+          }
+        } catch {
+          case e: InterruptedException =>
+            Thread.currentThread.interrupt()
+            throw new RejectedExecutionException("Producer interrupted", e)
+          case e: Throwable =>
+            logError("Encountered error in async write executor service", e)
+            errorNotifier.markError(e)
+        }
+      }
+    })
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+    sparkSession,
+    checkpointFile("offsets"),
+    asyncWritesExecutorService,
+    asyncProgressTrackingCheckpointingIntervalMs,
+    clock = triggerClock
+  )
+
+  override val commitLog =
+    new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+    // check if pipeline is stateful
+    checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+    // this is a no op for async progress tracking since we only want to 
commit sources only
+    // after the offset WAL commit has be successfully written
+  }
+
+  /**
+   * Should not call super method as we need to do something completely 
different
+   * in this method for async progress tracking
+   */
+  override def markMicroBatchStart(): Unit = {
+    // Because we are using a thread pool with only one thread, async writes 
to the offset log
+    // are still written in a serial / in order fashion
+    offsetLog
+      .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, 
offsetSeqMetadata))
+      .thenAccept(tuple => {
+        val (batchId, persistedToDurableStorage) = tuple
+        if (persistedToDurableStorage) {
+
+          // batch id cache not initialized
+          if (lastBatchPersistedToDurableStorage.get == -1) {
+            lastBatchPersistedToDurableStorage.set(
+              offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1))
+          }
+
+          if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) {
+            // sanity check to make sure batch ids are monotonically increasing
+            assert(lastBatchPersistedToDurableStorage.get < batchId)
+            val prevBatchOff = 
offsetLog.get(lastBatchPersistedToDurableStorage.get())
+            if (prevBatchOff.isDefined) {
+              // Offset is ready to be committed by the source. Add to queue
+              sourceCommitQueue.add(prevBatchOff.get)
+            } else {
+              throw new IllegalStateException(
+                s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't 
exist"

Review Comment:
   It'd be nice to be more specific. If we just see the error message in the 
exception, there is no information the batch ID does not exist from "where". 
It's more likely not user facing one but it's not friendly even to us.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {

Review Comment:
   nit: If the singleton object is purely for helper, maybe better to switch 
this with class definition, so that more important thing comes first.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+

Review Comment:
   nit: maybe better to import `AsyncProgressTrackingMicroBatchExecution._` and 
leverage it.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {

Review Comment:
   nit: 2 spaces here. Please refer the "class" part of indentation explanation.
   
   https://github.com/databricks/scala-style-guide#indent



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.OutputStream
+import java.util.concurrent.{CompletableFuture, ConcurrentLinkedDeque, 
ThreadPoolExecutor}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Implementation of CommitLog to perform asynchronous writes to storage
+ */
+class AsyncCommitLog(sparkSession: SparkSession, path: String, 
executorService: ThreadPoolExecutor)
+    extends CommitLog(sparkSession, path) {
+
+  // A queue of batches written to storage.  Used to keep track when to purge 
old batches
+  val writtenToDurableStorage =
+    new ConcurrentLinkedDeque[Long](listBatchesOnDisk.toList.asJavaCollection)
+
+  /**
+   * Writes a new batch to the commit log asynchronously
+   * @param batchId id of batch to write
+   * @param metadata metadata of batch to write
+   * @return a CompeletableFuture that contains the batch id.  The future is 
completed when
+   *         the async write of the batch is completed.  Future may also be 
completed exceptionally
+   *         to indicate some write error.
+   */
+  def addAsync(batchId: Long, metadata: CommitMetadata): 
CompletableFuture[Long] = {
+    require(metadata != null, "'null' metadata cannot be written to a metadata 
log")
+    val future: CompletableFuture[Long] = addNewBatchByStreamAsync(batchId) { 
output =>
+      serialize(metadata, output)
+    }.thenApply((ret: Boolean) => {
+      if (ret) {
+        batchId
+      } else {
+        throw new IllegalStateException(
+          s"Concurrent update to the log. Multiple streaming jobs detected for 
$batchId"

Review Comment:
   commit log



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,

Review Comment:
   nit: `AsyncProgressTrackingMicroBatchExecution.` looks to be unnecessary.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"

Review Comment:
   Please make sure this default value is documented as well when we document 
this.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.OutputStream
+import java.util.concurrent.{CompletableFuture, ConcurrentLinkedDeque, 
ThreadPoolExecutor}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Implementation of CommitLog to perform asynchronous writes to storage
+ */
+class AsyncCommitLog(sparkSession: SparkSession, path: String, 
executorService: ThreadPoolExecutor)
+    extends CommitLog(sparkSession, path) {
+
+  // A queue of batches written to storage.  Used to keep track when to purge 
old batches
+  val writtenToDurableStorage =
+    new ConcurrentLinkedDeque[Long](listBatchesOnDisk.toList.asJavaCollection)
+
+  /**
+   * Writes a new batch to the commit log asynchronously
+   * @param batchId id of batch to write
+   * @param metadata metadata of batch to write
+   * @return a CompeletableFuture that contains the batch id.  The future is 
completed when
+   *         the async write of the batch is completed.  Future may also be 
completed exceptionally
+   *         to indicate some write error.
+   */
+  def addAsync(batchId: Long, metadata: CommitMetadata): 
CompletableFuture[Long] = {
+    require(metadata != null, "'null' metadata cannot be written to a metadata 
log")
+    val future: CompletableFuture[Long] = addNewBatchByStreamAsync(batchId) { 
output =>
+      serialize(metadata, output)
+    }.thenApply((ret: Boolean) => {
+      if (ret) {
+        batchId
+      } else {
+        throw new IllegalStateException(
+          s"Concurrent update to the log. Multiple streaming jobs detected for 
$batchId"
+        )
+      }
+    })
+
+    future
+  }
+
+  /**
+   * Adds batch to commit log only in memory and not persisted to durable 
storage. This method is
+   * used when we don't want to persist the commit log entry for every micro 
batch
+   * to durable storage
+   * @param batchId id of batch to write
+   * @param metadata metadata of batch to write
+   * @return true if operation is successful otherwise false.
+   */
+  def addInMemory(batchId: Long, metadata: CommitMetadata): Boolean = {
+    if (batchCache.containsKey(batchId)) {
+      false
+    } else {
+      batchCache.put(batchId, metadata)
+      true
+    }
+  }
+
+  /**
+   * Purge entries in the commit log up to thresholdBatchId.  This method is 
synchronized so that

Review Comment:
   same here



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.OutputStream
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{Clock, SystemClock}
+
+/**
+ * Used to write entries to the offset log asynchronously
+ */
+class AsyncOffsetSeqLog(
+    sparkSession: SparkSession,
+    path: String,
+    executorService: ThreadPoolExecutor,
+    offsetCommitIntervalMs: Long,
+    clock: Clock = new SystemClock())
+    extends OffsetSeqLog(sparkSession, path) {
+
+  // the cache needs to be enabled because we may not be persisting every 
entry to durable storage
+  // entries not persisted to durable storage will just be stored in memory 
for faster lookups
+  assert(metadataCacheEnabled == true)
+
+  // A map of the current pending offset writes. Key -> batch Id, Value -> 
CompletableFuture
+  // Used to determine if a commit log entry for this batch also needs to be 
persisted to storage
+  private val pendingOffsetWrites = new ConcurrentHashMap[Long, 
CompletableFuture[Long]]()
+
+  // Keeps track the last time a commit was issued. Used for issuing commits 
to storage at
+  // the configured intervals
+  private val lastCommitIssuedTimestampMs: AtomicLong = new AtomicLong(-1)
+
+  // A queue of batches written to storage.  Used to keep track when to purge 
old batches
+  val writtenToDurableStorage =
+    new ConcurrentLinkedDeque[Long](listBatchesOnDisk.toList.asJavaCollection)
+
+  /**
+   * Get a async offset write by batch id.  To check if a corresponding commit 
log entry
+   * needs to be written to durable storage as well
+   * @param batchId
+   * @return a option to indicate whether a async offset write was issued for 
the batch with id
+   */
+  def getAsyncOffsetWrite(batchId: Long): Option[CompletableFuture[Long]] = {
+    Option(pendingOffsetWrites.get(batchId))
+  }
+
+  /**
+   * Remove the async offset write when we don't need to keep track of it 
anymore
+   * @param batchId
+   */
+  def removeAsyncOffsetWrite(batchId: Long): Unit = {
+    pendingOffsetWrites.remove(batchId)
+  }
+
+  /**
+   * Writes a new batch to the offset log asynchronously
+   * @param batchId id of batch to write
+   * @param metadata metadata of batch to write
+   * @return a CompeletableFuture that contains the batch id.  The future is 
completed when
+   *         the async write of the batch is completed.  Future may also be 
completed exceptionally
+   *         to indicate some write error.
+   */
+  def addAsync(batchId: Long, metadata: OffsetSeq): CompletableFuture[(Long, 
Boolean)] = {
+    require(metadata != null, "'null' metadata cannot written to a metadata 
log")
+
+    def issueAsyncWrite(batchId: Long): CompletableFuture[Long] = {
+      lastCommitIssuedTimestampMs.set(clock.getTimeMillis())
+      val future: CompletableFuture[Long] = addNewBatchByStreamAsync(batchId) 
{ output =>
+        serialize(metadata, output)
+      }.thenApply((ret: Boolean) => {
+        if (ret) {
+          batchId
+        } else {
+          throw new IllegalStateException(
+            s"Concurrent update to the log. Multiple streaming jobs detected 
for $batchId"

Review Comment:
   Shall we be more specific for "where"? offset? commit?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.OutputStream
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{Clock, SystemClock}
+
+/**
+ * Used to write entries to the offset log asynchronously
+ */
+class AsyncOffsetSeqLog(
+    sparkSession: SparkSession,
+    path: String,
+    executorService: ThreadPoolExecutor,
+    offsetCommitIntervalMs: Long,
+    clock: Clock = new SystemClock())
+    extends OffsetSeqLog(sparkSession, path) {
+
+  // the cache needs to be enabled because we may not be persisting every 
entry to durable storage
+  // entries not persisted to durable storage will just be stored in memory 
for faster lookups
+  assert(metadataCacheEnabled == true)
+
+  // A map of the current pending offset writes. Key -> batch Id, Value -> 
CompletableFuture
+  // Used to determine if a commit log entry for this batch also needs to be 
persisted to storage
+  private val pendingOffsetWrites = new ConcurrentHashMap[Long, 
CompletableFuture[Long]]()
+
+  // Keeps track the last time a commit was issued. Used for issuing commits 
to storage at
+  // the configured intervals
+  private val lastCommitIssuedTimestampMs: AtomicLong = new AtomicLong(-1)
+
+  // A queue of batches written to storage.  Used to keep track when to purge 
old batches
+  val writtenToDurableStorage =
+    new ConcurrentLinkedDeque[Long](listBatchesOnDisk.toList.asJavaCollection)
+
+  /**
+   * Get a async offset write by batch id.  To check if a corresponding commit 
log entry
+   * needs to be written to durable storage as well
+   * @param batchId
+   * @return a option to indicate whether a async offset write was issued for 
the batch with id
+   */
+  def getAsyncOffsetWrite(batchId: Long): Option[CompletableFuture[Long]] = {
+    Option(pendingOffsetWrites.get(batchId))
+  }
+
+  /**
+   * Remove the async offset write when we don't need to keep track of it 
anymore
+   * @param batchId
+   */
+  def removeAsyncOffsetWrite(batchId: Long): Unit = {
+    pendingOffsetWrites.remove(batchId)
+  }
+
+  /**
+   * Writes a new batch to the offset log asynchronously
+   * @param batchId id of batch to write
+   * @param metadata metadata of batch to write
+   * @return a CompeletableFuture that contains the batch id.  The future is 
completed when
+   *         the async write of the batch is completed.  Future may also be 
completed exceptionally
+   *         to indicate some write error.
+   */
+  def addAsync(batchId: Long, metadata: OffsetSeq): CompletableFuture[(Long, 
Boolean)] = {
+    require(metadata != null, "'null' metadata cannot written to a metadata 
log")
+
+    def issueAsyncWrite(batchId: Long): CompletableFuture[Long] = {
+      lastCommitIssuedTimestampMs.set(clock.getTimeMillis())
+      val future: CompletableFuture[Long] = addNewBatchByStreamAsync(batchId) 
{ output =>
+        serialize(metadata, output)
+      }.thenApply((ret: Boolean) => {
+        if (ret) {
+          batchId
+        } else {
+          throw new IllegalStateException(
+            s"Concurrent update to the log. Multiple streaming jobs detected 
for $batchId"
+          )
+        }
+      })
+      pendingOffsetWrites.put(batchId, future)
+      future
+    }
+
+    val lastIssuedTs = lastCommitIssuedTimestampMs.get()
+    val future: CompletableFuture[(Long, Boolean)] = {
+      if (offsetCommitIntervalMs > 0) {
+        if ((lastIssuedTs == -1) // haven't started any commits yet
+          || (lastIssuedTs + offsetCommitIntervalMs) <= clock.getTimeMillis()) 
{
+          issueAsyncWrite(batchId).thenApply((batchId: Long) => {
+            (batchId, true)
+          })
+        } else {
+          // just return completed future because we are not persisting this 
offset
+          CompletableFuture.completedFuture((batchId, false))
+        }
+      } else {
+        // offset commit interval is not enabled
+        issueAsyncWrite(batchId).thenApply((batchId: Long) => {
+          (batchId, true)
+        })
+      }
+    }
+
+    batchCache.put(batchId, metadata)
+    future
+  }
+
+  /**
+   * Adds new batch asynchronously
+   * @param batchId id of batch to write
+   * @param fn serialization function
+   * @return CompletableFuture that contains a boolean do
+   *         indicate whether the write was successfuly or not.
+   *         Future can also be completed exceptionally to indicate write 
errors.
+   */
+  private def addNewBatchByStreamAsync(batchId: Long)(
+      fn: OutputStream => Unit): CompletableFuture[Boolean] = {
+    val future = new CompletableFuture[Boolean]()
+    val batchMetadataFile = batchIdToPath(batchId)
+
+    if (batchCache.containsKey(batchId)) {
+      future.complete(false)
+      future
+    } else {
+      executorService.submit(new Runnable {
+        override def run(): Unit = {
+          try {
+            if (fileManager.exists(batchMetadataFile)) {
+              future.complete(false)
+            } else {
+              val start = System.currentTimeMillis()
+              write(
+                batchMetadataFile,
+                fn
+              )
+              logDebug(
+                s"Offset commit for batch ${batchId} took" +
+                s" ${System.currentTimeMillis() - start} ms to be persisted to 
durable storage"
+              )
+              writtenToDurableStorage.add(batchId)
+              future.complete(true)
+            }
+          } catch {
+            case e: Throwable =>
+              logError(s"Encountered error while writing batch ${batchId} to 
offset log", e)
+              future.completeExceptionally(e)
+          }
+        }
+      })
+      future
+    }
+  }
+
+  /**
+   * Purge entries in the offset log up to thresholdBatchId.  This method is 
synchronized so that

Review Comment:
   Is there synchronization between the two? I don't see any synchronization 
here. Am I missing something? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.OutputStream
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{Clock, SystemClock}
+
+/**
+ * Used to write entries to the offset log asynchronously
+ */
+class AsyncOffsetSeqLog(
+    sparkSession: SparkSession,
+    path: String,
+    executorService: ThreadPoolExecutor,
+    offsetCommitIntervalMs: Long,
+    clock: Clock = new SystemClock())
+    extends OffsetSeqLog(sparkSession, path) {

Review Comment:
   nit: 2 spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.OutputStream
+import java.util.concurrent.{CompletableFuture, ConcurrentLinkedDeque, 
ThreadPoolExecutor}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Implementation of CommitLog to perform asynchronous writes to storage
+ */
+class AsyncCommitLog(sparkSession: SparkSession, path: String, 
executorService: ThreadPoolExecutor)
+    extends CommitLog(sparkSession, path) {

Review Comment:
   nit: 2 spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+    "asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+    "_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+      extraOptions: Map[String, String]): Long = {
+    extraOptions
+      .getOrElse(
+        
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+        "1000"
+      )
+      .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+    sparkSession: SparkSession,
+    trigger: Trigger,
+    triggerClock: Clock,
+    extraOptions: Map[String, String],
+    plan: WriteToStream)
+    extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+    .getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as 
micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+    "async-log-write",
+    2, // one for offset commit and one for completion commit
+    new RejectedExecutionHandler() {
+      override def rejectedExecution(r: Runnable, executor: 
ThreadPoolExecutor): Unit = {
+        try {
+          if (!executor.isShutdown) {
+            val start = System.currentTimeMillis()
+            executor.getQueue.put(r)
+            logDebug(
+              s"Async write paused execution for " +
+                s"${System.currentTimeMillis() - start} due to task queue 
being full."
+            )
+          }
+        } catch {
+          case e: InterruptedException =>
+            Thread.currentThread.interrupt()
+            throw new RejectedExecutionException("Producer interrupted", e)
+          case e: Throwable =>
+            logError("Encountered error in async write executor service", e)
+            errorNotifier.markError(e)
+        }
+      }
+    })
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+    sparkSession,
+    checkpointFile("offsets"),
+    asyncWritesExecutorService,
+    asyncProgressTrackingCheckpointingIntervalMs,
+    clock = triggerClock
+  )
+
+  override val commitLog =
+    new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+    // check if pipeline is stateful
+    checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+    // this is a no op for async progress tracking since we only want to 
commit sources only
+    // after the offset WAL commit has be successfully written
+  }
+
+  /**
+   * Should not call super method as we need to do something completely 
different
+   * in this method for async progress tracking
+   */
+  override def markMicroBatchStart(): Unit = {
+    // Because we are using a thread pool with only one thread, async writes 
to the offset log
+    // are still written in a serial / in order fashion
+    offsetLog
+      .addAsync(currentBatchId, availableOffsets.toOffsetSeq(sources, 
offsetSeqMetadata))
+      .thenAccept(tuple => {
+        val (batchId, persistedToDurableStorage) = tuple
+        if (persistedToDurableStorage) {
+
+          // batch id cache not initialized
+          if (lastBatchPersistedToDurableStorage.get == -1) {
+            lastBatchPersistedToDurableStorage.set(
+              offsetLog.getPrevBatchFromStorage(batchId).getOrElse(-1))
+          }
+
+          if (batchId != 0 && lastBatchPersistedToDurableStorage.get != -1) {
+            // sanity check to make sure batch ids are monotonically increasing
+            assert(lastBatchPersistedToDurableStorage.get < batchId)
+            val prevBatchOff = 
offsetLog.get(lastBatchPersistedToDurableStorage.get())
+            if (prevBatchOff.isDefined) {
+              // Offset is ready to be committed by the source. Add to queue
+              sourceCommitQueue.add(prevBatchOff.get)
+            } else {
+              throw new IllegalStateException(
+                s"batch ${lastBatchPersistedToDurableStorage.get()} doesn't 
exist"
+              )
+            }
+          }
+          lastBatchPersistedToDurableStorage.set(batchId)
+        }
+      })
+      .exceptionally((th: Throwable) => {
+        logError("Encountered error while performing async offset write", th)
+        errorNotifier.markError(th)
+        return
+      })
+
+    // check if there are offsets that are ready to be committed by the source
+    var offset = sourceCommitQueue.poll()
+    while (offset != null) {
+      commitSources(offset)
+      offset = sourceCommitQueue.poll()
+    }
+  }
+
+  override def markMicroBatchEnd(): Unit = {
+    watermarkTracker.updateWatermark(lastExecution.executedPlan)
+    reportTimeTaken("commitOffsets") {
+      // check if current batch there is a async write for the offset log is 
issued for this batch
+      // if so, we should do the same for commit log
+      if (!offsetLog.getAsyncOffsetWrite(currentBatchId).isEmpty) {
+        commitLog
+          .addAsync(currentBatchId, 
CommitMetadata(watermarkTracker.currentWatermark))
+          .exceptionally((th: Throwable) => {
+            logError("Got exception during async write", th)
+            errorNotifier.markError(th)
+            return
+          })
+      } else {
+        if (!commitLog.addInMemory(
+          currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) {
+          throw new IllegalStateException(
+            s"Concurrent update to the log. Multiple streaming jobs detected 
for $currentBatchId"

Review Comment:
   Btw is it even possible to happen? If there are multiple queries based on 
same checkpoint running concurrently even in the same driver, the instance of 
AsyncCommitLog would be different.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.OutputStream
+import java.util.concurrent.{CompletableFuture, ConcurrentLinkedDeque, 
ThreadPoolExecutor}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Implementation of CommitLog to perform asynchronous writes to storage
+ */
+class AsyncCommitLog(sparkSession: SparkSession, path: String, 
executorService: ThreadPoolExecutor)
+    extends CommitLog(sparkSession, path) {
+

Review Comment:
   Does this class also require metadataCacheEnabled == true? If then let's 
also add require here as well.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.OutputStream
+import java.util.concurrent.{CompletableFuture, ConcurrentLinkedDeque, 
ThreadPoolExecutor}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Implementation of CommitLog to perform asynchronous writes to storage
+ */
+class AsyncCommitLog(sparkSession: SparkSession, path: String, 
executorService: ThreadPoolExecutor)
+    extends CommitLog(sparkSession, path) {
+
+  // A queue of batches written to storage.  Used to keep track when to purge 
old batches
+  val writtenToDurableStorage =
+    new ConcurrentLinkedDeque[Long](listBatchesOnDisk.toList.asJavaCollection)
+
+  /**
+   * Writes a new batch to the commit log asynchronously
+   * @param batchId id of batch to write
+   * @param metadata metadata of batch to write
+   * @return a CompeletableFuture that contains the batch id.  The future is 
completed when
+   *         the async write of the batch is completed.  Future may also be 
completed exceptionally
+   *         to indicate some write error.
+   */
+  def addAsync(batchId: Long, metadata: CommitMetadata): 
CompletableFuture[Long] = {
+    require(metadata != null, "'null' metadata cannot be written to a metadata 
log")
+    val future: CompletableFuture[Long] = addNewBatchByStreamAsync(batchId) { 
output =>
+      serialize(metadata, output)
+    }.thenApply((ret: Boolean) => {
+      if (ret) {
+        batchId
+      } else {
+        throw new IllegalStateException(
+          s"Concurrent update to the log. Multiple streaming jobs detected for 
$batchId"
+        )
+      }
+    })
+

Review Comment:
   Is updating batchCache missed or intended?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to