[ 
https://issues.apache.org/jira/browse/SPARK-56564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18075305#comment-18075305
 ] 

Dan Wu commented on SPARK-56564:
--------------------------------

Created this PR to fix it: [[SPARK-56564][SQL] Fix V2 write path to use 
CommitDeniedException for proper speculation handling by wudanzy · Pull Request 
#55470 · apache/spark|https://github.com/apache/spark/pull/55470]

> V2 DataSource write path throws SparkException instead of 
> CommitDeniedException, causing spurious stage failures with speculation
> ---------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-56564
>                 URL: https://issues.apache.org/jira/browse/SPARK-56564
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 3.5.3, 4.0.0
>            Reporter: Dan Wu
>            Priority: Major
>
> h2. Problem
> When speculation is enabled, the OutputCommitCoordinator denies commit for 
> losing speculative task attempts. The V1 write path 
> ({{SparkHadoopMapRedUtil.commitTask()}}) correctly throws 
> {{CommitDeniedException}}, which the executor converts to 
> {{TaskCommitDenied}} with {{countTowardsTaskFailures=false}}. However, the V2 
> write path ({{WritingSparkTask}} in {{WriteToDataSourceV2Exec.scala}}) calls 
> {{QueryExecutionErrors.commitDeniedError()}} which returns a *plain 
> SparkException*. The executor does not recognize this as a commit denial, so 
> each denied speculative attempt is counted as a real task failure. After 
> {{spark.task.maxFailures}} (default 4) denials, the stage is incorrectly 
> aborted.
> This affects *all V2 DataSource writes* with speculation enabled, including 
> Structured Streaming jobs using {{ForeachWriter}}.
> h2. V1 path (correct)
> {code:scala}
> // SparkHadoopMapRedUtil.scala:85
> throw new CommitDeniedException(message, ctx.stageId(), splitId, 
> ctx.attemptNumber())
> // -> Executor catches CommitDeniedException (Executor.scala:777)
> // -> Converts to TaskCommitDenied (countTowardsTaskFailures=false)
> {code}
> h2. V2 path (buggy)
> {code:scala}
> // WriteToDataSourceV2Exec.scala:590
> throw QueryExecutionErrors.commitDeniedError(partId, taskId, attemptId, 
> stageId, stageAttempt)
> // -> QueryExecutionErrors returns plain SparkException
> // -> Executor generic Throwable handler (countTowardsTaskFailures=true)
> {code}
> h2. Proposed Fix
> Change {{WritingSparkTask.run()}} in {{WriteToDataSourceV2Exec.scala}} to 
> throw {{CommitDeniedException}} directly, matching V1 behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to