Repository: spark
Updated Branches:
  refs/heads/master 2fe16333d -> 59daf91b7


[SPARK-22733] Split StreamExecution into MicroBatchExecution and 
StreamExecution.

## What changes were proposed in this pull request?

StreamExecution is now an abstract base class, which MicroBatchExecution (the 
current StreamExecution) inherits. When continuous processing is implemented, 
we'll have a new ContinuousExecution implementation of StreamExecution.

A few fields are also renamed to make them less microbatch-specific.

## How was this patch tested?

refactoring only

Author: Jose Torres <j...@databricks.com>

Closes #19926 from joseph-torres/continuous-refactor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59daf91b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59daf91b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59daf91b

Branch: refs/heads/master
Commit: 59daf91b7cfb50b1c20eb41959921fc03103b739
Parents: 2fe1633
Author: Jose Torres <j...@databricks.com>
Authored: Thu Dec 14 14:31:21 2017 -0800
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Thu Dec 14 14:31:21 2017 -0800

----------------------------------------------------------------------
 .../execution/streaming/BatchCommitLog.scala    |  83 ----
 .../sql/execution/streaming/CommitLog.scala     |  83 ++++
 .../streaming/MicroBatchExecution.scala         | 407 +++++++++++++++++
 .../execution/streaming/StreamExecution.scala   | 457 +++----------------
 .../sql/streaming/StreamingQueryManager.scala   |   2 +-
 .../sql/streaming/EventTimeWatermarkSuite.scala |   4 +-
 .../sql/streaming/FileStreamSourceSuite.scala   |   5 +-
 .../spark/sql/streaming/StreamSuite.scala       |   2 +-
 .../apache/spark/sql/streaming/StreamTest.scala |  20 +-
 .../streaming/StreamingAggregationSuite.scala   |   4 +-
 .../sql/streaming/StreamingQuerySuite.scala     |   4 +-
 11 files changed, 563 insertions(+), 508 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/59daf91b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
