This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch flip116
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e099019f80ad4420a35565ef7015a89c363d0b14
Author: Xintong Song <tonysong...@gmail.com>
AuthorDate: Thu Apr 9 13:22:49 2020 +0800

    [FLINK-16742][dist] Extend and use BashJavaUtils for JM memory 
configuration in start-up scripts.
    
    This closes #11545.
---
 flink-dist/src/main/flink-bin/bin/jobmanager.sh    | 31 ++++++++++-----------
 .../src/main/flink-bin/bin/standalone-job.sh       | 30 ++++++++++----------
 .../org/apache/flink/dist/BashJavaUtilsITCase.java | 32 +++++++++++++++++++++-
 .../apache/flink/runtime/util/BashJavaUtils.java   | 29 ++++++++++++++++++--
 4 files changed, 88 insertions(+), 34 deletions(-)

diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh 
b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index 9efb83e..7403dd8 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -37,26 +37,16 @@ bin=`cd "$bin"; pwd`
 ENTRYPOINT=standalonesession
 
 if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
-    if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then
-           echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace 
with key \`${KEY_JOBM_MEM_SIZE}\`"
-    else
-           flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})
-           FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})
-    fi
-
-    if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" 
-lt "0" ]]; then
-        echo "[ERROR] Configured JobManager memory size is not a valid value. 
Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
-        exit 1
-    fi
-
-    if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then
-        export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m 
-Xmx"$FLINK_JM_HEAP_MB"m"
-    fi
-
     # Add JobManager-specific JVM options
     export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} 
${FLINK_ENV_JAVA_OPTS_JM}"
 
     # Startup parameters
+
+    java_utils_output=$(runBashJavaUtilsCmd GET_JM_RESOURCE_PARAMS 
${FLINK_CONF_DIR} $FLINK_BIN_DIR/bash-java-utils.jar:$(findFlinkDistJar))
+    logging_output=$(extractLoggingOutputs "${java_utils_output}")
+    jvm_params=$(extractExecutionResults "${java_utils_output}" 1)
+    export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
+
     args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
     if [ ! -z $HOST ]; then
         args+=("--host")
@@ -67,6 +57,15 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == 
"start-foreground" ]]; then
         args+=("--webui-port")
         args+=("${WEBUIPORT}")
     fi
+
+    export FLINK_INHERITED_LOGS="
+$FLINK_INHERITED_LOGS
+
+JM_RESOURCE_PARAMS extraction logs:
+jvm_params: $jvm_params
+logs: $logging_output
+"
+
 fi
 
 if [[ $STARTSTOP == "start-foreground" ]]; then
diff --git a/flink-dist/src/main/flink-bin/bin/standalone-job.sh 
b/flink-dist/src/main/flink-bin/bin/standalone-job.sh
index 8a98a19..4df02ee 100644
--- a/flink-dist/src/main/flink-bin/bin/standalone-job.sh
+++ b/flink-dist/src/main/flink-bin/bin/standalone-job.sh
@@ -37,24 +37,24 @@ bin=`cd "$bin"; pwd`
 ARGS=("--configDir" "${FLINK_CONF_DIR}" "${@:2}")
 
 if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
-    if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then
-           echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace 
with key \`${KEY_JOBM_MEM_SIZE}\`"
-    else
-           flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})
-           FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})
-    fi
+    # Add cluster entry point specific JVM options
+    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} 
${FLINK_ENV_JAVA_OPTS_JM}"
 
-    if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" 
-lt "0" ]]; then
-        echo "[ERROR] Configured memory size is not a valid value. Please set 
'${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
-        exit 1
-    fi
+    # Startup parameters
 
-    if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then
-        export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m 
-Xmx"$FLINK_JM_HEAP_MB"m"
-    fi
+    java_utils_output=$(runBashJavaUtilsCmd GET_JM_RESOURCE_PARAMS 
${FLINK_CONF_DIR} $FLINK_BIN_DIR/bash-java-utils.jar:$(findFlinkDistJar) 
"${ARGS[@]}")
+    logging_output=$(extractLoggingOutputs "${java_utils_output}")
+    jvm_params=$(extractExecutionResults "${java_utils_output}" 1)
+    export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
+
+    export FLINK_INHERITED_LOGS="
+$FLINK_INHERITED_LOGS
+
+JM_RESOURCE_PARAMS extraction logs:
+jvm_params: $jvm_params
+logs: $logging_output
+"
 
-    # Add cluster entry point specific JVM options
-    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} 
${FLINK_ENV_JAVA_OPTS_JM}"
 fi
 
 if [[ $STARTSTOP == "start-foreground" ]]; then
diff --git 
a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java 
b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java
index 8aafb23..4604697 100644
--- a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java
+++ b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.dist;
 
 import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.util.BashJavaUtils;
 
@@ -54,7 +55,7 @@ public class BashJavaUtilsITCase extends JavaBashTestBase {
        }
 
        @Test
-       public void testConfigOverwrittenByDynamicOpts() throws Exception {
+       public void testGetTmResourceParamsConfigsWithDynamicProperties() 
throws Exception {
                int expectedResultLines = 2;
                double cpuCores = 39.0;
                String[] commands = {
@@ -70,6 +71,35 @@ public class BashJavaUtilsITCase extends JavaBashTestBase {
        }
 
        @Test
+       public void testGetJmResourceParams() throws Exception {
+               int expectedResultLines = 1;
+               String[] commands = {
+                       RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
+                       BashJavaUtils.Command.GET_JM_RESOURCE_PARAMS.toString(),
+                       String.valueOf(expectedResultLines)};
+               List<String> lines = 
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
+
+               assertThat(lines.size(), is(expectedResultLines));
+               ConfigurationUtils.parseJvmArgString(lines.get(0));
+       }
+
+       @Test
+       public void testGetJmResourceParamsWithDynamicProperties() throws 
Exception {
+               int expectedResultLines = 1;
+               long metaspace = 123456789L;
+               String[] commands = {
+                       RUN_BASH_JAVA_UTILS_CMD_SCRIPT,
+                       BashJavaUtils.Command.GET_JM_RESOURCE_PARAMS.toString(),
+                       String.valueOf(expectedResultLines),
+                       "-D" + JobManagerOptions.JVM_METASPACE.key() + "=" + 
metaspace + "b"};
+               List<String> lines = 
Arrays.asList(executeScript(commands).split(System.lineSeparator()));
+
+               assertThat(lines.size(), is(expectedResultLines));
+               Map<String, String> params = 
ConfigurationUtils.parseJvmArgString(lines.get(0));
+               assertThat(Long.valueOf(params.get("-XX:MaxMetaspaceSize=")), 
is(metaspace));
+       }
+
+       @Test
        public void testExtractLoggingOutputs() throws Exception {
                StringBuilder input = new StringBuilder();
                List<String> expectedOutput = new ArrayList<>();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java
index 1478e1c..666d09a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java
@@ -20,10 +20,13 @@ package org.apache.flink.runtime.util;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
 import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
 import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory;
+import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
+import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
 import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
 import org.apache.flink.util.FlinkException;
 
@@ -47,9 +50,14 @@ public class BashJavaUtils {
        public static void main(String[] args) throws Exception {
                checkArgument(args.length > 0, "Command not specified.");
 
+               String[] commandArgs = Arrays.copyOfRange(args, 1, args.length);
+
                switch (Command.valueOf(args[0])) {
                        case GET_TM_RESOURCE_PARAMS:
-                               getTmResourceParams(Arrays.copyOfRange(args, 1, 
args.length));
+                               getTmResourceParams(commandArgs);
+                               break;
+                       case GET_JM_RESOURCE_PARAMS:
+                               getJmResourceParams(commandArgs);
                                break;
                        default:
                                // unexpected, Command#valueOf should fail if a 
unknown command is passed in
@@ -74,6 +82,18 @@ public class BashJavaUtils {
                        configuration, TaskManagerOptions.TOTAL_FLINK_MEMORY);
        }
 
+       private static void getJmResourceParams(String[] args) throws Exception 
{
+               Configuration configuration = 
getConfigurationForStandaloneJobManager(args);
+               JobManagerProcessSpec jobManagerProcessSpec = 
JobManagerProcessUtils.processSpecFromConfig(configuration);
+               System.out.println(EXECUTION_PREFIX + 
ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec));
+       }
+
+       private static Configuration 
getConfigurationForStandaloneJobManager(String[] args) throws Exception {
+               Configuration configuration = loadConfiguration(args);
+               return 
JobManagerProcessUtils.getConfigurationWithLegacyHeapSizeMappedToNewConfigOption(
+                       configuration, JobManagerOptions.TOTAL_FLINK_MEMORY);
+       }
+
        @VisibleForTesting
        static Configuration loadConfiguration(String[] args) throws 
FlinkException {
                return ConfigurationParserUtils.loadCommonConfiguration(
@@ -109,6 +129,11 @@ public class BashJavaUtils {
                /**
                 * Get JVM parameters and dynamic configs of task executor 
resources.
                 */
-               GET_TM_RESOURCE_PARAMS
+               GET_TM_RESOURCE_PARAMS,
+
+               /**
+                * Get JVM parameters and dynamic configs of job manager 
resources.
+                */
+               GET_JM_RESOURCE_PARAMS
        }
 }

Reply via email to