Repository: spark
Updated Branches:
  refs/heads/branch-2.2 cf0719b5e -> bfe3ba869


[SPARK-21376][YARN] Fix yarn client token expire issue when cleaning the 
staging files in long running scenario

## What changes were proposed in this pull request?

This issue happens in long running application with yarn cluster mode, because 
yarn#client doesn't sync token with AM, so it will always keep the initial 
token, this token may be expired in the long running scenario, so when 
yarn#client tries to clean up staging directory after application finished, it 
will use this expired token and meet token expire issue.

## How was this patch tested?

Manual verification is secure cluster.

Author: jerryshao <ss...@hortonworks.com>

Closes #18617 from jerryshao/SPARK-21376.

(cherry picked from commit cb8d5cc90ff8d3c991ff33da41b136ab7634f71b)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bfe3ba86
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bfe3ba86
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bfe3ba86

Branch: refs/heads/branch-2.2
Commit: bfe3ba86936ffaabff9f89d03018eb368d246b4d
Parents: cf0719b
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Jul 13 15:25:38 2017 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Fri Jul 14 10:51:28 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 35 +++++++++++++++-----
 1 file changed, 26 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bfe3ba86/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1fb7edf..7e39c08 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,6 +21,7 @@ import java.io.{File, FileOutputStream, IOException, 
OutputStreamWriter}
 import java.net.{InetAddress, UnknownHostException, URI}
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
+import java.security.PrivilegedExceptionAction
 import java.util.{Locale, Properties, UUID}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
@@ -189,16 +190,32 @@ private[spark] class Client(
    * Cleanup application staging directory.
    */
   private def cleanupStagingDir(appId: ApplicationId): Unit = {
-    val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
-    try {
-      val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
-      val fs = stagingDirPath.getFileSystem(hadoopConf)
-      if (!preserveFiles && fs.delete(stagingDirPath, true)) {
-        logInfo(s"Deleted staging directory $stagingDirPath")
+    if (sparkConf.get(PRESERVE_STAGING_FILES)) {
+      return
+    }
+
+    def cleanupStagingDirInternal(): Unit = {
+      val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
+      try {
+        val fs = stagingDirPath.getFileSystem(hadoopConf)
+        if (fs.delete(stagingDirPath, true)) {
+          logInfo(s"Deleted staging directory $stagingDirPath")
+        }
+      } catch {
+        case ioe: IOException =>
+          logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
       }
-    } catch {
-      case ioe: IOException =>
-        logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
+    }
+
+    if (isClusterMode && principal != null && keytab != null) {
+      val newUgi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
+      newUgi.doAs(new PrivilegedExceptionAction[Unit] {
+        override def run(): Unit = {
+          cleanupStagingDirInternal()
+        }
+      })
+    } else {
+      cleanupStagingDirInternal()
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to