This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6498f4b76f6b7b4cfe99972e241a2257575155ea
Author: Andrey Zagrebin <azagre...@apache.org>
AuthorDate: Mon Jun 8 11:42:35 2020 +0300

    [FLINK-18154][Runtime] Check Total Flink Memory plus JVM metaspace is less 
than or equal to the configured Total Process Memory
    
    This closes #12520.
---
 .../util/config/memory/ProcessMemoryUtils.java     | 23 +++++++++++++---
 .../TaskExecutorProcessUtilsTest.java              | 11 ++++++++
 .../jobmanager/JobManagerProcessUtilsTest.java     |  8 ++++++
 .../config/memory/ProcessMemoryUtilsTestBase.java  | 31 ++++++++++++++++++++++
 4 files changed, 70 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
index 16a6d2d..b154156 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java
@@ -143,9 +143,7 @@ public class ProcessMemoryUtils<FM extends FlinkMemory> {
                MemorySize totalFlinkAndJvmMetaspaceSize = 
totalFlinkMemorySize.add(jvmMetaspaceSize);
                JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead;
                if (config.contains(options.getTotalProcessMemoryOption())) {
-                       MemorySize totalProcessMemorySize = 
getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption());
-                       MemorySize jvmOverheadSize = 
totalProcessMemorySize.subtract(totalFlinkAndJvmMetaspaceSize);
-                       sanityCheckJvmOverhead(config, jvmOverheadSize, 
totalProcessMemorySize);
+                       MemorySize jvmOverheadSize = 
deriveJvmOverheadFromTotalFlinkMemoryAndOtherComponents(config, 
totalFlinkMemorySize);
                        jvmMetaspaceAndOverhead = new 
JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize);
                } else {
                        MemorySize jvmOverheadSize = deriveWithInverseFraction(
@@ -158,6 +156,25 @@ public class ProcessMemoryUtils<FM extends FlinkMemory> {
                return jvmMetaspaceAndOverhead;
        }
 
+       private MemorySize 
deriveJvmOverheadFromTotalFlinkMemoryAndOtherComponents(
+                       Configuration config,
+                       MemorySize totalFlinkMemorySize) {
+               MemorySize totalProcessMemorySize = 
getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption());
+               MemorySize jvmMetaspaceSize = getMemorySizeFromConfig(config, 
options.getJvmOptions().getJvmMetaspaceOption());
+               MemorySize totalFlinkAndJvmMetaspaceSize = 
totalFlinkMemorySize.add(jvmMetaspaceSize);
+               if (totalProcessMemorySize.getBytes() < 
totalFlinkAndJvmMetaspaceSize.getBytes()) {
+                       throw new IllegalConfigurationException(
+                               "The configured Total Process Memory size (%s) 
is less than the sum of the derived " +
+                                       "Total Flink Memory size (%s) and the 
configured or default JVM Metaspace size  (%s).",
+                               totalProcessMemorySize.toHumanReadableString(),
+                               totalFlinkMemorySize.toHumanReadableString(),
+                               jvmMetaspaceSize.toHumanReadableString());
+               }
+               MemorySize jvmOverheadSize = 
totalProcessMemorySize.subtract(totalFlinkAndJvmMetaspaceSize);
+               sanityCheckJvmOverhead(config, jvmOverheadSize, 
totalProcessMemorySize);
+               return jvmOverheadSize;
+       }
+
        private void sanityCheckJvmOverhead(
                        Configuration config,
                        MemorySize derivedJvmOverheadSize,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
index 4548162..487a25b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java
@@ -584,6 +584,17 @@ public class TaskExecutorProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<Tas
                }
        }
 
