This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch flip116 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e099019f80ad4420a35565ef7015a89c363d0b14 Author: Xintong Song <tonysong...@gmail.com> AuthorDate: Thu Apr 9 13:22:49 2020 +0800 [FLINK-16742][dist] Extend and use BashJavaUtils for JM memory configuration in start-up scripts. This closes #11545. --- flink-dist/src/main/flink-bin/bin/jobmanager.sh | 31 ++++++++++----------- .../src/main/flink-bin/bin/standalone-job.sh | 30 ++++++++++---------- .../org/apache/flink/dist/BashJavaUtilsITCase.java | 32 +++++++++++++++++++++- .../apache/flink/runtime/util/BashJavaUtils.java | 29 ++++++++++++++++++-- 4 files changed, 88 insertions(+), 34 deletions(-) diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh index 9efb83e..7403dd8 100755 --- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh @@ -37,26 +37,16 @@ bin=`cd "$bin"; pwd` ENTRYPOINT=standalonesession if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then - if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then - echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`" - else - flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP}) - FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes}) - fi - - if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then - echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}." - exit 1 - fi - - if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then - export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m" - fi - # Add JobManager-specific JVM options export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}" # Startup parameters + + java_utils_output=$(runBashJavaUtilsCmd GET_JM_RESOURCE_PARAMS ${FLINK_CONF_DIR} $FLINK_BIN_DIR/bash-java-utils.jar:$(findFlinkDistJar)) + logging_output=$(extractLoggingOutputs "${java_utils_output}") + jvm_params=$(extractExecutionResults "${java_utils_output}" 1) + export JVM_ARGS="${JVM_ARGS} ${jvm_params}" + args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster") if [ ! -z $HOST ]; then args+=("--host") @@ -67,6 +57,15 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then args+=("--webui-port") args+=("${WEBUIPORT}") fi + + export FLINK_INHERITED_LOGS=" +$FLINK_INHERITED_LOGS + +JM_RESOURCE_PARAMS extraction logs: +jvm_params: $jvm_params +logs: $logging_output +" + fi if [[ $STARTSTOP == "start-foreground" ]]; then diff --git a/flink-dist/src/main/flink-bin/bin/standalone-job.sh b/flink-dist/src/main/flink-bin/bin/standalone-job.sh index 8a98a19..4df02ee 100644 --- a/flink-dist/src/main/flink-bin/bin/standalone-job.sh +++ b/flink-dist/src/main/flink-bin/bin/standalone-job.sh @@ -37,24 +37,24 @@ bin=`cd "$bin"; pwd` ARGS=("--configDir" "${FLINK_CONF_DIR}" "${@:2}") if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then - if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then - echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`" - else - flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP}) - FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes}) - fi + # Add cluster entry point specific JVM options + export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}" - if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then - echo "[ERROR] Configured memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}." - exit 1 - fi + # Startup parameters - if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then - export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m" - fi + java_utils_output=$(runBashJavaUtilsCmd GET_JM_RESOURCE_PARAMS ${FLINK_CONF_DIR} $FLINK_BIN_DIR/bash-java-utils.jar:$(findFlinkDistJar) "${ARGS[@]}") + logging_output=$(extractLoggingOutputs "${java_utils_output}") + jvm_params=$(extractExecutionResults "${java_utils_output}" 1) + export JVM_ARGS="${JVM_ARGS} ${jvm_params}" + + export FLINK_INHERITED_LOGS=" +$FLINK_INHERITED_LOGS + +JM_RESOURCE_PARAMS extraction logs: +jvm_params: $jvm_params +logs: $logging_output +" - # Add cluster entry point specific JVM options - export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}" fi if [[ $STARTSTOP == "start-foreground" ]]; then diff --git a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java index 8aafb23..4604697 100644 --- a/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java +++ b/flink-dist/src/test/java/org/apache/flink/dist/BashJavaUtilsITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.dist; import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.util.BashJavaUtils; @@ -54,7 +55,7 @@ public class BashJavaUtilsITCase extends JavaBashTestBase { } @Test - public void testConfigOverwrittenByDynamicOpts() throws Exception { + public void testGetTmResourceParamsConfigsWithDynamicProperties() throws Exception { int expectedResultLines = 2; double cpuCores = 39.0; String[] commands = { @@ -70,6 +71,35 @@ public class BashJavaUtilsITCase extends JavaBashTestBase { } @Test + public void testGetJmResourceParams() throws Exception { + int expectedResultLines = 1; + String[] commands = { + RUN_BASH_JAVA_UTILS_CMD_SCRIPT, + BashJavaUtils.Command.GET_JM_RESOURCE_PARAMS.toString(), + String.valueOf(expectedResultLines)}; + List<String> lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); + + assertThat(lines.size(), is(expectedResultLines)); + ConfigurationUtils.parseJvmArgString(lines.get(0)); + } + + @Test + public void testGetJmResourceParamsWithDynamicProperties() throws Exception { + int expectedResultLines = 1; + long metaspace = 123456789L; + String[] commands = { + RUN_BASH_JAVA_UTILS_CMD_SCRIPT, + BashJavaUtils.Command.GET_JM_RESOURCE_PARAMS.toString(), + String.valueOf(expectedResultLines), + "-D" + JobManagerOptions.JVM_METASPACE.key() + "=" + metaspace + "b"}; + List<String> lines = Arrays.asList(executeScript(commands).split(System.lineSeparator())); + + assertThat(lines.size(), is(expectedResultLines)); + Map<String, String> params = ConfigurationUtils.parseJvmArgString(lines.get(0)); + assertThat(Long.valueOf(params.get("-XX:MaxMetaspaceSize=")), is(metaspace)); + } + + @Test public void testExtractLoggingOutputs() throws Exception { StringBuilder input = new StringBuilder(); List<String> expectedOutput = new ArrayList<>(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java index 1478e1c..666d09a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/BashJavaUtils.java @@ -20,10 +20,13 @@ package org.apache.flink.runtime.util; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; import org.apache.flink.runtime.entrypoint.ClusterConfigurationParserFactory; +import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec; +import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils; import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils; import org.apache.flink.util.FlinkException; @@ -47,9 +50,14 @@ public class BashJavaUtils { public static void main(String[] args) throws Exception { checkArgument(args.length > 0, "Command not specified."); + String[] commandArgs = Arrays.copyOfRange(args, 1, args.length); + switch (Command.valueOf(args[0])) { case GET_TM_RESOURCE_PARAMS: - getTmResourceParams(Arrays.copyOfRange(args, 1, args.length)); + getTmResourceParams(commandArgs); + break; + case GET_JM_RESOURCE_PARAMS: + getJmResourceParams(commandArgs); break; default: // unexpected, Command#valueOf should fail if a unknown command is passed in @@ -74,6 +82,18 @@ public class BashJavaUtils { configuration, TaskManagerOptions.TOTAL_FLINK_MEMORY); } + private static void getJmResourceParams(String[] args) throws Exception { + Configuration configuration = getConfigurationForStandaloneJobManager(args); + JobManagerProcessSpec jobManagerProcessSpec = JobManagerProcessUtils.processSpecFromConfig(configuration); + System.out.println(EXECUTION_PREFIX + ProcessMemoryUtils.generateJvmParametersStr(jobManagerProcessSpec)); + } + + private static Configuration getConfigurationForStandaloneJobManager(String[] args) throws Exception { + Configuration configuration = loadConfiguration(args); + return JobManagerProcessUtils.getConfigurationWithLegacyHeapSizeMappedToNewConfigOption( + configuration, JobManagerOptions.TOTAL_FLINK_MEMORY); + } + @VisibleForTesting static Configuration loadConfiguration(String[] args) throws FlinkException { return ConfigurationParserUtils.loadCommonConfiguration( @@ -109,6 +129,11 @@ public class BashJavaUtils { /** * Get JVM parameters and dynamic configs of task executor resources. */ - GET_TM_RESOURCE_PARAMS + GET_TM_RESOURCE_PARAMS, + + /** + * Get JVM parameters and dynamic configs of job manager resources. + */ + GET_JM_RESOURCE_PARAMS } }