This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4ba38c2c3305 [SPARK-52346][SDP] Fix counter initialization and
increment logic for flow retries
4ba38c2c3305 is described below
commit 4ba38c2c33055a853c7d39a974c670914dc8a049
Author: Jacky Wang <[email protected]>
AuthorDate: Mon Sep 1 23:13:28 2025 -0700
[SPARK-52346][SDP] Fix counter initialization and increment logic for flow
retries
### What changes were proposed in this pull request?
Fixes flow failure counter increments by switching to `updateWith`. This
prevents first increment from throwing `NoSuchElementException` when key does
not exist.
### Why are the changes needed?
Current impl `flowToNumConsecutiveFailure(flowIdentifier)` throws exception
when a flow is retried for first time:
```
java.util.NoSuchElementException: key not found:
`spark_catalog`.`test_db`.`mv`
at scala.collection.MapOps.default(Map.scala:289)
at scala.collection.MapOps.default$(Map.scala:288)
at scala.collection.AbstractMap.default(Map.scala:420)
at scala.collection.MapOps.apply(Map.scala:176)
at scala.collection.MapOps.apply$(Map.scala:175)
at scala.collection.AbstractMap.apply(Map.scala:420)
at
org.apache.spark.sql.pipelines.graph.GraphExecution.incrementFlowToNumConsecutiveFailure(GraphExecution.scala:52)
at
org.apache.spark.sql.pipelines.graph.GraphExecution.$anonfun$planAndStartFlow$1(GraphExecution.scala:92)
at
org.apache.spark.sql.pipelines.graph.GraphExecution.$anonfun$planAndStartFlow$1$adapted(GraphExecution.scala:90)
at
scala.concurrent.impl.Promise$Transformation.run(Promise.scala:484)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Run pipeline manually with flow failure to test.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52094 from JiaqiWang18/sdp-graphexecution-incrementflow-map-default.
Authored-by: Jacky Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../spark/sql/pipelines/graph/GraphExecution.scala | 5 +++-
.../graph/TriggeredGraphExecutionSuite.scala | 31 +++++++++++++++++++++-
2 files changed, 34 insertions(+), 2 deletions(-)
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala
index 4c969f1bbefd..c687c7f01ed7 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala
@@ -49,7 +49,10 @@ abstract class GraphExecution(
/** Increments flow execution retry count for `flow`. */
private def incrementFlowToNumConsecutiveFailure(flowIdentifier:
TableIdentifier): Unit = {
- flowToNumConsecutiveFailure.put(flowIdentifier,
flowToNumConsecutiveFailure(flowIdentifier) + 1)
+ flowToNumConsecutiveFailure.updateWith(flowIdentifier) {
+ case Some(count) => Some(count + 1)
+ case None => Some(1)
+ }
}
/**
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala
index 4aaa139378b9..4fcd9dad93fe 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util,
Identifier, TableC
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
import org.apache.spark.sql.pipelines.common.{FlowStatus, RunState}
import org.apache.spark.sql.pipelines.graph.TriggeredGraphExecution.StreamState
-import org.apache.spark.sql.pipelines.logging.EventLevel
+import org.apache.spark.sql.pipelines.logging.{EventLevel, FlowProgress}
import org.apache.spark.sql.pipelines.utils.{ExecutionTest,
TestGraphRegistrationContext}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
@@ -1027,4 +1027,33 @@ class TriggeredGraphExecutionSuite extends ExecutionTest
with SharedSparkSession
}
)
}
+
+ test("consecutive failure event level is correct") {
+ val session = spark
+ import session.implicits._
+
+ val pipelineDef = new TestGraphRegistrationContext(spark) {
+ registerMaterializedView(
+ "retry_test",
+ partitionCols = Some(Seq("nonexistent_col")),
+ query = dfFlowFunc(spark.range(5).withColumn("id_mod", ($"id" %
2).cast("int")))
+ )
+ }
+
+ val graph = pipelineDef.toDataflowGraph
+ val updateContext = TestPipelineUpdateContext(spark, graph)
+ updateContext.pipelineExecution.runPipeline()
+ updateContext.pipelineExecution.awaitCompletion()
+
+ val failedEvents = updateContext.eventBuffer.getEvents.filter { e =>
+ e.details.isInstanceOf[FlowProgress] &&
+ e.details.asInstanceOf[FlowProgress].status == FlowStatus.FAILED
+ }
+
+ val warnCount = failedEvents.count(_.level == EventLevel.WARN)
+ // flowToNumConsecutiveFailure controls that the last failure should be
logged as ERROR
+ val errorCount = failedEvents.count(_.level == EventLevel.ERROR)
+
+ assert(warnCount == 2 && errorCount == 1)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]