+       @Override
+       protected void configWithFineGrainedOptions(Configuration 
configuration, MemorySize totalFlinkMemorySize) {
+               MemorySize componentSize = new 
MemorySize(totalFlinkMemorySize.getBytes() / 6);
+               configuration.set(TaskManagerOptions.TASK_HEAP_MEMORY, 
componentSize);
+               configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, 
componentSize);
+               configuration.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, 
componentSize);
+               configuration.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY, 
componentSize);
+               configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
componentSize);
+               // network is the 6th component, fixed implicitly
+       }
+
        private static Configuration configWithExplicitTaskHeapAndManageMem() {
                final Configuration conf = new Configuration();
                conf.set(TaskManagerOptions.TASK_HEAP_MEMORY, TASK_HEAP_SIZE);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
index 6bab438..a7a51ec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
@@ -246,6 +246,14 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
                }
        }
 
+       @Override
+       protected void configWithFineGrainedOptions(Configuration 
configuration, MemorySize totalFlinkMemorySize) {
+               MemorySize heapSize = new 
MemorySize(totalFlinkMemorySize.getBytes() / 2);
+               MemorySize offHeapSize = 
totalFlinkMemorySize.subtract(heapSize);
+               configuration.set(JobManagerOptions.JVM_HEAP_MEMORY, heapSize);
+               configuration.set(JobManagerOptions.OFF_HEAP_MEMORY, 
offHeapSize);
+       }
+
        private static Configuration configWithExplicitJvmHeap() {
                Configuration conf = new Configuration();
                conf.set(JobManagerOptions.JVM_HEAP_MEMORY, JVM_HEAP_SIZE);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
index 15fdd74..301e2b6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtilsTestBase.java
@@ -126,6 +126,35 @@ public abstract class ProcessMemoryUtilsTestBase<T extends 
ProcessMemorySpec> ex
        }
 
        @Test
+       public void 
testDerivedTotalProcessMemoryGreaterThanConfiguredFailureWithFineGrainedOptions()
 {
+               Configuration conf = 
getConfigurationWithJvmMetaspaceAndTotalFlinkMemory(100, 200);
+               // Total Flink memory + JVM Metaspace > Total Process Memory 
(no space for JVM overhead)
+               MemorySize totalFlinkMemorySize = MemorySize.ofMebiBytes(150);
+               configWithFineGrainedOptions(conf, totalFlinkMemorySize);
+               validateFail(conf);
+       }
+
+       @Test
+       public void 
testDerivedTotalProcessMemoryGreaterThanConfiguredFailureWithTotalFlinkMemory() 
{
+               Configuration conf = 
getConfigurationWithJvmMetaspaceAndTotalFlinkMemory(100, 200);
+               // Total Flink memory + JVM Metaspace > Total Process Memory 
(no space for JVM overhead)
+               MemorySize totalFlinkMemorySize = MemorySize.ofMebiBytes(150);
+               conf.set(options.getTotalFlinkMemoryOption(), 
totalFlinkMemorySize);
+               validateFail(conf);
+       }
+
+       private Configuration 
getConfigurationWithJvmMetaspaceAndTotalFlinkMemory(
+                       long jvmMetaspaceSizeMb,
+                       long totalProcessMemorySizeMb) {
+               MemorySize jvmMetaspaceSize = 
MemorySize.ofMebiBytes(jvmMetaspaceSizeMb);
+               MemorySize totalProcessMemorySize = 
MemorySize.ofMebiBytes(totalProcessMemorySizeMb);
+               Configuration conf = new Configuration();
+               conf.set(options.getJvmOptions().getJvmMetaspaceOption(), 
jvmMetaspaceSize);
+               conf.set(options.getTotalProcessMemoryOption(), 
totalProcessMemorySize);
+               return conf;
+       }
+
+       @Test
        public void testConfigJvmMetaspaceSize() {
                MemorySize jvmMetaspaceSize = MemorySize.parse("50m");
 
@@ -303,6 +332,8 @@ public abstract class ProcessMemoryUtilsTestBase<T extends 
ProcessMemorySpec> ex
 
        protected abstract Configuration 
getConfigurationWithLegacyHeapSizeMappedToNewConfigOption(Configuration config);
 
+       protected abstract void configWithFineGrainedOptions(Configuration 
configuration, MemorySize totalFlinkMemorySize);
+
        protected ConfigOption<MemorySize> getNewOptionForLegacyHeapOption() {
                return newOptionForLegacyHeapOption;
        }

Reply via email to