This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4555ad9  [FLINK-18188][Runtime] Derive JM Off-Heap memory from 
configured Total Flink Memory minus JVM Heap
4555ad9 is described below

commit 4555ad91b4bd0df8887c4b4eb3119cbae272e805
Author: Andrey Zagrebin <azagre...@apache.org>
AuthorDate: Tue Jun 9 16:47:04 2020 +0300

    [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total 
Flink Memory minus JVM Heap
---
 docs/ops/memory/mem_setup_master.md                |  5 ++
 docs/ops/memory/mem_setup_master.zh.md             |  4 ++
 .../jobmanager/JobManagerFlinkMemoryUtils.java     | 56 ++++++++++++++++++----
 .../jobmanager/JobManagerProcessUtilsTest.java     | 38 ++++++++++++++-
 4 files changed, 93 insertions(+), 10 deletions(-)

diff --git a/docs/ops/memory/mem_setup_master.md 
b/docs/ops/memory/mem_setup_master.md
index dbea142..d086427 100644
--- a/docs/ops/memory/mem_setup_master.md
+++ b/docs/ops/memory/mem_setup_master.md
@@ -89,6 +89,11 @@ There can be the following possible sources of *Off-heap* 
memory consumption:
 * Flink framework dependencies (e.g. Akka network communication)
 * User code executed during job submission (e.g. for certain batch sources) or 
in checkpoint completion callbacks
 
+<span class="label label-info">Note</span> If you have configured the [Total 
Flink Memory](mem_setup.html#configure-total-memory)
+and the [JVM Heap](#configure-jvm-heap) explicitly but you have not configured 
the *Off-heap* memory, the size of the *Off-heap* memory
+will be derived as the [Total Flink 
Memory](mem_setup.html#configure-total-memory) minus the [JVM 
Heap](#configure-jvm-heap).
+The default value of the *Off-heap* memory option will be ignored.
+
 ## Local Execution
 
 If you run Flink locally (e.g. from your IDE) without creating a cluster, then 
the Master memory configuration options are ignored.
diff --git a/docs/ops/memory/mem_setup_master.zh.md 
b/docs/ops/memory/mem_setup_master.zh.md
index dbea142..54b5165 100644
--- a/docs/ops/memory/mem_setup_master.zh.md
+++ b/docs/ops/memory/mem_setup_master.zh.md
@@ -89,6 +89,10 @@ There can be the following possible sources of *Off-heap* 
memory consumption:
 * Flink framework dependencies (e.g. Akka network communication)
 * User code executed during job submission (e.g. for certain batch sources) or 
in checkpoint completion callbacks
 
+<span class="label label-info">Note</span> If you have configured the [Total 
Flink Memory](mem_setup.html#configure-total-memory)
+and the [JVM Heap](#configure-jvm-heap) explicitly but you have not configured 
the *Off-heap* memory, the size of the *Off-heap* memory
+will be derived as the [Total Flink 
Memory](mem_setup.html#configure-total-memory) minus the [JVM 
Heap](#configure-jvm-heap).
+The default value of the *Off-heap* memory option will be ignored.
 ## Local Execution
 
 If you run Flink locally (e.g. from your IDE) without creating a cluster, then 
the Master memory configuration options are ignored.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
index b22b6e6..9f4f13e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java
@@ -40,25 +40,63 @@ public class JobManagerFlinkMemoryUtils implements 
FlinkMemoryUtils<JobManagerFl
        public JobManagerFlinkMemory 
deriveFromRequiredFineGrainedOptions(Configuration config) {
                MemorySize jvmHeapMemorySize = 
ProcessMemoryUtils.getMemorySizeFromConfig(config, 
JobManagerOptions.JVM_HEAP_MEMORY);
                MemorySize offHeapMemorySize = 
ProcessMemoryUtils.getMemorySizeFromConfig(config, 
JobManagerOptions.OFF_HEAP_MEMORY);
-               MemorySize derivedTotalFlinkMemorySize = 
jvmHeapMemorySize.add(offHeapMemorySize);
 
                if (config.contains(JobManagerOptions.TOTAL_FLINK_MEMORY)) {
                        // derive network memory from total flink memory, and 
check against network min/max
                        MemorySize totalFlinkMemorySize = 
ProcessMemoryUtils.getMemorySizeFromConfig(config, 
JobManagerOptions.TOTAL_FLINK_MEMORY);
-                       if (derivedTotalFlinkMemorySize.getBytes() != 
totalFlinkMemorySize.getBytes()) {
-                               throw new 
IllegalConfigurationException(String.format(
-                                       "Sum of the configured JVM Heap Memory 
(%s) and the configured or default Off-heap Memory (%s) " +
-                                               "exceeds the configured Total 
Flink Memory (%s). Please, make the configuration consistent " +
-                                               "or configure only one option: 
either JVM Heap or Total Flink Memory.",
-                                       
jvmHeapMemorySize.toHumanReadableString(),
-                                       
offHeapMemorySize.toHumanReadableString(),
-                                       
totalFlinkMemorySize.toHumanReadableString()));
+                       if (config.contains(JobManagerOptions.OFF_HEAP_MEMORY)) 
{
+                               // off-heap memory is explicitly set by user
+                               
sanityCheckTotalFlinkMemory(totalFlinkMemorySize, jvmHeapMemorySize, 
totalFlinkMemorySize);
+                       } else {
+                               // off-heap memory is not explicitly set by 
user, derive it from Total Flink Memory and JVM Heap
+                               offHeapMemorySize = 
deriveOffHeapMemory(jvmHeapMemorySize, totalFlinkMemorySize, offHeapMemorySize);
                        }
                }
 
                return createJobManagerFlinkMemory(config, jvmHeapMemorySize, 
offHeapMemorySize);
        }
 
+       private static void sanityCheckTotalFlinkMemory(
+                       MemorySize totalFlinkMemorySize,
+                       MemorySize jvmHeapMemorySize,
+                       MemorySize offHeapMemorySize) {
+               MemorySize derivedTotalFlinkMemorySize = 
jvmHeapMemorySize.add(offHeapMemorySize);
+               if (derivedTotalFlinkMemorySize.getBytes() != 
totalFlinkMemorySize.getBytes()) {
+                       throw new IllegalConfigurationException(String.format(
+                               "Sum of the configured JVM Heap Memory (%s) and 
the configured Off-heap Memory (%s) " +
+                                       "does not match the configured Total 
Flink Memory (%s). Please, make the configuration consistent " +
+                                       "or configure only one option: either 
JVM Heap or Total Flink Memory.",
+                               jvmHeapMemorySize.toHumanReadableString(),
+                               offHeapMemorySize.toHumanReadableString(),
+                               totalFlinkMemorySize.toHumanReadableString()));
+               }
+       }
+
+       private static MemorySize deriveOffHeapMemory(
+                       MemorySize jvmHeapMemorySize,
+                       MemorySize totalFlinkMemorySize,
+                       MemorySize defaultOffHeapMemorySize) {
+               if (totalFlinkMemorySize.getBytes() < 
jvmHeapMemorySize.getBytes()) {
+                       throw new IllegalConfigurationException(String.format(
+                               "The configured JVM Heap Memory (%s) exceeds 
the configured Total Flink Memory (%s). " +
+                                       "Please, make the configuration 
consistent or configure only one option: either JVM Heap " +
+                                       "or Total Flink Memory.",
+                               jvmHeapMemorySize.toHumanReadableString(),
+                               totalFlinkMemorySize.toHumanReadableString()));
+               }
+               MemorySize offHeapMemorySize = 
totalFlinkMemorySize.subtract(jvmHeapMemorySize);
+               if (offHeapMemorySize.getBytes() != 
defaultOffHeapMemorySize.getBytes()) {
+                       LOG.info(
+                               "The Off-Heap Memory size ({}) is derived the 
configured Total Flink Memory size ({}) minus " +
+                                       "the configured JVM Heap Memory size 
({}). The default Off-Heap Memory size ({}) is ignored.",
+                               offHeapMemorySize.toHumanReadableString(),
+                               totalFlinkMemorySize.toHumanReadableString(),
+                               jvmHeapMemorySize.toHumanReadableString(),
+                               
defaultOffHeapMemorySize.toHumanReadableString());
+               }
+               return offHeapMemorySize;
+       }
+
        @Override
        public JobManagerFlinkMemory deriveFromTotalFlinkMemory(Configuration 
config, MemorySize totalFlinkMemorySize) {
                MemorySize offHeapMemorySize = 
ProcessMemoryUtils.getMemorySizeFromConfig(config, 
JobManagerOptions.OFF_HEAP_MEMORY);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
index a7a51ec..840520f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java
@@ -50,7 +50,7 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
        private static final MemorySize TOTAL_PROCESS_MEM_SIZE = 
MemorySize.parse("1536m");
 
        @Rule
-       public final TestLoggerResource testLoggerResource = new 
TestLoggerResource(JobManagerFlinkMemoryUtils.class, Level.WARN);
+       public final TestLoggerResource testLoggerResource = new 
TestLoggerResource(JobManagerFlinkMemoryUtils.class, Level.INFO);
 
        public JobManagerProcessUtilsTest() {
                super(JM_PROCESS_MEMORY_OPTIONS, JM_LEGACY_HEAP_OPTIONS, 
JobManagerOptions.TOTAL_PROCESS_MEMORY);
@@ -130,6 +130,42 @@ public class JobManagerProcessUtilsTest extends 
ProcessMemoryUtilsTestBase<JobMa
                validateFail(conf);
        }
 
+       @Test
+       public void testJvmHeapExceedsTotalFlinkMemoryFailure() {
+               MemorySize totalFlinkMemory = MemorySize.ofMebiBytes(100);
+               MemorySize jvmHeap = MemorySize.ofMebiBytes(150);
+
+               Configuration conf = new Configuration();
+               conf.set(JobManagerOptions.TOTAL_FLINK_MEMORY, 
totalFlinkMemory);
+               conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeap);
+
+               validateFail(conf);
+       }
+
+       @Test
+       public void testOffHeapMemoryDerivedFromJvmHeapAndTotalFlinkMemory() {
+               MemorySize jvmHeap = MemorySize.ofMebiBytes(150);
+               MemorySize defaultOffHeap = 
JobManagerOptions.OFF_HEAP_MEMORY.defaultValue();
+               MemorySize expectedOffHeap = 
MemorySize.ofMebiBytes(100).add(defaultOffHeap);
+               MemorySize totalFlinkMemory = jvmHeap.add(expectedOffHeap);
+
+               Configuration conf = new Configuration();
+               conf.set(JobManagerOptions.TOTAL_FLINK_MEMORY, 
totalFlinkMemory);
+               conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeap);
+
+               JobManagerProcessSpec JobManagerProcessSpec = 
JobManagerProcessUtils.processSpecFromConfig(conf);
+               assertThat(JobManagerProcessSpec.getJvmDirectMemorySize(), 
is(expectedOffHeap));
+               MatcherAssert.assertThat(
+                       testLoggerResource.getMessages(),
+                       hasItem(containsString(String.format(
+                               "The Off-Heap Memory size (%s) is derived the 
configured Total Flink Memory size (%s) minus " +
+                                       "the configured JVM Heap Memory size 
(%s). The default Off-Heap Memory size (%s) is ignored.",
+                               expectedOffHeap.toHumanReadableString(),
+                               totalFlinkMemory.toHumanReadableString(),
+                               jvmHeap.toHumanReadableString(),
+                               defaultOffHeap.toHumanReadableString()))));
+       }
+
        @Override
        protected JobManagerProcessSpec processSpecFromConfig(Configuration 
config) {
                return JobManagerProcessUtils.processSpecFromConfig(config);

Reply via email to