Repository: spark
Updated Branches:
  refs/heads/master af4dc5028 -> 47b5b6852


[SPARK-24157][SS] Enabled no-data batches in MicroBatchExecution for streaming 
aggregation and deduplication.

## What changes were proposed in this pull request?

This PR enables the MicroBatchExecution to run no-data batches if some 
SparkPlan requires running another batch to output results based on updated 
watermark / processing time. In this PR, I have enabled streaming aggregations 
and streaming deduplicates to automatically run addition batch even if new data 
is available. See https://issues.apache.org/jira/browse/SPARK-24156 for more 
context.

Major changes/refactoring done in this PR.
- Refactoring MicroBatchExecution - A major point of confusion in 
MicroBatchExecution control flow was always (at least to me) was that 
`populateStartOffsets` internally called `constructNextBatch` which was not 
obvious from just the name "populateStartOffsets" and made the control flow 
from the main trigger execution loop very confusing (main loop in 
`runActivatedStream` called `constructNextBatch` but only if 
`populateStartOffsets` hadn't already called it). Instead, the refactoring 
makes it cleaner.
    - `populateStartOffsets` only the updates `availableOffsets` and 
`committedOffsets`. Does not call `constructNextBatch`.
    - Main loop in `runActivatedStream` calls `constructNextBatch` which 
returns true or false reflecting whether the next batch is ready for executing. 
This method is now idempotent; if a batch has already been constructed, then it 
will always return true until the batch has been executed.
    - If next batch is ready then we call `runBatch` or sleep.
    - That's it.

- Refactoring watermark management logic - This has been refactored out from 
`MicroBatchExecution` in a separate class to simplify `MicroBatchExecution`.

- New method `shouldRunAnotherBatch` in `IncrementalExecution` - This returns 
true if there is any stateful operation in the last execution plan that 
requires another batch for state cleanup, etc. This is used to decide whether 
to construct a batch or not in `constructNextBatch`.

- Changes to stream testing framework - Many tests used CheckLastBatch to 
validate answers. This assumed that there will be no more batches after the 
last set of input has been processed, so the last batch is the one that has 
output corresponding to the last input. This is not true anymore. To account 
for that, I made two changes.
    - `CheckNewAnswer` is a new test action that verifies the new rows 
generated since the last time the answer was checked by `CheckAnswer`, 
`CheckNewAnswer` or `CheckLastBatch`. This is agnostic to how many batches 
occurred between the last check and now. To do make this easier, I added a 
common trait between MemorySink and MemorySinkV2 to abstract out some common 
methods.
    - `assertNumStateRows` has been updated in the same way to be agnostic to 
batches while checking what the total rows and how many state rows were updated 
(sums up updates since the last check).

## How was this patch tested?
- Changes made to existing tests - Tests have been changed in one of the 
following patterns.
    - Tests where the last input was given again to force another batch to be 
executed and state cleaned up / output generated, they were simplified by 
removing the extra input.
    - Tests using aggregation+watermark where CheckLastBatch were replaced with 
CheckNewAnswer to make them batch agnostic.
- New tests added to check whether the flag works for streaming aggregation and 
deduplication

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #21220 from tdas/SPARK-24157.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47b5b685
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47b5b685
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47b5b685

Branch: refs/heads/master
Commit: 47b5b68528c154d32b3f40f388918836d29462b8
Parents: af4dc50
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Fri May 4 16:35:24 2018 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri May 4 16:35:24 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  11 +
 .../streaming/IncrementalExecution.scala        |  10 +
 .../streaming/MicroBatchExecution.scala         | 231 +++++++--------
 .../execution/streaming/WatermarkTracker.scala  |  73 +++++
 .../spark/sql/execution/streaming/memory.scala  |  17 +-
 .../execution/streaming/sources/memoryV2.scala  |   8 +-
 .../execution/streaming/statefulOperators.scala |  16 +
 .../streaming/sources/ForeachWriterSuite.scala  |   8 +-
 .../spark/sql/streaming/DeduplicateSuite.scala  | 285 ------------------
 .../sql/streaming/EventTimeWatermarkSuite.scala | 112 +++----
 .../sql/streaming/FileStreamSinkSuite.scala     |   7 +-
 .../sql/streaming/StateStoreMetricsTest.scala   |  52 +++-
 .../apache/spark/sql/streaming/StreamTest.scala |  56 +++-
 .../streaming/StreamingDeduplicationSuite.scala | 295 +++++++++++++++++++
 .../sql/streaming/StreamingJoinSuite.scala      |  18 +-
 .../sql/streaming/StreamingQuerySuite.scala     |   2 +-
 16 files changed, 693 insertions(+), 508 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3942240..895e150 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -919,6 +919,14 @@ object SQLConf {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefault(10000L)
 
+  val STREAMING_NO_DATA_MICRO_BATCHES_ENABLED =
+    buildConf("spark.sql.streaming.noDataMicroBatchesEnabled")
+      .doc(
+        "Whether streaming micro-batch engine will execute batches without 
data " +
+          "for eager state management for stateful streaming queries.")
+      .booleanConf
+      .createWithDefault(true)
+
   val STREAMING_METRICS_ENABLED =
     buildConf("spark.sql.streaming.metricsEnabled")
       .doc("Whether Dropwizard/Codahale metrics will be reported for active 
streaming queries.")
@@ -1313,6 +1321,9 @@ class SQLConf extends Serializable with Logging {
   def streamingNoDataProgressEventInterval: Long =
     getConf(STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL)
 
+  def streamingNoDataMicroBatchesEnabled: Boolean =
+    getConf(STREAMING_NO_DATA_MICRO_BATCHES_ENABLED)
+
   def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)
 
   def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION)

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 1a83c88..c480b96 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -143,4 +143,14 @@ class IncrementalExecution(
 
   /** No need assert supported, as this check has already been done */
   override def assertSupported(): Unit = { }
+
+  /**
+   * Should the MicroBatchExecution run another batch based on this execution 
and the current
+   * updated metadata.
+   */
+  def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = {
+    executedPlan.collect {
+      case p: StateStoreWriter => p.shouldRunAnotherBatch(newMetadata)
+    }.exists(_ == true)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 6e23197..6709e70 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -61,6 +61,8 @@ class MicroBatchExecution(
     case _ => throw new IllegalStateException(s"Unknown type of trigger: 
$trigger")
   }
 
+  private val watermarkTracker = new WatermarkTracker()
+
   override lazy val logicalPlan: LogicalPlan = {
     assert(queryExecutionThread eq Thread.currentThread,
       "logicalPlan must be initialized in QueryExecutionThread " +
@@ -128,40 +130,55 @@ class MicroBatchExecution(
    * Repeatedly attempts to run batches as data arrives.
    */
   protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit 
= {
-    triggerExecutor.execute(() => {
-      startTrigger()
 
+    val noDataBatchesEnabled =
+      
sparkSessionForStream.sessionState.conf.streamingNoDataMicroBatchesEnabled
+
+    triggerExecutor.execute(() => {
       if (isActive) {
+        var currentBatchIsRunnable = false // Whether the current batch is 
runnable / has been run
+        var currentBatchHasNewData = false // Whether the current batch had 
new data
+
+        startTrigger()
+
         reportTimeTaken("triggerExecution") {
+          // We'll do this initialization only once every start / restart
           if (currentBatchId < 0) {
-            // We'll do this initialization only once
             populateStartOffsets(sparkSessionForStream)
-            
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
-            logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
-          } else {
-            constructNextBatch()
+            logInfo(s"Stream started from $committedOffsets")
           }
-          if (dataAvailable) {
-            currentStatus = currentStatus.copy(isDataAvailable = true)
-            updateStatusMessage("Processing new data")
+
+          // Set this before calling constructNextBatch() so any Spark jobs 
executed by sources
+          // while getting new data have the correct description
+          
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
+
+          // Try to construct the next batch. This will return true only if 
the next batch is
+          // ready and runnable. Note that the current batch may be runnable 
even without
+          // new data to process as `constructNextBatch` may decide to run a 
batch for
+          // state cleanup, etc. `isNewDataAvailable` will be updated to 
reflect whether new data
+          // is available or not.
+          currentBatchIsRunnable = constructNextBatch(noDataBatchesEnabled)
+
+          // Remember whether the current batch has data or not. This will be 
required later
+          // for bookkeeping after running the batch, when 
`isNewDataAvailable` will have changed
+          // to false as the batch would have already processed the available 
data.
+          currentBatchHasNewData = isNewDataAvailable
+
+          currentStatus = currentStatus.copy(isDataAvailable = 
isNewDataAvailable)
+          if (currentBatchIsRunnable) {
+            if (currentBatchHasNewData) updateStatusMessage("Processing new 
data")
+            else updateStatusMessage("No new data but cleaning up state")
             runBatch(sparkSessionForStream)
+          } else {
+            updateStatusMessage("Waiting for data to arrive")
           }
         }
-        // Report trigger as finished and construct progress object.
-        finishTrigger(dataAvailable)
-        if (dataAvailable) {
-          // Update committed offsets.
-          commitLog.add(currentBatchId)
-          committedOffsets ++= availableOffsets
-          logDebug(s"batch ${currentBatchId} committed")
-          // We'll increase currentBatchId after we complete processing 
current batch's data
-          currentBatchId += 1
-          
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
-        } else {
-          currentStatus = currentStatus.copy(isDataAvailable = false)
-          updateStatusMessage("Waiting for data to arrive")
-          Thread.sleep(pollingDelayMs)
-        }
+
+        finishTrigger(currentBatchHasNewData)  // Must be outside 
reportTimeTaken so it is recorded
+
+        // If the current batch has been executed, then increment the batch 
id, else there was
+        // no data to execute the batch
+        if (currentBatchIsRunnable) currentBatchId += 1 else 
Thread.sleep(pollingDelayMs)
       }
       updateStatusMessage("Waiting for next trigger")
       isActive
@@ -211,6 +228,7 @@ class MicroBatchExecution(
           OffsetSeqMetadata.setSessionConf(metadata, 
sparkSessionToRunBatches.conf)
           offsetSeqMetadata = OffsetSeqMetadata(
             metadata.batchWatermarkMs, metadata.batchTimestampMs, 
sparkSessionToRunBatches.conf)
+          watermarkTracker.setWatermark(metadata.batchWatermarkMs)
         }
 
         /* identify the current batch id: if commit log indicates we 
successfully processed the
@@ -235,7 +253,6 @@ class MicroBatchExecution(
               currentBatchId = latestCommittedBatchId + 1
               committedOffsets ++= availableOffsets
               // Construct a new batch be recomputing availableOffsets
-              constructNextBatch()
             } else if (latestCommittedBatchId < latestBatchId - 1) {
               logWarning(s"Batch completion log latest batch id is " +
                 s"${latestCommittedBatchId}, which is not trailing " +
@@ -243,19 +260,18 @@ class MicroBatchExecution(
             }
           case None => logInfo("no commit log present")
         }
-        logDebug(s"Resuming at batch $currentBatchId with committed offsets " +
+        logInfo(s"Resuming at batch $currentBatchId with committed offsets " +
           s"$committedOffsets and available offsets $availableOffsets")
       case None => // We are starting this stream for the first time.
         logInfo(s"Starting new streaming query.")
         currentBatchId = 0
-        constructNextBatch()
     }
   }
 
   /**
    * Returns true if there is any new data available to be processed.
    */
-  private def dataAvailable: Boolean = {
+  private def isNewDataAvailable: Boolean = {
     availableOffsets.exists {
       case (source, available) =>
         committedOffsets
@@ -266,93 +282,63 @@ class MicroBatchExecution(
   }
 
   /**
-   * Queries all of the sources to see if any new data is available. When 
there is new data the
-   * batchId counter is incremented and a new log entry is written with the 
newest offsets.
+   * Attempts to construct a batch according to:
+   *  - Availability of new data
+   *  - Need for timeouts and state cleanups in stateful operators
+   *
+   * Returns true only if the next batch should be executed.
+   *
+   * Here is the high-level logic on how this constructs the next batch.
+   * - Check each source whether new data is available
+   * - Updated the query's metadata and check using the last execution whether 
there is any need
+   *   to run another batch (for state clean up, etc.)
+   * - If either of the above is true, then construct the next batch by 
committing to the offset
+   *   log that range of offsets that the next batch will process.
    */
-  private def constructNextBatch(): Unit = {
-    // Check to see what new data is available.
-    val hasNewData = {
-      awaitProgressLock.lock()
-      try {
-        // Generate a map from each unique source to the next available offset.
-        val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = 
uniqueSources.map {
-          case s: Source =>
-            updateStatusMessage(s"Getting offsets from $s")
-            reportTimeTaken("getOffset") {
-              (s, s.getOffset)
-            }
-          case s: MicroBatchReader =>
-            updateStatusMessage(s"Getting offsets from $s")
-            reportTimeTaken("setOffsetRange") {
-              // Once v1 streaming source execution is gone, we can refactor 
this away.
-              // For now, we set the range here to get the source to infer the 
available end offset,
-              // get that offset, and then set the range again when we later 
execute.
-              s.setOffsetRange(
-                toJava(availableOffsets.get(s).map(off => 
s.deserializeOffset(off.json))),
-                Optional.empty())
-            }
-
-            val currentOffset = reportTimeTaken("getEndOffset") { 
s.getEndOffset() }
-            (s, Option(currentOffset))
-        }.toMap
-        availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty 
}.mapValues(_.get)
-
-        if (dataAvailable) {
-          true
-        } else {
-          noNewData = true
-          false
+  private def constructNextBatch(noDataBatchesEnables: Boolean): Boolean = 
withProgressLocked {
+    // If new data is already available that means this method has already 
been called before
+    // and it must have already committed the offset range of next batch to 
the offset log.
+    // Hence do nothing, just return true.
+    if (isNewDataAvailable) return true
+
+    // Generate a map from each unique source to the next available offset.
+    val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = 
uniqueSources.map {
+      case s: Source =>
+        updateStatusMessage(s"Getting offsets from $s")
+        reportTimeTaken("getOffset") {
+          (s, s.getOffset)
         }
-      } finally {
-        awaitProgressLock.unlock()
-      }
-    }
-    if (hasNewData) {
-      var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
-      // Update the eventTime watermarks if we find any in the plan.
-      if (lastExecution != null) {
-        lastExecution.executedPlan.collect {
-          case e: EventTimeWatermarkExec => e
-        }.zipWithIndex.foreach {
-          case (e, index) if e.eventTimeStats.value.count > 0 =>
-            logDebug(s"Observed event time stats $index: 
${e.eventTimeStats.value}")
-            val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
-            val prevWatermarkMs = watermarkMsMap.get(index)
-            if (prevWatermarkMs.isEmpty || newWatermarkMs > 
prevWatermarkMs.get) {
-              watermarkMsMap.put(index, newWatermarkMs)
-            }
-
-          // Populate 0 if we haven't seen any data yet for this watermark 
node.
-          case (_, index) =>
-            if (!watermarkMsMap.isDefinedAt(index)) {
-              watermarkMsMap.put(index, 0)
-            }
+      case s: MicroBatchReader =>
+        updateStatusMessage(s"Getting offsets from $s")
+        reportTimeTaken("setOffsetRange") {
+          // Once v1 streaming source execution is gone, we can refactor this 
away.
+          // For now, we set the range here to get the source to infer the 
available end offset,
+          // get that offset, and then set the range again when we later 
execute.
+          s.setOffsetRange(
+            toJava(availableOffsets.get(s).map(off => 
s.deserializeOffset(off.json))),
+            Optional.empty())
         }
 
-        // Update the global watermark to the minimum of all watermark nodes.
-        // This is the safest option, because only the global watermark is 
fault-tolerant. Making
-        // it the minimum of all individual watermarks guarantees it will 
never advance past where
-        // any individual watermark operator would be if it were in a plan by 
itself.
-        if(!watermarkMsMap.isEmpty) {
-          val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
-          if (newWatermarkMs > batchWatermarkMs) {
-            logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
-            batchWatermarkMs = newWatermarkMs
-          } else {
-            logDebug(
-              s"Event time didn't move: $newWatermarkMs < " +
-                s"$batchWatermarkMs")
-          }
-        }
-      }
-      offsetSeqMetadata = offsetSeqMetadata.copy(
-        batchWatermarkMs = batchWatermarkMs,
-        batchTimestampMs = triggerClock.getTimeMillis()) // Current batch 
timestamp in milliseconds
+        val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() 
}
+        (s, Option(currentOffset))
+    }.toMap
+    availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty 
}.mapValues(_.get)
+
+    // Update the query metadata
+    offsetSeqMetadata = offsetSeqMetadata.copy(
+      batchWatermarkMs = watermarkTracker.currentWatermark,
+      batchTimestampMs = triggerClock.getTimeMillis())
+
+    // Check whether next batch should be constructed
+    val lastExecutionRequiresAnotherBatch = noDataBatchesEnables &&
+      Option(lastExecution).exists(_.shouldRunAnotherBatch(offsetSeqMetadata))
+    val shouldConstructNextBatch = isNewDataAvailable || 
lastExecutionRequiresAnotherBatch
 
+    if (shouldConstructNextBatch) {
+      // Commit the next batch offset range to the offset log
       updateStatusMessage("Writing offsets to log")
       reportTimeTaken("walCommit") {
-        assert(offsetLog.add(
-          currentBatchId,
+        assert(offsetLog.add(currentBatchId,
           availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
           s"Concurrent update to the log. Multiple streaming jobs detected for 
$currentBatchId")
         logInfo(s"Committed offsets for batch $currentBatchId. " +
@@ -373,7 +359,7 @@ class MicroBatchExecution(
                 reader.commit(reader.deserializeOffset(off.json))
             }
           } else {
-            throw new IllegalStateException(s"batch $currentBatchId doesn't 
exist")
+            throw new IllegalStateException(s"batch ${currentBatchId - 1} 
doesn't exist")
           }
         }
 
@@ -384,15 +370,12 @@ class MicroBatchExecution(
           commitLog.purge(currentBatchId - minLogEntriesToMaintain)
         }
       }
+      noNewData = false
     } else {
-      awaitProgressLock.lock()
-      try {
-        // Wake up any threads that are waiting for the stream to progress.
-        awaitProgressLockCondition.signalAll()
-      } finally {
-        awaitProgressLock.unlock()
-      }
+      noNewData = true
+      awaitProgressLockCondition.signalAll()
     }
+    shouldConstructNextBatch
   }
 
   /**
@@ -400,6 +383,8 @@ class MicroBatchExecution(
    * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch 
with.
    */
   private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
+    logDebug(s"Running batch $currentBatchId")
+
     // Request unprocessed data from all sources.
     newData = reportTimeTaken("getBatch") {
       availableOffsets.flatMap {
@@ -513,17 +498,17 @@ class MicroBatchExecution(
       }
     }
 
-    awaitProgressLock.lock()
-    try {
-      // Wake up any threads that are waiting for the stream to progress.
+    withProgressLocked {
+      commitLog.add(currentBatchId)
+      committedOffsets ++= availableOffsets
       awaitProgressLockCondition.signalAll()
-    } finally {
-      awaitProgressLock.unlock()
     }
+    watermarkTracker.updateWatermark(lastExecution.executedPlan)
+    logDebug(s"Completed batch ${currentBatchId}")
   }
 
   /** Execute a function while locking the stream from making an progress */
-  private[sql] def withProgressLocked(f: => Unit): Unit = {
+  private[sql] def withProgressLocked[T](f: => T): T = {
     awaitProgressLock.lock()
     try {
       f

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
new file mode 100644
index 0000000..8086566
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.SparkPlan
+
+class WatermarkTracker extends Logging {
+  private val operatorToWatermarkMap = mutable.HashMap[Int, Long]()
+  private var watermarkMs: Long = 0
+  private var updated = false
+
+  def setWatermark(newWatermarkMs: Long): Unit = synchronized {
+    watermarkMs = newWatermarkMs
+  }
+
+  def updateWatermark(executedPlan: SparkPlan): Unit = synchronized {
+    val watermarkOperators = executedPlan.collect {
+      case e: EventTimeWatermarkExec => e
+    }
+    if (watermarkOperators.isEmpty) return
+
+
+    watermarkOperators.zipWithIndex.foreach {
+      case (e, index) if e.eventTimeStats.value.count > 0 =>
+        logDebug(s"Observed event time stats $index: 
${e.eventTimeStats.value}")
+        val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
+        val prevWatermarkMs = operatorToWatermarkMap.get(index)
+        if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
+          operatorToWatermarkMap.put(index, newWatermarkMs)
+        }
+
+      // Populate 0 if we haven't seen any data yet for this watermark node.
+      case (_, index) =>
+        if (!operatorToWatermarkMap.isDefinedAt(index)) {
+          operatorToWatermarkMap.put(index, 0)
+        }
+    }
+
+    // Update the global watermark to the minimum of all watermark nodes.
+    // This is the safest option, because only the global watermark is 
fault-tolerant. Making
+    // it the minimum of all individual watermarks guarantees it will never 
advance past where
+    // any individual watermark operator would be if it were in a plan by 
itself.
+    val newWatermarkMs = operatorToWatermarkMap.minBy(_._2)._2
+    if (newWatermarkMs > watermarkMs) {
+      logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
+      watermarkMs = newWatermarkMs
+      updated = true
+    } else {
+      logDebug(s"Event time didn't move: $newWatermarkMs < $watermarkMs")
+      updated = false
+    }
+  }
+
+  def currentWatermark: Long = synchronized { watermarkMs }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 628923d..2225827 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -222,11 +222,20 @@ class MemoryStreamDataReaderFactory(records: 
Array[UnsafeRow])
   }
 }
 
+/** A common trait for MemorySinks with methods used for testing */
+trait MemorySinkBase extends BaseStreamingSink {
+  def allData: Seq[Row]
+  def latestBatchData: Seq[Row]
+  def dataSinceBatch(sinceBatchId: Long): Seq[Row]
+  def latestBatchId: Option[Long]
+}
+
 /**
  * A sink that stores the results in memory. This [[Sink]] is primarily 
intended for use in unit
  * tests and does not provide durability.
  */
-class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink 
with Logging {
+class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink
+  with MemorySinkBase with Logging {
 
   private case class AddedData(batchId: Long, data: Array[Row])
 
@@ -236,7 +245,7 @@ class MemorySink(val schema: StructType, outputMode: 
OutputMode) extends Sink wi
 
   /** Returns all rows that are stored in this [[Sink]]. */
   def allData: Seq[Row] = synchronized {
-    batches.map(_.data).flatten
+    batches.flatMap(_.data)
   }
 
   def latestBatchId: Option[Long] = synchronized {
@@ -245,6 +254,10 @@ class MemorySink(val schema: StructType, outputMode: 
OutputMode) extends Sink wi
 
   def latestBatchData: Seq[Row] = synchronized { 
batches.lastOption.toSeq.flatten(_.data) }
 
+  def dataSinceBatch(sinceBatchId: Long): Seq[Row] = synchronized {
+    batches.filter(_.batchId > sinceBatchId).flatMap(_.data)
+  }
+
   def toDebugString: String = synchronized {
     batches.map { case AddedData(batchId, data) =>
       val dataStr = try data.mkString(" ") catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index d871d37..0d6c239 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, 
Complete, Update}
-import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
StreamWriteSupport}
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
@@ -39,7 +39,7 @@ import org.apache.spark.sql.types.StructType
  * A sink that stores the results in memory. This [[Sink]] is primarily 
intended for use in unit
  * tests and does not provide durability.
  */
-class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with Logging {
+class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with 
MemorySinkBase with Logging {
   override def createStreamWriter(
       queryId: String,
       schema: StructType,
@@ -67,6 +67,10 @@ class MemorySinkV2 extends DataSourceV2 with 
StreamWriteSupport with Logging {
     batches.lastOption.toSeq.flatten(_.data)
   }
 
+  def dataSinceBatch(sinceBatchId: Long): Seq[Row] = synchronized {
+    batches.filter(_.batchId > sinceBatchId).flatMap(_.data)
+  }
+
   def toDebugString: String = synchronized {
     batches.map { case AddedData(batchId, data) =>
       val dataStr = try data.mkString(" ") catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index c9354ac..1691a63 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -126,6 +126,12 @@ trait StateStoreWriter extends StatefulOperator { self: 
SparkPlan =>
         name -> SQLMetrics.createTimingMetric(sparkContext, desc)
     }.toMap
   }
+
+  /**
+   * Should the MicroBatchExecution run another batch based on this stateful 
operator and the
+   * current updated metadata.
+   */
+  def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = false
 }
 
 /** An operator that supports watermark. */
@@ -388,6 +394,12 @@ case class StateStoreSaveExec(
       ClusteredDistribution(keyExpressions, stateInfo.map(_.numPartitions)) :: 
Nil
     }
   }
+
+  override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean 
= {
+    (outputMode.contains(Append) || outputMode.contains(Update)) &&
+      eventTimeWatermark.isDefined &&
+      newMetadata.batchWatermarkMs > eventTimeWatermark.get
+  }
 }
 
 /** Physical operator for executing streaming Deduplicate. */
@@ -454,6 +466,10 @@ case class StreamingDeduplicateExec(
   override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean 
= {
+    eventTimeWatermark.isDefined && newMetadata.batchWatermarkMs > 
eventTimeWatermark.get
+  }
 }
 
 object StreamingDeduplicateExec {

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala
index 03bf71b..e60c339 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala
@@ -211,14 +211,12 @@ class ForeachWriterSuite extends StreamTest with 
SharedSQLContext with BeforeAnd
     try {
       inputData.addData(10, 11, 12)
       query.processAllAvailable()
-      inputData.addData(25) // Advance watermark to 15 seconds
-      query.processAllAvailable()
       inputData.addData(25) // Evict items less than previous watermark
       query.processAllAvailable()
 
       // There should be 3 batches and only does the last batch contain a 
value.
       val allEvents = ForeachWriterSuite.allEvents()
-      assert(allEvents.size === 3)
+      assert(allEvents.size === 4)
       val expectedEvents = Seq(
         Seq(
           ForeachWriterSuite.Open(partition = 0, version = 0),
@@ -230,6 +228,10 @@ class ForeachWriterSuite extends StreamTest with 
SharedSQLContext with BeforeAnd
         ),
         Seq(
           ForeachWriterSuite.Open(partition = 0, version = 2),
+          ForeachWriterSuite.Close(None)
+        ),
+        Seq(
+          ForeachWriterSuite.Open(partition = 0, version = 3),
           ForeachWriterSuite.Process(value = 3),
           ForeachWriterSuite.Close(None)
         )

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
deleted file mode 100644
index 0088b64..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
HashPartitioning, SinglePartition}
-import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
-import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamingDeduplicateExec}
-import org.apache.spark.sql.execution.streaming.state.StateStore
-import org.apache.spark.sql.functions._
-
-class DeduplicateSuite extends StateStoreMetricsTest with BeforeAndAfterAll {
-
-  import testImplicits._
-
-  override def afterAll(): Unit = {
-    super.afterAll()
-    StateStore.stop()
-  }
-
-  test("deduplicate with all columns") {
-    val inputData = MemoryStream[String]
-    val result = inputData.toDS().dropDuplicates()
-
-    testStream(result, Append)(
-      AddData(inputData, "a"),
-      CheckLastBatch("a"),
-      assertNumStateRows(total = 1, updated = 1),
-      AddData(inputData, "a"),
-      CheckLastBatch(),
-      assertNumStateRows(total = 1, updated = 0),
-      AddData(inputData, "b"),
-      CheckLastBatch("b"),
-      assertNumStateRows(total = 2, updated = 1)
-    )
-  }
-
-  test("deduplicate with some columns") {
-    val inputData = MemoryStream[(String, Int)]
-    val result = inputData.toDS().dropDuplicates("_1")
-
-    testStream(result, Append)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1),
-      assertNumStateRows(total = 1, updated = 1),
-      AddData(inputData, "a" -> 2), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = 1, updated = 0),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1),
-      assertNumStateRows(total = 2, updated = 1)
-    )
-  }
-
-  test("multiple deduplicates") {
-    val inputData = MemoryStream[(String, Int)]
-    val result = inputData.toDS().dropDuplicates().dropDuplicates("_1")
-
-    testStream(result, Append)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-
-      AddData(inputData, "a" -> 2), // Dropped from the second `dropDuplicates`
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L)),
-
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
-  }
-
-  test("deduplicate with watermark") {
-    val inputData = MemoryStream[Int]
-    val result = inputData.toDS()
-      .withColumn("eventTime", $"value".cast("timestamp"))
-      .withWatermark("eventTime", "10 seconds")
-      .dropDuplicates()
-      .select($"eventTime".cast("long").as[Long])
-
-    testStream(result, Append)(
-      AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
-      CheckLastBatch(10 to 15: _*),
-      assertNumStateRows(total = 6, updated = 6),
-
-      AddData(inputData, 25), // Advance watermark to 15 seconds
-      CheckLastBatch(25),
-      assertNumStateRows(total = 7, updated = 1),
-
-      AddData(inputData, 25), // Drop states less than watermark
-      CheckLastBatch(),
-      assertNumStateRows(total = 1, updated = 0),
-
-      AddData(inputData, 10), // Should not emit anything as data less than 
watermark
-      CheckLastBatch(),
-      assertNumStateRows(total = 1, updated = 0),
-
-      AddData(inputData, 45), // Advance watermark to 35 seconds
-      CheckLastBatch(45),
-      assertNumStateRows(total = 2, updated = 1),
-
-      AddData(inputData, 45), // Drop states less than watermark
-      CheckLastBatch(),
-      assertNumStateRows(total = 1, updated = 0)
-    )
-  }
-
-  test("deduplicate with aggregate - append mode") {
-    val inputData = MemoryStream[Int]
-    val windowedaggregate = inputData.toDS()
-      .withColumn("eventTime", $"value".cast("timestamp"))
-      .withWatermark("eventTime", "10 seconds")
-      .dropDuplicates()
-      .withWatermark("eventTime", "10 seconds")
-      .groupBy(window($"eventTime", "5 seconds") as 'window)
-      .agg(count("*") as 'count)
-      .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
-
-    testStream(windowedaggregate)(
-      AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
-      CheckLastBatch(),
-      // states in aggregate in [10, 14), [15, 20) (2 windows)
-      // states in deduplicate is 10 to 15
-      assertNumStateRows(total = Seq(2L, 6L), updated = Seq(2L, 6L)),
-
-      AddData(inputData, 25), // Advance watermark to 15 seconds
-      CheckLastBatch(),
-      // states in aggregate in [10, 14), [15, 20) and [25, 30) (3 windows)
-      // states in deduplicate is 10 to 15 and 25
-      assertNumStateRows(total = Seq(3L, 7L), updated = Seq(1L, 1L)),
-
-      AddData(inputData, 25), // Emit items less than watermark and drop their 
state
-      CheckLastBatch((10 -> 5)), // 5 items (10 to 14) after deduplicate
-      // states in aggregate in [15, 20) and [25, 30) (2 windows, note 
aggregate uses the end of
-      // window to evict items, so [15, 20) is still in the state store)
-      // states in deduplicate is 25
-      assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L)),
-
-      AddData(inputData, 10), // Should not emit anything as data less than 
watermark
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L)),
-
-      AddData(inputData, 40), // Advance watermark to 30 seconds
-      CheckLastBatch(),
-      // states in aggregate in [15, 20), [25, 30) and [40, 45)
-      // states in deduplicate is 25 and 40,
-      assertNumStateRows(total = Seq(3L, 2L), updated = Seq(1L, 1L)),
-
-      AddData(inputData, 40), // Emit items less than watermark and drop their 
state
-      CheckLastBatch((15 -> 1), (25 -> 1)),
-      // states in aggregate in [40, 45)
-      // states in deduplicate is 40,
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L))
-    )
-  }
-
-  test("deduplicate with aggregate - update mode") {
-    val inputData = MemoryStream[(String, Int)]
-    val result = inputData.toDS()
-      .select($"_1" as "str", $"_2" as "num")
-      .dropDuplicates()
-      .groupBy("str")
-      .agg(sum("num"))
-      .as[(String, Long)]
-
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
-  }
-
-  test("deduplicate with aggregate - complete mode") {
-    val inputData = MemoryStream[(String, Int)]
-    val result = inputData.toDS()
-      .select($"_1" as "str", $"_2" as "num")
-      .dropDuplicates()
-      .groupBy("str")
-      .agg(sum("num"))
-      .as[(String, Long)]
-
-    testStream(result, Complete)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("a" -> 3L, "b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
-  }
-
-  test("deduplicate with file sink") {
-    withTempDir { output =>
-      withTempDir { checkpointDir =>
-        val outputPath = output.getAbsolutePath
-        val inputData = MemoryStream[String]
-        val result = inputData.toDS().dropDuplicates()
-        val q = result.writeStream
-          .format("parquet")
-          .outputMode(Append)
-          .option("checkpointLocation", checkpointDir.getPath)
-          .start(outputPath)
-        try {
-          inputData.addData("a")
-          q.processAllAvailable()
-          checkDataset(spark.read.parquet(outputPath).as[String], "a")
-
-          inputData.addData("a") // Dropped
-          q.processAllAvailable()
-          checkDataset(spark.read.parquet(outputPath).as[String], "a")
-
-          inputData.addData("b")
-          q.processAllAvailable()
-          checkDataset(spark.read.parquet(outputPath).as[String], "a", "b")
-        } finally {
-          q.stop()
-        }
-      }
-    }
-  }
-
-  test("SPARK-19841: watermarkPredicate should filter based on keys") {
-    val input = MemoryStream[(Int, Int)]
-    val df = input.toDS.toDF("time", "id")
-      .withColumn("time", $"time".cast("timestamp"))
-      .withWatermark("time", "1 second")
-      .dropDuplicates("id", "time") // Change the column positions
-      .select($"id")
-    testStream(df)(
-      AddData(input, 1 -> 1, 1 -> 1, 1 -> 2),
-      CheckLastBatch(1, 2),
-      AddData(input, 1 -> 1, 2 -> 3, 2 -> 4),
-      CheckLastBatch(3, 4),
-      AddData(input, 1 -> 0, 1 -> 1, 3 -> 5, 3 -> 6), // Drop (1 -> 0, 1 -> 1) 
due to watermark
-      CheckLastBatch(5, 6),
-      AddData(input, 1 -> 0, 4 -> 7), // Drop (1 -> 0) due to watermark
-      CheckLastBatch(7)
-    )
-  }
-
-  test("SPARK-21546: dropDuplicates should ignore watermark when it's not a 
key") {
-    val input = MemoryStream[(Int, Int)]
-    val df = input.toDS.toDF("id", "time")
-      .withColumn("time", $"time".cast("timestamp"))
-      .withWatermark("time", "1 second")
-      .dropDuplicates("id")
-      .select($"id", $"time".cast("long"))
-    testStream(df)(
-      AddData(input, 1 -> 1, 1 -> 2, 2 -> 2),
-      CheckLastBatch(1 -> 1, 2 -> 2)
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index d6bef9c..7e8fde1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions.{count, window}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.OutputMode._
 
 class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with 
Matchers with Logging {
@@ -137,20 +138,12 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
         assert(e.get("watermark") === formatTimestamp(5))
       },
       AddData(inputData2, 25),
-      CheckAnswer(),
-      assertEventStats { e =>
-        assert(e.get("max") === formatTimestamp(25))
-        assert(e.get("min") === formatTimestamp(25))
-        assert(e.get("avg") === formatTimestamp(25))
-        assert(e.get("watermark") === formatTimestamp(5))
-      },
-      AddData(inputData2, 25),
       CheckAnswer((10, 3)),
       assertEventStats { e =>
         assert(e.get("max") === formatTimestamp(25))
         assert(e.get("min") === formatTimestamp(25))
         assert(e.get("avg") === formatTimestamp(25))
-        assert(e.get("watermark") === formatTimestamp(15))
+        assert(e.get("watermark") === formatTimestamp(5))
       }
     )
   }
@@ -167,15 +160,12 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 
     testStream(windowedAggregation)(
       AddData(inputData, 10, 11, 12, 13, 14, 15),
-      CheckLastBatch(),
+      CheckNewAnswer(),
       AddData(inputData, 25),   // Advance watermark to 15 seconds
-      CheckLastBatch(),
-      assertNumStateRows(3),
-      AddData(inputData, 25),   // Emit items less than watermark and drop 
their state
-      CheckLastBatch((10, 5)),
+      CheckNewAnswer((10, 5)),
       assertNumStateRows(2),
       AddData(inputData, 10),   // Should not emit anything as data less than 
watermark
-      CheckLastBatch(),
+      CheckNewAnswer(),
       assertNumStateRows(2)
     )
   }
@@ -193,15 +183,15 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 
     testStream(windowedAggregation, OutputMode.Update)(
       AddData(inputData, 10, 11, 12, 13, 14, 15),
-      CheckLastBatch((10, 5), (15, 1)),
+      CheckNewAnswer((10, 5), (15, 1)),
       AddData(inputData, 25),     // Advance watermark to 15 seconds
-      CheckLastBatch((25, 1)),
-      assertNumStateRows(3),
+      CheckNewAnswer((25, 1)),
+      assertNumStateRows(2),
       AddData(inputData, 10, 25), // Ignore 10 as its less than watermark
-      CheckLastBatch((25, 2)),
+      CheckNewAnswer((25, 2)),
       assertNumStateRows(2),
       AddData(inputData, 10),     // Should not emit anything as data less 
than watermark
-      CheckLastBatch(),
+      CheckNewAnswer(),
       assertNumStateRows(2)
     )
   }
@@ -251,56 +241,25 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
 
     testStream(df)(
       AddData(inputData, 10, 11, 12, 13, 14, 15),
-      CheckLastBatch(),
+      CheckAnswer(),
       AddData(inputData, 25), // Advance watermark to 15 seconds
-      StopStream,
-      StartStream(),
-      CheckLastBatch(),
-      AddData(inputData, 25), // Evict items less than previous watermark.
-      CheckLastBatch((10, 5)),
+      CheckAnswer((10, 5)),
       StopStream,
       AssertOnQuery { q => // purge commit and clear the sink
-        val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L) + 1L
+        val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L)
         q.commitLog.purge(commit)
         q.sink.asInstanceOf[MemorySink].clear()
         true
       },
       StartStream(),
-      CheckLastBatch((10, 5)), // Recompute last batch and re-evict timestamp 
10
-      AddData(inputData, 30), // Advance watermark to 20 seconds
-      CheckLastBatch(),
+      AddData(inputData, 10, 27, 30), // Advance watermark to 20 seconds, 10 
should be ignored
+      CheckAnswer((15, 1)),
       StopStream,
-      StartStream(), // Watermark should still be 15 seconds
-      AddData(inputData, 17),
-      CheckLastBatch(), // We still do not see next batch
-      AddData(inputData, 30), // Advance watermark to 20 seconds
-      CheckLastBatch(),
-      AddData(inputData, 30), // Evict items less than previous watermark.
-      CheckLastBatch((15, 2)) // Ensure we see next window
-    )
-  }
-
-  test("dropping old data") {
-    val inputData = MemoryStream[Int]
-
-    val windowedAggregation = inputData.toDF()
-        .withColumn("eventTime", $"value".cast("timestamp"))
-        .withWatermark("eventTime", "10 seconds")
-        .groupBy(window($"eventTime", "5 seconds") as 'window)
-        .agg(count("*") as 'count)
-        .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
-
-    testStream(windowedAggregation)(
-      AddData(inputData, 10, 11, 12),
-      CheckAnswer(),
-      AddData(inputData, 25),     // Advance watermark to 15 seconds
-      CheckAnswer(),
-      AddData(inputData, 25),     // Evict items less than previous watermark.
-      CheckAnswer((10, 3)),
-      AddData(inputData, 10),     // 10 is later than 15 second watermark
-      CheckAnswer((10, 3)),
-      AddData(inputData, 25),
-      CheckAnswer((10, 3))        // Should not emit an incorrect partial 
result.
+      StartStream(),
+      AddData(inputData, 17), // Watermark should still be 20 seconds, 17 
should be ignored
+      CheckAnswer((15, 1)),
+      AddData(inputData, 40), // Advance watermark to 30 seconds, emit first 
data 25
+      CheckNewAnswer((25, 2))
     )
   }
 
@@ -421,8 +380,6 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
       AddData(inputData, 10),
       CheckAnswer(),
       AddData(inputData, 25), // Advance watermark to 15 seconds
-      CheckAnswer(),
-      AddData(inputData, 25), // Evict items less than previous watermark.
       CheckAnswer((10, 1))
     )
   }
@@ -501,8 +458,35 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
     }
   }
 
+  test("test no-data flag") {
+    val flagKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key
+
+    def testWithFlag(flag: Boolean): Unit = withClue(s"with $flagKey = $flag") 
{
+      val inputData = MemoryStream[Int]
+      val windowedAggregation = inputData.toDF()
+        .withColumn("eventTime", $"value".cast("timestamp"))
+        .withWatermark("eventTime", "10 seconds")
+        .groupBy(window($"eventTime", "5 seconds") as 'window)
+        .agg(count("*") as 'count)
+        .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+
+      testStream(windowedAggregation)(
+        StartStream(additionalConfs = Map(flagKey -> flag.toString)),
+        AddData(inputData, 10, 11, 12, 13, 14, 15),
+        CheckNewAnswer(),
+        AddData(inputData, 25), // Advance watermark to 15 seconds
+        // Check if there is new answer if flag is set, no new answer otherwise
+        if (flag) CheckNewAnswer((10, 5)) else CheckNewAnswer()
+      )
+    }
+
+    testWithFlag(true)
+    testWithFlag(false)
+  }
+
   private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = 
AssertOnQuery { q =>
-    val progressWithData = q.recentProgress.filter(_.numInputRows > 
0).lastOption.get
+    q.processAllAvailable()
+    val progressWithData = q.recentProgress.lastOption.get
     assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows)
     true
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index cf41d7e..ed53def 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -279,13 +279,10 @@ class FileStreamSinkSuite extends StreamTest {
       check() // nothing emitted yet
 
       addTimestamp(104, 123) // watermark = 90 before this, watermark = 123 - 
10 = 113 after this
-      check() // nothing emitted yet
+      check((100L, 105L) -> 2L)  // no-data-batch emits results on 100-105,
 
       addTimestamp(140) // wm = 113 before this, emit results on 100-105, wm = 
130 after this
-      check((100L, 105L) -> 2L)
-
-      addTimestamp(150) // wm = 130s before this, emit results on 120-125, wm 
= 150 after this
-      check((100L, 105L) -> 2L, (120L, 125L) -> 1L)
+      check((100L, 105L) -> 2L, (120L, 125L) -> 1L)  // no-data-batch emits 
results on 120-125
 
     } finally {
       if (query != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
index 368c460..e45f9d3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala
@@ -17,20 +17,58 @@
 
 package org.apache.spark.sql.streaming
 
+import org.apache.spark.sql.execution.streaming.StreamExecution
+
 trait StateStoreMetricsTest extends StreamTest {
 
+  private var lastCheckedRecentProgressIndex = -1
+  private var lastQuery: StreamExecution = null
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    lastCheckedRecentProgressIndex = -1
+  }
+
   def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery =
     AssertOnQuery(s"Check total state rows = $total, updated state rows = 
$updated") { q =>
-      val progressWithData = q.recentProgress.filter(_.numInputRows > 
0).lastOption.get
-      assert(
-        progressWithData.stateOperators.map(_.numRowsTotal) === total,
-        "incorrect total rows")
-      assert(
-        progressWithData.stateOperators.map(_.numRowsUpdated) === updated,
-        "incorrect updates rows")
+      val recentProgress = q.recentProgress
+      require(recentProgress.nonEmpty, "No progress made, cannot check num 
state rows")
+      require(recentProgress.length < 
spark.sessionState.conf.streamingProgressRetention,
+        "This test assumes that all progresses are present in q.recentProgress 
but " +
+          "some may have been dropped due to retention limits")
+
+      if (q.ne(lastQuery)) lastCheckedRecentProgressIndex = -1
+      lastQuery = q
+
+      val numStateOperators = recentProgress.last.stateOperators.length
+      val progressesSinceLastCheck = recentProgress
+        .slice(lastCheckedRecentProgressIndex + 1, recentProgress.length)
+        .filter(_.stateOperators.length == numStateOperators)
+
+      val allNumUpdatedRowsSinceLastCheck =
+        progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated))
+
+      lazy val debugString = "recent progresses:\n" +
+        progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n")
+
+      val numTotalRows = recentProgress.last.stateOperators.map(_.numRowsTotal)
+      assert(numTotalRows === total, s"incorrect total rows, $debugString")
+
+      val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, 
numStateOperators)
+      assert(numUpdatedRows === updated, s"incorrect updates rows, 
$debugString")
+
+      lastCheckedRecentProgressIndex = recentProgress.length - 1
       true
     }
 
   def assertNumStateRows(total: Long, updated: Long): AssertOnQuery =
     assertNumStateRows(Seq(total), Seq(updated))
+
+  def arraySum(arraySeq: Seq[Array[Long]], arrayLength: Int): Seq[Long] = {
+    if (arraySeq.isEmpty) return Seq.fill(arrayLength)(0L)
+
+    assert(arraySeq.forall(_.length == arrayLength),
+      "Arrays are of different lengths:\n" + 
arraySeq.map(_.toSeq).mkString("\n"))
+    (0 until arrayLength).map { index => arraySeq.map(_.apply(index)).sum }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index af0268f..9d139a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.streaming._
 import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, 
EpochCoordinatorRef, IncrementAndGetEpoch}
 import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2
 import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamingQueryListener._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -192,7 +193,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
   case class CheckAnswerRowsContains(expectedAnswer: Seq[Row], lastOnly: 
Boolean = false)
     extends StreamAction with StreamMustBeRunning {
     override def toString: String = s"$operatorName: 
${expectedAnswer.mkString(",")}"
-    private def operatorName = if (lastOnly) "CheckLastBatch" else 
"CheckAnswer"
+    private def operatorName = if (lastOnly) "CheckLastBatchContains" else 
"CheckAnswerContains"
   }
 
   case class CheckAnswerRowsByFunc(
@@ -202,6 +203,23 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
     private def operatorName = if (lastOnly) "CheckLastBatchByFunc" else 
"CheckAnswerByFunc"
   }
 
+  case class CheckNewAnswerRows(expectedAnswer: Seq[Row])
+    extends StreamAction with StreamMustBeRunning {
+    override def toString: String = s"$operatorName: 
${expectedAnswer.mkString(",")}"
+
+    private def operatorName = "CheckNewAnswer"
+  }
+
+  object CheckNewAnswer {
+    def apply(): CheckNewAnswerRows = CheckNewAnswerRows(Seq.empty)
+
+    def apply[A: Encoder](data: A, moreData: A*): CheckNewAnswerRows = {
+      val encoder = encoderFor[A]
+      val toExternalRow = RowEncoder(encoder.schema).resolveAndBind()
+      CheckNewAnswerRows((data +: moreData).map(d => 
toExternalRow.fromRow(encoder.toRow(d))))
+    }
+  }
+
   /** Stops the stream. It must currently be running. */
   case object StopStream extends StreamAction with StreamMustBeRunning
 
@@ -435,13 +453,24 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
          """.stripMargin)
     }
 
-    def fetchStreamAnswer(currentStream: StreamExecution, lastOnly: Boolean) = 
{
+    var lastFetchedMemorySinkLastBatchId: Long = -1
+
+    def fetchStreamAnswer(
+        currentStream: StreamExecution,
+        lastOnly: Boolean = false,
+        sinceLastFetchOnly: Boolean = false) = {
+      verify(
+        !(lastOnly && sinceLastFetchOnly), "both lastOnly and 
sinceLastFetchOnly cannot be true")
       verify(currentStream != null, "stream not running")
 
       // Block until all data added has been processed for all the source
       awaiting.foreach { case (sourceIndex, offset) =>
         failAfter(streamingTimeout) {
           currentStream.awaitOffset(sourceIndex, offset)
+          // Make sure all processing including no-data-batches have been 
executed
+          if (!currentStream.triggerClock.isInstanceOf[StreamManualClock]) {
+            currentStream.processAllAvailable()
+          }
         }
       }
 
@@ -463,14 +492,21 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
         }
       }
 
-      val (latestBatchData, allData) = sink match {
-        case s: MemorySink => (s.latestBatchData, s.allData)
-        case s: MemorySinkV2 => (s.latestBatchData, s.allData)
-      }
-      try if (lastOnly) latestBatchData else allData catch {
+      val rows = try {
+        if (sinceLastFetchOnly) {
+          if (sink.latestBatchId.getOrElse(-1L) < 
lastFetchedMemorySinkLastBatchId) {
+            failTest("MemorySink was probably cleared since last fetch. Use 
CheckAnswer instead.")
+          }
+          sink.dataSinceBatch(lastFetchedMemorySinkLastBatchId)
+        } else {
+          if (lastOnly) sink.latestBatchData else sink.allData
+        }
+      } catch {
         case e: Exception =>
           failTest("Exception while getting data from sink", e)
       }
+      lastFetchedMemorySinkLastBatchId = sink.latestBatchId.getOrElse(-1L)
+      rows
     }
 
     def executeAction(action: StreamAction): Unit = {
@@ -704,6 +740,12 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
           } catch {
             case e: Throwable => failTest(e.toString)
           }
+
+        case CheckNewAnswerRows(expectedAnswer) =>
+          val sparkAnswer = fetchStreamAnswer(currentStream, 
sinceLastFetchOnly = true)
+          QueryTest.sameRows(expectedAnswer, sparkAnswer).foreach {
+            error => failTest(error)
+          }
       }
       pos += 1
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
new file mode 100644
index 0000000..42ffd47
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
HashPartitioning, SinglePartition}
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamingDeduplicateExec}
+import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+class StreamingDeduplicationSuite extends StateStoreMetricsTest with 
BeforeAndAfterAll {
+
+  import testImplicits._
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    StateStore.stop()
+  }
+
+  test("deduplicate with all columns") {
+    val inputData = MemoryStream[String]
+    val result = inputData.toDS().dropDuplicates()
+
+    testStream(result, Append)(
+      AddData(inputData, "a"),
+      CheckLastBatch("a"),
+      assertNumStateRows(total = 1, updated = 1),
+      AddData(inputData, "a"),
+      CheckLastBatch(),
+      assertNumStateRows(total = 1, updated = 0),
+      AddData(inputData, "b"),
+      CheckLastBatch("b"),
+      assertNumStateRows(total = 2, updated = 1)
+    )
+  }
+
+  test("deduplicate with some columns") {
+    val inputData = MemoryStream[(String, Int)]
+    val result = inputData.toDS().dropDuplicates("_1")
+
+    testStream(result, Append)(
+      AddData(inputData, "a" -> 1),
+      CheckLastBatch("a" -> 1),
+      assertNumStateRows(total = 1, updated = 1),
+      AddData(inputData, "a" -> 2), // Dropped
+      CheckLastBatch(),
+      assertNumStateRows(total = 1, updated = 0),
+      AddData(inputData, "b" -> 1),
+      CheckLastBatch("b" -> 1),
+      assertNumStateRows(total = 2, updated = 1)
+    )
+  }
+
+  test("multiple deduplicates") {
+    val inputData = MemoryStream[(String, Int)]
+    val result = inputData.toDS().dropDuplicates().dropDuplicates("_1")
+
+    testStream(result, Append)(
+      AddData(inputData, "a" -> 1),
+      CheckLastBatch("a" -> 1),
+      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
+
+      AddData(inputData, "a" -> 2), // Dropped from the second `dropDuplicates`
+      CheckLastBatch(),
+      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L)),
+
+      AddData(inputData, "b" -> 1),
+      CheckLastBatch("b" -> 1),
+      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
+    )
+  }
+
+  test("deduplicate with watermark") {
+    val inputData = MemoryStream[Int]
+    val result = inputData.toDS()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "10 seconds")
+      .dropDuplicates()
+      .select($"eventTime".cast("long").as[Long])
+
+    testStream(result, Append)(
+      AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+      CheckAnswer(10 to 15: _*),
+      assertNumStateRows(total = 6, updated = 6),
+
+      AddData(inputData, 25), // Advance watermark to 15 secs, no-data-batch 
drops rows <= 15
+      CheckNewAnswer(25),
+      assertNumStateRows(total = 1, updated = 1),
+
+      AddData(inputData, 10), // Should not emit anything as data less than 
watermark
+      CheckNewAnswer(),
+      assertNumStateRows(total = 1, updated = 0),
+
+      AddData(inputData, 45), // Advance watermark to 35 seconds, 
no-data-batch drops row 25
+      CheckNewAnswer(45),
+      assertNumStateRows(total = 1, updated = 1)
+    )
+  }
+
+  test("deduplicate with aggregate - append mode") {
+    val inputData = MemoryStream[Int]
+    val windowedaggregate = inputData.toDS()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "10 seconds")
+      .dropDuplicates()
+      .withWatermark("eventTime", "10 seconds")
+      .groupBy(window($"eventTime", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+
+    testStream(windowedaggregate)(
+      AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+      CheckLastBatch(),
+      // states in aggregate in [10, 14), [15, 20) (2 windows)
+      // states in deduplicate is 10 to 15
+      assertNumStateRows(total = Seq(2L, 6L), updated = Seq(2L, 6L)),
+
+      AddData(inputData, 25), // Advance watermark to 15 seconds
+      CheckLastBatch((10 -> 5)), // 5 items (10 to 14) after deduplicate, 
emitted with no-data-batch
+      // states in aggregate in [15, 20) and [25, 30); no-data-batch removed 
[10, 14)
+      // states in deduplicate is 25, no-data-batch removed 10 to 14
+      assertNumStateRows(total = Seq(2L, 1L), updated = Seq(1L, 1L)),
+
+      AddData(inputData, 10), // Should not emit anything as data less than 
watermark
+      CheckLastBatch(),
+      assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L)),
+
+      AddData(inputData, 40), // Advance watermark to 30 seconds
+      CheckLastBatch((15 -> 1), (25 -> 1)),
+      // states in aggregate is [40, 45); no-data-batch removed [15, 20) and 
[25, 30)
+      // states in deduplicate is 40; no-data-batch removed 25
+      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L))
+    )
+  }
+
+  test("deduplicate with aggregate - update mode") {
+    val inputData = MemoryStream[(String, Int)]
+    val result = inputData.toDS()
+      .select($"_1" as "str", $"_2" as "num")
+      .dropDuplicates()
+      .groupBy("str")
+      .agg(sum("num"))
+      .as[(String, Long)]
+
+    testStream(result, Update)(
+      AddData(inputData, "a" -> 1),
+      CheckLastBatch("a" -> 1L),
+      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
+      AddData(inputData, "a" -> 1), // Dropped
+      CheckLastBatch(),
+      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
+      AddData(inputData, "a" -> 2),
+      CheckLastBatch("a" -> 3L),
+      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
+      AddData(inputData, "b" -> 1),
+      CheckLastBatch("b" -> 1L),
+      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
+    )
+  }
+
+  test("deduplicate with aggregate - complete mode") {
+    val inputData = MemoryStream[(String, Int)]
+    val result = inputData.toDS()
+      .select($"_1" as "str", $"_2" as "num")
+      .dropDuplicates()
+      .groupBy("str")
+      .agg(sum("num"))
+      .as[(String, Long)]
+
+    testStream(result, Complete)(
+      AddData(inputData, "a" -> 1),
+      CheckLastBatch("a" -> 1L),
+      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
+      AddData(inputData, "a" -> 1), // Dropped
+      CheckLastBatch("a" -> 1L),
+      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
+      AddData(inputData, "a" -> 2),
+      CheckLastBatch("a" -> 3L),
+      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
+      AddData(inputData, "b" -> 1),
+      CheckLastBatch("a" -> 3L, "b" -> 1L),
+      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
+    )
+  }
+
+  test("deduplicate with file sink") {
+    withTempDir { output =>
+      withTempDir { checkpointDir =>
+        val outputPath = output.getAbsolutePath
+        val inputData = MemoryStream[String]
+        val result = inputData.toDS().dropDuplicates()
+        val q = result.writeStream
+          .format("parquet")
+          .outputMode(Append)
+          .option("checkpointLocation", checkpointDir.getPath)
+          .start(outputPath)
+        try {
+          inputData.addData("a")
+          q.processAllAvailable()
+          checkDataset(spark.read.parquet(outputPath).as[String], "a")
+
+          inputData.addData("a") // Dropped
+          q.processAllAvailable()
+          checkDataset(spark.read.parquet(outputPath).as[String], "a")
+
+          inputData.addData("b")
+          q.processAllAvailable()
+          checkDataset(spark.read.parquet(outputPath).as[String], "a", "b")
+        } finally {
+          q.stop()
+        }
+      }
+    }
+  }
+
+  test("SPARK-19841: watermarkPredicate should filter based on keys") {
+    val input = MemoryStream[(Int, Int)]
+    val df = input.toDS.toDF("time", "id")
+      .withColumn("time", $"time".cast("timestamp"))
+      .withWatermark("time", "1 second")
+      .dropDuplicates("id", "time") // Change the column positions
+      .select($"id")
+    testStream(df)(
+      AddData(input, 1 -> 1, 1 -> 1, 1 -> 2),
+      CheckAnswer(1, 2),
+      AddData(input, 1 -> 1, 2 -> 3, 2 -> 4),
+      CheckNewAnswer(3, 4),
+      AddData(input, 1 -> 0, 1 -> 1, 3 -> 5, 3 -> 6), // Drop (1 -> 0, 1 -> 1) 
due to watermark
+      CheckNewAnswer(5, 6),
+      AddData(input, 1 -> 0, 4 -> 7), // Drop (1 -> 0) due to watermark
+      CheckNewAnswer(7)
+    )
+  }
+
+  test("SPARK-21546: dropDuplicates should ignore watermark when it's not a 
key") {
+    val input = MemoryStream[(Int, Int)]
+    val df = input.toDS.toDF("id", "time")
+      .withColumn("time", $"time".cast("timestamp"))
+      .withWatermark("time", "1 second")
+      .dropDuplicates("id")
+      .select($"id", $"time".cast("long"))
+    testStream(df)(
+      AddData(input, 1 -> 1, 1 -> 2, 2 -> 2),
+      CheckAnswer(1 -> 1, 2 -> 2)
+    )
+  }
+
+  test("test no-data flag") {
+    val flagKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key
+
+    def testWithFlag(flag: Boolean): Unit = withClue(s"with $flagKey = $flag") 
{
+      val inputData = MemoryStream[Int]
+      val result = inputData.toDS()
+        .withColumn("eventTime", $"value".cast("timestamp"))
+        .withWatermark("eventTime", "10 seconds")
+        .dropDuplicates()
+        .select($"eventTime".cast("long").as[Long])
+
+      testStream(result, Append)(
+        StartStream(additionalConfs = Map(flagKey -> flag.toString)),
+        AddData(inputData, 10, 11, 12, 13, 14, 15),
+        CheckAnswer(10, 11, 12, 13, 14, 15),
+        assertNumStateRows(total = 6, updated = 6),
+
+        AddData(inputData, 25), // Advance watermark to 15 seconds
+        CheckNewAnswer(25),
+        { // State should have been cleaned if flag is set, otherwise should 
not have been cleaned
+          if (flag) assertNumStateRows(total = 1, updated = 1)
+          else assertNumStateRows(total = 7, updated = 1)
+        }
+      )
+    }
+
+    testWithFlag(true)
+    testWithFlag(false)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 11bdd13..da8f960 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -192,7 +192,7 @@ class StreamingInnerJoinSuite extends StreamTest with 
StateStoreMetricsTest with
       CheckLastBatch((1, 5, 11)),
       AddData(rightInput, (1, 10)),
       CheckLastBatch(), // no match as neither 5, nor 10 from leftTime is less 
than rightTime 10 - 5
-      assertNumStateRows(total = 3, updated = 1),
+      assertNumStateRows(total = 3, updated = 3),
 
       // Increase event time watermark to 20s by adding data with time = 30s 
on both inputs
       AddData(leftInput, (1, 3), (1, 30)),
@@ -276,14 +276,14 @@ class StreamingInnerJoinSuite extends StreamTest with 
StateStoreMetricsTest with
       CheckAnswer(),
       AddData(rightInput, (1, 14), (1, 15), (1, 25), (1, 26), (1, 30), (1, 
31)),
       CheckLastBatch((1, 20, 15), (1, 20, 25), (1, 20, 26), (1, 20, 30)),
-      assertNumStateRows(total = 7, updated = 6),
+      assertNumStateRows(total = 7, updated = 7),
 
       // If rightTime = 60, then it matches only leftTime = [50, 65]
       AddData(rightInput, (1, 60)),
       CheckLastBatch(),                // matches with nothing on the left
       AddData(leftInput, (1, 49), (1, 50), (1, 65), (1, 66)),
       CheckLastBatch((1, 50, 60), (1, 65, 60)),
-      assertNumStateRows(total = 12, updated = 4),
+      assertNumStateRows(total = 12, updated = 5),
 
       // Event time watermark = min(left: 66 - delay 20 = 46, right: 60 - 
delay 30 = 30) = 30
       // Left state value watermark = 30 - 10 = slightly less than 20 (since 
condition has <=)
@@ -573,7 +573,7 @@ class StreamingOuterJoinSuite extends StreamTest with 
StateStoreMetricsTest with
       // nulls won't show up until the next batch after the watermark advances.
       MultiAddData(leftInput, 21)(rightInput, 22),
       CheckLastBatch(),
-      assertNumStateRows(total = 12, updated = 2),
+      assertNumStateRows(total = 12, updated = 12),
       AddData(leftInput, 22),
       CheckLastBatch(Row(22, 30, 44, 66), Row(1, 10, 2, null), Row(2, 10, 4, 
null)),
       assertNumStateRows(total = 3, updated = 1)
@@ -591,7 +591,7 @@ class StreamingOuterJoinSuite extends StreamTest with 
StateStoreMetricsTest with
       // nulls won't show up until the next batch after the watermark advances.
       MultiAddData(leftInput, 21)(rightInput, 22),
       CheckLastBatch(),
-      assertNumStateRows(total = 12, updated = 2),
+      assertNumStateRows(total = 12, updated = 12),
       AddData(leftInput, 22),
       CheckLastBatch(Row(22, 30, 44, 66), Row(6, 10, null, 18), Row(7, 10, 
null, 21)),
       assertNumStateRows(total = 3, updated = 1)
@@ -630,7 +630,7 @@ class StreamingOuterJoinSuite extends StreamTest with 
StateStoreMetricsTest with
         CheckLastBatch((1, 1, 5, 10)),
         AddData(rightInput, (1, 11)),
         CheckLastBatch(), // no match as left time is too low
-        assertNumStateRows(total = 5, updated = 1),
+        assertNumStateRows(total = 5, updated = 5),
 
         // Increase event time watermark to 20s by adding data with time = 30s 
on both inputs
         AddData(leftInput, (1, 7), (1, 30)),
@@ -668,7 +668,7 @@ class StreamingOuterJoinSuite extends StreamTest with 
StateStoreMetricsTest with
       CheckLastBatch(Row(1, 10, 2, null), Row(2, 10, 4, null), Row(3, 10, 6, 
null)),
       MultiAddData(leftInput, 20)(rightInput, 21),
       CheckLastBatch(),
-      assertNumStateRows(total = 5, updated = 2),
+      assertNumStateRows(total = 5, updated = 5),  // 1...3 added, but 20 and 
21 not added
       AddData(rightInput, 20),
       CheckLastBatch(
         Row(20, 30, 40, 60)),
@@ -678,7 +678,7 @@ class StreamingOuterJoinSuite extends StreamTest with 
StateStoreMetricsTest with
       CheckLastBatch((40, 50, 80, 120), (41, 50, 82, 123)),
       MultiAddData(leftInput, 70)(rightInput, 71),
       CheckLastBatch(),
-      assertNumStateRows(total = 6, updated = 2),
+      assertNumStateRows(total = 6, updated = 6),  // all inputs added since 
last check
       AddData(rightInput, 70),
       CheckLastBatch((70, 80, 140, 210)),
       assertNumStateRows(total = 3, updated = 1),
@@ -687,7 +687,7 @@ class StreamingOuterJoinSuite extends StreamTest with 
StateStoreMetricsTest with
       CheckLastBatch(),
       MultiAddData(leftInput, 1000)(rightInput, 1001),
       CheckLastBatch(),
-      assertNumStateRows(total = 8, updated = 2),
+      assertNumStateRows(total = 8, updated = 5),  // 101...103 added, but 
1000 and 1001 not added
       AddData(rightInput, 1000),
       CheckLastBatch(
         Row(1000, 1010, 2000, 3000),

http://git-wip-us.apache.org/repos/asf/spark/blob/47b5b685/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 390d67d..0cb2375 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -334,7 +334,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 
         assert(progress.sources.length === 1)
         assert(progress.sources(0).description contains "MemoryStream")
-        assert(progress.sources(0).startOffset === null)
+        assert(progress.sources(0).startOffset === "0")
         assert(progress.sources(0).endOffset !== null)
         assert(progress.sources(0).processedRowsPerSecond === 4.0)  // 2 rows 
processed in 500 ms
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to