This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new d660871 SAMZA-2466: [Scala cleanup] Convert ShellCommandConfig from
scala to java (#1288)
d660871 is described below
commit d6608714c211835c3b04caaaa49882455135158c
Author: Cameron Lee <[email protected]>
AuthorDate: Wed Mar 11 13:44:21 2020 -0700
SAMZA-2466: [Scala cleanup] Convert ShellCommandConfig from scala to java
(#1288)
API changes: None
Upgrade/usage instructions: None
---
.../clustermanager/ClusterBasedJobCoordinator.java | 6 +-
.../apache/samza/config/ShellCommandConfig.java} | 89 +++++++++---------
.../org/apache/samza/config/StreamConfig.java | 0
.../org/apache/samza/execution/JobPlanner.java | 2 +-
.../apache/samza/runtime/ContainerLaunchUtil.java | 6 +-
.../apache/samza/runtime/LocalContainerRunner.java | 6 +-
.../org/apache/samza/job/ShellCommandBuilder.scala | 15 +--
.../apache/samza/job/local/ThreadJobFactory.scala | 9 +-
.../org/apache/samza/config/TestJobConfig.java | 39 --------
.../samza/config/TestShellCommandConfig.java | 103 +++++++++++++++++++++
.../apache/samza/logging/log4j/StreamAppender.java | 2 +-
.../samza/logging/log4j2/StreamAppender.java | 2 +-
.../samza/job/yarn/YarnClusterResourceManager.java | 4 +-
.../org/apache/samza/job/yarn/TestYarnJob.java | 30 +++---
14 files changed, 193 insertions(+), 120 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index b56cb52..b9d054b 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -467,7 +467,7 @@ public class ClusterBasedJobCoordinator {
*/
public static void main(String[] args) {
boolean dependencyIsolationEnabled = Boolean.parseBoolean(
-
System.getenv(ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED()));
+
System.getenv(ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED));
if (!dependencyIsolationEnabled) {
// no isolation enabled, so can just execute
runClusterBasedJobCoordinator directly
runClusterBasedJobCoordinator(args);
@@ -537,8 +537,8 @@ public class ClusterBasedJobCoordinator {
* {@link #main(String[])} so that it can be executed directly or from a
separate classloader.
*/
private static void runClusterBasedJobCoordinator(String[] args) {
- final String coordinatorSystemEnv =
System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG());
- final String submissionEnv =
System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG());
+ final String coordinatorSystemEnv =
System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG);
+ final String submissionEnv =
System.getenv(ShellCommandConfig.ENV_SUBMISSION_CONFIG);
if (!StringUtils.isBlank(submissionEnv)) {
Config submissionConfig;
diff --git
a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
similarity index 60%
rename from
samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
rename to
samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
index 32da385..668c40a 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/java/org/apache/samza/config/ShellCommandConfig.java
@@ -16,44 +16,46 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.samza.config;
-package org.apache.samza.config
+import java.util.Optional;
-object ShellCommandConfig {
+
+public class ShellCommandConfig extends MapConfig {
/**
* This environment variable is used to store a JSON serialized map of all
coordinator system configs.
*/
- val ENV_COORDINATOR_SYSTEM_CONFIG = "SAMZA_COORDINATOR_SYSTEM_CONFIG"
+ public static final String ENV_COORDINATOR_SYSTEM_CONFIG =
"SAMZA_COORDINATOR_SYSTEM_CONFIG";
/**
* This environment variable is used to pass a JSON serialized map of
configs provided during job submission.
*/
- val ENV_SUBMISSION_CONFIG = "SAMZA_SUBMISSION_CONFIG"
+ public static final String ENV_SUBMISSION_CONFIG = "SAMZA_SUBMISSION_CONFIG";
/**
* The ID for a container. This is a string representation that is unique to
the runtime environment.
*/
- val ENV_CONTAINER_ID = "SAMZA_CONTAINER_ID"
+ public static final String ENV_CONTAINER_ID = "SAMZA_CONTAINER_ID";
/**
* The URL location of the job coordinator's HTTP server.
*/
- val ENV_COORDINATOR_URL = "SAMZA_COORDINATOR_URL"
+ public static final String ENV_COORDINATOR_URL = "SAMZA_COORDINATOR_URL";
/**
* Arguments to be passed to the processing running the TaskRunner (or
equivalent, for non JVM languages).
*/
- val ENV_JAVA_OPTS = "JAVA_OPTS"
+ public static final String ENV_JAVA_OPTS = "JAVA_OPTS";
/**
* The JAVA_HOME path for running the task
*/
- val ENV_JAVA_HOME = "JAVA_HOME"
+ public static final String ENV_JAVA_HOME = "JAVA_HOME";
/**
* The ID assigned to the container by the execution environment (eg: YARN
Container Id)
*/
- val ENV_EXECUTION_ENV_CONTAINER_ID = "EXECUTION_ENV_CONTAINER_ID"
+ public static final String ENV_EXECUTION_ENV_CONTAINER_ID =
"EXECUTION_ENV_CONTAINER_ID";
/**
* Set to "true" if cluster-based job coordinator dependency isolation is
enabled. Otherwise, will be considered
@@ -64,8 +66,8 @@ object ShellCommandConfig {
* variable, because the value needs to be known before the full configs can
be read from the metadata store (full
* configs are only read after launch is complete).
*/
- val ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED =
- "CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED"
+ public static final String
ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED =
+ "CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED";
/**
* When running the cluster-based job coordinator in an isolated mode, it
uses JARs and resources from a lib directory
@@ -77,7 +79,7 @@ object ShellCommandConfig {
* For example, this is used to set a system property for the location of an
application-specified log4j configuration
* file when launching the cluster-based job coordinator Java process.
*/
- val ENV_APPLICATION_LIB_DIR = "APPLICATION_LIB_DIR"
+ public static final String ENV_APPLICATION_LIB_DIR = "APPLICATION_LIB_DIR";
/*
* The base directory for storing logged data stores used in Samza. This has
to be set on all machine running Samza
@@ -85,21 +87,21 @@ object ShellCommandConfig {
* If this environment variable is not set, the path defaults to current
working directory (which is the same as the
* path for persisting non-logged data stores)
*/
- val ENV_LOGGED_STORE_BASE_DIR = "LOGGED_STORE_BASE_DIR"
+ public static final String ENV_LOGGED_STORE_BASE_DIR =
"LOGGED_STORE_BASE_DIR";
/**
* The directory path that contains the execution plan
*/
- val EXECUTION_PLAN_DIR = "EXECUTION_PLAN_DIR"
+ public static final String EXECUTION_PLAN_DIR = "EXECUTION_PLAN_DIR";
/**
* Points to the lib directory of the localized resources(other than the
framework dependencies).
*/
- val ENV_ADDITIONAL_CLASSPATH_DIR = "ADDITIONAL_CLASSPATH_DIR"
+ public static final String ENV_ADDITIONAL_CLASSPATH_DIR =
"ADDITIONAL_CLASSPATH_DIR";
- val COMMAND_SHELL_EXECUTE = "task.execute"
- val TASK_JVM_OPTS = "task.opts"
- val TASK_JAVA_HOME = "task.java.home"
+ public static final String COMMAND_SHELL_EXECUTE = "task.execute";
+ public static final String TASK_JVM_OPTS = "task.opts";
+ public static final String TASK_JAVA_HOME = "task.java.home";
/**
* SamzaContainer uses JARs from the lib directory of the framework in it
classpath. In some cases, it is necessary to include
@@ -108,35 +110,38 @@ object ShellCommandConfig {
* run-time before launching the SamzaContainer. This environment variable
can be set to a lib directory of the localized resource and
* it will be included in the java classpath of the SamzaContainer.
*/
- val ADDITIONAL_CLASSPATH_DIR = "additional.classpath.dir"
-
- implicit def Config2ShellCommand(config: Config) = new
ShellCommandConfig(config)
-}
-
-class ShellCommandConfig(config: Config) extends ScalaMapConfig(config) {
- def getCommand =
getOption(ShellCommandConfig.COMMAND_SHELL_EXECUTE).getOrElse("bin/run-container.sh")
-
- def getTaskOpts = {
- var jvmOpts = getOption(ShellCommandConfig.TASK_JVM_OPTS)
- val jobConfig = new JobConfig(config)
+ public static final String ADDITIONAL_CLASSPATH_DIR =
"additional.classpath.dir";
- if (jobConfig.getAutosizingEnabled &&
getOption(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB).isDefined) {
+ public ShellCommandConfig(Config config) {
+ super(config);
+ }
- val maxHeapMb =
getOption(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB).get
- val xmxSetting = "-Xmx" + maxHeapMb + "m"
+ public String getCommand() {
+ return
Optional.ofNullable(get(ShellCommandConfig.COMMAND_SHELL_EXECUTE)).orElse("bin/run-container.sh");
+ }
- if (jvmOpts.isDefined && jvmOpts.get.contains("-Xmx"))
- jvmOpts = Option(jvmOpts.get.replaceAll("-Xmx\\S+", xmxSetting))
- else if (jvmOpts.isDefined)
- jvmOpts = Option(jvmOpts.get.concat(" " + xmxSetting))
- else
- jvmOpts = Some(xmxSetting)
+ public Optional<String> getTaskOpts() {
+ Optional<String> jvmOpts =
Optional.ofNullable(get(ShellCommandConfig.TASK_JVM_OPTS));
+ Optional<String> maxHeapMbOptional =
Optional.ofNullable(get(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB));
+ if (new JobConfig(this).getAutosizingEnabled() &&
maxHeapMbOptional.isPresent()) {
+ String maxHeapMb = maxHeapMbOptional.get();
+ String xmxSetting = "-Xmx" + maxHeapMb + "m";
+ if (jvmOpts.isPresent() && jvmOpts.get().contains("-Xmx")) {
+ jvmOpts = Optional.of(jvmOpts.get().replaceAll("-Xmx\\S+",
xmxSetting));
+ } else if (jvmOpts.isPresent()) {
+ jvmOpts = Optional.of(jvmOpts.get().concat(" " + xmxSetting));
+ } else {
+ jvmOpts = Optional.of(xmxSetting);
+ }
}
-
- jvmOpts
+ return jvmOpts;
}
- def getJavaHome = getOption(ShellCommandConfig.TASK_JAVA_HOME)
+ public Optional<String> getJavaHome() {
+ return Optional.ofNullable(get(ShellCommandConfig.TASK_JAVA_HOME));
+ }
- def getAdditionalClasspathDir(): Option[String] =
getOption(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR)
+ public Optional<String> getAdditionalClasspathDir() {
+ return
Optional.ofNullable(get(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR));
+ }
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.java
b/samza-core/src/main/java/org/apache/samza/config/StreamConfig.java
similarity index 100%
rename from samza-core/src/main/scala/org/apache/samza/config/StreamConfig.java
rename to samza-core/src/main/java/org/apache/samza/config/StreamConfig.java
diff --git
a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
index f8e0684..72c2972 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobPlanner.java
@@ -111,7 +111,7 @@ public abstract class JobPlanner {
final void writePlanJsonFile(String planJson) {
try {
String content = "plan='" + planJson + "'";
- String planPath = System.getenv(ShellCommandConfig.EXECUTION_PLAN_DIR());
+ String planPath = System.getenv(ShellCommandConfig.EXECUTION_PLAN_DIR);
if (planPath != null && !planPath.isEmpty()) {
// Write the plan json to plan path
File file = new File(planPath + "/plan.json");
diff --git
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 54cb298..c68ec03 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -65,7 +65,7 @@ public class ContainerLaunchUtil {
* Any change here needs to take Beam into account.
*/
public static void run(ApplicationDescriptorImpl<? extends
ApplicationDescriptor> appDesc, String containerId, JobModel jobModel) {
- Optional<String> execEnvContainerId =
Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID()));
+ Optional<String> execEnvContainerId =
Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
JobConfig jobConfig = new JobConfig(jobModel.getConfig());
ContainerLaunchUtil.run(appDesc, jobConfig.getName().get(),
jobConfig.getJobId(), containerId, execEnvContainerId, jobModel);
}
@@ -182,8 +182,8 @@ public class ContainerLaunchUtil {
* @return a new {@link ContainerHeartbeatMonitor} instance, or null if
could not create one
*/
private static ContainerHeartbeatMonitor
createContainerHeartbeatMonitor(SamzaContainer container) {
- String coordinatorUrl =
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
- String executionEnvContainerId =
System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID());
+ String coordinatorUrl =
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
+ String executionEnvContainerId =
System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID);
if (executionEnvContainerId != null) {
log.info("Got execution environment container id: {}",
executionEnvContainerId);
return new ContainerHeartbeatMonitor(() -> {
diff --git
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 07cd637..f8d5e40 100644
---
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -50,15 +50,15 @@ public class LocalContainerRunner {
System.exit(1);
}));
- String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
+ String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID);
log.info(String.format("Got container ID: %s", containerId));
System.out.println(String.format("Container ID: %s", containerId));
- String coordinatorUrl =
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+ String coordinatorUrl =
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
- Optional<String> execEnvContainerId =
Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID()));
+ Optional<String> execEnvContainerId =
Optional.ofNullable(System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID));
int delay = new
Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
diff --git
a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
index 42c50e1..9b95648 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
@@ -23,25 +23,28 @@ package org.apache.samza.job
import java.io.File
import org.apache.samza.config.ShellCommandConfig
-import org.apache.samza.config.ShellCommandConfig.Config2ShellCommand
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
+
import scala.collection.JavaConverters._
class ShellCommandBuilder extends CommandBuilder {
def buildCommand() = {
+ val shellCommandConfig = new ShellCommandConfig(config)
if(commandPath == null || commandPath.isEmpty())
- config.getCommand
+ shellCommandConfig.getCommand
else
- commandPath + File.separator + config.getCommand
+ commandPath + File.separator + shellCommandConfig.getCommand
}
def buildEnvironment(): java.util.Map[String, String] = {
+ val shellCommandConfig = new ShellCommandConfig(config)
val envMap = Map(
ShellCommandConfig.ENV_CONTAINER_ID -> id.toString,
ShellCommandConfig.ENV_COORDINATOR_URL -> url.toString,
- ShellCommandConfig.ENV_JAVA_OPTS -> config.getTaskOpts.getOrElse(""),
- ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR ->
config.getAdditionalClasspathDir.getOrElse(""))
+ ShellCommandConfig.ENV_JAVA_OPTS ->
shellCommandConfig.getTaskOpts.orElse(""),
+ ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR ->
shellCommandConfig.getAdditionalClasspathDir.orElse(""))
- val envMapWithJavaHome = config.getJavaHome match {
+ val envMapWithJavaHome =
JavaOptionals.toRichOptional(shellCommandConfig.getJavaHome).toOption match {
case Some(javaHome) => envMap + (ShellCommandConfig.ENV_JAVA_HOME ->
javaHome)
case None => envMap
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index deea95a..c1a3683 100644
---
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -23,8 +23,7 @@ import org.apache.samza.SamzaException
import org.apache.samza.application.ApplicationUtil
import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
import org.apache.samza.config.JobConfig._
-import org.apache.samza.config.ShellCommandConfig._
-import org.apache.samza.config.{Config, JobConfig}
+import org.apache.samza.config.{Config, JobConfig, ShellCommandConfig}
import org.apache.samza.container.{SamzaContainer, SamzaContainerListener,
TaskName}
import org.apache.samza.context.{ExternalContext, JobContextImpl}
import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore,
NamespaceAwareCoordinatorStreamStore}
@@ -38,6 +37,7 @@ import org.apache.samza.runtime.ProcessorContext
import org.apache.samza.startpoint.StartpointManager
import org.apache.samza.storage.ChangelogStreamManager
import org.apache.samza.task.{TaskFactory, TaskFactoryUtil}
+import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil,
DiagnosticsUtil, Logging}
import scala.collection.JavaConversions._
@@ -110,9 +110,10 @@ class ThreadJobFactory extends StreamJobFactory with
Logging {
val taskFactory: TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc)
// Give developers a nice friendly warning if they've specified task.opts
and are using a threaded job.
- config.getTaskOpts match {
+ JavaOptionals.toRichOptional(new
ShellCommandConfig(config).getTaskOpts).toOption match {
case Some(taskOpts) => warn("%s was specified in config, but is not
being used because job is being executed with ThreadJob. " +
- "You probably want to run %s=%s." format(TASK_JVM_OPTS,
STREAM_JOB_FACTORY_CLASS, classOf[ProcessJobFactory].getName))
+ "You probably want to run %s=%s."
format(ShellCommandConfig.TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS,
+ classOf[ProcessJobFactory].getName))
case _ => None
}
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index 9c3c97e..abe6dfa 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -33,7 +33,6 @@ import
org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStore
import org.apache.samza.runtime.DefaultLocationIdProviderFactory;
import org.junit.Assert;
import org.junit.Test;
-import scala.Option;
import static org.junit.Assert.*;
@@ -604,42 +603,4 @@ public class TestJobConfig {
Assert.assertEquals(900, clusterManagerConfig.getContainerMemoryMb());
Assert.assertEquals(2, clusterManagerConfig.getNumCores());
}
-
- @Test
- public void testGetTaskOptsAutosizingDisabled() {
- ShellCommandConfig shellCommandConfig =
- new ShellCommandConfig(new
MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "false")));
- assertEquals(Option.empty(), shellCommandConfig.getTaskOpts());
-
- String taskOpts = "-Dproperty=value";
- shellCommandConfig = new ShellCommandConfig(new MapConfig(
- ImmutableMap.of(ShellCommandConfig.TASK_JVM_OPTS(), taskOpts,
JobConfig.JOB_AUTOSIZING_ENABLED, "false")));
- assertEquals(Option.apply(taskOpts), shellCommandConfig.getTaskOpts());
- }
-
- @Test
- public void testGetTaskOptsAutosizingEnabled() {
- // opts not set, autosizing max heap not set
- ShellCommandConfig shellCommandConfig =
- new ShellCommandConfig(new
MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true")));
- assertEquals(Option.empty(), shellCommandConfig.getTaskOpts());
-
- // opts set, autosizing max heap not set
- String taskOpts = "-Dproperty=value";
- shellCommandConfig = new ShellCommandConfig(new MapConfig(
- ImmutableMap.of(ShellCommandConfig.TASK_JVM_OPTS(), taskOpts,
JobConfig.JOB_AUTOSIZING_ENABLED, "true")));
- assertEquals(Option.apply(taskOpts), shellCommandConfig.getTaskOpts());
-
- // opts set with Xmx, autosizing max heap set
- shellCommandConfig = new ShellCommandConfig(new MapConfig(
- ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true",
JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB,
- "1024", "task.opts", "-Xmx10m -Dproperty=value")));
- assertEquals(Option.apply("-Xmx1024m -Dproperty=value"),
shellCommandConfig.getTaskOpts());
-
- // opts set without -Xmx, autosizing max heap set
- shellCommandConfig = new ShellCommandConfig(new MapConfig(
- ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true",
JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB,
- "1024", "task.opts", "-Dproperty=value")));
- assertEquals(Option.apply("-Dproperty=value -Xmx1024m"),
shellCommandConfig.getTaskOpts());
- }
}
\ No newline at end of file
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java
new file mode 100644
index 0000000..452883d
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/config/TestShellCommandConfig.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.Optional;
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+public class TestShellCommandConfig {
+ @Test
+ public void testGetCommand() {
+ ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new
MapConfig());
+ assertEquals("bin/run-container.sh", shellCommandConfig.getCommand());
+
+ shellCommandConfig = new ShellCommandConfig(
+ new
MapConfig(ImmutableMap.of(ShellCommandConfig.COMMAND_SHELL_EXECUTE,
"my-run-container.sh")));
+ assertEquals("my-run-container.sh", shellCommandConfig.getCommand());
+ }
+
+ @Test
+ public void testGetTaskOptsAutosizingDisabled() {
+ ShellCommandConfig shellCommandConfig =
+ new ShellCommandConfig(new
MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "false")));
+ assertEquals(Optional.empty(), shellCommandConfig.getTaskOpts());
+
+ String taskOpts = "-Dproperty=value";
+ shellCommandConfig = new ShellCommandConfig(new MapConfig(
+ ImmutableMap.of(ShellCommandConfig.TASK_JVM_OPTS, taskOpts,
JobConfig.JOB_AUTOSIZING_ENABLED, "false")));
+ assertEquals(Optional.of(taskOpts), shellCommandConfig.getTaskOpts());
+ }
+
+ @Test
+ public void testGetTaskOptsAutosizingEnabled() {
+ // opts not set, autosizing max heap not set
+ ShellCommandConfig shellCommandConfig =
+ new ShellCommandConfig(new
MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true")));
+ assertEquals(Optional.empty(), shellCommandConfig.getTaskOpts());
+
+ // opts set, autosizing max heap not set
+ String taskOpts = "-Dproperty=value";
+ shellCommandConfig = new ShellCommandConfig(new MapConfig(
+ ImmutableMap.of(ShellCommandConfig.TASK_JVM_OPTS, taskOpts,
JobConfig.JOB_AUTOSIZING_ENABLED, "true")));
+ assertEquals(Optional.of(taskOpts), shellCommandConfig.getTaskOpts());
+
+ // opts not set, autosizing max heap set
+ shellCommandConfig = new ShellCommandConfig(new MapConfig(
+ ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true",
JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB,
+ "1024")));
+ assertEquals(Optional.of("-Xmx1024m"), shellCommandConfig.getTaskOpts());
+
+ // opts set with Xmx, autosizing max heap set
+ shellCommandConfig = new ShellCommandConfig(new MapConfig(
+ ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true",
JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB,
+ "1024", "task.opts", "-Xmx10m -Dproperty=value")));
+ assertEquals(Optional.of("-Xmx1024m -Dproperty=value"),
shellCommandConfig.getTaskOpts());
+
+ // opts set without -Xmx, autosizing max heap set
+ shellCommandConfig = new ShellCommandConfig(new MapConfig(
+ ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true",
JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB,
+ "1024", "task.opts", "-Dproperty=value")));
+ assertEquals(Optional.of("-Dproperty=value -Xmx1024m"),
shellCommandConfig.getTaskOpts());
+ }
+
+ @Test
+ public void testGetJavaHome() {
+ ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new
MapConfig());
+ assertFalse(shellCommandConfig.getJavaHome().isPresent());
+
+ shellCommandConfig =
+ new ShellCommandConfig(new
MapConfig(ImmutableMap.of(ShellCommandConfig.TASK_JAVA_HOME,
"/location/java")));
+ assertEquals(Optional.of("/location/java"),
shellCommandConfig.getJavaHome());
+ }
+
+ @Test
+ public void testGetAdditionalClasspathDir() {
+ ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new
MapConfig());
+ assertFalse(shellCommandConfig.getAdditionalClasspathDir().isPresent());
+
+ shellCommandConfig = new ShellCommandConfig(
+ new
MapConfig(ImmutableMap.of(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR,
"/location/classpath")));
+ assertEquals(Optional.of("/location/classpath"),
shellCommandConfig.getAdditionalClasspathDir());
+ }
+}
\ No newline at end of file
diff --git
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index a039858..c885454 100644
---
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -271,7 +271,7 @@ public class StreamAppender extends AppenderSkeleton {
if (isApplicationMaster) {
config =
JobModelManager.currentJobModelManager().jobModel().getConfig();
} else {
- String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+ String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
String response = HttpUtil.read(new URL(url), 30000, new
ExponentialSleepStrategy());
config = SamzaObjectMapper.getObjectMapper().readValue(response,
JobModel.class).getConfig();
}
diff --git
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index f6fb4d9..e033376 100644
---
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -292,7 +292,7 @@ public class StreamAppender extends AbstractAppender {
if (isApplicationMaster) {
config =
JobModelManager.currentJobModelManager().jobModel().getConfig();
} else {
- String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+ String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL);
String response = HttpUtil.read(new URL(url), 30000, new
ExponentialSleepStrategy());
config = SamzaObjectMapper.getObjectMapper().readValue(response,
JobModel.class).getConfig();
}
diff --git
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index eb97b69..e05b31e 100644
---
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -300,7 +300,7 @@ public class YarnClusterResourceManager extends
ClusterResourceManager implement
*/
@Override
public void launchStreamProcessor(SamzaResource resource, CommandBuilder
builder) {
- String processorId =
builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID());
+ String processorId =
builder.buildEnvironment().get(ShellCommandConfig.ENV_CONTAINER_ID);
String containerId = resource.getContainerId();
String host = resource.getHost();
log.info("Starting Processor ID: {} on Container ID: {} on host: {}",
processorId, containerId, host);
@@ -599,7 +599,7 @@ public class YarnClusterResourceManager extends
ClusterResourceManager implement
String command = cmdBuilder.buildCommand();
Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
- env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(),
Util.envVarEscape(container.getId().toString()));
+ env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID,
Util.envVarEscape(container.getId().toString()));
Path packagePath = new Path(yarnConfig.getPackagePath());
String formattedCommand =
diff --git
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
index daa719b..7961360 100644
--- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
+++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java
@@ -64,9 +64,9 @@ public class TestYarnJob {
String expectedCoordinatorStreamConfigStringValue =
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
.writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
Map<String, String> expected = ImmutableMap.of(
- ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG(),
expectedCoordinatorStreamConfigStringValue,
- ShellCommandConfig.ENV_JAVA_OPTS(), Util.envVarEscape(amJvmOptions),
-
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED(),
"false");
+ ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG,
expectedCoordinatorStreamConfigStringValue,
+ ShellCommandConfig.ENV_JAVA_OPTS, Util.envVarEscape(amJvmOptions),
+
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
"false");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new
JobConfig(config))).asJava());
}
@@ -83,10 +83,10 @@ public class TestYarnJob {
String expectedCoordinatorStreamConfigStringValue =
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
.writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
Map<String, String> expected = ImmutableMap.of(
- ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG(),
expectedCoordinatorStreamConfigStringValue,
- ShellCommandConfig.ENV_JAVA_OPTS(), "",
-
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED(),
"true",
- ShellCommandConfig.ENV_APPLICATION_LIB_DIR(), "./__package/lib");
+ ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG,
expectedCoordinatorStreamConfigStringValue,
+ ShellCommandConfig.ENV_JAVA_OPTS, "",
+
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
"true",
+ ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new
JobConfig(config))).asJava());
}
@@ -104,10 +104,10 @@ public class TestYarnJob {
String expectedCoordinatorStreamConfigStringValue =
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
.writeValueAsString(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)));
Map<String, String> expected = ImmutableMap.of(
- ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG(),
expectedCoordinatorStreamConfigStringValue,
- ShellCommandConfig.ENV_JAVA_OPTS(), "",
-
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED(),
"false",
- ShellCommandConfig.ENV_JAVA_HOME(), "/some/path/to/java/home");
+ ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG,
expectedCoordinatorStreamConfigStringValue,
+ ShellCommandConfig.ENV_JAVA_OPTS, "",
+
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
"false",
+ ShellCommandConfig.ENV_JAVA_HOME, "/some/path/to/java/home");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new
JobConfig(config))).asJava());
}
@@ -124,10 +124,10 @@ public class TestYarnJob {
String expectedSubmissionConfig =
Util.envVarEscape(SamzaObjectMapper.getObjectMapper()
.writeValueAsString(config));
Map<String, String> expected = ImmutableMap.of(
- ShellCommandConfig.ENV_SUBMISSION_CONFIG(), expectedSubmissionConfig,
- ShellCommandConfig.ENV_JAVA_OPTS(), "",
-
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED(),
"true",
- ShellCommandConfig.ENV_APPLICATION_LIB_DIR(), "./__package/lib");
+ ShellCommandConfig.ENV_SUBMISSION_CONFIG, expectedSubmissionConfig,
+ ShellCommandConfig.ENV_JAVA_OPTS, "",
+
ShellCommandConfig.ENV_CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED,
"true",
+ ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib");
assertEquals(expected, JavaConverters.mapAsJavaMapConverter(
YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new
JobConfig(config))).asJava());
}