This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6aa83e7 [SPARK-38033][SS] The SS processing cannot be started because the com… 6aa83e7 is described below commit 6aa83e7a42555982ea80e16812bed65bc16617a3 Author: qitao liu <liuqi...@haizhi.com> AuthorDate: Mon Feb 28 23:48:12 2022 +0900 [SPARK-38033][SS] The SS processing cannot be started because the com… ### What changes were proposed in this pull request? The code of method: populateStartOffsets in class: org.apache.spark.sql.execution.streaming.MicroBatchExecution is modified. ### Why are the changes needed? In some unexpected cases, commit and offset are inconsistent, and offset is not written into HDFS continuously, as follows: commits /tmp/streaming_xxxxxxxx/commits/113256 /tmp/streaming_xxxxxxxx/commits/113257 offsets /tmp/streaming_xxxxxxxx/offsets/113257 /tmp/streaming_xxxxxxxx/offsets/113259 When we start the streaming program, batch ${latestBatchId - 1} is 113258, but offsets 113258 doesn't exist, an exception will be thrown, resulting in the program cannot be started. As an improvement, Spark doesn‘t need to repair itself, but we could probably do some simply analysis and give better error message. ### Does this PR introduce _any_ user-facing change? Yes. An error message is logged if the exception is thrown. ### How was this patch tested? I have provided a test case that can output logs correctly. We can run test("SPARK-38033: SS cannot be....") in the MicroBatchExecutionSuite class. In fact, I simulated a corresponding scenario to test the original exception.This exception validates normally and outputs a new error message, as follows: 11:00:26.271 WARN org.apache.spark.sql.execution.streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. 11:00:26.675 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: The offset log for batch 3 doesn't exist, which is required to restart the query from the latest batch 4 from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log. 11:00:26.690 ERROR org.apache.spark.sql.execution.streaming.MicroBatchExecution: Query [id = d4358946-170c-49a7-823b-d8e4e9126616, runId = 9e7f12b8-6c10-4f36-b5c5-136e1bace8de] terminated with error java.lang.IllegalStateException: batch 3 doesn't exist at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$populateStartOffsets$1(MicroBatchExecution.scala:338) ~[classes/:?] at scala.Option.getOrElse(Option.scala:189) ~[scala-library-2.12.15.jar:?] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:331) ~[classes/:?] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:222) ~[classes/:?] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?] at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) ~[classes/:?] at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) ~[classes/:?] Authored-by: LeeeeLiu liuqt1024gmail.com Closes #35513 from LeeeeLiu/SPARK-38033-m. Authored-by: qitao liu <liuqi...@haizhi.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../execution/streaming/MicroBatchExecution.scala | 6 +++++ .../commits/0 | 2 ++ .../commits/1 | 2 ++ .../commits/2 | 2 ++ .../metadata | 1 + .../offsets/0 | 3 +++ .../offsets/1 | 3 +++ .../offsets/2 | 3 +++ .../offsets/4 | 3 +++ .../streaming/MicroBatchExecutionSuite.scala | 29 +++++++++++++++++++--- 10 files changed, 51 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index b5667ee..fb434f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -315,6 +315,12 @@ class MicroBatchExecution( * is the second latest batch id in the offset log. */ if (latestBatchId != 0) { val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse { + logError(s"The offset log for batch ${latestBatchId - 1} doesn't exist, " + + s"which is required to restart the query from the latest batch $latestBatchId " + + "from the offset log. Please ensure there are two subsequent offset logs " + + "available for the latest batch via manually deleting the offset file(s). " + + "Please also ensure the latest batch for commit log is equal or one batch " + + "earlier than the latest batch for offset log.") throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist") } committedOffsets = secondLatestOffsets.toStreamProgress(sources) diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/0 new file mode 100644 index 0000000..9c1e302 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/0 @@ -0,0 +1,2 @@ +v1 +{"nextBatchWatermarkMs":0} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/1 new file mode 100644 index 0000000..9c1e302 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/1 @@ -0,0 +1,2 @@ +v1 +{"nextBatchWatermarkMs":0} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/2 b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/2 new file mode 100644 index 0000000..9c1e302 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/commits/2 @@ -0,0 +1,2 @@ +v1 +{"nextBatchWatermarkMs":0} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/metadata new file mode 100644 index 0000000..4691bcc --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/metadata @@ -0,0 +1 @@ +{"id":"d4358946-170c-49a7-823b-d8e4e9126616"} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/0 new file mode 100644 index 0000000..807d7b0 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/0 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1531292029003,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}} +0 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/1 new file mode 100644 index 0000000..cce5410 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/1 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":5000,"batchTimestampMs":1531292030005,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}} +1 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/2 b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/2 new file mode 100644 index 0000000..dd9a193 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/2 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":5000,"batchTimestampMs":1531292030005,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}} +2 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/4 b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/4 new file mode 100644 index 0000000..54a6fec --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/offsets/4 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":5000,"batchTimestampMs":1531292030005,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}} +4 \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala index a508f92..53ef9df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.streaming +import java.io.File + +import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfter import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} @@ -24,8 +27,9 @@ import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.connector.read.streaming import org.apache.spark.sql.connector.read.streaming.SparkDataStream import org.apache.spark.sql.functions.{count, timestamp_seconds, window} -import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.streaming.{StreamTest, Trigger} import org.apache.spark.sql.types.{LongType, StructType} +import org.apache.spark.util.Utils class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter { @@ -74,6 +78,27 @@ class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter { ) } + test("SPARK-38033: SS cannot be started because the commitId and offsetId are inconsistent") { + val inputData = MemoryStream[Int] + val streamEvent = inputData.toDF().select("value") + + val resourceUri = this.getClass.getResource( + "/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/").toURI + + val checkpointDir = Utils.createTempDir().getCanonicalFile + // Copy the checkpoint to a temp dir to prevent changes to the original. + // Not doing this will lead to the test passing on the first run, but fail subsequent runs. + FileUtils.copyDirectory(new File(resourceUri), checkpointDir) + + testStream(streamEvent) ( + AddData(inputData, 1, 2, 3, 4, 5, 6), + StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath), + ExpectFailure[IllegalStateException] { e => + assert(e.getMessage.contains("batch 3 doesn't exist")) + } + ) + } + test("no-data-batch re-executed after restart should call V1 source.getBatch()") { val testSource = ReExecutedBatchTestSource(spark) val df = testSource.toDF() @@ -153,7 +178,6 @@ class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter { ) } - case class ReExecutedBatchTestSource(spark: SparkSession) extends Source { @volatile var currentOffset = 0L @volatile var getBatchCallCount = 0 @@ -191,4 +215,3 @@ class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter { } } } - --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org