This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch flip116 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1a831ec08b87f98b856874e6ab69c7870ea2e601 Author: Andrey Zagrebin <azagre...@apache.org> AuthorDate: Wed Apr 8 18:06:58 2020 +0300 [FLINK-16745][yarn] Start Yarn JM with FLIP-116 JVM memory args --- .../flink/yarn/YARNFileReplicationITCase.java | 5 +++- .../flink/yarn/YARNHighAvailabilityITCase.java | 4 ++- .../java/org/apache/flink/yarn/YARNITCase.java | 5 +++- .../apache/flink/yarn/YarnConfigurationITCase.java | 5 +++- .../apache/flink/yarn/YarnClusterDescriptor.java | 10 ++++--- .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 2 +- .../apache/flink/yarn/FlinkYarnSessionCliTest.java | 33 ++++++++++---------- .../flink/yarn/YarnClusterDescriptorTest.java | 35 ++++++++++++---------- 8 files changed, 58 insertions(+), 41 deletions(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java index 055380b..993adbf 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -84,8 +85,9 @@ public class YARNFileReplicationITCase extends YarnTestBase { yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles())); yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles())); + final int masterMemory = yarnClusterDescriptor.getFlinkConfiguration().get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes(); final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder() - .setMasterMemoryMB(768) + .setMasterMemoryMB(masterMemory) .setTaskManagerMemoryMB(1024) .setSlotsPerTaskManager(1) .createClusterSpecification(); @@ -130,6 +132,7 @@ public class YARNFileReplicationITCase extends YarnTestBase { private Configuration getDefaultConfiguration() { final Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768)); configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g")); configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s"); configuration.setString(CLASSPATH_INCLUDE_USER_JAR, YarnConfigOptions.UserJarInclusion.DISABLED.toString()); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index 6420078..b4ec5ea 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.RestartStrategyOptions; @@ -273,6 +274,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { @Nonnull private YarnClusterDescriptor setupYarnClusterDescriptor() { final Configuration flinkConfiguration = new Configuration(); + flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768)); flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g")); flinkConfiguration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "10"); flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); @@ -290,7 +292,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { } private RestClusterClient<ApplicationId> deploySessionCluster(YarnClusterDescriptor yarnClusterDescriptor) throws ClusterDeploymentException { - final int masterMemory = 256; + final int masterMemory = yarnClusterDescriptor.getFlinkConfiguration().get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes(); final int taskManagerMemory = 1024; final ClusterClient<ApplicationId> yarnClusterClient = yarnClusterDescriptor .deploySessionCluster(new ClusterSpecification.ClusterSpecificationBuilder() diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java index bb15e28..b7139af 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -94,8 +95,9 @@ public class YARNITCase extends YarnTestBase { yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles())); yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles())); + final int masterMemory = yarnClusterDescriptor.getFlinkConfiguration().get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes(); final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder() - .setMasterMemoryMB(768) + .setMasterMemoryMB(masterMemory) .setTaskManagerMemoryMB(1024) .setSlotsPerTaskManager(1) .createClusterSpecification(); @@ -138,6 +140,7 @@ public class YARNITCase extends YarnTestBase { private Configuration createDefaultConfiguration(YarnConfigOptions.UserJarInclusion userJarInclusion) { Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768)); configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g")); configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s"); configuration.setString(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion.toString()); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java index d44d612..ee538d4 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java @@ -26,6 +26,8 @@ import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ResourceManagerOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; @@ -88,9 +90,10 @@ public class YarnConfigurationITCase extends YarnTestBase { final int slotsPerTaskManager = 3; configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager); + final int masterMemory = 768; + configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(masterMemory)); final TaskExecutorProcessSpec tmResourceSpec = TaskExecutorProcessUtils.processSpecFromConfig(configuration); - final int masterMemory = 64; final int taskManagerMemory = tmResourceSpec.getTotalProcessMemorySize().getMebiBytes(); final YarnConfiguration yarnConfiguration = getYarnConfiguration(); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index cde732b..daae7ed 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -44,7 +44,10 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec; +import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils; import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.ShutdownHookUtil; @@ -932,7 +935,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { hasLogback, hasLog4j, hasKrb5, - clusterSpecification.getMasterMemoryMB()); + JobManagerProcessUtils.processSpecFromConfig(flinkConfiguration)); // setup security tokens if (UserGroupInformation.isSecurityEnabled()) { @@ -1570,7 +1573,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { boolean hasLogback, boolean hasLog4j, boolean hasKrb5, - int jobManagerMemoryMb) { + JobManagerProcessSpec processSpec) { // ------------------ Prepare Application Master Container ------------------------------ // respect custom JVM options in the YAML file @@ -1590,8 +1593,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { final Map<String, String> startCommandValues = new HashMap<>(); startCommandValues.put("java", "$JAVA_HOME/bin/java"); - int heapSize = BootstrapTools.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration); - String jvmHeapMem = String.format("-Xms%sm -Xmx%sm", heapSize, heapSize); + String jvmHeapMem = ProcessMemoryUtils.generateJvmParametersStr(processSpec); startCommandValues.put("jvmmem", jvmHeapMem); startCommandValues.put("jvmopts", javaOpts); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 352ca6e..4ca49c6 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -355,7 +355,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine { if (!MemorySize.MemoryUnit.hasUnit(jmMemoryVal)) { jmMemoryVal += "m"; } - effectiveConfiguration.set(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, MemorySize.parse(jmMemoryVal)); + effectiveConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(jmMemoryVal)); } if (commandLine.hasOption(tmMemory.getOpt())) { diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index bf097fb..6657190 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -111,7 +111,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { String[] params = new String[] {"-ys", "3"}; - FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCliWithTmTotalMemory(2048); + FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCliWithJmAndTmTotalMemory(2048); final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true); @@ -314,7 +314,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { final int taskManagerMemory = 7331; final int slotsPerTaskManager = 30; - configuration.set(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, MemorySize.ofMebiBytes(jobManagerMemory)); + configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(jobManagerMemory)); configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(taskManagerMemory)); configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager); @@ -340,7 +340,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { public void testConfigurationClusterSpecification() throws Exception { final Configuration configuration = new Configuration(); final int jobManagerMemory = 1337; - configuration.set(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, MemorySize.ofMebiBytes(jobManagerMemory)); + configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(jobManagerMemory)); final int taskManagerMemory = 7331; configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(taskManagerMemory)); final int slotsPerTaskManager = 42; @@ -361,10 +361,10 @@ public class FlinkYarnSessionCliTest extends TestLogger { } /** - * Tests the specifying heap memory without unit for job manager and task manager. + * Tests the specifying total process memory without unit for job manager and task manager. */ @Test - public void testHeapMemoryPropertyWithoutUnit() throws Exception { + public void testMemoryPropertyWithoutUnit() throws Exception { final String[] args = new String[] { "-yjm", "1024", "-ytm", "2048" }; final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(); @@ -379,10 +379,10 @@ public class FlinkYarnSessionCliTest extends TestLogger { } /** - * Tests the specifying heap memory with unit (MB) for job manager and task manager. + * Tests the specifying total process memory with unit (MB) for job manager and task manager. */ @Test - public void testHeapMemoryPropertyWithUnitMB() throws Exception { + public void testMemoryPropertyWithUnitMB() throws Exception { final String[] args = new String[] { "-yjm", "1024m", "-ytm", "2048m" }; final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false); @@ -396,10 +396,10 @@ public class FlinkYarnSessionCliTest extends TestLogger { } /** - * Tests the specifying heap memory with arbitrary unit for job manager and task manager. + * Tests the specifying total process memory with arbitrary unit for job manager and task manager. */ @Test - public void testHeapMemoryPropertyWithArbitraryUnit() throws Exception { + public void testMemoryPropertyWithArbitraryUnit() throws Exception { final String[] args = new String[] { "-yjm", "1g", "-ytm", "2g" }; final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false); @@ -434,12 +434,12 @@ public class FlinkYarnSessionCliTest extends TestLogger { } /** - * Tests the specifying job manager heap memory with config default value for job manager and task manager. + * Tests the specifying job manager total process memory with config default value for job manager and task manager. */ @Test - public void testJobManagerHeapMemoryPropertyWithConfigDefaultValue() throws Exception { - int tmMemory = 2048; - final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCliWithTmTotalMemory(tmMemory); + public void testJobManagerMemoryPropertyWithConfigDefaultValue() throws Exception { + int procMemory = 2048; + final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCliWithJmAndTmTotalMemory(procMemory); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false); @@ -447,8 +447,8 @@ public class FlinkYarnSessionCliTest extends TestLogger { final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig); final ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig); - assertThat(clusterSpecification.getMasterMemoryMB(), is(1024)); - assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(tmMemory)); + assertThat(clusterSpecification.getMasterMemoryMB(), is(procMemory)); + assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(procMemory)); } @Test @@ -490,8 +490,9 @@ public class FlinkYarnSessionCliTest extends TestLogger { return createFlinkYarnSessionCli(new Configuration()); } - private FlinkYarnSessionCli createFlinkYarnSessionCliWithTmTotalMemory(int totalMemomory) throws FlinkException { + private FlinkYarnSessionCli createFlinkYarnSessionCliWithJmAndTmTotalMemory(int totalMemomory) throws FlinkException { Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(totalMemomory)); configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(totalMemomory)); return createFlinkYarnSessionCli(configuration); } diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java index 7a855e9..09d2133 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java @@ -26,6 +26,8 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec; +import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils; import org.apache.flink.util.TestLogger; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.yarn.configuration.YarnConfigOptions; @@ -55,6 +57,7 @@ import java.util.Map; import java.util.Set; import static junit.framework.TestCase.assertTrue; +import static org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.createDefaultJobManagerProcessSpec; import static org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; @@ -156,8 +159,9 @@ public class YarnClusterDescriptorTest extends TestLogger { Configuration cfg = new Configuration(); YarnClusterDescriptor clusterDescriptor = createYarnClusterDescriptor(cfg); + final JobManagerProcessSpec jobManagerProcessSpec = createDefaultJobManagerProcessSpec(1024); final String java = "$JAVA_HOME/bin/java"; - final String jvmmem = "-Xms424m -Xmx424m"; + final String jvmmem = ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec); final String jvmOpts = "-Djvm"; // if set final String jmJvmOpts = "-DjmJvm"; // if set final String krb5 = "-Djava.security.krb5.conf=krb5.conf"; @@ -173,7 +177,6 @@ public class YarnClusterDescriptorTest extends TestLogger { final String redirects = "1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " + "2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err"; - final int jobManagerMemory = 1024; try { // no logging, with/out krb5 @@ -188,7 +191,7 @@ public class YarnClusterDescriptorTest extends TestLogger { false, false, false, - jobManagerMemory) + jobManagerProcessSpec) .getCommands().get(0)); assertEquals( @@ -202,7 +205,7 @@ public class YarnClusterDescriptorTest extends TestLogger { false, false, true, - jobManagerMemory) + jobManagerProcessSpec) .getCommands().get(0)); // logback only, with/out krb5 @@ -217,7 +220,7 @@ public class YarnClusterDescriptorTest extends TestLogger { true, false, false, - jobManagerMemory) + jobManagerProcessSpec) .getCommands().get(0)); assertEquals( @@ -231,7 +234,7 @@ public class YarnClusterDescriptorTest extends TestLogger { true, false, true, - jobManagerMemory) + jobManagerProcessSpec) .getCommands().get(0)); // log4j, with/out krb5 @@ -246,7 +249,7 @@ public class YarnClusterDescriptorTest extends TestLogger { false, true, false, - jobManagerMemory) + jobManagerProcessSpec) .getCommands().get(0)); assertEquals( @@ -260,7 +263,7 @@ public class YarnClusterDescriptorTest extends TestLogger { false, true, true, - jobManagerMemory) + jobManagerProcessSpec) .getCommands().get(0)); // logback + log4j, with/out krb5 @@ -275,7 +278,7 @@ public class YarnClusterDescriptorTest extends TestLogger { true, true, false, - jobManagerMemory) + jobManagerProcessSpec) .getCommands().get(0)); assertEquals( @@ -289,7 +292,7 @@ public class YarnClusterDescriptorTest extends TestLogger { true, true, true, - jobManagerMemory) + jobManagerProcessSpec) .getCommands().get(0)); // logback + log4j, with/out krb5, different JVM opts @@ -307,7 +310,7 @@ public class YarnClusterDescriptorTest extends TestLogger { true, true, false, - jobManagerMemory) + jobManagerProcessSpec) .getCommands().get(0)); assertEquals( @@ -321,7 +324,7 @@ public class YarnClusterDescriptorTest extends TestLogger { true, true, true, - jobManagerMemory) + jobManagerProcessSpec) .getCommands().get(0)); // logback + log4j, with/out krb5, different JVM opts @@ -338,7 +341,7 @@ public class YarnClusterDescriptorTest extends TestLogger { true, true, false, - jobManagerMemory) + jobManagerProcessSpec) .getCommands().get(0)); assertEquals( @@ -352,7 +355,7 @@ public class YarnClusterDescriptorTest extends TestLogger { true, true, true, - jobManagerMemory) + jobManagerProcessSpec) .getCommands().get(0)); // now try some configurations with different yarn.container-start-command-template @@ -370,7 +373,7 @@ public class YarnClusterDescriptorTest extends TestLogger { true, true, true, - jobManagerMemory) + jobManagerProcessSpec) .getCommands().get(0)); cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, @@ -388,7 +391,7 @@ public class YarnClusterDescriptorTest extends TestLogger { true, true, true, - jobManagerMemory) + jobManagerProcessSpec) .getCommands().get(0)); } finally { clusterDescriptor.close();