xuanyuanking commented on a change in pull request #25620: [SPARK-25341][Core] Support rolling back a shuffle map stage and re-generate the shuffle files URL: https://github.com/apache/spark/pull/25620#discussion_r324665868
########## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ########## @@ -1607,11 +1616,18 @@ private[spark] class DAGScheduler( case mapStage: ShuffleMapStage => val numMissingPartitions = mapStage.findMissingPartitions().length if (numMissingPartitions < mapStage.numTasks) { - // TODO: support to rollback shuffle files. - // Currently the shuffle writing is "first write wins", so we can't re-run a - // shuffle map stage and overwrite existing shuffle files. We have to finish - // SPARK-8029 first. - abortStage(mapStage, generateErrorMessage(mapStage), None) + if (sc.getConf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { + val reason = "A shuffle map stage with indeterminate output was failed " + + "and retried. However, Spark can only do this while using the new " + + "shuffle block fetching protocol. Please check the config " + + "'spark.shuffle.useOldFetchProtocol', see more detail in " + + "SPARK-27665 and SPARK-25341." + abortStage(mapStage, reason, None) + } else { + logInfo(s"The indeterminate stage $mapStage will be resubmitted," + + " the stage self and all indeterminate parent stage will be" + + " rollback and whole stage rerun.") Review comment: Yeah, the answer for both questions is yes. Just log once done in aa3a409. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org