This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new d745e7d SAMZA-2509: New config name to represent if split deployment
feature is enabled (#1342)
d745e7d is described below
commit d745e7d1cbd051002b30bfe99ae924d48ff96eaa
Author: Alan Zhang <[email protected]>
AuthorDate: Thu Apr 16 13:48:29 2020 -0700
SAMZA-2509: New config name to represent if split deployment feature is
enabled (#1342)
API changes: Changed
"samza.cluster.based.job.coordinator.dependency.isolation.enabled" to
"job.split.deployment.enabled" for enabling split deployment.
---
.../clustermanager/ClusterBasedJobCoordinator.java | 2 +-
.../java/org/apache/samza/config/JobConfig.java | 7 +++---
.../apache/samza/config/ShellCommandConfig.java | 29 +++++++++++-----------
.../org/apache/samza/config/TestJobConfig.java | 12 ++++-----
.../scala/org/apache/samza/job/yarn/YarnJob.scala | 12 ++++-----
.../org/apache/samza/job/yarn/TestYarnJob.java | 25 +++++++++----------
6 files changed, 40 insertions(+), 47 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 0cb80b2..0bd66dc 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -465,7 +465,7 @@ public class ClusterBasedJobCoordinator {
*/
public static void main(String[] args) {
boolean dependencyIsolationEnabled = Boolean.parseBoolean(
-
System.getenv(ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED));
+ System.getenv(ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED));
Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
LOG.error("Uncaught exception in ClusterBasedJobCoordinator::main.
Exiting job coordinator", exception);
System.exit(1);
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index dff2991..93b4b60 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -143,8 +143,7 @@ public class JobConfig extends MapConfig {
public static final String COORDINATOR_STREAM_FACTORY =
"job.coordinatorstream.config.factory";
public static final String DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY =
"org.apache.samza.util.DefaultCoordinatorStreamConfigFactory";
- public static final String
CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED =
- "samza.cluster.based.job.coordinator.dependency.isolation.enabled";
+ public static final String JOB_SPLIT_DEPLOYMENT_ENABLED =
"job.split.deployment.enabled";
private static final String JOB_STARTPOINT_ENABLED =
"job.startpoint.enabled";
@@ -372,8 +371,8 @@ public class JobConfig extends MapConfig {
return getStandbyTaskReplicationFactor() > 1;
}
- public boolean getClusterBasedJobCoordinatorDependencyIsolationEnabled() {
- return
getBoolean(CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED, false);
+ public boolean isSplitDeploymentEnabled() {
+ return getBoolean(JOB_SPLIT_DEPLOYMENT_ENABLED, false);
}
/**
diff --git
a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
index 668c40a..73093a8 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
@@ -58,26 +58,25 @@ public class ShellCommandConfig extends MapConfig {
public static final String ENV_EXECUTION_ENV_CONTAINER_ID =
"EXECUTION_ENV_CONTAINER_ID";
/**
- * Set to "true" if cluster-based job coordinator dependency isolation is
enabled. Otherwise, will be considered
- * false.
+ * Set to "true" if split deployment feature is enabled. Otherwise, will be
considered false.
*
- * The launch process for the cluster-based job coordinator depends on the
value of this, since it needs to be known
- * if the cluster-based job coordinator should be launched in an isolated
mode. This needs to be an environment
- * variable, because the value needs to be known before the full configs can
be read from the metadata store (full
- * configs are only read after launch is complete).
+ * The launch process for the cluster-based job coordinator and job
container depends on the value of this, since it
+ * needs to be known if the cluster-based job coordinator and job container
should be launched in a split deployment
+ * mode.
+ * This needs to be an environment variable, because the value needs to be
known before the full configs can be read
+ * from the metadata store (full configs are only read after launch is
complete).
*/
- public static final String
ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED =
- "CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED";
+ public static final String ENV_SPLIT_DEPLOYMENT_ENABLED =
"ENV_SPLIT_DEPLOYMENT_ENABLED";
/**
- * When running the cluster-based job coordinator in an isolated mode, it
uses JARs and resources from a lib directory
- * which is provided by the framework. In some cases, it is necessary to use
some resources specified by the
- * application as well. This environment variable can be set to a directory
which is different from the framework lib
- * directory in order to tell Samza where application resources live.
- * This is an environment variable because it is needed in order to launch
the cluster-based job coordinator Java
- * process, which means access to full configs is not available yet.
+ * When running the cluster-based job coordinator and job container in a
split deployment mode, it uses JARs and
+ * resources from a lib directory which is provided by the framework. In
some cases, it is necessary to use some
+ * resources specified by the application as well. This environment variable
can be set to a directory which is
+ * different from the framework lib directory in order to tell Samza where
application resources live.
+ * This is an environment variable because it is needed in order to launch
the cluster-based job coordinator and job
+ * container Java processes, which means access to full configs is not
available yet.
* For example, this is used to set a system property for the location of an
application-specified log4j configuration
- * file when launching the cluster-based job coordinator Java process.
+ * file when launching the cluster-based job coordinator and job container
Java processes.
*/
public static final String ENV_APPLICATION_LIB_DIR = "APPLICATION_LIB_DIR";
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index abe6dfa..00c2d5e 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -547,15 +547,13 @@ public class TestJobConfig {
@Test
public void testGetClusterBasedJobCoordinatorDependencyIsolationEnabled() {
- Config config =
- new
MapConfig(ImmutableMap.of(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
"true"));
- assertTrue(new
JobConfig(config).getClusterBasedJobCoordinatorDependencyIsolationEnabled());
+ Config config = new
MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true"));
+ assertTrue(new JobConfig(config).isSplitDeploymentEnabled());
- config =
- new
MapConfig(ImmutableMap.of(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
"false"));
- assertFalse(new
JobConfig(config).getClusterBasedJobCoordinatorDependencyIsolationEnabled());
+ config = new
MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false"));
+ assertFalse(new JobConfig(config).isSplitDeploymentEnabled());
- assertFalse(new JobConfig(new
MapConfig()).getClusterBasedJobCoordinatorDependencyIsolationEnabled());
+ assertFalse(new JobConfig(new MapConfig()).isSplitDeploymentEnabled());
}
@Test
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 43e6a7c..ca681f5 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -184,12 +184,10 @@ object YarnJob extends Logging {
Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig))
}
envMapBuilder += ShellCommandConfig.ENV_JAVA_OPTS ->
Util.envVarEscape(yarnConfig.getAmOpts)
- val clusterBasedJobCoordinatorDependencyIsolationEnabled =
- jobConfig.getClusterBasedJobCoordinatorDependencyIsolationEnabled
- envMapBuilder +=
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED
->
-
Util.envVarEscape(Boolean.toString(clusterBasedJobCoordinatorDependencyIsolationEnabled))
- if (clusterBasedJobCoordinatorDependencyIsolationEnabled) {
- // dependency isolation is enabled, so need to specify where the
application lib directory is for app resources
+ val splitDeploymentEnabled = jobConfig.isSplitDeploymentEnabled
+ envMapBuilder += ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED ->
Util.envVarEscape(Boolean.toString(splitDeploymentEnabled))
+ if (splitDeploymentEnabled) {
+ //split deployment is enabled, so need to specify where the application
lib directory is for app resources
envMapBuilder += ShellCommandConfig.ENV_APPLICATION_LIB_DIR ->
Util.envVarEscape(String.format("./%s/lib",
DependencyIsolationUtils.APPLICATION_DIRECTORY))
}
@@ -206,7 +204,7 @@ object YarnJob extends Logging {
@VisibleForTesting
private[yarn] def buildJobCoordinatorCmd(config: Config, jobConfig:
JobConfig): String = {
var cmdExec = "./__package/bin/run-jc.sh" // default location
- if (jobConfig.getClusterBasedJobCoordinatorDependencyIsolationEnabled) {
+ if (jobConfig.isSplitDeploymentEnabled) {
cmdExec = "./%s/bin/run-jc.sh" format
DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY
logger.info("Using isolated cluster-based job coordinator path: %s"
format cmdExec)
}
diff --git
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
index 7961360..4858d76 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
@@ -43,12 +43,11 @@ public class TestYarnJob {
Config config = new MapConfig();
assertEquals("./__package/bin/run-jc.sh",
YarnJob$.MODULE$.buildJobCoordinatorCmd(config, new JobConfig(config)));
- // cluster-based job coordinator dependency isolation is enabled; use
script from framework infrastructure directory
- Config configJobCoordinatorDependencyIsolationEnabled =
- new
MapConfig(ImmutableMap.of(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
"true"));
+ // split deployment is enabled; use script from framework infrastructure
directory
+ Config splitDeploymentEnabled =
+ new MapConfig(ImmutableMap.of(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED,
"true"));
assertEquals(String.format("./%s/bin/run-jc.sh",
DependencyIsolationUtils.FRAMEWORK_INFRASTRUCTURE_DIRECTORY),
-
YarnJob$.MODULE$.buildJobCoordinatorCmd(configJobCoordinatorDependencyIsolationEnabled,
- new JobConfig(configJobCoordinatorDependencyIsolationEnabled)));
+ YarnJob$.MODULE$.buildJobCoordinatorCmd(splitDeploymentEnabled, new
JobConfig(splitDeploymentEnabled)));
}
@Test
@@ -59,14 +58,14 @@ public class TestYarnJob {
.put(JobConfig.JOB_ID, "jobId")
.put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
.put(YarnConfig.AM_JVM_OPTIONS, amJvmOptions) // needs escaping
-
.put(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
"false")
+ .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false")
.build());
String expectedCoordinatorStreamConfigStringValue =
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
.writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
Map<String, String> expected = ImmutableMap.of(
ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG,
expectedCoordinatorStreamConfigStringValue,
ShellCommandConfig.ENV_JAVA_OPTS, Util.envVarEscape(amJvmOptions),
-
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
"false");
+ ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "false");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new
JobConfig(config))).asJava());
}
@@ -78,14 +77,14 @@ public class TestYarnJob {
.put(JobConfig.JOB_ID, "jobId")
.put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
.put(YarnConfig.AM_JVM_OPTIONS, "")
-
.put(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
"true")
+ .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
.build());
String expectedCoordinatorStreamConfigStringValue =
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
.writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
Map<String, String> expected = ImmutableMap.of(
ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG,
expectedCoordinatorStreamConfigStringValue,
ShellCommandConfig.ENV_JAVA_OPTS, "",
-
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
"true",
+ ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new
JobConfig(config))).asJava());
@@ -98,7 +97,7 @@ public class TestYarnJob {
.put(JobConfig.JOB_ID, "jobId")
.put(JobConfig.JOB_COORDINATOR_SYSTEM, "jobCoordinatorSystem")
.put(YarnConfig.AM_JVM_OPTIONS, "")
-
.put(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
"false")
+ .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "false")
.put(YarnConfig.AM_JAVA_HOME, "/some/path/to/java/home")
.build());
String expectedCoordinatorStreamConfigStringValue =
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
@@ -106,7 +105,7 @@ public class TestYarnJob {
Map<String, String> expected = ImmutableMap.of(
ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG,
expectedCoordinatorStreamConfigStringValue,
ShellCommandConfig.ENV_JAVA_OPTS, "",
-
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
"false",
+ ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "false",
ShellCommandConfig.ENV_JAVA_HOME, "/some/path/to/java/home");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new
JobConfig(config))).asJava());
@@ -119,14 +118,14 @@ public class TestYarnJob {
.put(JobConfig.JOB_ID, "jobId")
.put(JobConfig.CONFIG_LOADER_FACTORY,
"org.apache.samza.config.loaders.PropertiesConfigLoaderFactory")
.put(YarnConfig.AM_JVM_OPTIONS, "")
-
.put(JobConfig.CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
"true")
+ .put(JobConfig.JOB_SPLIT_DEPLOYMENT_ENABLED, "true")
.build());
String expectedSubmissionConfig =
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
.writeValueAsString(config));
Map<String, String> expected = ImmutableMap.of(
ShellCommandConfig.ENV_SUBMISSION_CONFIG, expectedSubmissionConfig,
ShellCommandConfig.ENV_JAVA_OPTS, "",
-
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
"true",
+ ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true",
ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new
JobConfig(config))).asJava());