This is an automated email from the ASF dual-hosted git repository.

tdas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e4237bb  [SPARK-32794][SS] Fixed rare corner case error in micro-batch 
engine with some stateful queries + no-data-batches + V1 sources
e4237bb is described below

commit e4237bbda68c23a3367fab56fd8cdb521f8a1ae2
Author: Tathagata Das <tathagata.das1...@gmail.com>
AuthorDate: Wed Sep 9 13:35:51 2020 -0400

    [SPARK-32794][SS] Fixed rare corner case error in micro-batch engine with 
some stateful queries + no-data-batches + V1 sources
    
    ### What changes were proposed in this pull request?
    Make MicroBatchExecution explicitly call `getBatch` when the start and end 
offsets are the same.
    
    ### Why are the changes needed?
    
    Structured Streaming micro-batch engine has the contract with V1 data 
sources that, after a restart, it will call `source.getBatch()` on the last 
batch attempted before the restart. However, a very rare combination of 
sequences violates this contract. It occurs only when
    - The streaming query has specific types of stateful operations with 
watermarks (e.g., aggregation in append, mapGroupsWithState with timeouts).
        - These queries can execute a batch even without new data when the 
previous updates the watermark and the stateful ops are such that the new 
watermark can cause new output/cleanup. Such batches are called no-data-batches.
    - The last batch before termination was an incomplete no-data-batch. Upon 
restart, the micro-batch engine fails to call `source.getBatch` when attempting 
to re-execute the incomplete no-data-batch.
    
    This occurs because no-data-batches has the same and end offsets, and when 
a batch is executed, if the start and end offset is same then calling 
`source.getBatch` is skipped as it is assumed the generated plan will be empty. 
This only affects V1 data sources like Delta and Autoloader which rely on this 
invariant to detect in the source whether the query is being started from 
scratch or restarted.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    New unit test with a mock v1 source that fails without the fix.
    
    Closes #29651 from tdas/SPARK-32794.
    
    Authored-by: Tathagata Das <tathagata.das1...@gmail.com>
    Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>
