This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 37b9c532d69 [SPARK-43542][SS] Define a new error class and apply for 
the case where streaming query fails due to concurrent run of streaming query 
with same checkpoint
37b9c532d69 is described below

commit 37b9c532d698a35d2f577a8fd85ba31b4529f5ea
Author: Eric Marnadi <eric.marn...@databricks.com>
AuthorDate: Sat May 20 10:33:12 2023 +0300

    [SPARK-43542][SS] Define a new error class and apply for the case where 
streaming query fails due to concurrent run of streaming query with same 
checkpoint
    
    ### What changes were proposed in this pull request?
    
    We are migrating to a new error framework in order to surface errors in a 
friendlier way to customers. This PR defines a new error class specifically for 
when there are concurrent updates to the log for the same batch ID
    
    ### Why are the changes needed?
    
    This gives more information to customers, and allows them to filter in a 
better way
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    There is an existing test to check the error message upon this condition. 
Because we are only changing the error type, and not the error message, this 
test is sufficient.
    
    Closes #41205 from ericm-db/SC-130782.
    
    Authored-by: Eric Marnadi <eric.marn...@databricks.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 core/src/main/resources/error/error-classes.json           |  7 +++++++
 .../org/apache/spark/sql/errors/QueryExecutionErrors.scala |  7 +++++++
 .../spark/sql/execution/streaming/AsyncCommitLog.scala     |  5 ++---
 .../spark/sql/execution/streaming/AsyncOffsetSeqLog.scala  |  5 ++---
 .../AsyncProgressTrackingMicroBatchExecution.scala         |  5 ++---
 .../sql/execution/streaming/MicroBatchExecution.scala      | 14 ++++++++------
 6 files changed, 28 insertions(+), 15 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 3c7c29f7532..b130f6f6c93 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -212,6 +212,13 @@
       "Another instance of this query was just started by a concurrent 
session."
     ]
   },
+  "CONCURRENT_STREAM_LOG_UPDATE" : {
+    "message" : [
+      "Concurrent update to the log. Multiple streaming jobs detected for 
<batchId>.",
+      "Please make sure only one streaming job runs on a specific checkpoint 
location at a time."
+    ],
+    "sqlState" : "40000"
+  },
   "CONNECT" : {
     "message" : [
       "Generic Spark Connect error."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 99f7489e8bc..67c5fa54732 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1409,6 +1409,13 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
       messageParameters = Map.empty[String, String])
   }
 
+  def concurrentStreamLogUpdate(batchId: Long): Throwable = {
+    new SparkException(
+      errorClass = "CONCURRENT_STREAM_LOG_UPDATE",
+      messageParameters = Map("batchId" -> batchId.toString),
+      cause = null)
+  }
+
   def cannotParseJsonArraysAsStructsError(): SparkRuntimeException = {
     new SparkRuntimeException(
       errorClass = "_LEGACY_ERROR_TEMP_2132",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala
index e9ad8bed27c..495f2f7ac0b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.{CompletableFuture, 
ConcurrentLinkedDeque, ThreadPoo
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.errors.QueryExecutionErrors
 
 /**
  * Implementation of CommitLog to perform asynchronous writes to storage
@@ -54,9 +55,7 @@ class AsyncCommitLog(sparkSession: SparkSession, path: 
String, executorService:
       if (ret) {
         batchId
       } else {
-        throw new IllegalStateException(
-          s"Concurrent update to the log. Multiple streaming jobs detected for 
$batchId"
-        )
+        throw QueryExecutionErrors.concurrentStreamLogUpdate(batchId)
       }
     })
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala
index 4dd49951436..240a64ec7b0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.util.{Clock, SystemClock}
 
 /**
@@ -90,9 +91,7 @@ class AsyncOffsetSeqLog(
         if (ret) {
           batchId
         } else {
-          throw new IllegalStateException(
-            s"Concurrent update to the log. Multiple streaming jobs detected 
for $batchId"
-          )
+          throw QueryExecutionErrors.concurrentStreamLogUpdate(batchId)
         }
       })
       pendingOffsetWrites.put(batchId, future)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
index f7c7aab65e2..56cdba88175 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.streaming.Trigger
 import org.apache.spark.util.{Clock, ThreadUtils}
 
@@ -194,9 +195,7 @@ class AsyncProgressTrackingMicroBatchExecution(
       } else {
         if (!commitLog.addInMemory(
           currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))) {
-          throw new IllegalStateException(
-            s"Concurrent update to the log. Multiple streaming jobs detected 
for $currentBatchId"
-          )
+          throw QueryExecutionErrors.concurrentStreamLogUpdate(currentBatchId)
         }
       }
       offsetLog.removeAsyncOffsetWrite(currentBatchId)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 65a70328148..691cea9edde 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -760,9 +760,11 @@ class MicroBatchExecution(
    * checkpointing to offset log and any microbatch startup tasks.
    */
   protected def markMicroBatchStart(): Unit = {
-    assert(offsetLog.add(currentBatchId,
-      availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
-      s"Concurrent update to the log. Multiple streaming jobs detected for 
$currentBatchId")
+    if (!offsetLog.add(currentBatchId,
+      availableOffsets.toOffsetSeq(sources, offsetSeqMetadata))) {
+      throw QueryExecutionErrors.concurrentStreamLogUpdate(currentBatchId)
+    }
+
     logInfo(s"Committed offsets for batch $currentBatchId. " +
       s"Metadata ${offsetSeqMetadata.toString}")
   }
@@ -780,9 +782,9 @@ class MicroBatchExecution(
   protected def markMicroBatchEnd(): Unit = {
     watermarkTracker.updateWatermark(lastExecution.executedPlan)
     reportTimeTaken("commitOffsets") {
-      assert(commitLog.add(currentBatchId, 
CommitMetadata(watermarkTracker.currentWatermark)),
-        "Concurrent update to the commit log. Multiple streaming jobs detected 
for " +
-          s"$currentBatchId")
+      if (!commitLog.add(currentBatchId, 
CommitMetadata(watermarkTracker.currentWatermark))) {
+        throw QueryExecutionErrors.concurrentStreamLogUpdate(currentBatchId)
+      }
     }
     committedOffsets ++= availableOffsets
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to