This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch flip116
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a831ec08b87f98b856874e6ab69c7870ea2e601
Author: Andrey Zagrebin <azagre...@apache.org>
AuthorDate: Wed Apr 8 18:06:58 2020 +0300

    [FLINK-16745][yarn] Start Yarn JM with FLIP-116 JVM memory args
---
 .../flink/yarn/YARNFileReplicationITCase.java      |  5 +++-
 .../flink/yarn/YARNHighAvailabilityITCase.java     |  4 ++-
 .../java/org/apache/flink/yarn/YARNITCase.java     |  5 +++-
 .../apache/flink/yarn/YarnConfigurationITCase.java |  5 +++-
 .../apache/flink/yarn/YarnClusterDescriptor.java   | 10 ++++---
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |  2 +-
 .../apache/flink/yarn/FlinkYarnSessionCliTest.java | 33 ++++++++++----------
 .../flink/yarn/YarnClusterDescriptorTest.java      | 35 ++++++++++++----------
 8 files changed, 58 insertions(+), 41 deletions(-)

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
index 055380b..993adbf 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -84,8 +85,9 @@ public class YARNFileReplicationITCase extends YarnTestBase {
                        
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
                        
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
 
+                       final int masterMemory = 
yarnClusterDescriptor.getFlinkConfiguration().get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes();
                        final ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
-                               .setMasterMemoryMB(768)
+                               .setMasterMemoryMB(masterMemory)
                                .setTaskManagerMemoryMB(1024)
                                .setSlotsPerTaskManager(1)
                                .createClusterSpecification();
@@ -130,6 +132,7 @@ public class YARNFileReplicationITCase extends YarnTestBase 
{
 
        private Configuration getDefaultConfiguration() {
                final Configuration configuration = new Configuration();
+               configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(768));
                configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse("1g"));
                configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
                configuration.setString(CLASSPATH_INCLUDE_USER_JAR, 
YarnConfigOptions.UserJarInclusion.DISABLED.toString());
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 6420078..b4ec5ea 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.RestartStrategyOptions;
@@ -273,6 +274,7 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
        @Nonnull
        private YarnClusterDescriptor setupYarnClusterDescriptor() {
                final Configuration flinkConfiguration = new Configuration();
+               flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(768));
                flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse("1g"));
                
flinkConfiguration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "10");
                flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
@@ -290,7 +292,7 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
        }
 
        private RestClusterClient<ApplicationId> 
