[ 
https://issues.apache.org/jira/browse/SPARK-51019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18046406#comment-18046406
 ] 

Ashrith Bandla commented on SPARK-51019:
----------------------------------------

Hey [~dongjoon], I put a fix in for this issue, I think it was being caused by 
shallow exception search, so I changed the loop to go in more depth. Here is my 
PR, feel free to check it out! https://github.com/apache/spark/pull/53517

> Fix Flaky Test: `SPARK-47148: AQE should avoid to submit shuffle job on 
> cancellation`
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-51019
>                 URL: https://issues.apache.org/jira/browse/SPARK-51019
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL, Tests
>    Affects Versions: 4.0.0
>            Reporter: Dongjoon Hyun
>            Priority: Major
>              Labels: pull-request-available
>
> - https://github.com/apache/spark/actions/runs/13004225714/job/36268222928
> {code}
> == Parsed Logical Plan ==
> 'Join UsingJoin(Inner, [id])
> :- Project [id#133801L, scalarsubquery()#133805]
> :  +- Join Inner, (id#133801L = id#133806L)
> :     :- Project [id#133801L, scalar-subquery#133800 [] AS 
> scalarsubquery()#133805]
> :     :  :  +- Project [slow_udf() AS slow_udf()#133804]
> :     :  :     +- Range (0, 2, step=1)
> :     :  +- Range (0, 5, step=1)
> :     +- Repartition 2, false
> :        +- Project [id#133806L]
> :           +- Range (0, 10, step=1)
> +- Project [id#133808L, scalar-subquery#133807 [] AS scalarsubquery()#133812]
>    :  +- Project [slow_udf() AS slow_udf()#133811]
>    :     +- Range (0, 2, step=1)
>    +- Filter (id#133808L > cast(2 as bigint))
>       +- Range (0, 15, step=1)
> == Analyzed Logical Plan ==
> id: bigint, scalarsubquery(): int, scalarsubquery(): int
> Project [id#133801L, scalarsubquery()#133805, scalarsubquery()#133812]
> +- Join Inner, (id#133801L = id#133808L)
>    :- Project [id#133801L, scalarsubquery()#133805]
>    :  +- Join Inner, (id#133801L = id#133806L)
>    :     :- Project [id#133801L, scalar-subquery#133800 [] AS 
> scalarsubquery()#133805]
>    :     :  :  +- Project [slow_udf() AS slow_udf()#133804]
>    :     :  :     +- Range (0, 2, step=1)
>    :     :  +- Range (0, 5, step=1)
>    :     +- Repartition 2, false
>    :        +- Project [id#133806L]
>    :           +- Range (0, 10, step=1)
>    +- Project [id#133808L, scalar-subquery#133807 [] AS 
> scalarsubquery()#133812]
>       :  +- Project [slow_udf() AS slow_udf()#133811]
>       :     +- Range (0, 2, step=1)
>       +- Filter (id#133808L > cast(2 as bigint))
>          +- Range (0, 15, step=1)
> == Optimized Logical Plan ==
> Project [id#133801L, scalarsubquery()#133805, scalarsubquery()#133812]
> +- Join Inner, (id#133801L = id#133808L)
>    :- Project [id#133801L, scalarsubquery()#133805]
>    :  +- Join Inner, (id#133801L = id#133806L)
>    :     :- Project [id#133801L, scalar-subquery#133800 [] AS 
> scalarsubquery()#133805]
>    :     :  :  +- Project [slow_udf() AS slow_udf()#133804]
>    :     :  :     +- Range (0, 2, step=1)
>    :     :  +- Filter (id#133801L > 2)
>    :     :     +- Range (0, 5, step=1)
>    :     +- Repartition 2, false
>    :        +- Range (0, 10, step=1)
>    +- Project [id#133808L, scalar-subquery#133807 [] AS 
> scalarsubquery()#133812]
>       :  +- Project [slow_udf() AS slow_udf()#133804]
>       :     +- Range (0, 2, step=1)
>       +- Filter (id#133808L > 2)
>          +- Range (0, 15, step=1)
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [id#133801L, scalarsubquery()#133805, scalarsubquery()#133812]
>    +- SortMergeJoin [id#133801L], [id#133808L], Inner
>       :- Project [id#133801L, scalarsubquery()#133805]
>       :  +- SortMergeJoin [id#133801L], [id#133806L], Inner
>       :     :- Sort [id#133801L ASC NULLS FIRST], false, 0
>       :     :  +- Exchange hashpartitioning(id#133801L, 5), 
> ENSURE_REQUIREMENTS, [plan_id=423273]
>       :     :     +- Project [id#133801L, Subquery subquery#133800, 
> [id=#423258] AS scalarsubquery()#133805]
>       :     :        :  +- Subquery subquery#133800, [id=#423258]
>       :     :        :     +- AdaptiveSparkPlan isFinalPlan=false
>       :     :        :        +- Project [slow_udf() AS slow_udf()#133804]
>       :     :        :           +- Range (0, 2, step=1, splits=2)
>       :     :        +- Filter (id#133801L > 2)
>       :     :           +- Range (0, 5, step=1, splits=2)
>       :     +- Sort [id#133806L ASC NULLS FIRST], false, 0
>       :        +- Exchange hashpartitioning(id#133806L, 5), 
> ENSURE_REQUIREMENTS, [plan_id=423272]
>       :           +- TestProblematicCoalesce 2
>       :              +- Range (0, 10, step=1, splits=2)
>       +- Sort [id#133808L ASC NULLS FIRST], false, 0
>          +- Exchange hashpartitioning(id#133808L, 5), ENSURE_REQUIREMENTS, 
> [plan_id=423284]
>             +- Project [id#133808L, Subquery subquery#133807, [id=#423262] AS 
> scalarsubquery()#133812]
>                :  +- Subquery subquery#133807, [id=#423262]
>                :     +- AdaptiveSparkPlan isFinalPlan=false
>                :        +- Project [slow_udf() AS slow_udf()#133804]
>                :           +- Range (0, 2, step=1, splits=2)
>                +- Filter (id#133808L > 2)
>                   +- Range (0, 15, step=1, splits=2)
> 05:58:03.518 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in 
> stage 585.0 (TID 3197) (localhost executor driver): TaskKilled (Stage 
> cancelled: [SPARK_JOB_CANCELLED] Job 276 cancelled The corresponding SQL 
> query has failed. SQLSTATE: XXKDA)
> 05:58:03.520 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in 
> stage 585.0 (TID 3198) (localhost executor driver): TaskKilled (Stage 
> cancelled: [SPARK_JOB_CANCELLED] Job 276 cancelled The corresponding SQL 
> query has failed. SQLSTATE: XXKDA)
> [info] - SPARK-47148: AQE should avoid to submit shuffle job on cancellation 
> *** FAILED *** (6 seconds, 85 milliseconds)
> [info]   
> scala.`package`.Seq.apply[org.apache.spark.SparkException](error).++[Throwable](scala.Option.apply[Throwable](error.getCause())).++[Throwable](scala.Predef.wrapRefArray[Throwable](error.getSuppressed())).exists(((e:
>  Throwable) => e.getMessage().!=(null).&&(e.getMessage().contains("coalesce 
> test error")))) was false (AdaptiveQueryExecSuite.scala:940)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to