This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6498f4b76f6b7b4cfe99972e241a2257575155ea Author: Andrey Zagrebin <azagre...@apache.org> AuthorDate: Mon Jun 8 11:42:35 2020 +0300 [FLINK-18154][Runtime] Check Total Flink Memory plus JVM metaspace is less than or equal to the configured Total Process Memory This closes #12520. --- .../util/config/memory/ProcessMemoryUtils.java | 23 +++++++++++++--- .../TaskExecutorProcessUtilsTest.java | 11 ++++++++ .../jobmanager/JobManagerProcessUtilsTest.java | 8 ++++++ .../config/memory/ProcessMemoryUtilsTestBase.java | 31 ++++++++++++++++++++++ 4 files changed, 70 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java index 16a6d2d..b154156 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java @@ -143,9 +143,7 @@ public class ProcessMemoryUtils<FM extends FlinkMemory> { MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add(jvmMetaspaceSize); JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead; if (config.contains(options.getTotalProcessMemoryOption())) { - MemorySize totalProcessMemorySize = getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption()); - MemorySize jvmOverheadSize = totalProcessMemorySize.subtract(totalFlinkAndJvmMetaspaceSize); - sanityCheckJvmOverhead(config, jvmOverheadSize, totalProcessMemorySize); + MemorySize jvmOverheadSize = deriveJvmOverheadFromTotalFlinkMemoryAndOtherComponents(config, totalFlinkMemorySize); jvmMetaspaceAndOverhead = new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize); } else { MemorySize jvmOverheadSize = deriveWithInverseFraction( @@ -158,6 +156,25 @@ public class ProcessMemoryUtils<FM extends FlinkMemory> { return jvmMetaspaceAndOverhead; } + private MemorySize deriveJvmOverheadFromTotalFlinkMemoryAndOtherComponents( + Configuration config, + MemorySize totalFlinkMemorySize) { + MemorySize totalProcessMemorySize = getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption()); + MemorySize jvmMetaspaceSize = getMemorySizeFromConfig(config, options.getJvmOptions().getJvmMetaspaceOption()); + MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add(jvmMetaspaceSize); + if (totalProcessMemorySize.getBytes() < totalFlinkAndJvmMetaspaceSize.getBytes()) { + throw new IllegalConfigurationException( + "The configured Total Process Memory size (%s) is less than the sum of the derived " + + "Total Flink Memory size (%s) and the configured or default JVM Metaspace size (%s).", + totalProcessMemorySize.toHumanReadableString(), + totalFlinkMemorySize.toHumanReadableString(), + jvmMetaspaceSize.toHumanReadableString()); + } + MemorySize jvmOverheadSize = totalProcessMemorySize.subtract(totalFlinkAndJvmMetaspaceSize); + sanityCheckJvmOverhead(config, jvmOverheadSize, totalProcessMemorySize); + return jvmOverheadSize; + } + private void sanityCheckJvmOverhead( Configuration config, MemorySize derivedJvmOverheadSize, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java index 4548162..487a25b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java @@ -584,6 +584,17 @@ public class TaskExecutorProcessUtilsTest extends ProcessMemoryUtilsTestBase<Tas } } + @Override + protected void configWithFineGrainedOptions(Configuration configuration, MemorySize totalFlinkMemorySize) { + MemorySize componentSize = new MemorySize(totalFlinkMemorySize.getBytes() / 6); + configuration.set(TaskManagerOptions.TASK_HEAP_MEMORY, componentSize); + configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, componentSize); + configuration.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, componentSize); + configuration.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY, componentSize); + configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, componentSize); + // network is the 6th component, fixed implicitly + } + private static Configuration configWithExplicitTaskHeapAndManageMem() { final Configuration conf = new Configuration(); conf.set(TaskManagerOptions.TASK_HEAP_MEMORY, TASK_HEAP_SIZE); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java index 6bab438..a7a51ec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java @@ -246,6 +246,14 @@ public class JobManagerProcessUtilsTest extends ProcessMemoryUtilsTestBase<JobMa } } + @Override + protected void configWithFineGrainedOptions(Configuration configuration, MemorySize totalFlinkMemorySize) { + MemorySize heapSize = new MemorySize(totalFlinkMemorySize.getBytes() / 2); + MemorySize offHeapSize = totalFlinkMemorySize.subtract(heapSize); + configuration.set(JobManagerOptions.JVM_HEAP_MEMORY, heapSize); + configuration.set(JobManagerOptions.OFF_HEAP_MEMORY, offHeapSize); + } + private static Configuration configWithExplicitJvmHeap() { Configuration conf = new Configuration(); conf.set(JobManagerOptions.JVM_HEAP_MEMORY, JVM_HEAP_SIZE); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java index 15fdd74..301e2b6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java @@ -126,6 +126,35 @@ public abstract class ProcessMemoryUtilsTestBase<T extends ProcessMemorySpec> ex } @Test + public void testDerivedTotalProcessMemoryGreaterThanConfiguredFailureWithFineGrainedOptions() { + Configuration conf = getConfigurationWithJvmMetaspaceAndTotalFlinkMemory(100, 200); + // Total Flink memory + JVM Metaspace > Total Process Memory (no space for JVM overhead) + MemorySize totalFlinkMemorySize = MemorySize.ofMebiBytes(150); + configWithFineGrainedOptions(conf, totalFlinkMemorySize); + validateFail(conf); + } + + @Test + public void testDerivedTotalProcessMemoryGreaterThanConfiguredFailureWithTotalFlinkMemory() { + Configuration conf = getConfigurationWithJvmMetaspaceAndTotalFlinkMemory(100, 200); + // Total Flink memory + JVM Metaspace > Total Process Memory (no space for JVM overhead) + MemorySize totalFlinkMemorySize = MemorySize.ofMebiBytes(150); + conf.set(options.getTotalFlinkMemoryOption(), totalFlinkMemorySize); + validateFail(conf); + } + + private Configuration getConfigurationWithJvmMetaspaceAndTotalFlinkMemory( + long jvmMetaspaceSizeMb, + long totalProcessMemorySizeMb) { + MemorySize jvmMetaspaceSize = MemorySize.ofMebiBytes(jvmMetaspaceSizeMb); + MemorySize totalProcessMemorySize = MemorySize.ofMebiBytes(totalProcessMemorySizeMb); + Configuration conf = new Configuration(); + conf.set(options.getJvmOptions().getJvmMetaspaceOption(), jvmMetaspaceSize); + conf.set(options.getTotalProcessMemoryOption(), totalProcessMemorySize); + return conf; + } + + @Test public void testConfigJvmMetaspaceSize() { MemorySize jvmMetaspaceSize = MemorySize.parse("50m"); @@ -303,6 +332,8 @@ public abstract class ProcessMemoryUtilsTestBase<T extends ProcessMemorySpec> ex protected abstract Configuration getConfigurationWithLegacyHeapSizeMappedToNewConfigOption(Configuration config); + protected abstract void configWithFineGrainedOptions(Configuration configuration, MemorySize totalFlinkMemorySize); + protected ConfigOption<MemorySize> getNewOptionForLegacyHeapOption() { return newOptionForLegacyHeapOption; }