---
 .../execution/streaming/MicroBatchExecution.scala  |  11 ++
 .../streaming/MicroBatchExecutionSuite.scala       | 123 +++++++++++++++++++++
 .../apache/spark/sql/streaming/StreamTest.scala    |   8 ++
 3 files changed, 142 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index abbfea8..468a8c9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -321,6 +321,17 @@ class MicroBatchExecution(
               committedOffsets ++= availableOffsets
               watermarkTracker.setWatermark(
                 math.max(watermarkTracker.currentWatermark, 
commitMetadata.nextBatchWatermarkMs))
+            } else if (latestCommittedBatchId == latestBatchId - 1) {
+              availableOffsets.foreach {
+                case (source: Source, end: Offset) =>
+                  val start = 
committedOffsets.get(source).map(_.asInstanceOf[Offset])
+                  if (start.map(_ == end).getOrElse(true)) {
+                    source.getBatch(start, end)
+                  }
+                case nonV1Tuple =>
+                  // The V2 API does not have the same edge case requiring 
getBatch to be called
+                  // here, so we do nothing here.
+              }
             } else if (latestCommittedBatchId < latestBatchId - 1) {
               logWarning(s"Batch completion log latest batch id is " +
                 s"${latestCommittedBatchId}, which is not trailing " +
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
index c0f25e3..a508f92 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
@@ -19,8 +19,13 @@ package org.apache.spark.sql.execution.streaming
 
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical.Range
+import org.apache.spark.sql.connector.read.streaming
+import org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.functions.{count, timestamp_seconds, window}
 import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{LongType, StructType}
 
 class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter {
 
@@ -68,4 +73,122 @@ class MicroBatchExecutionSuite extends StreamTest with 
BeforeAndAfter {
       CheckNewAnswer((25, 1), (30, 1))   // This should not throw the error 
reported in SPARK-24156
     )
   }
+
+  test("no-data-batch re-executed after restart should call V1 
source.getBatch()") {
+    val testSource = ReExecutedBatchTestSource(spark)
+    val df = testSource.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "10 seconds")
+      .groupBy(window($"eventTime", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long])
+
+    /** Reset this test source so that it appears to be a new source requiring 
initialization */
+    def resetSource(): StreamAction = Execute("reset source") { _ =>
+      testSource.reset()  // Make it look like a new source that needs to be 
re-initialized
+      require(testSource.currentOffset === 0)
+      require(testSource.getBatchCallCount === 0)
+    }
+
+    /** Add data to this test source by incrementing its available offset */
+    def addData(numNewRows: Int): StreamAction = new AddData {
+      override def addData(query: Option[StreamExecution]): (SparkDataStream, 
streaming.Offset) = {
+        testSource.incrementAvailableOffset(numNewRows)
+        (testSource, testSource.getOffset.get)
+      }
+    }
+
+    testStream(df)(
+      addData(numNewRows = 10),   // generate values 1...10, sets watermark to 0
+      CheckAnswer(),
+      addData(numNewRows = 10),   // generate values 11...20, sets watermark 
to 10
+      ProcessAllAvailable(),      // let no-data-batch be executed
+      CheckAnswer(0, 5),          // start time of windows closed and outputted
+      Execute("verify source internal state before stop") { q =>
+        // Last batch should be a no-data batch
+        require(q.lastProgress.numInputRows === 0)
+        // Source should have expected internal state
+        require(testSource.currentOffset === 20)
+        // getBatch should be called only for 2 batches with data, not for 
no-data-batches
+        assert(testSource.getBatchCallCount === 2)
+      },
+      StopStream,
+
+      /* Verify that if the last no-data-batch was incomplete, getBatch() is 
called only once */
+      Execute("mark last batch as incomplete") { q =>
+        // Delete the last committed batch from the commit log to signify that 
the last batch
+        // (a no-data batch) did not complete and has to be re-executed on 
restart.
+        val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L)
+        q.commitLog.purgeAfter(commit - 1)
+      },
+      resetSource(),
+      StartStream(),
+      ProcessAllAvailable(),  // allow initialization and re-execution
+      Execute("verify source.getBatch() called after re-executed 
no-data-batch") { q =>
+        // After restart, getBatch() should be called once even for no-data 
batch
+        assert(testSource.getBatchCallCount === 1)
+        assert(testSource.currentOffset === 20)
+      },
+      addData(numNewRows = 10),   // generate values 21...30, sets watermark 
to 20
+      ProcessAllAvailable(),      // let no-data-batch be executed
+      CheckAnswer(0, 5, 10, 15),
+      StopStream,
+
+      /* Verify that if the last no-data-batch was complete, getBatch() is 
still called only once */
+      Execute("verify last batch was complete") { q =>
+        // Verify that the commit log records the last batch as completed
+        require(q.commitLog.getLatest().map(_._1).get === 
q.offsetLog.getLatest().map(_._1).get)
+      },
+      resetSource(),
+      StartStream(),
+      ProcessAllAvailable(),      // allow initialization to completed
+      Execute("verify source.getBatch() called even if no-data-batch was not 
re-executed") { q =>
+        // After restart, getBatch() should be called even for no-data batch, 
but only once
+        assert(testSource.getBatchCallCount === 1)
+        assert(testSource.currentOffset === 30)
+      },
+      addData(numNewRows = 10),   // generate values 31...40, sets watermark 
to 30
+      ProcessAllAvailable(),      // let no-data-batch be executed
+      CheckAnswer(0, 5, 10, 15, 20, 25)
+    )
+  }
+
+
+  case class ReExecutedBatchTestSource(spark: SparkSession) extends Source {
+    @volatile var currentOffset = 0L
+    @volatile var getBatchCallCount = 0
+
+    override def getOffset: Option[Offset] = {
+      if (currentOffset <= 0) None else Some(LongOffset(currentOffset))
+    }
+
+    override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+      getBatchCallCount = getBatchCallCount + 1
+      if (currentOffset == 0) currentOffset = getOffsetValue(end)
+      val plan = Range(
+        start.map(getOffsetValue).getOrElse(0L) + 1L, getOffsetValue(end) + 
1L, 1, None,
+        isStreaming = true)
+      Dataset.ofRows(spark, plan)
+    }
+
+    def incrementAvailableOffset(numNewRows: Int): Unit = {
+      currentOffset = currentOffset + numNewRows
+    }
+
+    def reset(): Unit = {
+      currentOffset = 0L
+      getBatchCallCount = 0
+    }
+    def toDF(): DataFrame = Dataset.ofRows(spark, 
StreamingExecutionRelation(this, spark))
+    override def schema: StructType = new StructType().add("value", LongType)
+    override def stop(): Unit = {}
+    private def getOffsetValue(offset: Offset): Long = {
+      offset match {
+        case s: SerializedOffset => LongOffset(s).offset
+        case l: LongOffset => l.offset
+        case _ => throw new IllegalArgumentException("incorrect offset type: " 
+ offset)
+      }
+    }
+  }
 }
+
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index ceea1e4..7a2e29f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -306,6 +306,14 @@ trait StreamTest extends QueryTest with SharedSparkSession 
with TimeLimits with
     def apply(func: StreamExecution => Any): AssertOnQuery = 
apply("Execute")(func)
   }
 
+  /** Call `StreamingQuery.processAllAvailable()` to wait. */
+  object ProcessAllAvailable {
+    def apply(): AssertOnQuery = AssertOnQuery { query =>
+      query.processAllAvailable()
+      true
+    }
+  }
+
   object AwaitEpoch {
     def apply(epoch: Long): AssertOnQuery =
       Execute {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to