[
https://issues.apache.org/jira/browse/SPARK-51019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18046406#comment-18046406
]
Ashrith Bandla commented on SPARK-51019:
----------------------------------------
Hey [~dongjoon], I put a fix in for this issue, I think it was being caused by
shallow exception search, so I changed the loop to go in more depth. Here is my
PR, feel free to check it out! https://github.com/apache/spark/pull/53517
> Fix Flaky Test: `SPARK-47148: AQE should avoid to submit shuffle job on
> cancellation`
> -------------------------------------------------------------------------------------
>
> Key: SPARK-51019
> URL: https://issues.apache.org/jira/browse/SPARK-51019
> Project: Spark
> Issue Type: Sub-task
> Components: SQL, Tests
> Affects Versions: 4.0.0
> Reporter: Dongjoon Hyun
> Priority: Major
> Labels: pull-request-available
>
> - https://github.com/apache/spark/actions/runs/13004225714/job/36268222928
> {code}
> == Parsed Logical Plan ==
> 'Join UsingJoin(Inner, [id])
> :- Project [id#133801L, scalarsubquery()#133805]
> : +- Join Inner, (id#133801L = id#133806L)
> : :- Project [id#133801L, scalar-subquery#133800 [] AS
> scalarsubquery()#133805]
> : : : +- Project [slow_udf() AS slow_udf()#133804]
> : : : +- Range (0, 2, step=1)
> : : +- Range (0, 5, step=1)
> : +- Repartition 2, false
> : +- Project [id#133806L]
> : +- Range (0, 10, step=1)
> +- Project [id#133808L, scalar-subquery#133807 [] AS scalarsubquery()#133812]
> : +- Project [slow_udf() AS slow_udf()#133811]
> : +- Range (0, 2, step=1)
> +- Filter (id#133808L > cast(2 as bigint))
> +- Range (0, 15, step=1)
> == Analyzed Logical Plan ==
> id: bigint, scalarsubquery(): int, scalarsubquery(): int
> Project [id#133801L, scalarsubquery()#133805, scalarsubquery()#133812]
> +- Join Inner, (id#133801L = id#133808L)
> :- Project [id#133801L, scalarsubquery()#133805]
> : +- Join Inner, (id#133801L = id#133806L)
> : :- Project [id#133801L, scalar-subquery#133800 [] AS
> scalarsubquery()#133805]
> : : : +- Project [slow_udf() AS slow_udf()#133804]
> : : : +- Range (0, 2, step=1)
> : : +- Range (0, 5, step=1)
> : +- Repartition 2, false
> : +- Project [id#133806L]
> : +- Range (0, 10, step=1)
> +- Project [id#133808L, scalar-subquery#133807 [] AS
> scalarsubquery()#133812]
> : +- Project [slow_udf() AS slow_udf()#133811]
> : +- Range (0, 2, step=1)
> +- Filter (id#133808L > cast(2 as bigint))
> +- Range (0, 15, step=1)
> == Optimized Logical Plan ==
> Project [id#133801L, scalarsubquery()#133805, scalarsubquery()#133812]
> +- Join Inner, (id#133801L = id#133808L)
> :- Project [id#133801L, scalarsubquery()#133805]
> : +- Join Inner, (id#133801L = id#133806L)
> : :- Project [id#133801L, scalar-subquery#133800 [] AS
> scalarsubquery()#133805]
> : : : +- Project [slow_udf() AS slow_udf()#133804]
> : : : +- Range (0, 2, step=1)
> : : +- Filter (id#133801L > 2)
> : : +- Range (0, 5, step=1)
> : +- Repartition 2, false
> : +- Range (0, 10, step=1)
> +- Project [id#133808L, scalar-subquery#133807 [] AS
> scalarsubquery()#133812]
> : +- Project [slow_udf() AS slow_udf()#133804]
> : +- Range (0, 2, step=1)
> +- Filter (id#133808L > 2)
> +- Range (0, 15, step=1)
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [id#133801L, scalarsubquery()#133805, scalarsubquery()#133812]
> +- SortMergeJoin [id#133801L], [id#133808L], Inner
> :- Project [id#133801L, scalarsubquery()#133805]
> : +- SortMergeJoin [id#133801L], [id#133806L], Inner
> : :- Sort [id#133801L ASC NULLS FIRST], false, 0
> : : +- Exchange hashpartitioning(id#133801L, 5),
> ENSURE_REQUIREMENTS, [plan_id=423273]
> : : +- Project [id#133801L, Subquery subquery#133800,
> [id=#423258] AS scalarsubquery()#133805]
> : : : +- Subquery subquery#133800, [id=#423258]
> : : : +- AdaptiveSparkPlan isFinalPlan=false
> : : : +- Project [slow_udf() AS slow_udf()#133804]
> : : : +- Range (0, 2, step=1, splits=2)
> : : +- Filter (id#133801L > 2)
> : : +- Range (0, 5, step=1, splits=2)
> : +- Sort [id#133806L ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(id#133806L, 5),
> ENSURE_REQUIREMENTS, [plan_id=423272]
> : +- TestProblematicCoalesce 2
> : +- Range (0, 10, step=1, splits=2)
> +- Sort [id#133808L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(id#133808L, 5), ENSURE_REQUIREMENTS,
> [plan_id=423284]
> +- Project [id#133808L, Subquery subquery#133807, [id=#423262] AS
> scalarsubquery()#133812]
> : +- Subquery subquery#133807, [id=#423262]
> : +- AdaptiveSparkPlan isFinalPlan=false
> : +- Project [slow_udf() AS slow_udf()#133804]
> : +- Range (0, 2, step=1, splits=2)
> +- Filter (id#133808L > 2)
> +- Range (0, 15, step=1, splits=2)
> 05:58:03.518 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in
> stage 585.0 (TID 3197) (localhost executor driver): TaskKilled (Stage
> cancelled: [SPARK_JOB_CANCELLED] Job 276 cancelled The corresponding SQL
> query has failed. SQLSTATE: XXKDA)
> 05:58:03.520 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in
> stage 585.0 (TID 3198) (localhost executor driver): TaskKilled (Stage
> cancelled: [SPARK_JOB_CANCELLED] Job 276 cancelled The corresponding SQL
> query has failed. SQLSTATE: XXKDA)
> [info] - SPARK-47148: AQE should avoid to submit shuffle job on cancellation
> *** FAILED *** (6 seconds, 85 milliseconds)
> [info]
> scala.`package`.Seq.apply[org.apache.spark.SparkException](error).++[Throwable](scala.Option.apply[Throwable](error.getCause())).++[Throwable](scala.Predef.wrapRefArray[Throwable](error.getSuppressed())).exists(((e:
> Throwable) => e.getMessage().!=(null).&&(e.getMessage().contains("coalesce
> test error")))) was false (AdaptiveQueryExecSuite.scala:940)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]