This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6aa83e7  [SPARK-38033][SS] The SS processing cannot be started because 
the com…
6aa83e7 is described below

commit 6aa83e7a42555982ea80e16812bed65bc16617a3
Author: qitao liu <liuqi...@haizhi.com>
AuthorDate: Mon Feb 28 23:48:12 2022 +0900

    [SPARK-38033][SS] The SS processing cannot be started because the com…
    
    ### What changes were proposed in this pull request?
    
    The code of method: populateStartOffsets in class: 
org.apache.spark.sql.execution.streaming.MicroBatchExecution is modified.
    
    ### Why are the changes needed?
    
    In some unexpected cases, commit and offset are inconsistent, and offset is 
not written into HDFS continuously, as follows:
    
            commits
            /tmp/streaming_xxxxxxxx/commits/113256
            /tmp/streaming_xxxxxxxx/commits/113257
    
            offsets
            /tmp/streaming_xxxxxxxx/offsets/113257
            /tmp/streaming_xxxxxxxx/offsets/113259
    When we start the streaming program, batch ${latestBatchId - 1} is 113258, 
but offsets 113258 doesn't exist, an exception will be thrown, resulting in the 
program cannot be started. As an improvement, Spark doesn‘t need to repair 
itself, but we could probably do some simply analysis and give better error 
message.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    An error message is logged if the exception is thrown.
    
    ### How was this patch tested?
    
    I have provided a test case that can output logs correctly.
    We can run test("SPARK-38033: SS cannot be....") in the 
MicroBatchExecutionSuite class. In fact, I simulated a corresponding scenario 
to test the original exception.This exception validates normally and outputs a 
new error message, as follows:
    
        11:00:26.271 WARN 
org.apache.spark.sql.execution.streaming.ResolveWriteToStream: 
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets 
and will be disabled.
        11:00:26.675 ERROR 
org.apache.spark.sql.execution.streaming.MicroBatchExecution: The offset log 
for batch 3 doesn't exist, which is required to restart the query from the 
latest batch 4 from the offset log. Please ensure there are two subsequent 
offset logs available for the latest batch via manually deleting the offset 
file(s). Please also ensure the latest batch for commit log is equal or one 
batch earlier than the latest batch for offset log.
        11:00:26.690 ERROR 
org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = 
d4358946-170c-49a7-823b-d8e4e9126616, runId = 
9e7f12b8-6c10-4f36-b5c5-136e1bace8de] terminated with error
        java.lang.IllegalStateException: batch 3 doesn't exist
            at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:338)
 ~[classes/:?]
            at scala.Option.getOrElse(Option.scala:189) 
~[scala-library-2.12.15.jar:?]
            at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:331)
 ~[classes/:?]
            at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:222)
 ~[classes/:?]
            at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
~[scala-library-2.12.15.jar:?]
            at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
 ~[classes/:?]
            at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
 ~[classes/:?]
    
    Authored-by: LeeeeLiu liuqt1024gmail.com
    
    Closes #35513 from LeeeeLiu/SPARK-38033-m.
    
    Authored-by: qitao liu <liuqi...@haizhi.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../execution/streaming/MicroBatchExecution.scala  |  6 +++++
 .../commits/0                                      |  2 ++
 .../commits/1                                      |  2 ++
 .../commits/2                                      |  2 ++
 .../metadata                                       |  1 +
 .../offsets/0                                      |  3 +++
 .../offsets/1                                      |  3 +++
 .../offsets/2                                      |  3 +++
 .../offsets/4                                      |  3 +++
 .../streaming/MicroBatchExecutionSuite.scala       | 29 +++++++++++++++++++---
 10 files changed, 51 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index b5667ee..fb434f4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -315,6 +315,12 @@ class MicroBatchExecution(
          * is the second latest batch id in the offset log. */
         if (latestBatchId != 0) {
           val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse 
{
+            logError(s"The offset log for batch ${latestBatchId - 1} doesn't 
exist, " +
+              s"which is required to restart the query from the latest batch 
$latestBatchId " +
+              "from the offset log. Please ensure there are two subsequent 
offset logs " +
+              "available for the latest batch via manually deleting the offset 
file(s). " +
+              "Please also ensure the latest batch for commit log is equal or 
one batch " +
+              "earlier than the latest batch for offset log.")
             throw new IllegalStateException(s"batch ${latestBatchId - 1} 
doesn't exist")
           }
           committedOffsets = secondLatestOffsets.toStreamProgress(sources)
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/0
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/0
new file mode 100644
index 0000000..9c1e302
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/0
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/1
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/1
new file mode 100644
index 0000000..9c1e302
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/1
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/2
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/2
new file mode 100644
index 0000000..9c1e302
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/2
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/metadata
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/metadata
new file mode 100644
index 0000000..4691bcc
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/metadata
@@ -0,0 +1 @@
+{"id":"d4358946-170c-49a7-823b-d8e4e9126616"}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/0
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/0
new file mode 100644
index 0000000..807d7b0
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/0
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1531292029003,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
+0
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/1
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/1
new file mode 100644
index 0000000..cce5410
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/1
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":5000,"batchTimestampMs":1531292030005,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
+1
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/2
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/2
new file mode 100644
index 0000000..dd9a193
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/2
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":5000,"batchTimestampMs":1531292030005,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
+2
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/4
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/4
new file mode 100644
index 0000000..54a6fec
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/4
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":5000,"batchTimestampMs":1531292030005,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
+4
\ No newline at end of file
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
index a508f92..53ef9df 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.File
+
+import org.apache.commons.io.FileUtils
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
@@ -24,8 +27,9 @@ import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.connector.read.streaming
 import org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.functions.{count, timestamp_seconds, window}
-import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
 import org.apache.spark.sql.types.{LongType, StructType}
+import org.apache.spark.util.Utils
 
 class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter {
 
@@ -74,6 +78,27 @@ class MicroBatchExecutionSuite extends StreamTest with 
BeforeAndAfter {
     )
   }
 
+  test("SPARK-38033: SS cannot be started because the commitId and offsetId 
are inconsistent") {
+    val inputData = MemoryStream[Int]
+    val streamEvent = inputData.toDF().select("value")
+
+    val resourceUri = this.getClass.getResource(
+      
"/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/").toURI
+
+    val checkpointDir = Utils.createTempDir().getCanonicalFile
+    // Copy the checkpoint to a temp dir to prevent changes to the original.
+    // Not doing this will lead to the test passing on the first run, but fail 
subsequent runs.
+    FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+
+    testStream(streamEvent) (
+      AddData(inputData, 1, 2, 3, 4, 5, 6),
+      StartStream(Trigger.Once, checkpointLocation = 
checkpointDir.getAbsolutePath),
+      ExpectFailure[IllegalStateException] { e =>
+        assert(e.getMessage.contains("batch 3 doesn't exist"))
+      }
+    )
+  }
+
   test("no-data-batch re-executed after restart should call V1 
source.getBatch()") {
     val testSource = ReExecutedBatchTestSource(spark)
     val df = testSource.toDF()
@@ -153,7 +178,6 @@ class MicroBatchExecutionSuite extends StreamTest with 
BeforeAndAfter {
     )
   }
 
-
   case class ReExecutedBatchTestSource(spark: SparkSession) extends Source {
     @volatile var currentOffset = 0L
     @volatile var getBatchCallCount = 0
@@ -191,4 +215,3 @@ class MicroBatchExecutionSuite extends StreamTest with 
BeforeAndAfter {
     }
   }
 }
-

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to