This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2732980 [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed before call doMaterialize 2732980 is described below commit 27329806c36d0b403153fe1ad0077acb72d92606 Author: yi.wu <yi...@databricks.com> AuthorDate: Tue Mar 3 13:40:51 2020 -0800 [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed before call doMaterialize ### What changes were proposed in this pull request? This PR proposes to not cancel a `QueryStageExec` which failed before calling `doMaterialize`. Besides, this PR also includes 2 minor improvements: * fail fast when stage failed before calling `doMaterialize` * format Exception with Cause ### Why are the changes needed? For a stage which failed before materializing the lazy value (e.g. `inputRDD`), calling `cancel` on it could re-trigger the same failure again, e.g. executing child node again(see `AdaptiveQueryExecSuite`.`SPARK-30291: AQE should catch the exceptions when doing materialize` for example). And finally, the same failure will be counted 2 times, one is for materialize error and another is for cancel error. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Updated test. Closes #27752 from Ngone51/avoid_cancel_finished_stage. Authored-by: yi.wu <yi...@databricks.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> (cherry picked from commit 380e8876316d6ef5a74358be2a04ab20e8b6e7ca) Signed-off-by: gatorsmile <gatorsm...@gmail.com> --- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 23 +++++++++++++--------- .../adaptive/AdaptiveQueryExecSuite.scala | 3 ++- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 4036424..c018ca4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -165,7 +165,7 @@ case class AdaptiveSparkPlanExec( stagesToReplace = result.newStages ++ stagesToReplace executionId.foreach(onUpdatePlan) - // Start materialization of all new stages. + // Start materialization of all new stages and fail fast if any stages failed eagerly result.newStages.foreach { stage => try { stage.materialize().onComplete { res => @@ -176,7 +176,10 @@ case class AdaptiveSparkPlanExec( } }(AdaptiveSparkPlanExec.executionContext) } catch { - case e: Throwable => events.offer(StageFailure(stage, e)) + case e: Throwable => + val ex = new SparkException( + s"Early failed query stage found: ${stage.treeString}", e) + cleanUpAndThrowException(Seq(ex), Some(stage.id)) } } } @@ -192,13 +195,12 @@ case class AdaptiveSparkPlanExec( stage.resultOption = Some(res) case StageFailure(stage, ex) => errors.append( - new SparkException(s"Failed to materialize query stage: ${stage.treeString}." + - s" and the cause is ${ex.getMessage}", ex)) + new SparkException(s"Failed to materialize query stage: ${stage.treeString}.", ex)) } // In case of errors, we cancel all running stages and throw exception. if (errors.nonEmpty) { - cleanUpAndThrowException(errors) + cleanUpAndThrowException(errors, None) } // Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less @@ -522,9 +524,13 @@ case class AdaptiveSparkPlanExec( * Cancel all running stages with best effort and throw an Exception containing all stage * materialization errors and stage cancellation errors. */ - private def cleanUpAndThrowException(errors: Seq[SparkException]): Unit = { + private def cleanUpAndThrowException( + errors: Seq[SparkException], + earlyFailedStage: Option[Int]): Unit = { val runningStages = currentPhysicalPlan.collect { - case s: QueryStageExec => s + // earlyFailedStage is the stage which failed before calling doMaterialize, + // so we should avoid calling cancel on it to re-trigger the failure again. + case s: QueryStageExec if !earlyFailedStage.contains(s.id) => s } val cancelErrors = new mutable.ArrayBuffer[SparkException]() try { @@ -539,8 +545,7 @@ case class AdaptiveSparkPlanExec( } } finally { val ex = new SparkException( - "Adaptive execution failed due to stage materialization failures." + - s" and the cause is ${errors.head.getMessage}", errors.head) + "Adaptive execution failed due to stage materialization failures.", errors.head) errors.tail.foreach(ex.addSuppressed) cancelErrors.foreach(ex.addSuppressed) throw ex diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 17f6b29..500b6cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -692,7 +692,8 @@ class AdaptiveQueryExecSuite val error = intercept[Exception] { agged.count() } - assert(error.getCause().toString contains "Failed to materialize query stage") + assert(error.getCause().toString contains "Early failed query stage found") + assert(error.getSuppressed.size === 0) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org