deploySessionCluster(YarnClusterDescriptor yarnClusterDescriptor) throws 
ClusterDeploymentException {
-               final int masterMemory = 256;
+               final int masterMemory = 
yarnClusterDescriptor.getFlinkConfiguration().get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes();
                final int taskManagerMemory = 1024;
                final ClusterClient<ApplicationId> yarnClusterClient = 
yarnClusterDescriptor
                                .deploySessionCluster(new 
ClusterSpecification.ClusterSpecificationBuilder()
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index bb15e28..b7139af 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -94,8 +95,9 @@ public class YARNITCase extends YarnTestBase {
                        
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
                        
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
 
+                       final int masterMemory = 
yarnClusterDescriptor.getFlinkConfiguration().get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes();
                        final ClusterSpecification clusterSpecification = new 
ClusterSpecification.ClusterSpecificationBuilder()
-                               .setMasterMemoryMB(768)
+                               .setMasterMemoryMB(masterMemory)
                                .setTaskManagerMemoryMB(1024)
                                .setSlotsPerTaskManager(1)
                                .createClusterSpecification();
@@ -138,6 +140,7 @@ public class YARNITCase extends YarnTestBase {
 
        private Configuration 
createDefaultConfiguration(YarnConfigOptions.UserJarInclusion userJarInclusion) 
{
                Configuration configuration = new Configuration();
+               configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(768));
                configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse("1g"));
                configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
                configuration.setString(CLASSPATH_INCLUDE_USER_JAR, 
userJarInclusion.toString());
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index d44d612..ee538d4 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -26,6 +26,8 @@ import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
@@ -88,9 +90,10 @@ public class YarnConfigurationITCase extends YarnTestBase {
 
                        final int slotsPerTaskManager = 3;
                        configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, 
slotsPerTaskManager);
+                       final int masterMemory = 768;
+                       
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(masterMemory));
 
                        final TaskExecutorProcessSpec tmResourceSpec = 
TaskExecutorProcessUtils.processSpecFromConfig(configuration);
-                       final int masterMemory = 64;
                        final int taskManagerMemory = 
tmResourceSpec.getTotalProcessMemorySize().getMebiBytes();
 
                        final YarnConfiguration yarnConfiguration = 
getYarnConfiguration();
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index cde732b..daae7ed 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -44,7 +44,10 @@ import 
org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
+import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
 import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ShutdownHookUtil;
@@ -932,7 +935,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                                hasLogback,
                                hasLog4j,
                                hasKrb5,
-                               clusterSpecification.getMasterMemoryMB());
+                               
JobManagerProcessUtils.processSpecFromConfig(flinkConfiguration));
 
                // setup security tokens
                if (UserGroupInformation.isSecurityEnabled()) {
@@ -1570,7 +1573,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                        boolean hasLogback,
                        boolean hasLog4j,
                        boolean hasKrb5,
-                       int jobManagerMemoryMb) {
+                       JobManagerProcessSpec processSpec) {
                // ------------------ Prepare Application Master Container  
------------------------------
 
                // respect custom JVM options in the YAML file
@@ -1590,8 +1593,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                final  Map<String, String> startCommandValues = new HashMap<>();
                startCommandValues.put("java", "$JAVA_HOME/bin/java");
 
-               int heapSize = 
BootstrapTools.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration);
-               String jvmHeapMem = String.format("-Xms%sm -Xmx%sm", heapSize, 
heapSize);
+               String jvmHeapMem = 
ProcessMemoryUtils.generateJvmParametersStr(processSpec);
                startCommandValues.put("jvmmem", jvmHeapMem);
 
                startCommandValues.put("jvmopts", javaOpts);
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 352ca6e..4ca49c6 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -355,7 +355,7 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine {
                        if (!MemorySize.MemoryUnit.hasUnit(jmMemoryVal)) {
                                jmMemoryVal += "m";
                        }
-                       
effectiveConfiguration.set(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
MemorySize.parse(jmMemoryVal));
+                       
effectiveConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.parse(jmMemoryVal));
                }
 
                if (commandLine.hasOption(tmMemory.getOpt())) {
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index bf097fb..6657190 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -111,7 +111,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                String[] params =
                        new String[] {"-ys", "3"};
 
-               FlinkYarnSessionCli yarnCLI = 
createFlinkYarnSessionCliWithTmTotalMemory(2048);
+               FlinkYarnSessionCli yarnCLI = 
createFlinkYarnSessionCliWithJmAndTmTotalMemory(2048);
 
                final CommandLine commandLine = 
yarnCLI.parseCommandLineOptions(params, true);
 
@@ -314,7 +314,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                final int taskManagerMemory = 7331;
                final int slotsPerTaskManager = 30;
 
-               configuration.set(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
MemorySize.ofMebiBytes(jobManagerMemory));
+               configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(jobManagerMemory));
                configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(taskManagerMemory));
                configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
slotsPerTaskManager);
 
@@ -340,7 +340,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
        public void testConfigurationClusterSpecification() throws Exception {
                final Configuration configuration = new Configuration();
                final int jobManagerMemory = 1337;
-               configuration.set(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
MemorySize.ofMebiBytes(jobManagerMemory));
+               configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(jobManagerMemory));
                final int taskManagerMemory = 7331;
                configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(taskManagerMemory));
                final int slotsPerTaskManager = 42;
@@ -361,10 +361,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
        }
 
        /**
-        * Tests the specifying heap memory without unit for job manager and 
task manager.
+        * Tests the specifying total process memory without unit for job 
manager and task manager.
         */
        @Test
