cameronlee314 commented on a change in pull request #1172: SAMZA-2332: [AM
isolation] YarnJob should pass new command and additional environment variables
for AM deployment
URL: https://github.com/apache/samza/pull/1172#discussion_r332789135
##########
File path: samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
##########
@@ -200,3 +167,63 @@ class YarnJob(config: Config, hadoopConfig:
Configuration) extends StreamJob {
}
}
}
+
+object YarnJob extends Logging {
+ /**
+ * Build the environment variable map for the application master execution.
+ * Passing multiple separate config objects so that they can be reused for
other logic.
+ */
+ @VisibleForTesting
+ private[yarn] def buildEnvironment(config: Config, yarnConfig: YarnConfig,
+ jobConfig: JobConfig): Map[String, String] = {
+ val coordinatorSystemConfig =
CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)
+ val envMapBuilder = Map.newBuilder[String, String]
+ envMapBuilder += ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG ->
+
Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig))
+ envMapBuilder += ShellCommandConfig.ENV_JAVA_OPTS ->
Util.envVarEscape(yarnConfig.getAmOpts)
+ val applicationMasterIsolationEnabled =
jobConfig.getApplicationMasterIsolationEnabled
+ envMapBuilder +=
ShellCommandConfig.ENV_APPLICATION_MASTER_ISOLATION_ENABLED ->
+ Util.envVarEscape(Boolean.toString(applicationMasterIsolationEnabled))
+ if (applicationMasterIsolationEnabled) {
+ // if AM isolation is enabled, then need to specify where the
application lib directory is
+ envMapBuilder += ShellCommandConfig.ENV_APPLICATION_LIB_DIR ->
+ Util.envVarEscape(String.format("./%s/lib",
IsolationUtils.APPLICATION_MASTER_APPLICATION_DIRECTORY))
+ }
+ Option.apply(yarnConfig.getAMJavaHome).foreach {
+ amJavaHome => envMapBuilder += ShellCommandConfig.ENV_JAVA_HOME ->
amJavaHome
+ }
+ envMapBuilder.result()
+ }
+
+ /**
+ * Build the command for the application master execution.
+ * Passing multiple separate config objects so that they can be reused in
other places.
+ */
+ @VisibleForTesting
+ private[yarn] def buildAmCmd(config: Config, jobConfig: JobConfig): String =
{
+ // figure out if we have framework is deployed into a separate location
+ val fwkPath = config.get(JobConfig.SAMZA_FWK_PATH, "")
Review comment:
It is not relevant. I will put out a separate PR to remove the old code and
then apply this change on top of that.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services