This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 56f0233658e [SPARK-39868][CORE][TESTS] StageFailed event should attach with the root cause 56f0233658e is described below commit 56f0233658e53425ff915e803284139defb4af42 Author: panbingkun <pbk1...@gmail.com> AuthorDate: Wed Jul 27 14:21:34 2022 +0900 [SPARK-39868][CORE][TESTS] StageFailed event should attach with the root cause ### What changes were proposed in this pull request? The pr follow https://github.com/apache/spark/pull/37245 StageFailed event should attach with the root cause ### Why are the changes needed? **It may be a good way for users to know the reason of failure.** By carefully investigating the issue: https://issues.apache.org/jira/browse/SPARK-39622, I found the root cause of test failure: StageFailed don't attach the failed reason from executor. when OutputCommitCoordinator execute 'taskCompleted', the 'reason' is ignored. Scenario 1: receive TaskSetFailed (Success) > InsertIntoHadoopFsRelationCommand > FileFormatWriter.write > _**handleTaskSetFailed**_ (**attach root cause**) > abortStage > failJobAndIndependentStages > SparkListenerJobEnd Scenario 1: receive StageFailed (Fail) > InsertIntoHadoopFsRelationCommand > FileFormatWriter.write > _**handleStageFailed**_ (**don't attach root cause**) > abortStage > failJobAndIndependentStages > SparkListenerJobEnd ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual run UT & Pass GitHub Actions Closes #37292 from panbingkun/SPARK-39868. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../apache/spark/scheduler/OutputCommitCoordinator.scala | 3 ++- .../OutputCommitCoordinatorIntegrationSuite.scala | 3 ++- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 14 ++------------ 3 files changed, 6 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index a33c2bb93bc..cd5d6b8f9c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -160,7 +160,8 @@ private[spark] class OutputCommitCoordinator( if (stageState.authorizedCommitters(partition) == taskId) { sc.foreach(_.dagScheduler.stageFailed(stage, s"Authorized committer " + s"(attemptNumber=$attemptNumber, stage=$stage, partition=$partition) failed; " + - s"but task commit success, data duplication may happen.")) + s"but task commit success, data duplication may happen. " + + s"reason=$reason")) } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala index 66b13be4f7a..45da750768f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala @@ -51,7 +51,8 @@ class OutputCommitCoordinatorIntegrationSuite sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out") } }.getCause.getMessage - assert(e.endsWith("failed; but task commit success, data duplication may happen.")) + assert(e.contains("failed; but task commit success, data duplication may happen.") && + e.contains("Intentional exception")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index a06ddc1b9e9..5a8f4563756 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1202,15 +1202,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession val m1 = intercept[SparkException] { spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath) }.getCause.getMessage - // SPARK-39622: The current case must handle the `TaskSetFailed` event before SPARK-39195 - // due to `maxTaskFailures` is 1 when local mode. After SPARK-39195, it may handle to one - // of the `TaskSetFailed` event and `StageFailed` event, and the execution order of the - // two events is uncertain, so the assertion of `Authorized committer (attemptNumber=n, - // stage=s, partition=p) failed; but task commit success, data duplication may happen.` - // is added for workaround. - assert(m1.contains("Intentional exception for testing purposes") || - (m1.contains("Authorized committer") && - m1.contains("failed; but task commit success, data duplication may happen."))) + assert(m1.contains("Intentional exception for testing purposes")) } withTempPath { dir => @@ -1219,9 +1211,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession .coalesce(1) df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath) }.getCause.getMessage - assert(m2.contains("Intentional exception for testing purposes") || - (m2.contains("Authorized committer") && - m2.contains("failed; but task commit success, data duplication may happen."))) + assert(m2.contains("Intentional exception for testing purposes")) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org