-       public void testHeapMemoryPropertyWithoutUnit() throws Exception {
+       public void testMemoryPropertyWithoutUnit() throws Exception {
                final String[] args = new String[] { "-yjm", "1024", "-ytm", 
"2048" };
                final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCli();
 
@@ -379,10 +379,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
        }
 
        /**
-        * Tests the specifying heap memory with unit (MB) for job manager and 
task manager.
+        * Tests the specifying total process memory with unit (MB) for job 
manager and task manager.
         */
        @Test
-       public void testHeapMemoryPropertyWithUnitMB() throws Exception {
+       public void testMemoryPropertyWithUnitMB() throws Exception {
                final String[] args = new String[] { "-yjm", "1024m", "-ytm", 
"2048m" };
                final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCli();
                final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(args, false);
@@ -396,10 +396,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
        }
 
        /**
-        * Tests the specifying heap memory with arbitrary unit for job manager 
and task manager.
+        * Tests the specifying total process memory with arbitrary unit for 
job manager and task manager.
         */
        @Test
-       public void testHeapMemoryPropertyWithArbitraryUnit() throws Exception {
+       public void testMemoryPropertyWithArbitraryUnit() throws Exception {
                final String[] args = new String[] { "-yjm", "1g", "-ytm", "2g" 
};
                final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCli();
                final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(args, false);
@@ -434,12 +434,12 @@ public class FlinkYarnSessionCliTest extends TestLogger {
        }
 
        /**
-        * Tests the specifying job manager heap memory with config default 
value for job manager and task manager.
+        * Tests the specifying job manager total process memory with config 
default value for job manager and task manager.
         */
        @Test
-       public void testJobManagerHeapMemoryPropertyWithConfigDefaultValue() 
throws Exception {
-               int tmMemory = 2048;
-               final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCliWithTmTotalMemory(tmMemory);
+       public void testJobManagerMemoryPropertyWithConfigDefaultValue() throws 
Exception {
+               int procMemory = 2048;
+               final FlinkYarnSessionCli flinkYarnSessionCli = 
createFlinkYarnSessionCliWithJmAndTmTotalMemory(procMemory);
 
                final CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);
 
@@ -447,8 +447,8 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                final ClusterClientFactory<ApplicationId> clientFactory = 
getClusterClientFactory(executorConfig);
                final ClusterSpecification clusterSpecification = 
clientFactory.getClusterSpecification(executorConfig);
 
-               assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
-               assertThat(clusterSpecification.getTaskManagerMemoryMB(), 
is(tmMemory));
+               assertThat(clusterSpecification.getMasterMemoryMB(), 
is(procMemory));
+               assertThat(clusterSpecification.getTaskManagerMemoryMB(), 
is(procMemory));
        }
 
        @Test
@@ -490,8 +490,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
                return createFlinkYarnSessionCli(new Configuration());
        }
 
-       private FlinkYarnSessionCli 
createFlinkYarnSessionCliWithTmTotalMemory(int totalMemomory) throws 
FlinkException {
+       private FlinkYarnSessionCli 
createFlinkYarnSessionCliWithJmAndTmTotalMemory(int totalMemomory) throws 
FlinkException {
                Configuration configuration = new Configuration();
+               configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(totalMemomory));
                configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(totalMemomory));
                return createFlinkYarnSessionCli(configuration);
        }
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 7a855e9..09d2133 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
+import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -55,6 +57,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static junit.framework.TestCase.assertTrue;
+import static 
org.apache.flink.runtime.jobmanager.JobManagerProcessUtils.createDefaultJobManagerProcessSpec;
 import static 
org.apache.flink.yarn.configuration.YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
@@ -156,8 +159,9 @@ public class YarnClusterDescriptorTest extends TestLogger {
                Configuration cfg = new Configuration();
                YarnClusterDescriptor clusterDescriptor = 
createYarnClusterDescriptor(cfg);
 
+               final JobManagerProcessSpec jobManagerProcessSpec = 
createDefaultJobManagerProcessSpec(1024);
                final String java = "$JAVA_HOME/bin/java";
-               final String jvmmem = "-Xms424m -Xmx424m";
+               final String jvmmem = 
ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec);
                final String jvmOpts = "-Djvm"; // if set
                final String jmJvmOpts = "-DjmJvm"; // if set
                final String krb5 = "-Djava.security.krb5.conf=krb5.conf";
@@ -173,7 +177,6 @@ public class YarnClusterDescriptorTest extends TestLogger {
                final String redirects =
                        "1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/jobmanager.out " +
                        "2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/jobmanager.err";
-               final int jobManagerMemory = 1024;
 
                try {
                        // no logging, with/out krb5
@@ -188,7 +191,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                                                false,
                                                false,
                                                false,
-                                               jobManagerMemory)
+                                               jobManagerProcessSpec)
                                        .getCommands().get(0));
 
                        assertEquals(
@@ -202,7 +205,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                                                false,
                                                false,
                                                true,
-                                               jobManagerMemory)
+                                               jobManagerProcessSpec)
                                        .getCommands().get(0));
 
                        // logback only, with/out krb5
