This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new d745e7d  SAMZA-2509: New config name to represent if split deployment 
feature is enabled (#1342)
d745e7d is described below

commit d745e7d1cbd051002b30bfe99ae924d48ff96eaa
Author: Alan Zhang <[email protected]>
AuthorDate: Thu Apr 16 13:48:29 2020 -0700

    SAMZA-2509: New config name to represent if split deployment feature is 
enabled (#1342)
    
    API changes: Changed 
"samza.cluster.based.job.coordinator.dependency.isolation.enabled" to 
"job.split.deployment.enabled" for enabling split deployment.
---
 .../clustermanager/ClusterBasedJobCoordinator.java |  2 +-
 .../java/org/apache/samza/config/JobConfig.java    |  7 +++---
 .../apache/samza/config/ShellCommandConfig.java    | 29 +++++++++++-----------
 .../org/apache/samza/config/TestJobConfig.java     | 12 ++++-----
 .../scala/org/apache/samza/job/yarn/YarnJob.scala  | 12 ++++-----
 .../org/apache/samza/job/yarn/TestYarnJob.java     | 25 +++++++++----------
 6 files changed, 40 insertions(+), 47 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 0cb80b2..0bd66dc 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -465,7 +465,7 @@ public class ClusterBasedJobCoordinator {
    */
   public static void main(String[] args) {
     boolean dependencyIsolationEnabled = Boolean.parseBoolean(
-        
System.getenv(ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED));
+        System.getenv(ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED));
     Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
         LOG.error("Uncaught exception in ClusterBasedJobCoordinator::main. 
Exiting job coordinator", exception);
         System.exit(1);
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index dff2991..93b4b60 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -143,8 +143,7 @@ public class JobConfig extends MapConfig {
   public static final String COORDINATOR_STREAM_FACTORY = 
"job.coordinatorstream.config.factory";
   public static final String DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY = 
"org.apache.samza.util.DefaultCoordinatorStreamConfigFactory";
 
-  public static final String 
CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED =
-      "samza.cluster.based.job.coordinator.dependency.isolation.enabled";
+  public static final String JOB_SPLIT_DEPLOYMENT_ENABLED = 
"job.split.deployment.enabled";
 
   private static final String JOB_STARTPOINT_ENABLED = 
"job.startpoint.enabled";
 
@@ -372,8 +371,8 @@ public class JobConfig extends MapConfig {
     return getStandbyTaskReplicationFactor() > 1;
   }
 
-  public boolean getClusterBasedJobCoordinatorDependencyIsolationEnabled() {
-    return 
getBoolean(CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED, false);
+  public boolean isSplitDeploymentEnabled() {
+    return getBoolean(JOB_SPLIT_DEPLOYMENT_ENABLED, false);
   }
 
   /**
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
index 668c40a..73093a8 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
@@ -58,26 +58,25 @@ public class ShellCommandConfig extends MapConfig {
   public static final String ENV_EXECUTION_ENV_CONTAINER_ID = 
"EXECUTION_ENV_CONTAINER_ID";
 
   /**
-   * Set to "true" if cluster-based job coordinator dependency isolation is 
enabled. Otherwise, will be considered
-   * false.
+   * Set to "true" if split deployment feature is enabled. Otherwise, will be 
considered false.
    *
-   * The launch process for the cluster-based job coordinator depends on the 
value of this, since it needs to be known
-   * if the cluster-based job coordinator should be launched in an isolated 
mode. This needs to be an environment
-   * variable, because the value needs to be known before the full configs can 
be read from the metadata store (full
-   * configs are only read after launch is complete).
+   * The launch process for the cluster-based job coordinator and job 
container depends on the value of this, since it
+   * needs to be known if the cluster-based job coordinator and job container 
should be launched in a split deployment
+   * mode.
+   * This needs to be an environment variable, because the value needs to be 
known before the full configs can be read
+   * from the metadata store (full configs are only read after launch is 
complete).
    */
-  public static final String 
ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED =
-      "CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED";
+  public static final String ENV_SPLIT_DEPLOYMENT_ENABLED = 
"ENV_SPLIT_DEPLOYMENT_ENABLED";
 
   /**
-   * When running the cluster-based job coordinator in an isolated mode, it 
uses JARs and resources from a lib directory
-   * which is provided by the framework. In some cases, it is necessary to use 
some resources specified by the
-   * application as well. This environment variable can be set to a directory 
which is different from the framework lib
-   * directory in order to tell Samza where application resources live.
-   * This is an environment variable because it is needed in order to launch 
the cluster-based job coordinator Java
-   * process, which means access to full configs is not available yet.
+   * When running the cluster-based job coordinator and job container in a 
split deployment mode, it uses JARs and
+   * resources from a lib directory which is provided by the framework. In 
some cases, it is necessary to use some
+   * resources specified by the application as well. This environment variable 
can be set to a directory which is
+   * different from the framework lib directory in order to tell Samza where 
application resources live.
+   * This is an environment variable because it is needed in order to launch 
the cluster-based job coordinator and job
+   * container Java processes, which means access to full configs is not 
available yet.
    * For example, this is used to set a system property for the location of an 
application-specified log4j configuration
-   * file when launching the cluster-based job coordinator Java process.
+   * file when launching the cluster-based job coordinator and job container 
Java processes.
    */
   public static final String ENV_APPLICATION_LIB_DIR = "APPLICATION_LIB_DIR";
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index abe6dfa..00c2d5e 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -547,15 +547,13 @@ public class TestJobConfig {
 
   @Test
   public void testGetClusterBasedJobCoordinatorDependencyIsolationEnabled() {
-    Config config =
-        new 
MapConfig(ImmutableMap.of(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
 "true"));
-    assertTrue(new 
JobConfig(config).getClusterBasedJobCoordinatorDependencyIsolationEnabled());
+    Config config = new 
MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true"));
+    assertTrue(new JobConfig(config).isSplitDeploymentEnabled());
 
-    config =
-        new 
MapConfig(ImmutableMap.of(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
 "false"));
-    assertFalse(new 
JobConfig(config).getClusterBasedJobCoordinatorDependencyIsolationEnabled());
+    config = new 
MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false"));
+    assertFalse(new JobConfig(config).isSplitDeploymentEnabled());
 
-    assertFalse(new JobConfig(new 
MapConfig()).getClusterBasedJobCoordinatorDependencyIsolationEnabled());
+    assertFalse(new JobConfig(new MapConfig()).isSplitDeploymentEnabled());
   }
 
   @Test
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 43e6a7c..ca681f5 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -184,12 +184,10 @@ object YarnJob extends Logging {
         
Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig))
     }
     envMapBuilder += ShellCommandConfig.ENV_JAVA_OPTS -> 
Util.envVarEscape(yarnConfig.getAmOpts)
-    val clusterBasedJobCoordinatorDependencyIsolationEnabled =
-      jobConfig.getClusterBasedJobCoordinatorDependencyIsolationEnabled
-    envMapBuilder += 
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED
 ->
-      
Util.envVarEscape(Boolean.toString(clusterBasedJobCoordinatorDependencyIsolationEnabled))
-    if (clusterBasedJobCoordinatorDependencyIsolationEnabled) {
-      // dependency isolation is enabled, so need to specify where the 
application lib directory is for app resources
+    val splitDeploymentEnabled = jobConfig.isSplitDeploymentEnabled
+    envMapBuilder += ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED -> 
Util.envVarEscape(Boolean.toString(splitDeploymentEnabled))
+    if (splitDeploymentEnabled) {
+      //split deployment is enabled, so need to specify where the application 
lib directory is for app resources
       envMapBuilder += ShellCommandConfig.ENV_APPLICATION_LIB_DIR ->
         Util.envVarEscape(String.format("./%s/lib", 
DependencyIsolationUtils.APPLICATION_DIRECTORY))
     }
@@ -206,7 +204,7 @@ object YarnJob extends Logging {
   @VisibleForTesting
   private[yarn] def buildJobCoordinatorCmd(config: Config, jobConfig: 
JobConfig): String = {
     var cmdExec = "./__package/bin/run-jc.sh" // default location
-    if (jobConfig.getClusterBasedJobCoordinatorDependencyIsolationEnabled) {
+    if (jobConfig.isSplitDeploymentEnabled) {
       cmdExec = "./%s/bin/run-jc.sh" format 
DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY
       logger.info("Using isolated cluster-based job coordinator path: %s" 
format cmdExec)
     }
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
index 7961360..4858d76 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
@@ -43,12 +43,11 @@ public class TestYarnJob {
     Config config = new MapConfig();
     assertEquals("./__package/bin/run-jc.sh", 
YarnJob$.MODULE$.buildJobCoordinatorCmd(config, new JobConfig(config)));
 
-    // cluster-based job coordinator dependency isolation is enabled; use 
script from framework infrastructure directory
-    Config configJobCoordinatorDependencyIsolationEnabled =
-        new 
MapConfig(ImmutableMap.of(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
 "true"));
+    // split deployment is enabled; use script from framework infrastructure 
directory
+    Config splitDeploymentEnabled =
+        new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, 
"true"));
     assertEquals(String.format("./%s/bin/run-jc.sh", 
DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY),
-        
YarnJob$.MODULE$.buildJobCoordinatorCmd(configJobCoordinatorDependencyIsolationEnabled,
-            new JobConfig(configJobCoordinatorDependencyIsolationEnabled)));
+        YarnJob$.MODULE$.buildJobCoordinatorCmd(splitDeploymentEnabled, new 
JobConfig(splitDeploymentEnabled)));
   }
 
   @Test
@@ -59,14 +58,14 @@ public class TestYarnJob {
         .put(JobConfig.JOB_ID, "jobId")
         .put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
         .put(YarnConfig.AM_JVM_OPTIONS, amJvmOptions) // needs escaping
-        
.put(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED, 
"false")
+        .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false")
         .build());
     String expectedCoordinatorStreamConfigStringValue = 
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
         
.writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
     Map<String, String> expected = ImmutableMap.of(
         ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, 
expectedCoordinatorStreamConfigStringValue,
         ShellCommandConfig.ENV_JAVA_OPTS, Util.envVarEscape(amJvmOptions),
-        
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
 "false");
+        ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "false");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
         YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new 
JobConfig(config))).asJava());
   }
@@ -78,14 +77,14 @@ public class TestYarnJob {
         .put(JobConfig.JOB_ID, "jobId")
         .put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
         .put(YarnConfig.AM_JVM_OPTIONS, "")
-        
.put(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED, 
"true")
+        .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
         .build());
     String expectedCoordinatorStreamConfigStringValue = 
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
         
.writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
     Map<String, String> expected = ImmutableMap.of(
         ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, 
expectedCoordinatorStreamConfigStringValue,
         ShellCommandConfig.ENV_JAVA_OPTS, "",
-        
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
 "true",
+        ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
         ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
         YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new 
JobConfig(config))).asJava());
@@ -98,7 +97,7 @@ public class TestYarnJob {
         .put(JobConfig.JOB_ID, "jobId")
         .put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
         .put(YarnConfig.AM_JVM_OPTIONS, "")
-        
.put(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED, 
"false")
+        .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false")
         .put(YarnConfig.AM_JAVA_HOME, "/some/path/to/java/home")
         .build());
     String expectedCoordinatorStreamConfigStringValue = 
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
@@ -106,7 +105,7 @@ public class TestYarnJob {
     Map<String, String> expected = ImmutableMap.of(
         ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, 
expectedCoordinatorStreamConfigStringValue,
         ShellCommandConfig.ENV_JAVA_OPTS, "",
-        
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
 "false",
+        ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "false",
         ShellCommandConfig.ENV_JAVA_HOME, "/some/path/to/java/home");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
         YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new 
JobConfig(config))).asJava());
@@ -119,14 +118,14 @@ public class TestYarnJob {
         .put(JobConfig.JOB_ID, "jobId")
         .put(JobConfig.CONFIG_LOADER_FACTORY, 
"org.apache.samza.config.loaders.PropertiesConfigLoaderFactory")
         .put(YarnConfig.AM_JVM_OPTIONS, "")
-        
.put(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED, 
"true")
+        .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
         .build());
     String expectedSubmissionConfig = 
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
         .writeValueAsString(config));
     Map<String, String> expected = ImmutableMap.of(
         ShellCommandConfig.ENV_SUBMISSION_CONFIG, expectedSubmissionConfig,
         ShellCommandConfig.ENV_JAVA_OPTS, "",
-        
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
 "true",
+        ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
         ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib");
     assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
         YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new 
JobConfig(config))).asJava());

Reply via email to