[ 
https://issues.apache.org/jira/browse/AMATERASU-26?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491558#comment-16491558
 ] 

ASF GitHub Bot commented on AMATERASU-26:
-----------------------------------------

eyalbenivri closed pull request #18: AMATERASU-26 Pipeline tasks runs as "yarn" 
user instead of inheriting…
URL: https://github.com/apache/incubator-amaterasu/pull/18
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java 
b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
index dc4f15e..e3c2812 100644
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
+++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -110,8 +111,9 @@ private void run(JobOpts opts, String[] args) throws 
Exception {
 
 
         List<String> commands = Collections.singletonList(
-                "env AMA_NODE=" + System.getenv("AMA_NODE") + " " +
-                        "$JAVA_HOME/bin/java" +
+                "env AMA_NODE=" + System.getenv("AMA_NODE") +
+                        " env HADOOP_USER_NAME=" + 
UserGroupInformation.getCurrentUser().getUserName() +
+                        " $JAVA_HOME/bin/java" +
                         " -Dscala.usejavacp=false" +
                         " -Xmx1G" +
                         " org.apache.amaterasu.leader.yarn.ApplicationMaster " 
+
diff --git 
a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
 
b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
index a44202a..1828100 100644
--- 
a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
+++ 
b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
@@ -21,8 +21,8 @@ import java.net.{InetAddress, ServerSocket, URLEncoder}
 import java.nio.ByteBuffer
 import java.util
 import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
-import javax.jms.Session
 
+import javax.jms.Session
 import org.apache.activemq.ActiveMQConnectionFactory
 import org.apache.activemq.broker.BrokerService
 import org.apache.amaterasu.common.configuration.ClusterConfig
@@ -153,21 +153,14 @@ class ApplicationMaster extends 
AMRMClientAsync.CallbackHandler with Logging {
     // TODO: awsEnv currently set to empty string. should be changed to read 
values from (where?).
     allocListener = new YarnRMCallbackHandler(nmClient, jobManager, env, 
awsEnv = "", config, executorJar)
 
-    rmClient = AMRMClientAsync.createAMRMClientAsync(1000, this)
-    rmClient.init(conf)
-    rmClient.start()
-
-    // Register with ResourceManager
-    log.info("Registering application")
-    val registrationResponse = rmClient.registerApplicationMaster("", 0, "")
-    log.info("Registered application")
+    rmClient = startRMClient()
+    val registrationResponse = registerAppMaster("", 0, "")
     val maxMem = registrationResponse.getMaximumResourceCapability.getMemory
     log.info("Max mem capability of resources in this cluster " + maxMem)
     val maxVCores = 
registrationResponse.getMaximumResourceCapability.getVirtualCores
     log.info("Max vcores capability of resources in this cluster " + maxVCores)
     log.info(s"Created jobManager. jobManager.registeredActions.size: 
${jobManager.registeredActions.size}")
 
-
     // Resource requirements for worker containers
     this.capability = Records.newRecord(classOf[Resource])
     val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
@@ -194,6 +187,21 @@ class ApplicationMaster extends 
AMRMClientAsync.CallbackHandler with Logging {
     log.info("Finished asking for containers")
   }
 
+  private def startRMClient(): AMRMClientAsync[ContainerRequest] = {
+    val client = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](1000, 
this)
+    client.init(conf)
+    client.start()
+    client
+  }
+
+  private def registerAppMaster(host: String, port: Int, url: String) = {
+    // Register with ResourceManager
+    log.info("Registering application")
+    val registrationResponse = rmClient.registerApplicationMaster(host, port, 
url)
+    log.info("Registered application")
+    registrationResponse
+  }
+
   private def setupMessaging(jobId: String): Unit = {
 
     val cf = new ActiveMQConnectionFactory(address)
@@ -225,20 +233,6 @@ class ApplicationMaster extends 
AMRMClientAsync.CallbackHandler with Logging {
 
   override def onContainersAllocated(containers: util.List[Container]): Unit = 
{
 
-    // creating the credentials for container execution
-    val credentials = UserGroupInformation.getCurrentUser.getCredentials
-    val dob = new DataOutputBuffer
-    credentials.writeTokenStorageToStream(dob)
-
-    // removing the AM->RM token so that containers cannot access it.
-    val iter = credentials.getAllTokens.iterator
-    log.info("Executing with tokens:")
-    for (token <- iter) {
-      log.info(token.toString)
-      if (token.getKind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
-    }
-    val allTokens = ByteBuffer.wrap(dob.getData, 0, dob.getLength)
-
     log.info(s"${containers.size()} Containers allocated")
     for (container <- containers.asScala) { // Launch container by create 
ContainerLaunchContext
       if (actionsBuffer.isEmpty) {
@@ -294,7 +288,8 @@ class ApplicationMaster extends 
AMRMClientAsync.CallbackHandler with Logging {
         ctx.setEnvironment(Map[String, String](
           "HADOOP_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
           "YARN_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
-          "AMA_NODE" -> sys.env("AMA_NODE")
+          "AMA_NODE" -> sys.env("AMA_NODE"),
+          "HADOOP_USER_NAME" -> UserGroupInformation.getCurrentUser.getUserName
         ))
 
         log.info(s"hadoop conf dir is ${config.YARN.hadoopHomeDir}/conf/")
@@ -316,6 +311,22 @@ class ApplicationMaster extends 
AMRMClientAsync.CallbackHandler with Logging {
     }
   }
 
+  private def allTokens: ByteBuffer = {
+    // creating the credentials for container execution
+    val credentials = UserGroupInformation.getCurrentUser.getCredentials
+    val dob = new DataOutputBuffer
+    credentials.writeTokenStorageToStream(dob)
+
+    // removing the AM->RM token so that containers cannot access it.
+    val iter = credentials.getAllTokens.iterator
+    log.info("Executing with tokens:")
+    for (token <- iter) {
+      log.info(token.toString)
+      if (token.getKind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
+    }
+    ByteBuffer.wrap(dob.getData, 0, dob.getLength)
+  }
+
   private def setupResources(frameworkPath: String, countainerResources: 
mutable.Map[String, LocalResource], resourcesPath: String): Unit = {
 
     val sourcePath = Path.mergePaths(jarPath, new Path(s"/$resourcesPath"))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pipeline tasks (sub-Yarn jobs) runs as "yarn" user instead of inhering the 
> user in which the amaterasu job was submitted
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMATERASU-26
>                 URL: https://issues.apache.org/jira/browse/AMATERASU-26
>             Project: AMATERASU
>          Issue Type: Improvement
>            Reporter: Arun Manivannan
>            Assignee: Arun Manivannan
>            Priority: Major
>             Fix For: 0.2.1-incubating
>
>         Attachments: TaskJobsRunAsYarnUser.png
>
>
> Referring to the screenshot, the original user with which the amaterasu job 
> was submitted was username "amaterasu".  However, the sub jobs of the 
> pipeline gets submitted with the default user "yarn".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to