This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 37b9c532d69 [SPARK-43542][SS] Define a new error class and apply for the case where streaming query fails due to concurrent run of streaming query with same checkpoint 37b9c532d69 is described below commit 37b9c532d698a35d2f577a8fd85ba31b4529f5ea Author: Eric Marnadi <eric.marn...@databricks.com> AuthorDate: Sat May 20 10:33:12 2023 +0300 [SPARK-43542][SS] Define a new error class and apply for the case where streaming query fails due to concurrent run of streaming query with same checkpoint ### What changes were proposed in this pull request? We are migrating to a new error framework in order to surface errors in a friendlier way to customers. This PR defines a new error class specifically for when there are concurrent updates to the log for the same batch ID ### Why are the changes needed? This gives more information to customers, and allows them to filter in a better way ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? There is an existing test to check the error message upon this condition. Because we are only changing the error type, and not the error message, this test is sufficient. Closes #41205 from ericm-db/SC-130782. Authored-by: Eric Marnadi <eric.marn...@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- core/src/main/resources/error/error-classes.json | 7 +++++++ .../org/apache/spark/sql/errors/QueryExecutionErrors.scala | 7 +++++++ .../spark/sql/execution/streaming/AsyncCommitLog.scala | 5 ++--- .../spark/sql/execution/streaming/AsyncOffsetSeqLog.scala | 5 ++--- .../AsyncProgressTrackingMicroBatchExecution.scala | 5 ++--- .../sql/execution/streaming/MicroBatchExecution.scala | 14 ++++++++------ 6 files changed, 28 insertions(+), 15 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 3c7c29f7532..b130f6f6c93 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -212,6 +212,13 @@ "Another instance of this query was just started by a concurrent session." ] }, + "CONCURRENT_STREAM_LOG_UPDATE" : { + "message" : [ + "Concurrent update to the log. Multiple streaming jobs detected for <batchId>.", + "Please make sure only one streaming job runs on a specific checkpoint location at a time." + ], + "sqlState" : "40000" + }, "CONNECT" : { "message" : [ "Generic Spark Connect error." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 99f7489e8bc..67c5fa54732 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1409,6 +1409,13 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { messageParameters = Map.empty[String, String]) } + def concurrentStreamLogUpdate(batchId: Long): Throwable = { + new SparkException( + errorClass = "CONCURRENT_STREAM_LOG_UPDATE", + messageParameters = Map("batchId" -> batchId.toString), + cause = null) + } + def cannotParseJsonArraysAsStructsError(): SparkRuntimeException = { new SparkRuntimeException( errorClass = "_LEGACY_ERROR_TEMP_2132", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala index e9ad8bed27c..495f2f7ac0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala @@ -23,6 +23,7 @@ import java.util.concurrent.{CompletableFuture, ConcurrentLinkedDeque, ThreadPoo import scala.collection.JavaConverters._ import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.errors.QueryExecutionErrors /** * Implementation of CommitLog to perform asynchronous writes to storage @@ -54,9 +55,7 @@ class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService: if (ret) { batchId } else { - throw new IllegalStateException( - s"Concurrent update to the log. Multiple streaming jobs detected for $batchId" - ) + throw QueryExecutionErrors.concurrentStreamLogUpdate(batchId) } }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala index 4dd49951436..240a64ec7b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.util.{Clock, SystemClock} /** @@ -90,9 +91,7 @@ class AsyncOffsetSeqLog( if (ret) { batchId } else { - throw new IllegalStateException( - s"Concurrent update to the log. Multiple streaming jobs detected for $batchId" - ) + throw QueryExecutionErrors.concurrentStreamLogUpdate(batchId) } }) pendingOffsetWrites.put(batchId, future) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala index f7c7aab65e2..56cdba88175 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.streaming.WriteToStream +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.streaming.Trigger import org.apache.spark.util.{Clock, ThreadUtils} @@ -194,9 +195,7 @@ class AsyncProgressTrackingMicroBatchExecution( } else { if (!commitLog.addInMemory( currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) { - throw new IllegalStateException( - s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId" - ) + throw QueryExecutionErrors.concurrentStreamLogUpdate(currentBatchId) } } offsetLog.removeAsyncOffsetWrite(currentBatchId) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 65a70328148..691cea9edde 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -760,9 +760,11 @@ class MicroBatchExecution( * checkpointing to offset log and any microbatch startup tasks. */ protected def markMicroBatchStart(): Unit = { - assert(offsetLog.add(currentBatchId, - availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)), - s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") + if (!offsetLog.add(currentBatchId, + availableOffsets.toOffsetSeq(sources, offsetSeqMetadata))) { + throw QueryExecutionErrors.concurrentStreamLogUpdate(currentBatchId) + } + logInfo(s"Committed offsets for batch $currentBatchId. " + s"Metadata ${offsetSeqMetadata.toString}") } @@ -780,9 +782,9 @@ class MicroBatchExecution( protected def markMicroBatchEnd(): Unit = { watermarkTracker.updateWatermark(lastExecution.executedPlan) reportTimeTaken("commitOffsets") { - assert(commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)), - "Concurrent update to the commit log. Multiple streaming jobs detected for " + - s"$currentBatchId") + if (!commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) { + throw QueryExecutionErrors.concurrentStreamLogUpdate(currentBatchId) + } } committedOffsets ++= availableOffsets } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org