This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d62c18b7497 [SPARK-41313][SPARK-3900][SPARK-21138] Combine fixes for and d62c18b7497 is described below commit d62c18b7497997188ec587e1eb62e75c979c1c93 Author: Xing Lin <xing...@linkedin.com> AuthorDate: Sun Dec 4 08:24:37 2022 -0600 [SPARK-41313][SPARK-3900][SPARK-21138] Combine fixes for and ### What changes were proposed in this pull request? spark-3900 fixed the illegalStateException in cleanupStagingDir in ApplicationMaster's shutdownhook. However, spark-21138 accidentally reverted/undid that change when fixing the "Wrong FS" bug. Now, we are seeing spark-3900 reported by our users at Linkedin. We need to bring back the fix for spark-3900. The illegalStateException when creating a new filesystem object is due to the limitation in hadoop that we can not register a shutdownhook during shutdown. So, when a spark job fails during pre-launch, as part of shutdown, cleanupStagingDir would be called. Then, if we attempt to create a new filesystem object for the first time, hadoop would try to register a hook to shutdown KeyProviderCache when creating a ClientContext for DFSClient. As a result, we hit the illegalStateException. [...] ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Closes #38832 from xinglin/SPARK-41313. Authored-by: Xing Lin <xing...@linkedin.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index a7676fe24f6..69dd72720a5 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -240,6 +240,9 @@ private[spark] class ApplicationMaster( logInfo("ApplicationAttemptId: " + appAttemptId) + // During shutdown, we may not be able to create an FileSystem object. So, pre-create here. + val stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) + val stagingDirFs = stagingDirPath.getFileSystem(yarnConf) // This shutdown hook should run *after* the SparkContext is shut down. val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1 ShutdownHookManager.addShutdownHook(priority) { () => @@ -261,14 +264,14 @@ private[spark] class ApplicationMaster( if (!unregistered) { // we only want to unregister if we don't want the RM to retry if (isLastAttempt) { - cleanupStagingDir(new Path(System.getenv("SPARK_YARN_STAGING_DIR"))) + cleanupStagingDir(stagingDirFs, stagingDirPath) unregister(finalStatus, finalMsg) } else if (finalStatus == FinalApplicationStatus.SUCCEEDED) { // When it's not the last attempt, if unregister failed caused by timeout exception, // YARN will rerun the application, AM should not clean staging dir before unregister // success. unregister(finalStatus, finalMsg) - cleanupStagingDir(new Path(System.getenv("SPARK_YARN_STAGING_DIR"))) + cleanupStagingDir(stagingDirFs, stagingDirPath) } } } catch { @@ -686,11 +689,15 @@ private[spark] class ApplicationMaster( * Clean up the staging directory. */ private def cleanupStagingDir(stagingDirPath: Path): Unit = { + val stagingDirFs = stagingDirPath.getFileSystem(yarnConf) + cleanupStagingDir(stagingDirFs, stagingDirPath) + } + + private def cleanupStagingDir(fs: FileSystem, stagingDirPath: Path): Unit = { try { val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) if (!preserveFiles) { logInfo("Deleting staging directory " + stagingDirPath) - val fs = stagingDirPath.getFileSystem(yarnConf) fs.delete(stagingDirPath, true) } } catch { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org