spark git commit: [SPARK-21376][YARN] Fix yarn client token expire issue when cleaning the staging files in long running scenario

2017-07-14 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 cf0719b5e -> bfe3ba869


[SPARK-21376][YARN] Fix yarn client token expire issue when cleaning the 
staging files in long running scenario

## What changes were proposed in this pull request?

This issue happens in long running application with yarn cluster mode, because 
yarn#client doesn't sync token with AM, so it will always keep the initial 
token, this token may be expired in the long running scenario, so when 
yarn#client tries to clean up staging directory after application finished, it 
will use this expired token and meet token expire issue.

## How was this patch tested?

Manual verification is secure cluster.

Author: jerryshao 

Closes #18617 from jerryshao/SPARK-21376.

(cherry picked from commit cb8d5cc90ff8d3c991ff33da41b136ab7634f71b)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bfe3ba86
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bfe3ba86
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bfe3ba86

Branch: refs/heads/branch-2.2
Commit: bfe3ba86936ffaabff9f89d03018eb368d246b4d
Parents: cf0719b
Author: jerryshao 
Authored: Thu Jul 13 15:25:38 2017 -0700
Committer: Marcelo Vanzin 
Committed: Fri Jul 14 10:51:28 2017 -0700

--
 .../org/apache/spark/deploy/yarn/Client.scala   | 35 +++-
 1 file changed, 26 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bfe3ba86/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
--
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1fb7edf..7e39c08 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,6 +21,7 @@ import java.io.{File, FileOutputStream, IOException, 
OutputStreamWriter}
 import java.net.{InetAddress, UnknownHostException, URI}
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
+import java.security.PrivilegedExceptionAction
 import java.util.{Locale, Properties, UUID}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
@@ -189,16 +190,32 @@ private[spark] class Client(
* Cleanup application staging directory.
*/
   private def cleanupStagingDir(appId: ApplicationId): Unit = {
-val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
-try {
-  val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
-  val fs = stagingDirPath.getFileSystem(hadoopConf)
-  if (!preserveFiles && fs.delete(stagingDirPath, true)) {
-logInfo(s"Deleted staging directory $stagingDirPath")
+if (sparkConf.get(PRESERVE_STAGING_FILES)) {
+  return
+}
+
+def cleanupStagingDirInternal(): Unit = {
+  val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
+  try {
+val fs = stagingDirPath.getFileSystem(hadoopConf)
+if (fs.delete(stagingDirPath, true)) {
+  logInfo(s"Deleted staging directory $stagingDirPath")
+}
+  } catch {
+case ioe: IOException =>
+  logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
   }
-} catch {
-  case ioe: IOException =>
-logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
+}
+
+if (isClusterMode && principal != null && keytab != null) {
+  val newUgi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
+  newUgi.doAs(new PrivilegedExceptionAction[Unit] {
+override def run(): Unit = {
+  cleanupStagingDirInternal()
+}
+  })
+} else {
+  cleanupStagingDirInternal()
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21376][YARN] Fix yarn client token expire issue when cleaning the staging files in long running scenario

2017-07-13 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 5c8edfc4a -> cb8d5cc90


[SPARK-21376][YARN] Fix yarn client token expire issue when cleaning the 
staging files in long running scenario

## What changes were proposed in this pull request?

This issue happens in long running application with yarn cluster mode, because 
yarn#client doesn't sync token with AM, so it will always keep the initial 
token, this token may be expired in the long running scenario, so when 
yarn#client tries to clean up staging directory after application finished, it 
will use this expired token and meet token expire issue.

## How was this patch tested?

Manual verification is secure cluster.

Author: jerryshao 

Closes #18617 from jerryshao/SPARK-21376.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb8d5cc9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb8d5cc9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb8d5cc9

Branch: refs/heads/master
Commit: cb8d5cc90ff8d3c991ff33da41b136ab7634f71b
Parents: 5c8edfc
Author: jerryshao 
Authored: Thu Jul 13 15:25:38 2017 -0700
Committer: Marcelo Vanzin 
Committed: Thu Jul 13 15:25:38 2017 -0700

--
 .../org/apache/spark/deploy/yarn/Client.scala   | 35 +++-
 1 file changed, 26 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cb8d5cc9/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
--
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 7caaa91..a5b0e19 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,6 +21,7 @@ import java.io.{File, FileOutputStream, IOException, 
OutputStreamWriter}
 import java.net.{InetAddress, UnknownHostException, URI}
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
+import java.security.PrivilegedExceptionAction
 import java.util.{Locale, Properties, UUID}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
@@ -192,16 +193,32 @@ private[spark] class Client(
* Cleanup application staging directory.
*/
   private def cleanupStagingDir(appId: ApplicationId): Unit = {
-val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
-try {
-  val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
-  val fs = stagingDirPath.getFileSystem(hadoopConf)
-  if (!preserveFiles && fs.delete(stagingDirPath, true)) {
-logInfo(s"Deleted staging directory $stagingDirPath")
+if (sparkConf.get(PRESERVE_STAGING_FILES)) {
+  return
+}
+
+def cleanupStagingDirInternal(): Unit = {
+  val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
+  try {
+val fs = stagingDirPath.getFileSystem(hadoopConf)
+if (fs.delete(stagingDirPath, true)) {
+  logInfo(s"Deleted staging directory $stagingDirPath")
+}
+  } catch {
+case ioe: IOException =>
+  logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
   }
-} catch {
-  case ioe: IOException =>
-logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
+}
+
+if (isClusterMode && principal != null && keytab != null) {
+  val newUgi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
+  newUgi.doAs(new PrivilegedExceptionAction[Unit] {
+override def run(): Unit = {
+  cleanupStagingDirInternal()
+}
+  })
+} else {
+  cleanupStagingDirInternal()
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org