This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b4eb8ac503f [FLINK-33221][core][config] Add config options for
administrator JVM options
b4eb8ac503f is described below
commit b4eb8ac503f41fd793db1ac662fbedc46af92fd5
Author: Zhanghao Chen <[email protected]>
AuthorDate: Tue Jan 16 00:30:49 2024 +0800
[FLINK-33221][core][config] Add config options for administrator JVM options
---
.../generated/environment_configuration.html | 18 ++++++++
.../apache/flink/configuration/CoreOptions.java | 37 ++++++++++++++++
flink-dist/src/main/flink-bin/bin/config.sh | 9 ++++
flink-python/pyflink/pyflink_gateway_server.py | 11 +++--
.../src/main/java/org/apache/flink/yarn/Utils.java | 45 +++++++++++++++-----
.../apache/flink/yarn/YarnClusterDescriptor.java | 19 ++++-----
.../test/java/org/apache/flink/yarn/UtilsTest.java | 49 +++++++++++++++++++++-
.../flink/yarn/YarnClusterDescriptorTest.java | 14 +++++++
8 files changed, 174 insertions(+), 28 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/environment_configuration.html
b/docs/layouts/shortcodes/generated/environment_configuration.html
index 3627811dc9a..6f520e5abff 100644
--- a/docs/layouts/shortcodes/generated/environment_configuration.html
+++ b/docs/layouts/shortcodes/generated/environment_configuration.html
@@ -20,6 +20,24 @@
<td>String</td>
<td>Path to hbase configuration directory. It is required to read
HBASE configuration. You can also set it via environment variable.</td>
</tr>
+ <tr>
+ <td><h5>env.java.default-opts.all</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>A string of default JVM options to prepend to <code
class="highlighter-rouge">env.java.opts.all</code>. This is intended to be set
by administrators.</td>
+ </tr>
+ <tr>
+ <td><h5>env.java.default-opts.jobmanager</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>A string of default JVM options to prepend to <code
class="highlighter-rouge">env.java.opts.jobmanager</code>. This is intended to
be set by administrators.</td>
+ </tr>
+ <tr>
+ <td><h5>env.java.default-opts.taskmanager</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>A string of default JVM options to prepend to <code
class="highlighter-rouge">env.java.opts.taskmanager</code>. This is intended to
be set by administrators.</td>
+ </tr>
<tr>
<td><h5>env.java.opts.all</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index a78cde9edfd..03b92b328c6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -31,6 +31,7 @@ import
org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import java.util.List;
import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.code;
/** The set of configuration options for core parameters. */
@PublicEvolving
@@ -282,6 +283,42 @@ public class CoreOptions {
"Java options to start the JVM of
the Flink SQL Gateway with.")
.build());
+ public static final ConfigOption<String> FLINK_DEFAULT_JVM_OPTIONS =
+ ConfigOptions.key("env.java.default-opts.all")
+ .stringType()
+ .defaultValue("")
+ .withDescription(
+ Description.builder()
+ .text(
+ "A string of default JVM options
to prepend to %s."
+ + " This is intended to be
set by administrators.",
+ code(FLINK_JVM_OPTIONS.key()))
+ .build());
+
+ public static final ConfigOption<String> FLINK_DEFAULT_JM_JVM_OPTIONS =
+ ConfigOptions.key("env.java.default-opts.jobmanager")
+ .stringType()
+ .defaultValue("")
+ .withDescription(
+ Description.builder()
+ .text(
+ "A string of default JVM options
to prepend to %s."
+ + " This is intended to be
set by administrators.",
+ code(FLINK_JM_JVM_OPTIONS.key()))
+ .build());
+
+ public static final ConfigOption<String> FLINK_DEFAULT_TM_JVM_OPTIONS =
+ ConfigOptions.key("env.java.default-opts.taskmanager")
+ .stringType()
+ .defaultValue("")
+ .withDescription(
+ Description.builder()
+ .text(
+ "A string of default JVM options
to prepend to %s."
+ + " This is intended to be
set by administrators.",
+ code(FLINK_TM_JVM_OPTIONS.key()))
+ .build());
+
/**
* This option is here only for documentation generation, it is only
evaluated in the shell
* scripts.
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh
b/flink-dist/src/main/flink-bin/bin/config.sh
index 363e17a375f..c798bdf4c01 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -186,6 +186,9 @@ KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"
KEY_ENV_JAVA_OPTS_HS="env.java.opts.historyserver"
KEY_ENV_JAVA_OPTS_CLI="env.java.opts.client"
KEY_ENV_JAVA_OPTS_SQL_GATEWAY="env.java.opts.sql-gateway"
+KEY_ENV_JAVA_DEFAULT_OPTS="env.java.default-opts.all"
+KEY_ENV_JAVA_DEFAULT_OPTS_JM="env.java.default-opts.jobmanager"
+KEY_ENV_JAVA_DEFAULT_OPTS_TM="env.java.default-opts.taskmanager"
KEY_ENV_SSH_OPTS="env.ssh.opts"
KEY_HIGH_AVAILABILITY="high-availability.type"
KEY_ZK_HEAP_MB="zookeeper.heap.mb"
@@ -326,11 +329,13 @@ if [ -z "${FLINK_PID_DIR}" ]; then
fi
if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
+ FLINK_ENV_JAVA_DEFAULT_OPTS=$(readFromConfig ${KEY_ENV_JAVA_DEFAULT_OPTS}
"" "${YAML_CONF}")
FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} ""
"${YAML_CONF}")
if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
# try deprecated key
FLINK_ENV_JAVA_OPTS=$(readFromConfig "env.java.opts"
"${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")
fi
+ FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_DEFAULT_OPTS} ${FLINK_ENV_JAVA_OPTS}"
# Remove leading and ending double quotes (if present) of value
FLINK_ENV_JAVA_OPTS="-XX:+IgnoreUnrecognizedVMOptions $( echo
"${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )"
@@ -343,13 +348,17 @@ if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
fi
if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then
+ FLINK_ENV_JAVA_DEFAULT_OPTS_JM=$(readFromConfig
${KEY_ENV_JAVA_DEFAULT_OPTS_JM} "" "${YAML_CONF}")
FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM}
"${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")
+ FLINK_ENV_JAVA_OPTS_JM="${FLINK_ENV_JAVA_DEFAULT_OPTS_JM}
${FLINK_ENV_JAVA_OPTS_JM}"
# Remove leading and ending double quotes (if present) of value
FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e
's/^"//' -e 's/"$//' )"
fi
if [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
+ FLINK_ENV_JAVA_DEFAULT_OPTS_TM=$(readFromConfig
${KEY_ENV_JAVA_DEFAULT_OPTS_TM} "" "${YAML_CONF}")
FLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM}
"${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}")
+ FLINK_ENV_JAVA_OPTS_TM="${FLINK_ENV_JAVA_DEFAULT_OPTS_TM}
${FLINK_ENV_JAVA_OPTS_TM}"
# Remove leading and ending double quotes (if present) of value
FLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e
's/^"//' -e 's/"$//' )"
fi
diff --git a/flink-python/pyflink/pyflink_gateway_server.py
b/flink-python/pyflink/pyflink_gateway_server.py
index 5dacc15f6d0..dcb3f34884e 100644
--- a/flink-python/pyflink/pyflink_gateway_server.py
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -36,6 +36,7 @@ KEY_ENV_HBASE_CONF_DIR = "env.hbase.conf.dir"
KEY_ENV_JAVA_HOME = "env.java.home"
KEY_ENV_JAVA_OPTS = "env.java.opts.all"
KEY_ENV_JAVA_OPTS_DEPRECATED = "env.java.opts"
+KEY_ENV_JAVA_DEFAULT_OPTS = "env.java.default-opts.all"
def on_windows():
@@ -156,12 +157,14 @@ def construct_log_settings(env):
def get_jvm_opts(env):
flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
- jvm_opts = env.get(
- 'FLINK_ENV_JAVA_OPTS',
- read_from_config(
+ jvm_opts = env.get("FLINK_ENV_JAVA_OPTS")
+ if jvm_opts is None:
+ default_jvm_opts = read_from_config(KEY_ENV_JAVA_DEFAULT_OPTS, "",
flink_conf_file)
+ extra_jvm_opts = read_from_config(
KEY_ENV_JAVA_OPTS,
read_from_config(KEY_ENV_JAVA_OPTS_DEPRECATED, "",
flink_conf_file),
- flink_conf_file))
+ flink_conf_file)
+ jvm_opts = default_jvm_opts + " " + extra_jvm_opts
# Remove leading and trailing double quotes (if present) of value
jvm_opts = jvm_opts.strip('"')
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 621f1dcda41..985196e53fc 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.CoreOptions;
import
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -61,6 +62,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -509,17 +511,13 @@ public final class Utils {
startCommandValues.put(
"jvmmem",
ProcessMemoryUtils.generateJvmParametersStr(taskExecutorProcessSpec));
- String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
- if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() >
0) {
- javaOpts += " " +
flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS);
- }
- javaOpts += " " + IGNORE_UNRECOGNIZED_VM_OPTIONS;
-
- // krb5.conf file will be available as local resource in JM/TM
container
- if (hasKrb5) {
- javaOpts += " -Djava.security.krb5.conf=krb5.conf";
- }
- startCommandValues.put("jvmopts", javaOpts);
+ List<ConfigOption<String>> jvmOptions =
+ Arrays.asList(
+ CoreOptions.FLINK_DEFAULT_JVM_OPTIONS,
+ CoreOptions.FLINK_JVM_OPTIONS,
+ CoreOptions.FLINK_DEFAULT_TM_JVM_OPTIONS,
+ CoreOptions.FLINK_TM_JVM_OPTIONS);
+ startCommandValues.put("jvmopts", generateJvmOptsString(flinkConfig,
jvmOptions, hasKrb5));
String logging = "";
if (hasLogback || hasLog4j) {
@@ -592,6 +590,23 @@ public final class Utils {
return template;
}
+ public static String generateJvmOptsString(
+ org.apache.flink.configuration.Configuration conf,
+ List<ConfigOption<String>> jvmOptions,
+ boolean hasKrb5) {
+ StringBuilder javaOptsSb = new StringBuilder();
+ for (ConfigOption<String> option : jvmOptions) {
+ concatWithSpace(javaOptsSb, conf.get(option));
+ }
+ concatWithSpace(javaOptsSb, IGNORE_UNRECOGNIZED_VM_OPTIONS);
+
+ // krb5.conf file will be available as local resource in JM/TM
container
+ if (hasKrb5) {
+ concatWithSpace(javaOptsSb, "-Djava.security.krb5.conf=krb5.conf");
+ }
+ return javaOptsSb.toString().trim();
+ }
+
static boolean isRemotePath(String path) throws IOException {
org.apache.flink.core.fs.Path flinkPath = new
org.apache.flink.core.fs.Path(path);
return flinkPath.getFileSystem().isDistributedFS();
@@ -796,4 +811,12 @@ public final class Utils {
public static Path getPathFromLocalFilePathStr(String localPathStr) {
return getPathFromLocalFile(new File(localPathStr));
}
+
+ public static void concatWithSpace(StringBuilder sb, String value) {
+ if (value == null || value.isEmpty()) {
+ return;
+ }
+ sb.append(' ');
+ sb.append(value);
+ }
}
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 6e1f14392f7..176804eaab2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -121,6 +121,7 @@ import java.net.URI;
import java.net.URLDecoder;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -1856,17 +1857,13 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
// ------------------ Prepare Application Master Container
------------------------------
// respect custom JVM options in the YAML file
- String javaOpts =
flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
- if
(flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
- javaOpts += " " +
flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
- }
-
- javaOpts += " " + IGNORE_UNRECOGNIZED_VM_OPTIONS;
-
- // krb5.conf file will be available as local resource in JM/TM
container
- if (hasKrb5) {
- javaOpts += " -Djava.security.krb5.conf=krb5.conf";
- }
+ List<ConfigOption<String>> jvmOptions =
+ Arrays.asList(
+ CoreOptions.FLINK_DEFAULT_JVM_OPTIONS,
+ CoreOptions.FLINK_JVM_OPTIONS,
+ CoreOptions.FLINK_DEFAULT_JM_JVM_OPTIONS,
+ CoreOptions.FLINK_JM_JVM_OPTIONS);
+ String javaOpts = Utils.generateJvmOptsString(flinkConfiguration,
jvmOptions, hasKrb5);
// Set up the container launch context for the application master
ContainerLaunchContext amContainer =
Records.newRecord(ContainerLaunchContext.class);
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index d3f3a9e239e..353f48d7318 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
@@ -39,6 +40,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -225,7 +227,9 @@ class UtilsTest {
final String java = "$JAVA_HOME/bin/java";
final String jvmmem =
"-Xmx111 -Xms111 -XX:MaxDirectMemorySize=222
-XX:MaxMetaspaceSize=333";
+ final String defaultJvmOpts = "-DdefaultJvm"; // if set
final String jvmOpts = "-Djvm"; // if set
+ final String defaultTmJvmOpts = "-DdefaultTmJvm"; // if set
final String tmJvmOpts = "-DtmJvm"; // if set
final String logfile = "-Dlog.file=./logs/taskmanager.log"; // if set
final String logback =
"-Dlogback.configurationFile=file:./conf/logback.xml"; // if set
@@ -453,7 +457,8 @@ class UtilsTest {
redirects));
// logback + log4j, with/out krb5, different JVM opts
- cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
+ cfg.set(CoreOptions.FLINK_DEFAULT_JVM_OPTIONS, defaultJvmOpts);
+ cfg.set(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
assertThat(
Utils.getTaskManagerShellCommand(
cfg,
@@ -470,6 +475,7 @@ class UtilsTest {
" ",
java,
jvmmem,
+ defaultJvmOpts,
jvmOpts,
Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
logfile,
@@ -495,6 +501,7 @@ class UtilsTest {
" ",
java,
jvmmem,
+ defaultJvmOpts,
jvmOpts,
Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
krb5,
@@ -506,7 +513,8 @@ class UtilsTest {
redirects));
// logback + log4j, with/out krb5, different JVM opts
- cfg.setString(CoreOptions.FLINK_TM_JVM_OPTIONS, tmJvmOpts);
+ cfg.set(CoreOptions.FLINK_DEFAULT_TM_JVM_OPTIONS, defaultTmJvmOpts);
+ cfg.set(CoreOptions.FLINK_TM_JVM_OPTIONS, tmJvmOpts);
assertThat(
Utils.getTaskManagerShellCommand(
cfg,
@@ -523,7 +531,9 @@ class UtilsTest {
" ",
java,
jvmmem,
+ defaultJvmOpts,
jvmOpts,
+ defaultTmJvmOpts,
tmJvmOpts,
Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
logfile,
@@ -549,7 +559,9 @@ class UtilsTest {
" ",
java,
jvmmem,
+ defaultJvmOpts,
jvmOpts,
+ defaultTmJvmOpts,
tmJvmOpts,
Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
krb5,
@@ -583,7 +595,9 @@ class UtilsTest {
"1",
jvmmem,
"2",
+ defaultJvmOpts,
jvmOpts,
+ defaultTmJvmOpts,
tmJvmOpts,
Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
krb5,
@@ -619,7 +633,9 @@ class UtilsTest {
logfile,
logback,
log4j,
+ defaultJvmOpts,
jvmOpts,
+ defaultTmJvmOpts,
tmJvmOpts,
Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
krb5,
@@ -629,6 +645,35 @@ class UtilsTest {
redirects));
}
+ @Test
+ void testGenerateJvmOptsString() {
+ final String defaultJvmOpts = "-DdefaultJvm";
+ final String jvmOpts = "-Djvm";
+ final String krb5 = "-Djava.security.krb5.conf=krb5.conf";
+ final Configuration conf = new Configuration();
+ conf.set(CoreOptions.FLINK_DEFAULT_JVM_OPTIONS, defaultJvmOpts);
+ conf.set(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
+ final List<ConfigOption<String>> jvmOptions =
+ Arrays.asList(CoreOptions.FLINK_DEFAULT_JVM_OPTIONS,
CoreOptions.FLINK_JVM_OPTIONS);
+ // With Krb5
+ assertThat(Utils.generateJvmOptsString(conf, jvmOptions, true))
+ .isEqualTo(
+ String.join(
+ " ",
+ defaultJvmOpts,
+ jvmOpts,
+ Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS,
+ krb5));
+ // Without Krb5
+ assertThat(Utils.generateJvmOptsString(conf, jvmOptions, false))
+ .isEqualTo(
+ String.join(
+ " ",
+ defaultJvmOpts,
+ jvmOpts,
+ Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS));
+ }
+
private static void verifyUnitResourceVariousSchedulers(
YarnConfiguration yarnConfig, int minMem, int minVcore, int
incMem, int incVcore) {
yarnConfig.set(YarnConfiguration.RM_SCHEDULER,
Utils.YARN_RM_FAIR_SCHEDULER_CLAZZ);
diff --git
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 5eadae781e0..af0f283415f 100644
---
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -174,7 +174,9 @@ class YarnClusterDescriptorTest {
JobManagerProcessUtils.generateJvmParametersStr(jobManagerProcessSpec, cfg);
final String dynamicParameters =
JobManagerProcessUtils.generateDynamicConfigsStr(jobManagerProcessSpec);
+ final String defaultJvmOpts = "-DdefaultJvm"; // if set
final String jvmOpts = "-Djvm"; // if set
+ final String defaultJmJvmOpts = "-DdefaultJmJvm"; // if set
final String jmJvmOpts = "-DjmJvm"; // if set
final String krb5 = "-Djava.security.krb5.conf=krb5.conf";
final String logfile =
@@ -370,6 +372,7 @@ class YarnClusterDescriptorTest {
// YarnClusterDescriptor,
// because we have a reference to the ClusterDescriptor's
configuration which we modify
// continuously
+ cfg.setString(CoreOptions.FLINK_DEFAULT_JVM_OPTIONS,
defaultJvmOpts);
cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
cfg.set(
YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE,
@@ -385,6 +388,7 @@ class YarnClusterDescriptorTest {
" ",
java,
jvmmem,
+ defaultJvmOpts,
jvmOpts,
YarnClusterDescriptor.IGNORE_UNRECOGNIZED_VM_OPTIONS,
logfile,
@@ -407,6 +411,7 @@ class YarnClusterDescriptorTest {
" ",
java,
jvmmem,
+ defaultJvmOpts,
jvmOpts,
YarnClusterDescriptor.IGNORE_UNRECOGNIZED_VM_OPTIONS,
krb5,
@@ -419,6 +424,7 @@ class YarnClusterDescriptorTest {
// log4j, with/out krb5, different JVM opts
// IMPORTANT: Be aware that we are using side effects here to
modify the created
// YarnClusterDescriptor
+ cfg.setString(CoreOptions.FLINK_DEFAULT_JM_JVM_OPTIONS,
defaultJmJvmOpts);
cfg.setString(CoreOptions.FLINK_JM_JVM_OPTIONS, jmJvmOpts);
cfg.set(
YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE,
@@ -434,7 +440,9 @@ class YarnClusterDescriptorTest {
" ",
java,
jvmmem,
+ defaultJvmOpts,
jvmOpts,
+ defaultJmJvmOpts,
jmJvmOpts,
YarnClusterDescriptor.IGNORE_UNRECOGNIZED_VM_OPTIONS,
logfile,
@@ -457,7 +465,9 @@ class YarnClusterDescriptorTest {
" ",
java,
jvmmem,
+ defaultJvmOpts,
jvmOpts,
+ defaultJmJvmOpts,
jmJvmOpts,
YarnClusterDescriptor.IGNORE_UNRECOGNIZED_VM_OPTIONS,
krb5,
@@ -489,7 +499,9 @@ class YarnClusterDescriptorTest {
"1",
jvmmem,
"2",
+ defaultJvmOpts,
jvmOpts,
+ defaultJmJvmOpts,
jmJvmOpts,
YarnClusterDescriptor.IGNORE_UNRECOGNIZED_VM_OPTIONS,
krb5,
@@ -523,7 +535,9 @@ class YarnClusterDescriptorTest {
java,
logfile,
logback,
+ defaultJvmOpts,
jvmOpts,
+ defaultJmJvmOpts,
jmJvmOpts,
YarnClusterDescriptor.IGNORE_UNRECOGNIZED_VM_OPTIONS,
krb5,