deleted file mode 100644
index 5e24e8f..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import java.io.{InputStream, OutputStream}
-import java.nio.charset.StandardCharsets._
-
-import scala.io.{Source => IOSource}
-
-import org.apache.spark.sql.SparkSession
-
-/**
- * Used to write log files that represent batch commit points in structured 
streaming.
- * A commit log file will be written immediately after the successful 
completion of a
- * batch, and before processing the next batch. Here is an execution summary:
- * - trigger batch 1
- * - obtain batch 1 offsets and write to offset log
- * - process batch 1
- * - write batch 1 to completion log
- * - trigger batch 2
- * - obtain batch 2 offsets and write to offset log
- * - process batch 2
- * - write batch 2 to completion log
- * ....
- *
- * The current format of the batch completion log is:
- * line 1: version
- * line 2: metadata (optional json string)
- */
-class BatchCommitLog(sparkSession: SparkSession, path: String)
-  extends HDFSMetadataLog[String](sparkSession, path) {
-
-  import BatchCommitLog._
-
-  def add(batchId: Long): Unit = {
-    super.add(batchId, EMPTY_JSON)
-  }
-
-  override def add(batchId: Long, metadata: String): Boolean = {
-    throw new UnsupportedOperationException(
-      "BatchCommitLog does not take any metadata, use 'add(batchId)' instead")
-  }
-
-  override protected def deserialize(in: InputStream): String = {
-    // called inside a try-finally where the underlying stream is closed in 
the caller
-    val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
-    if (!lines.hasNext) {
-      throw new IllegalStateException("Incomplete log file in the offset 
commit log")
-    }
-    parseVersion(lines.next.trim, VERSION)
-    EMPTY_JSON
-  }
-
-  override protected def serialize(metadata: String, out: OutputStream): Unit 
= {
-    // called inside a try-finally where the underlying stream is closed in 
the caller
-    out.write(s"v${VERSION}".getBytes(UTF_8))
-    out.write('\n')
-
-    // write metadata
-    out.write(EMPTY_JSON.getBytes(UTF_8))
-  }
-}
-
-object BatchCommitLog {
-  private val VERSION = 1
-  private val EMPTY_JSON = "{}"
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/59daf91b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
new file mode 100644
index 0000000..5b11424
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{InputStream, OutputStream}
+import java.nio.charset.StandardCharsets._
+
+import scala.io.{Source => IOSource}
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Used to write log files that represent batch commit points in structured 
streaming.
+ * A commit log file will be written immediately after the successful 
completion of a
+ * batch, and before processing the next batch. Here is an execution summary:
+ * - trigger batch 1
+ * - obtain batch 1 offsets and write to offset log
+ * - process batch 1
+ * - write batch 1 to completion log
+ * - trigger batch 2
+ * - obtain batch 2 offsets and write to offset log
+ * - process batch 2
+ * - write batch 2 to completion log
+ * ....
+ *
+ * The current format of the batch completion log is:
+ * line 1: version
+ * line 2: metadata (optional json string)
+ */
+class CommitLog(sparkSession: SparkSession, path: String)
+  extends HDFSMetadataLog[String](sparkSession, path) {
+
+  import CommitLog._
+
+  def add(batchId: Long): Unit = {
+    super.add(batchId, EMPTY_JSON)
+  }
+
+  override def add(batchId: Long, metadata: String): Boolean = {
+    throw new UnsupportedOperationException(
+      "CommitLog does not take any metadata, use 'add(batchId)' instead")
+  }
+
+  override protected def deserialize(in: InputStream): String = {
+    // called inside a try-finally where the underlying stream is closed in 
the caller
+    val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
+    if (!lines.hasNext) {
+      throw new IllegalStateException("Incomplete log file in the offset 
commit log")
+    }
+    parseVersion(lines.next.trim, VERSION)
+    EMPTY_JSON
+  }
+
+  override protected def serialize(metadata: String, out: OutputStream): Unit 
= {
+    // called inside a try-finally where the underlying stream is closed in 
the caller
+    out.write(s"v${VERSION}".getBytes(UTF_8))
+    out.write('\n')
+
+    // write metadata
+    out.write(EMPTY_JSON.getBytes(UTF_8))
+  }
+}
+
+object CommitLog {
+  private val VERSION = 1
+  private val EMPTY_JSON = "{}"
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/59daf91b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
new file mode 100644
index 0000000..a67dda9
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.sources.v2.reader.Offset
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.util.{Clock, Utils}
+
+class MicroBatchExecution(
+    sparkSession: SparkSession,
+    name: String,
+    checkpointRoot: String,
+    analyzedPlan: LogicalPlan,
+    sink: Sink,
+    trigger: Trigger,
+    triggerClock: Clock,
+    outputMode: OutputMode,
+    deleteCheckpointOnStop: Boolean)
+  extends StreamExecution(
+    sparkSession, name, checkpointRoot, analyzedPlan, sink,
+    trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
+
+  private val triggerExecutor = trigger match {
+    case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
+    case OneTimeTrigger => OneTimeExecutor()
+    case _ => throw new IllegalStateException(s"Unknown type of trigger: 
$trigger")
+  }
+
+  override lazy val logicalPlan: LogicalPlan = {
+    assert(queryExecutionThread eq Thread.currentThread,
+      "logicalPlan must be initialized in QueryExecutionThread " +
+        s"but the current thread was ${Thread.currentThread}")
+    var nextSourceId = 0L
+    val toExecutionRelationMap = MutableMap[StreamingRelation, 
StreamingExecutionRelation]()
+    val _logicalPlan = analyzedPlan.transform {
+      case streamingRelation@StreamingRelation(dataSource, _, output) =>
+        toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
+          // Materialize source to avoid creating it in every batch
+          val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
+          val source = dataSource.createSource(metadataPath)
+          nextSourceId += 1
+          // We still need to use the previous `output` instead of 
`source.schema` as attributes in
+          // "df.logicalPlan" has already used attributes of the previous 
`output`.
+          StreamingExecutionRelation(source, output)(sparkSession)
+        })
+    }
+    sources = _logicalPlan.collect { case s: StreamingExecutionRelation => 
s.source }
+    uniqueSources = sources.distinct
+    _logicalPlan
+  }
+
+  /**
+   * Repeatedly attempts to run batches as data arrives.
+   */
+  protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit 
= {
+    triggerExecutor.execute(() => {
+      startTrigger()
+
+      if (isActive) {
+        reportTimeTaken("triggerExecution") {
+          if (currentBatchId < 0) {
+            // We'll do this initialization only once
+            populateStartOffsets(sparkSessionForStream)
+            
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
+            logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
+          } else {
+            constructNextBatch()
+          }
+          if (dataAvailable) {
+            currentStatus = currentStatus.copy(isDataAvailable = true)
+            updateStatusMessage("Processing new data")
+            runBatch(sparkSessionForStream)
+          }
+        }
+        // Report trigger as finished and construct progress object.
+        finishTrigger(dataAvailable)
+        if (dataAvailable) {
+          // Update committed offsets.
+          commitLog.add(currentBatchId)
+          committedOffsets ++= availableOffsets
+          logDebug(s"batch ${currentBatchId} committed")
+          // We'll increase currentBatchId after we complete processing 
current batch's data
+          currentBatchId += 1
+          
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
+        } else {
+          currentStatus = currentStatus.copy(isDataAvailable = false)
+          updateStatusMessage("Waiting for data to arrive")
+          Thread.sleep(pollingDelayMs)
+        }
+      }
+      updateStatusMessage("Waiting for next trigger")
+      isActive
+    })
+  }
+
+  /**
+   * Populate the start offsets to start the execution at the current offsets 
stored in the sink
+   * (i.e. avoid reprocessing data that we have already processed). This 
function must be called
+   * before any processing occurs and will populate the following fields:
+   *  - currentBatchId
+   *  - committedOffsets
+   *  - availableOffsets
+   *  The basic structure of this method is as follows:
+   *
+   *  Identify (from the offset log) the offsets used to run the last batch
+   *  IF last batch exists THEN
+   *    Set the next batch to be executed as the last recovered batch
+   *    Check the commit log to see which batch was committed last
+   *    IF the last batch was committed THEN
+   *      Call getBatch using the last batch start and end offsets
+   *      // ^^^^ above line is needed since some sources assume last batch 
always re-executes
+   *      Setup for a new batch i.e., start = last batch end, and identify new 
end
+   *    DONE
+   *  ELSE
+   *    Identify a brand new batch
+   *  DONE
+   */
+  private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): 
Unit = {
+    offsetLog.getLatest() match {
+      case Some((latestBatchId, nextOffsets)) =>
+        /* First assume that we are re-executing the latest known batch
+         * in the offset log */
+        currentBatchId = latestBatchId
+        availableOffsets = nextOffsets.toStreamProgress(sources)
+        /* Initialize committed offsets to a committed batch, which at this
+         * is the second latest batch id in the offset log. */
+        if (latestBatchId != 0) {
+          val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse 
{
+            throw new IllegalStateException(s"batch ${latestBatchId - 1} 
doesn't exist")
+          }
+          committedOffsets = secondLatestBatchId.toStreamProgress(sources)
+        }
+
+        // update offset metadata
+        nextOffsets.metadata.foreach { metadata =>
+          OffsetSeqMetadata.setSessionConf(metadata, 
sparkSessionToRunBatches.conf)
+          offsetSeqMetadata = OffsetSeqMetadata(
+            metadata.batchWatermarkMs, metadata.batchTimestampMs, 
sparkSessionToRunBatches.conf)
+        }
+
+        /* identify the current batch id: if commit log indicates we 
successfully processed the
+         * latest batch id in the offset log, then we can safely move to the 
next batch
+         * i.e., committedBatchId + 1 */
+        commitLog.getLatest() match {
+          case Some((latestCommittedBatchId, _)) =>
+            if (latestBatchId == latestCommittedBatchId) {
+              /* The last batch was successfully committed, so we can safely 
process a
+               * new next batch but first:
+               * Make a call to getBatch using the offsets from previous batch.
+               * because certain sources (e.g., KafkaSource) assume on restart 
the last
+               * batch will be executed before getOffset is called again. */
+              availableOffsets.foreach { ao: (Source, Offset) =>
+                val (source, end) = ao
+                if (committedOffsets.get(source).map(_ != 
end).getOrElse(true)) {
+                  val start = committedOffsets.get(source)
+                  source.getBatch(start, end)
+                }
+              }
+              currentBatchId = latestCommittedBatchId + 1
+              committedOffsets ++= availableOffsets
+              // Construct a new batch be recomputing availableOffsets
+              constructNextBatch()
+            } else if (latestCommittedBatchId < latestBatchId - 1) {
+              logWarning(s"Batch completion log latest batch id is " +
+                s"${latestCommittedBatchId}, which is not trailing " +
+                s"batchid $latestBatchId by one")
+            }
+          case None => logInfo("no commit log present")
+        }
+        logDebug(s"Resuming at batch $currentBatchId with committed offsets " +
+          s"$committedOffsets and available offsets $availableOffsets")
+      case None => // We are starting this stream for the first time.
+        logInfo(s"Starting new streaming query.")
+        currentBatchId = 0
+        constructNextBatch()
+    }
+  }
+
+  /**
+   * Returns true if there is any new data available to be processed.
+   */
+  private def dataAvailable: Boolean = {
+    availableOffsets.exists {
+      case (source, available) =>
+        committedOffsets
+          .get(source)
+          .map(committed => committed != available)
+          .getOrElse(true)
+    }
+  }
+
+  /**
+   * Queries all of the sources to see if any new data is available. When 
there is new data the
+   * batchId counter is incremented and a new log entry is written with the 
newest offsets.
+   */
+  private def constructNextBatch(): Unit = {
+    // Check to see what new data is available.
+    val hasNewData = {
+      awaitProgressLock.lock()
+      try {
+        val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s 
=>
+          updateStatusMessage(s"Getting offsets from $s")
+          reportTimeTaken("getOffset") {
+            (s, s.getOffset)
+          }
+        }.toMap
+        availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty 
}.mapValues(_.get)
+
+        if (dataAvailable) {
+          true
+        } else {
+          noNewData = true
+          false
+        }
+      } finally {
+        awaitProgressLock.unlock()
+      }
+    }
+    if (hasNewData) {
+      var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
+      // Update the eventTime watermarks if we find any in the plan.
+      if (lastExecution != null) {
+        lastExecution.executedPlan.collect {
+          case e: EventTimeWatermarkExec => e
+        }.zipWithIndex.foreach {
+          case (e, index) if e.eventTimeStats.value.count > 0 =>
+            logDebug(s"Observed event time stats $index: 
${e.eventTimeStats.value}")
+            val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
+            val prevWatermarkMs = watermarkMsMap.get(index)
+            if (prevWatermarkMs.isEmpty || newWatermarkMs > 
prevWatermarkMs.get) {
+              watermarkMsMap.put(index, newWatermarkMs)
+            }
+
+          // Populate 0 if we haven't seen any data yet for this watermark 
node.
+          case (_, index) =>
+            if (!watermarkMsMap.isDefinedAt(index)) {
+              watermarkMsMap.put(index, 0)
+            }
+        }
+
+        // Update the global watermark to the minimum of all watermark nodes.
+        // This is the safest option, because only the global watermark is 
fault-tolerant. Making
+        // it the minimum of all individual watermarks guarantees it will 
never advance past where
+        // any individual watermark operator would be if it were in a plan by 
itself.
+        if(!watermarkMsMap.isEmpty) {
+          val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
+          if (newWatermarkMs > batchWatermarkMs) {
+            logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
+            batchWatermarkMs = newWatermarkMs
+          } else {
+            logDebug(
+              s"Event time didn't move: $newWatermarkMs < " +
+                s"$batchWatermarkMs")
+          }
+        }
+      }
+      offsetSeqMetadata = offsetSeqMetadata.copy(
+        batchWatermarkMs = batchWatermarkMs,
+        batchTimestampMs = triggerClock.getTimeMillis()) // Current batch 
timestamp in milliseconds
+
+      updateStatusMessage("Writing offsets to log")
+      reportTimeTaken("walCommit") {
+        assert(offsetLog.add(
+          currentBatchId,
+          availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
+          s"Concurrent update to the log. Multiple streaming jobs detected for 
$currentBatchId")
+        logInfo(s"Committed offsets for batch $currentBatchId. " +
+          s"Metadata ${offsetSeqMetadata.toString}")
+
+        // NOTE: The following code is correct because runStream() processes 
exactly one
+        // batch at a time. If we add pipeline parallelism (multiple batches 
in flight at
+        // the same time), this cleanup logic will need to change.
+
+        // Now that we've updated the scheduler's persistent checkpoint, it is 
safe for the
+        // sources to discard data from the previous batch.
+        if (currentBatchId != 0) {
+          val prevBatchOff = offsetLog.get(currentBatchId - 1)
+          if (prevBatchOff.isDefined) {
+            prevBatchOff.get.toStreamProgress(sources).foreach {
+              case (src, off) => src.commit(off)
+            }
+          } else {
+            throw new IllegalStateException(s"batch $currentBatchId doesn't 
exist")
+          }
+        }
+
+        // It is now safe to discard the metadata beyond the minimum number to 
retain.
+        // Note that purge is exclusive, i.e. it purges everything before the 
target ID.
+        if (minLogEntriesToMaintain < currentBatchId) {
+          offsetLog.purge(currentBatchId - minLogEntriesToMaintain)
+          commitLog.purge(currentBatchId - minLogEntriesToMaintain)
+        }
+      }
+    } else {
+      awaitProgressLock.lock()
+      try {
+        // Wake up any threads that are waiting for the stream to progress.
+        awaitProgressLockCondition.signalAll()
+      } finally {
+        awaitProgressLock.unlock()
+      }
+    }
+  }
+
+  /**
+   * Processes any data available between `availableOffsets` and 
`committedOffsets`.
+   * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch 
with.
+   */
+  private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
+    // Request unprocessed data from all sources.
+    newData = reportTimeTaken("getBatch") {
+      availableOffsets.flatMap {
+        case (source, available)
+          if committedOffsets.get(source).map(_ != available).getOrElse(true) 
=>
+          val current = committedOffsets.get(source)
+          val batch = source.getBatch(current, available)
+          assert(batch.isStreaming,
+            s"DataFrame returned by getBatch from $source did not have 
isStreaming=true\n" +
+              s"${batch.queryExecution.logical}")
+          logDebug(s"Retrieving data from $source: $current -> $available")
+          Some(source -> batch)
+        case _ => None
+      }
+    }
+
+    // A list of attributes that will need to be updated.
+    val replacements = new ArrayBuffer[(Attribute, Attribute)]
+    // Replace sources in the logical plan with data that has arrived since 
the last batch.
+    val withNewSources = logicalPlan transform {
+      case StreamingExecutionRelation(source, output) =>
+        newData.get(source).map { data =>
+          val newPlan = data.logicalPlan
+          assert(output.size == newPlan.output.size,
+            s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
+              s"${Utils.truncatedString(newPlan.output, ",")}")
+          replacements ++= output.zip(newPlan.output)
+          newPlan
+        }.getOrElse {
+          LocalRelation(output, isStreaming = true)
+        }
+    }
+
+    // Rewire the plan to use the new attributes that were returned by the 
source.
+    val replacementMap = AttributeMap(replacements)
+    val triggerLogicalPlan = withNewSources transformAllExpressions {
+      case a: Attribute if replacementMap.contains(a) =>
+        replacementMap(a).withMetadata(a.metadata)
+      case ct: CurrentTimestamp =>
+        CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
+          ct.dataType)
+      case cd: CurrentDate =>
+        CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
+          cd.dataType, cd.timeZoneId)
+    }
+
+    reportTimeTaken("queryPlanning") {
+      lastExecution = new IncrementalExecution(
+        sparkSessionToRunBatch,
+        triggerLogicalPlan,
+        outputMode,
+        checkpointFile("state"),
+        runId,
+        currentBatchId,
+        offsetSeqMetadata)
+      lastExecution.executedPlan // Force the lazy generation of execution plan
+    }
+
+    val nextBatch =
+      new Dataset(sparkSessionToRunBatch, lastExecution, 
RowEncoder(lastExecution.analyzed.schema))
+
+    reportTimeTaken("addBatch") {
+      SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
+        sink.addBatch(currentBatchId, nextBatch)
+      }
+    }
+
+    awaitProgressLock.lock()
+    try {
+      // Wake up any threads that are waiting for the stream to progress.
+      awaitProgressLockCondition.signalAll()
+    } finally {
+      awaitProgressLock.unlock()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/59daf91b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 16063c0..7946889 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -22,10 +22,9 @@ import java.nio.channels.ClosedByInterruptException
 import java.util.UUID
 import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
-import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.locks.{Condition, ReentrantLock}
 
 import scala.collection.mutable.{Map => MutableMap}
-import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
 import com.google.common.util.concurrent.UncheckedExecutionException
@@ -33,10 +32,8 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
-import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.StreamingExplainCommand
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.v2.reader.Offset
@@ -58,7 +55,7 @@ case object TERMINATED extends State
  * @param deleteCheckpointOnStop whether to delete the checkpoint if the query 
is stopped without
  *                               errors
  */
-class StreamExecution(
+abstract class StreamExecution(
     override val sparkSession: SparkSession,
     override val name: String,
     private val checkpointRoot: String,
@@ -72,16 +69,16 @@ class StreamExecution(
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
 
-  private val pollingDelayMs = 
sparkSession.sessionState.conf.streamingPollingDelay
+  protected val pollingDelayMs: Long = 
sparkSession.sessionState.conf.streamingPollingDelay
 
-  private val minBatchesToRetain = 
sparkSession.sessionState.conf.minBatchesToRetain
-  require(minBatchesToRetain > 0, "minBatchesToRetain has to be positive")
+  protected val minLogEntriesToMaintain: Int = 
sparkSession.sessionState.conf.minBatchesToRetain
+  require(minLogEntriesToMaintain > 0, "minBatchesToRetain has to be positive")
 
   /**
    * A lock used to wait/notify when batches complete. Use a fair lock to 
avoid thread starvation.
    */
-  private val awaitBatchLock = new ReentrantLock(true)
-  private val awaitBatchLockCondition = awaitBatchLock.newCondition()
+  protected val awaitProgressLock = new ReentrantLock(true)
+  protected val awaitProgressLockCondition = awaitProgressLock.newCondition()
 
   private val initializationLatch = new CountDownLatch(1)
   private val startLatch = new CountDownLatch(1)
@@ -90,9 +87,11 @@ class StreamExecution(
   val resolvedCheckpointRoot = {
     val checkpointPath = new Path(checkpointRoot)
     val fs = 
checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
-    checkpointPath.makeQualified(fs.getUri(), 
fs.getWorkingDirectory()).toUri.toString
+    checkpointPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory).toUri.toString
   }
 
+  def logicalPlan: LogicalPlan
+
   /**
    * Tracks how much data we have processed and committed to the sink or state 
store from each
    * input source.
@@ -160,36 +159,7 @@ class StreamExecution(
   /**
    * A list of unique sources in the query plan. This will be set when 
generating logical plan.
    */
-  @volatile private var uniqueSources: Seq[Source] = Seq.empty
-
-  override lazy val logicalPlan: LogicalPlan = {
-    assert(microBatchThread eq Thread.currentThread,
-      "logicalPlan must be initialized in StreamExecutionThread " +
-        s"but the current thread was ${Thread.currentThread}")
-    var nextSourceId = 0L
-    val toExecutionRelationMap = MutableMap[StreamingRelation, 
StreamingExecutionRelation]()
-    val _logicalPlan = analyzedPlan.transform {
-      case streamingRelation@StreamingRelation(dataSource, _, output) =>
-        toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
-          // Materialize source to avoid creating it in every batch
-          val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
-          val source = dataSource.createSource(metadataPath)
-          nextSourceId += 1
-          // We still need to use the previous `output` instead of 
`source.schema` as attributes in
-          // "df.logicalPlan" has already used attributes of the previous 
`output`.
-          StreamingExecutionRelation(source, output)(sparkSession)
-        })
-    }
-    sources = _logicalPlan.collect { case s: StreamingExecutionRelation => 
s.source }
-    uniqueSources = sources.distinct
-    _logicalPlan
-  }
-
-  private val triggerExecutor = trigger match {
-    case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
-    case OneTimeTrigger => OneTimeExecutor()
-    case _ => throw new IllegalStateException(s"Unknown type of trigger: 
$trigger")
-  }
+  @volatile protected var uniqueSources: Seq[Source] = Seq.empty
 
   /** Defines the internal state of execution */
   private val state = new AtomicReference[State](INITIALIZING)
@@ -215,13 +185,13 @@ class StreamExecution(
    * [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: 
interrupting a
    * running `KafkaConsumer` may cause endless loop.
    */
-  val microBatchThread =
-    new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
+  val queryExecutionThread: QueryExecutionThread =
+    new QueryExecutionThread(s"stream execution thread for $prettyIdString") {
       override def run(): Unit = {
         // To fix call site like "run at <unknown>:0", we bridge the call site 
from the caller
         // thread to this micro batch thread
         sparkSession.sparkContext.setCallSite(callSite)
-        runBatches()
+        runStream()
       }
     }
 
@@ -238,7 +208,7 @@ class StreamExecution(
    * fully processed, and its output was committed to the sink, hence no need 
to process it again.
    * This is used (for instance) during restart, to help identify which batch 
to run next.
    */
-  val batchCommitLog = new BatchCommitLog(sparkSession, 
checkpointFile("commits"))
+  val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))
 
   /** Whether all fields of the query have been initialized */
   private def isInitialized: Boolean = state.get != INITIALIZING
@@ -250,7 +220,7 @@ class StreamExecution(
   override def exception: Option[StreamingQueryException] = 
Option(streamDeathCause)
 
   /** Returns the path of a file with `name` in the checkpoint directory. */
-  private def checkpointFile(name: String): String =
+  protected def checkpointFile(name: String): String =
     new Path(new Path(resolvedCheckpointRoot), name).toUri.toString
 
   /**
@@ -259,20 +229,25 @@ class StreamExecution(
    */
   def start(): Unit = {
     logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store 
the query checkpoint.")
-    microBatchThread.setDaemon(true)
-    microBatchThread.start()
+    queryExecutionThread.setDaemon(true)
+    queryExecutionThread.start()
     startLatch.await()  // Wait until thread started and QueryStart event has 
been posted
   }
 
   /**
-   * Repeatedly attempts to run batches as data arrives.
+   * Run the activated stream until stopped.
+   */
+  protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit
+
+  /**
+   * Activate the stream and then wrap a callout to runActivatedStream, 
handling start and stop.
    *
    * Note that this method ensures that [[QueryStartedEvent]] and 
[[QueryTerminatedEvent]] are
    * posted such that listeners are guaranteed to get a start event before a 
termination.
    * Furthermore, this method also ensures that [[QueryStartedEvent]] event is 
posted before the
    * `start()` method returns.
    */
-  private def runBatches(): Unit = {
+  private def runStream(): Unit = {
     try {
       sparkSession.sparkContext.setJobGroup(runId.toString, 
getBatchDescriptionString,
         interruptOnCancel = true)
@@ -295,56 +270,18 @@ class StreamExecution(
       logicalPlan
 
       // Isolated spark session to run the batches with.
-      val sparkSessionToRunBatches = sparkSession.cloneSession()
+      val sparkSessionForStream = sparkSession.cloneSession()
       // Adaptive execution can change num shuffle partitions, disallow
-      
sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, 
"false")
+      sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, 
"false")
       // Disable cost-based join optimization as we do not want stateful 
operations to be rearranged
-      sparkSessionToRunBatches.conf.set(SQLConf.CBO_ENABLED.key, "false")
+      sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
       offsetSeqMetadata = OffsetSeqMetadata(
-        batchWatermarkMs = 0, batchTimestampMs = 0, 
sparkSessionToRunBatches.conf)
+        batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionForStream.conf)
 
       if (state.compareAndSet(INITIALIZING, ACTIVE)) {
         // Unblock `awaitInitialization`
         initializationLatch.countDown()
-
-        triggerExecutor.execute(() => {
-          startTrigger()
-
-          if (isActive) {
-            reportTimeTaken("triggerExecution") {
-              if (currentBatchId < 0) {
-                // We'll do this initialization only once
-                populateStartOffsets(sparkSessionToRunBatches)
-                
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
-                logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
-              } else {
-                constructNextBatch()
-              }
-              if (dataAvailable) {
-                currentStatus = currentStatus.copy(isDataAvailable = true)
-                updateStatusMessage("Processing new data")
-                runBatch(sparkSessionToRunBatches)
-              }
-            }
-            // Report trigger as finished and construct progress object.
-            finishTrigger(dataAvailable)
-            if (dataAvailable) {
-              // Update committed offsets.
-              batchCommitLog.add(currentBatchId)
-              committedOffsets ++= availableOffsets
-              logDebug(s"batch ${currentBatchId} committed")
-              // We'll increase currentBatchId after we complete processing 
current batch's data
-              currentBatchId += 1
-              
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
-            } else {
-              currentStatus = currentStatus.copy(isDataAvailable = false)
-              updateStatusMessage("Waiting for data to arrive")
-              Thread.sleep(pollingDelayMs)
-            }
-          }
-          updateStatusMessage("Waiting for next trigger")
-          isActive
-        })
+        runActivatedStream(sparkSessionForStream)
         updateStatusMessage("Stopped")
       } else {
         // `stop()` is already called. Let `finally` finish the cleanup.
@@ -373,7 +310,7 @@ class StreamExecution(
         if (!NonFatal(e)) {
           throw e
         }
-    } finally microBatchThread.runUninterruptibly {
+    } finally queryExecutionThread.runUninterruptibly {
       // The whole `finally` block must run inside `runUninterruptibly` to 
avoid being interrupted
       // when a query is stopped by the user. We need to make sure the 
following codes finish
       // otherwise it may throw `InterruptedException` to 
`UncaughtExceptionHandler` (SPARK-21248).
@@ -410,12 +347,12 @@ class StreamExecution(
           }
         }
       } finally {
-        awaitBatchLock.lock()
+        awaitProgressLock.lock()
         try {
           // Wake up any threads that are waiting for the stream to progress.
-          awaitBatchLockCondition.signalAll()
+          awaitProgressLockCondition.signalAll()
         } finally {
-          awaitBatchLock.unlock()
+          awaitProgressLock.unlock()
         }
         terminationLatch.countDown()
       }
@@ -448,296 +385,6 @@ class StreamExecution(
     }
   }
 
-  /**
-   * Populate the start offsets to start the execution at the current offsets 
stored in the sink
-   * (i.e. avoid reprocessing data that we have already processed). This 
function must be called
-   * before any processing occurs and will populate the following fields:
-   *  - currentBatchId
-   *  - committedOffsets
-   *  - availableOffsets
-   *  The basic structure of this method is as follows:
-   *
-   *  Identify (from the offset log) the offsets used to run the last batch
-   *  IF last batch exists THEN
-   *    Set the next batch to be executed as the last recovered batch
-   *    Check the commit log to see which batch was committed last
-   *    IF the last batch was committed THEN
-   *      Call getBatch using the last batch start and end offsets
-   *      // ^^^^ above line is needed since some sources assume last batch 
always re-executes
-   *      Setup for a new batch i.e., start = last batch end, and identify new 
end
-   *    DONE
-   *  ELSE
-   *    Identify a brand new batch
-   *  DONE
-   */
-  private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): 
Unit = {
-    offsetLog.getLatest() match {
-      case Some((latestBatchId, nextOffsets)) =>
-        /* First assume that we are re-executing the latest known batch
-         * in the offset log */
-        currentBatchId = latestBatchId
-        availableOffsets = nextOffsets.toStreamProgress(sources)
-        /* Initialize committed offsets to a committed batch, which at this
-         * is the second latest batch id in the offset log. */
-        if (latestBatchId != 0) {
-          val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse 
{
-            throw new IllegalStateException(s"batch ${latestBatchId - 1} 
doesn't exist")
-          }
-          committedOffsets = secondLatestBatchId.toStreamProgress(sources)
-        }
-
-        // update offset metadata
-        nextOffsets.metadata.foreach { metadata =>
-          OffsetSeqMetadata.setSessionConf(metadata, 
sparkSessionToRunBatches.conf)
-          offsetSeqMetadata = OffsetSeqMetadata(
-            metadata.batchWatermarkMs, metadata.batchTimestampMs, 
sparkSessionToRunBatches.conf)
-        }
-
-        /* identify the current batch id: if commit log indicates we 
successfully processed the
-         * latest batch id in the offset log, then we can safely move to the 
next batch
-         * i.e., committedBatchId + 1 */
-        batchCommitLog.getLatest() match {
-          case Some((latestCommittedBatchId, _)) =>
-            if (latestBatchId == latestCommittedBatchId) {
-              /* The last batch was successfully committed, so we can safely 
process a
-               * new next batch but first:
-               * Make a call to getBatch using the offsets from previous batch.
-               * because certain sources (e.g., KafkaSource) assume on restart 
the last
-               * batch will be executed before getOffset is called again. */
-              availableOffsets.foreach { ao: (Source, Offset) =>
-                val (source, end) = ao
-                if (committedOffsets.get(source).map(_ != 
end).getOrElse(true)) {
-                  val start = committedOffsets.get(source)
-                  source.getBatch(start, end)
-                }
-              }
-              currentBatchId = latestCommittedBatchId + 1
-              committedOffsets ++= availableOffsets
-              // Construct a new batch be recomputing availableOffsets
-              constructNextBatch()
-            } else if (latestCommittedBatchId < latestBatchId - 1) {
-              logWarning(s"Batch completion log latest batch id is " +
-                s"${latestCommittedBatchId}, which is not trailing " +
-                s"batchid $latestBatchId by one")
-            }
-          case None => logInfo("no commit log present")
-        }
-        logDebug(s"Resuming at batch $currentBatchId with committed offsets " +
-          s"$committedOffsets and available offsets $availableOffsets")
-      case None => // We are starting this stream for the first time.
-        logInfo(s"Starting new streaming query.")
-        currentBatchId = 0
-        constructNextBatch()
-    }
-  }
-
-  /**
-   * Returns true if there is any new data available to be processed.
-   */
-  private def dataAvailable: Boolean = {
-    availableOffsets.exists {
-      case (source, available) =>
-        committedOffsets
-            .get(source)
-            .map(committed => committed != available)
-            .getOrElse(true)
-    }
-  }
-
-  /**
-   * Queries all of the sources to see if any new data is available. When 
there is new data the
-   * batchId counter is incremented and a new log entry is written with the 
newest offsets.
-   */
-  private def constructNextBatch(): Unit = {
-    // Check to see what new data is available.
-    val hasNewData = {
-      awaitBatchLock.lock()
-      try {
-        val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s 
=>
-          updateStatusMessage(s"Getting offsets from $s")
-          reportTimeTaken("getOffset") {
-            (s, s.getOffset)
-          }
-        }.toMap
-        availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty 
}.mapValues(_.get)
-
-        if (dataAvailable) {
-          true
-        } else {
-          noNewData = true
-          false
-        }
-      } finally {
-        awaitBatchLock.unlock()
-      }
-    }
-    if (hasNewData) {
-      var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
-      // Update the eventTime watermarks if we find any in the plan.
-      if (lastExecution != null) {
-        lastExecution.executedPlan.collect {
-          case e: EventTimeWatermarkExec => e
-        }.zipWithIndex.foreach {
-          case (e, index) if e.eventTimeStats.value.count > 0 =>
-            logDebug(s"Observed event time stats $index: 
${e.eventTimeStats.value}")
-            val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
-            val prevWatermarkMs = watermarkMsMap.get(index)
-            if (prevWatermarkMs.isEmpty || newWatermarkMs > 
prevWatermarkMs.get) {
-              watermarkMsMap.put(index, newWatermarkMs)
-            }
-
-          // Populate 0 if we haven't seen any data yet for this watermark 
node.
-          case (_, index) =>
-            if (!watermarkMsMap.isDefinedAt(index)) {
-              watermarkMsMap.put(index, 0)
-            }
-        }
-
-        // Update the global watermark to the minimum of all watermark nodes.
-        // This is the safest option, because only the global watermark is 
fault-tolerant. Making
-        // it the minimum of all individual watermarks guarantees it will 
never advance past where
-        // any individual watermark operator would be if it were in a plan by 
itself.
-        if(!watermarkMsMap.isEmpty) {
-          val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
-          if (newWatermarkMs > batchWatermarkMs) {
-            logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
-            batchWatermarkMs = newWatermarkMs
-          } else {
-            logDebug(
-              s"Event time didn't move: $newWatermarkMs < " +
-                s"$batchWatermarkMs")
-          }
-        }
-      }
-      offsetSeqMetadata = offsetSeqMetadata.copy(
-        batchWatermarkMs = batchWatermarkMs,
-        batchTimestampMs = triggerClock.getTimeMillis()) // Current batch 
timestamp in milliseconds
-
-      updateStatusMessage("Writing offsets to log")
-      reportTimeTaken("walCommit") {
-        assert(offsetLog.add(
-          currentBatchId,
-          availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
-          s"Concurrent update to the log. Multiple streaming jobs detected for 
$currentBatchId")
-        logInfo(s"Committed offsets for batch $currentBatchId. " +
-          s"Metadata ${offsetSeqMetadata.toString}")
-
-        // NOTE: The following code is correct because runBatches() processes 
exactly one
-        // batch at a time. If we add pipeline parallelism (multiple batches 
in flight at
-        // the same time), this cleanup logic will need to change.
-
-        // Now that we've updated the scheduler's persistent checkpoint, it is 
safe for the
-        // sources to discard data from the previous batch.
-        if (currentBatchId != 0) {
-          val prevBatchOff = offsetLog.get(currentBatchId - 1)
-          if (prevBatchOff.isDefined) {
-            prevBatchOff.get.toStreamProgress(sources).foreach {
-              case (src, off) => src.commit(off)
-            }
-          } else {
-            throw new IllegalStateException(s"batch $currentBatchId doesn't 
exist")
-          }
-        }
-
-        // It is now safe to discard the metadata beyond the minimum number to 
retain.
-        // Note that purge is exclusive, i.e. it purges everything before the 
target ID.
-        if (minBatchesToRetain < currentBatchId) {
-          offsetLog.purge(currentBatchId - minBatchesToRetain)
-          batchCommitLog.purge(currentBatchId - minBatchesToRetain)
-        }
-      }
-    } else {
-      awaitBatchLock.lock()
-      try {
-        // Wake up any threads that are waiting for the stream to progress.
-        awaitBatchLockCondition.signalAll()
-      } finally {
-        awaitBatchLock.unlock()
-      }
-    }
-  }
-
-  /**
-   * Processes any data available between `availableOffsets` and 
`committedOffsets`.
-   * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch 
with.
-   */
-  private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
-    // Request unprocessed data from all sources.
-    newData = reportTimeTaken("getBatch") {
-      availableOffsets.flatMap {
-        case (source, available)
-          if committedOffsets.get(source).map(_ != available).getOrElse(true) 
=>
-          val current = committedOffsets.get(source)
-          val batch = source.getBatch(current, available)
-          assert(batch.isStreaming,
-            s"DataFrame returned by getBatch from $source did not have 
isStreaming=true\n" +
-              s"${batch.queryExecution.logical}")
-          logDebug(s"Retrieving data from $source: $current -> $available")
-          Some(source -> batch)
-        case _ => None
-      }
-    }
-
-    // A list of attributes that will need to be updated.
-    val replacements = new ArrayBuffer[(Attribute, Attribute)]
-    // Replace sources in the logical plan with data that has arrived since 
the last batch.
-    val withNewSources = logicalPlan transform {
-      case StreamingExecutionRelation(source, output) =>
-        newData.get(source).map { data =>
-          val newPlan = data.logicalPlan
-          assert(output.size == newPlan.output.size,
-            s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
-            s"${Utils.truncatedString(newPlan.output, ",")}")
-          replacements ++= output.zip(newPlan.output)
-          newPlan
-        }.getOrElse {
-          LocalRelation(output, isStreaming = true)
-        }
-    }
-
-    // Rewire the plan to use the new attributes that were returned by the 
source.
-    val replacementMap = AttributeMap(replacements)
-    val triggerLogicalPlan = withNewSources transformAllExpressions {
-      case a: Attribute if replacementMap.contains(a) =>
-        replacementMap(a).withMetadata(a.metadata)
-      case ct: CurrentTimestamp =>
-        CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
-          ct.dataType)
-      case cd: CurrentDate =>
-        CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
-          cd.dataType, cd.timeZoneId)
-    }
-
-    reportTimeTaken("queryPlanning") {
-      lastExecution = new IncrementalExecution(
-        sparkSessionToRunBatch,
-        triggerLogicalPlan,
-        outputMode,
-        checkpointFile("state"),
-        runId,
-        currentBatchId,
-        offsetSeqMetadata)
-      lastExecution.executedPlan // Force the lazy generation of execution plan
-    }
-
-    val nextBatch =
-      new Dataset(sparkSessionToRunBatch, lastExecution, 
RowEncoder(lastExecution.analyzed.schema))
-
-    reportTimeTaken("addBatch") {
-      SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
-        sink.addBatch(currentBatchId, nextBatch)
-      }
-    }
-
-    awaitBatchLock.lock()
-    try {
-      // Wake up any threads that are waiting for the stream to progress.
-      awaitBatchLockCondition.signalAll()
-    } finally {
-      awaitBatchLock.unlock()
-    }
-  }
-
   override protected def postEvent(event: StreamingQueryListener.Event): Unit 
= {
     sparkSession.streams.postListenerEvent(event)
   }
@@ -762,10 +409,10 @@ class StreamExecution(
     // Set the state to TERMINATED so that the batching thread knows that it 
was interrupted
     // intentionally
     state.set(TERMINATED)
-    if (microBatchThread.isAlive) {
+    if (queryExecutionThread.isAlive) {
       sparkSession.sparkContext.cancelJobGroup(runId.toString)
-      microBatchThread.interrupt()
-      microBatchThread.join()
+      queryExecutionThread.interrupt()
+      queryExecutionThread.join()
       // microBatchThread may spawn new jobs, so we need to cancel again to 
prevent a leak
       sparkSession.sparkContext.cancelJobGroup(runId.toString)
     }
@@ -784,21 +431,21 @@ class StreamExecution(
     }
 
     while (notDone) {
-      awaitBatchLock.lock()
+      awaitProgressLock.lock()
       try {
-        awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS)
+        awaitProgressLockCondition.await(100, TimeUnit.MILLISECONDS)
         if (streamDeathCause != null) {
           throw streamDeathCause
         }
       } finally {
-        awaitBatchLock.unlock()
+        awaitProgressLock.unlock()
       }
     }
     logDebug(s"Unblocked at $newOffset for $source")
   }
 
   /** A flag to indicate that a batch has completed with no new data 
available. */
-  @volatile private var noNewData = false
+  @volatile protected var noNewData = false
 
   /**
    * Assert that the await APIs should not be called in the stream thread. 
Otherwise, it may cause
@@ -806,7 +453,7 @@ class StreamExecution(
    * the stream thread forever.
    */
   private def assertAwaitThread(): Unit = {
-    if (microBatchThread eq Thread.currentThread) {
+    if (queryExecutionThread eq Thread.currentThread) {
       throw new IllegalStateException(
         "Cannot wait for a query state from the same thread that is running 
the query")
     }
@@ -833,11 +480,11 @@ class StreamExecution(
       throw streamDeathCause
     }
     if (!isActive) return
-    awaitBatchLock.lock()
+    awaitProgressLock.lock()
     try {
       noNewData = false
       while (true) {
-        awaitBatchLockCondition.await(10000, TimeUnit.MILLISECONDS)
+        awaitProgressLockCondition.await(10000, TimeUnit.MILLISECONDS)
         if (streamDeathCause != null) {
           throw streamDeathCause
         }
@@ -846,7 +493,7 @@ class StreamExecution(
         }
       }
     } finally {
-      awaitBatchLock.unlock()
+      awaitProgressLock.unlock()
     }
   }
 
@@ -900,7 +547,7 @@ class StreamExecution(
           |Current Available Offsets: $availableOffsets
           |
           |Current State: $state
-          |Thread State: ${microBatchThread.getState}""".stripMargin
+          |Thread State: ${queryExecutionThread.getState}""".stripMargin
     if (includeLogicalPlan) {
       debugString + s"\n\nLogical Plan:\n$logicalPlan"
     } else {
@@ -908,7 +555,7 @@ class StreamExecution(
     }
   }
 
-  private def getBatchDescriptionString: String = {
+  protected def getBatchDescriptionString: String = {
     val batchDescription = if (currentBatchId < 0) "init" else 
currentBatchId.toString
     Option(name).map(_ + "<br/>").getOrElse("") +
       s"id = $id<br/>runId = $runId<br/>batch = $batchDescription"
@@ -920,7 +567,7 @@ object StreamExecution {
 }
 
 /**
- * A special thread to run the stream query. Some codes require to run in the 
StreamExecutionThread
- * and will use `classOf[StreamExecutionThread]` to check.
+ * A special thread to run the stream query. Some codes require to run in the 
QueryExecutionThread
+ * and will use `classOf[QueryxecutionThread]` to check.
  */
-abstract class StreamExecutionThread(name: String) extends 
UninterruptibleThread(name)
+abstract class QueryExecutionThread(name: String) extends 
UninterruptibleThread(name)

http://git-wip-us.apache.org/repos/asf/spark/blob/59daf91b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 48b0ea2..555d6e2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
           "is not supported in streaming DataFrames/Datasets and will be 
disabled.")
     }
 
-    new StreamingQueryWrapper(new StreamExecution(
+    new StreamingQueryWrapper(new MicroBatchExecution(
       sparkSession,
       userSpecifiedName.orNull,
       checkpointLocation,

http://git-wip-us.apache.org/repos/asf/spark/blob/59daf91b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 47bc452..d6bef9c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -260,8 +260,8 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
       CheckLastBatch((10, 5)),
       StopStream,
       AssertOnQuery { q => // purge commit and clear the sink
-        val commit = q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) + 1L
-        q.batchCommitLog.purge(commit)
+        val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L) + 1L
+        q.commitLog.purge(commit)
         q.sink.asInstanceOf[MemorySink].clear()
         true
       },

http://git-wip-us.apache.org/repos/asf/spark/blob/59daf91b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 7a2d9e3..c5b57bc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1024,7 +1024,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         expectedCompactInterval: Int): Boolean = {
       import CompactibleFileStreamLog._
 
-      val fileSource = (execution invokePrivate 
_sources()).head.asInstanceOf[FileStreamSource]
+      val fileSource = getSourcesFromStreamingQuery(execution).head
       val metadataLog = fileSource invokePrivate _metadataLog()
 
       if (isCompactionBatch(batchId, expectedCompactInterval)) {
@@ -1100,8 +1100,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
           CheckAnswer("keep1", "keep2", "keep3"),
           AssertOnQuery("check getBatch") { execution: StreamExecution =>
             val _sources = PrivateMethod[Seq[Source]]('sources)
-            val fileSource =
-              (execution invokePrivate 
_sources()).head.asInstanceOf[FileStreamSource]
+            val fileSource = getSourcesFromStreamingQuery(execution).head
 
             def verify(startId: Option[Int], endId: Int, expected: String*): 
Unit = {
               val start = startId.map(new FileStreamSourceOffset(_))

http://git-wip-us.apache.org/repos/asf/spark/blob/59daf91b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 8163a1f..9e696b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -276,7 +276,7 @@ class StreamSuite extends StreamTest {
 
     // Check the latest batchid in the commit log
     def CheckCommitLogLatestBatchId(expectedId: Int): AssertOnQuery =
-      AssertOnQuery(_.batchCommitLog.getLatest().get._1 == expectedId,
+      AssertOnQuery(_.commitLog.getLatest().get._1 == expectedId,
         s"commitLog's latest should be $expectedId")
 
     // Ensure that there has not been an incremental execution after restart

http://git-wip-us.apache.org/repos/asf/spark/blob/59daf91b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 7a1ff89..fb88c5d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -300,12 +300,14 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
       if (currentStream != null) currentStream.committedOffsets.toString else 
"not started"
 
     def threadState =
-      if (currentStream != null && currentStream.microBatchThread.isAlive) 
"alive" else "dead"
-    def threadStackTrace = if (currentStream != null && 
currentStream.microBatchThread.isAlive) {
-      s"Thread stack trace: 
${currentStream.microBatchThread.getStackTrace.mkString("\n")}"
-    } else {
-      ""
-    }
+      if (currentStream != null && currentStream.queryExecutionThread.isAlive) 
"alive" else "dead"
+
+    def threadStackTrace =
+      if (currentStream != null && currentStream.queryExecutionThread.isAlive) 
{
+        s"Thread stack trace: 
${currentStream.queryExecutionThread.getStackTrace.mkString("\n")}"
+      } else {
+        ""
+      }
 
     def testState =
       s"""
@@ -460,7 +462,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
             verify(currentStream != null, "can not stop a stream that is not 
running")
             try failAfter(streamingTimeout) {
               currentStream.stop()
-              verify(!currentStream.microBatchThread.isAlive,
+              verify(!currentStream.queryExecutionThread.isAlive,
                 s"microbatch thread not stopped")
               verify(!currentStream.isActive,
                 "query.isActive() is false even after stopping")
@@ -486,7 +488,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
                 currentStream.awaitTermination()
               }
               eventually("microbatch thread not stopped after termination with 
failure") {
-                assert(!currentStream.microBatchThread.isAlive)
+                assert(!currentStream.queryExecutionThread.isAlive)
               }
               verify(currentStream.exception === Some(thrownException),
                 s"incorrect exception returned by query.exception()")
@@ -614,7 +616,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
       case e: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
         failTest("Timed out waiting for stream", e)
     } finally {
-      if (currentStream != null && currentStream.microBatchThread.isAlive) {
+      if (currentStream != null && currentStream.queryExecutionThread.isAlive) 
{
         currentStream.stop()
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/59daf91b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index fa03135..38aa517 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -300,7 +300,7 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
       StopStream,
       AssertOnQuery { q => // clear the sink
         q.sink.asInstanceOf[MemorySink].clear()
-        q.batchCommitLog.purge(3)
+        q.commitLog.purge(3)
         // advance by a minute i.e., 90 seconds total
         clock.advance(60 * 1000L)
         true
@@ -352,7 +352,7 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest
       StopStream,
       AssertOnQuery { q => // clear the sink
         q.sink.asInstanceOf[MemorySink].clear()
-        q.batchCommitLog.purge(3)
+        q.commitLog.purge(3)
         // advance by 60 days i.e., 90 days total
         clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
         true

http://git-wip-us.apache.org/repos/asf/spark/blob/59daf91b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index f813b77..ad4d3ab 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -173,12 +173,12 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
       StopStream, // clears out StreamTest state
       AssertOnQuery { q =>
         // both commit log and offset log contain the same (latest) batch id
-        q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) ==
+        q.commitLog.getLatest().map(_._1).getOrElse(-1L) ==
           q.offsetLog.getLatest().map(_._1).getOrElse(-2L)
       },
       AssertOnQuery { q =>
         // blow away commit log and sink result
-        q.batchCommitLog.purge(1)
+        q.commitLog.purge(1)
         q.sink.asInstanceOf[MemorySink].clear()
         true
       },


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to