@@ -217,7 +220,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                                                true,
                                                false,
                                                false,
-                                               jobManagerMemory)
+                                               jobManagerProcessSpec)
                                        .getCommands().get(0));
 
                        assertEquals(
@@ -231,7 +234,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                                                true,
                                                false,
                                                true,
-                                               jobManagerMemory)
+                                               jobManagerProcessSpec)
                                        .getCommands().get(0));
 
                        // log4j, with/out krb5
@@ -246,7 +249,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                                                false,
                                                true,
                                                false,
-                                               jobManagerMemory)
+                                               jobManagerProcessSpec)
                                        .getCommands().get(0));
 
                        assertEquals(
@@ -260,7 +263,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                                                false,
                                                true,
                                                true,
-                                               jobManagerMemory)
+                                               jobManagerProcessSpec)
                                        .getCommands().get(0));
 
                        // logback + log4j, with/out krb5
@@ -275,7 +278,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                                                true,
                                                true,
                                                false,
-                                               jobManagerMemory)
+                                               jobManagerProcessSpec)
                                        .getCommands().get(0));
 
                        assertEquals(
@@ -289,7 +292,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                                                true,
                                                true,
                                                true,
-                                               jobManagerMemory)
+                                               jobManagerProcessSpec)
                                        .getCommands().get(0));
 
                        // logback + log4j, with/out krb5, different JVM opts
@@ -307,7 +310,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                                                true,
                                                true,
                                                false,
-                                               jobManagerMemory)
+                                               jobManagerProcessSpec)
                                        .getCommands().get(0));
 
                        assertEquals(
@@ -321,7 +324,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                                                true,
                                                true,
                                                true,
-                                               jobManagerMemory)
+                                               jobManagerProcessSpec)
                                        .getCommands().get(0));
 
                        // logback + log4j, with/out krb5, different JVM opts
@@ -338,7 +341,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                                                true,
                                                true,
                                                false,
-                                               jobManagerMemory)
+                                               jobManagerProcessSpec)
                                        .getCommands().get(0));
 
                        assertEquals(
@@ -352,7 +355,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                                                true,
                                                true,
                                                true,
-                                               jobManagerMemory)
+                                               jobManagerProcessSpec)
                                        .getCommands().get(0));
 
                        // now try some configurations with different 
yarn.container-start-command-template
@@ -370,7 +373,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                                                true,
                                                true,
                                                true,
-                                               jobManagerMemory)
+                                               jobManagerProcessSpec)
                                        .getCommands().get(0));
 
                        
cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
@@ -388,7 +391,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
                                                true,
                                                true,
                                                true,
-                                               jobManagerMemory)
+                                               jobManagerProcessSpec)
                                        .getCommands().get(0));
                } finally {
                        clusterDescriptor.close();

Reply via email to