This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 4555ad9 [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap 4555ad9 is described below commit 4555ad91b4bd0df8887c4b4eb3119cbae272e805 Author: Andrey Zagrebin <azagre...@apache.org> AuthorDate: Tue Jun 9 16:47:04 2020 +0300 [FLINK-18188][Runtime] Derive JM Off-Heap memory from configured Total Flink Memory minus JVM Heap --- docs/ops/memory/mem_setup_master.md | 5 ++ docs/ops/memory/mem_setup_master.zh.md | 4 ++ .../jobmanager/JobManagerFlinkMemoryUtils.java | 56 ++++++++++++++++++---- .../jobmanager/JobManagerProcessUtilsTest.java | 38 ++++++++++++++- 4 files changed, 93 insertions(+), 10 deletions(-) diff --git a/docs/ops/memory/mem_setup_master.md b/docs/ops/memory/mem_setup_master.md index dbea142..d086427 100644 --- a/docs/ops/memory/mem_setup_master.md +++ b/docs/ops/memory/mem_setup_master.md @@ -89,6 +89,11 @@ There can be the following possible sources of *Off-heap* memory consumption: * Flink framework dependencies (e.g. Akka network communication) * User code executed during job submission (e.g. for certain batch sources) or in checkpoint completion callbacks +<span class="label label-info">Note</span> If you have configured the [Total Flink Memory](mem_setup.html#configure-total-memory) +and the [JVM Heap](#configure-jvm-heap) explicitly but you have not configured the *Off-heap* memory, the size of the *Off-heap* memory +will be derived as the [Total Flink Memory](mem_setup.html#configure-total-memory) minus the [JVM Heap](#configure-jvm-heap). +The default value of the *Off-heap* memory option will be ignored. + ## Local Execution If you run Flink locally (e.g. from your IDE) without creating a cluster, then the Master memory configuration options are ignored. diff --git a/docs/ops/memory/mem_setup_master.zh.md b/docs/ops/memory/mem_setup_master.zh.md index dbea142..54b5165 100644 --- a/docs/ops/memory/mem_setup_master.zh.md +++ b/docs/ops/memory/mem_setup_master.zh.md @@ -89,6 +89,10 @@ There can be the following possible sources of *Off-heap* memory consumption: * Flink framework dependencies (e.g. Akka network communication) * User code executed during job submission (e.g. for certain batch sources) or in checkpoint completion callbacks +<span class="label label-info">Note</span> If you have configured the [Total Flink Memory](mem_setup.html#configure-total-memory) +and the [JVM Heap](#configure-jvm-heap) explicitly but you have not configured the *Off-heap* memory, the size of the *Off-heap* memory +will be derived as the [Total Flink Memory](mem_setup.html#configure-total-memory) minus the [JVM Heap](#configure-jvm-heap). +The default value of the *Off-heap* memory option will be ignored. ## Local Execution If you run Flink locally (e.g. from your IDE) without creating a cluster, then the Master memory configuration options are ignored. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java index b22b6e6..9f4f13e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemoryUtils.java @@ -40,25 +40,63 @@ public class JobManagerFlinkMemoryUtils implements FlinkMemoryUtils<JobManagerFl public JobManagerFlinkMemory deriveFromRequiredFineGrainedOptions(Configuration config) { MemorySize jvmHeapMemorySize = ProcessMemoryUtils.getMemorySizeFromConfig(config, JobManagerOptions.JVM_HEAP_MEMORY); MemorySize offHeapMemorySize = ProcessMemoryUtils.getMemorySizeFromConfig(config, JobManagerOptions.OFF_HEAP_MEMORY); - MemorySize derivedTotalFlinkMemorySize = jvmHeapMemorySize.add(offHeapMemorySize); if (config.contains(JobManagerOptions.TOTAL_FLINK_MEMORY)) { // derive network memory from total flink memory, and check against network min/max MemorySize totalFlinkMemorySize = ProcessMemoryUtils.getMemorySizeFromConfig(config, JobManagerOptions.TOTAL_FLINK_MEMORY); - if (derivedTotalFlinkMemorySize.getBytes() != totalFlinkMemorySize.getBytes()) { - throw new IllegalConfigurationException(String.format( - "Sum of the configured JVM Heap Memory (%s) and the configured or default Off-heap Memory (%s) " + - "exceeds the configured Total Flink Memory (%s). Please, make the configuration consistent " + - "or configure only one option: either JVM Heap or Total Flink Memory.", - jvmHeapMemorySize.toHumanReadableString(), - offHeapMemorySize.toHumanReadableString(), - totalFlinkMemorySize.toHumanReadableString())); + if (config.contains(JobManagerOptions.OFF_HEAP_MEMORY)) { + // off-heap memory is explicitly set by user + sanityCheckTotalFlinkMemory(totalFlinkMemorySize, jvmHeapMemorySize, totalFlinkMemorySize); + } else { + // off-heap memory is not explicitly set by user, derive it from Total Flink Memory and JVM Heap + offHeapMemorySize = deriveOffHeapMemory(jvmHeapMemorySize, totalFlinkMemorySize, offHeapMemorySize); } } return createJobManagerFlinkMemory(config, jvmHeapMemorySize, offHeapMemorySize); } + private static void sanityCheckTotalFlinkMemory( + MemorySize totalFlinkMemorySize, + MemorySize jvmHeapMemorySize, + MemorySize offHeapMemorySize) { + MemorySize derivedTotalFlinkMemorySize = jvmHeapMemorySize.add(offHeapMemorySize); + if (derivedTotalFlinkMemorySize.getBytes() != totalFlinkMemorySize.getBytes()) { + throw new IllegalConfigurationException(String.format( + "Sum of the configured JVM Heap Memory (%s) and the configured Off-heap Memory (%s) " + + "does not match the configured Total Flink Memory (%s). Please, make the configuration consistent " + + "or configure only one option: either JVM Heap or Total Flink Memory.", + jvmHeapMemorySize.toHumanReadableString(), + offHeapMemorySize.toHumanReadableString(), + totalFlinkMemorySize.toHumanReadableString())); + } + } + + private static MemorySize deriveOffHeapMemory( + MemorySize jvmHeapMemorySize, + MemorySize totalFlinkMemorySize, + MemorySize defaultOffHeapMemorySize) { + if (totalFlinkMemorySize.getBytes() < jvmHeapMemorySize.getBytes()) { + throw new IllegalConfigurationException(String.format( + "The configured JVM Heap Memory (%s) exceeds the configured Total Flink Memory (%s). " + + "Please, make the configuration consistent or configure only one option: either JVM Heap " + + "or Total Flink Memory.", + jvmHeapMemorySize.toHumanReadableString(), + totalFlinkMemorySize.toHumanReadableString())); + } + MemorySize offHeapMemorySize = totalFlinkMemorySize.subtract(jvmHeapMemorySize); + if (offHeapMemorySize.getBytes() != defaultOffHeapMemorySize.getBytes()) { + LOG.info( + "The Off-Heap Memory size ({}) is derived the configured Total Flink Memory size ({}) minus " + + "the configured JVM Heap Memory size ({}). The default Off-Heap Memory size ({}) is ignored.", + offHeapMemorySize.toHumanReadableString(), + totalFlinkMemorySize.toHumanReadableString(), + jvmHeapMemorySize.toHumanReadableString(), + defaultOffHeapMemorySize.toHumanReadableString()); + } + return offHeapMemorySize; + } + @Override public JobManagerFlinkMemory deriveFromTotalFlinkMemory(Configuration config, MemorySize totalFlinkMemorySize) { MemorySize offHeapMemorySize = ProcessMemoryUtils.getMemorySizeFromConfig(config, JobManagerOptions.OFF_HEAP_MEMORY); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java index a7a51ec..840520f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessUtilsTest.java @@ -50,7 +50,7 @@ public class JobManagerProcessUtilsTest extends ProcessMemoryUtilsTestBase<JobMa private static final MemorySize TOTAL_PROCESS_MEM_SIZE = MemorySize.parse("1536m"); @Rule - public final TestLoggerResource testLoggerResource = new TestLoggerResource(JobManagerFlinkMemoryUtils.class, Level.WARN); + public final TestLoggerResource testLoggerResource = new TestLoggerResource(JobManagerFlinkMemoryUtils.class, Level.INFO); public JobManagerProcessUtilsTest() { super(JM_PROCESS_MEMORY_OPTIONS, JM_LEGACY_HEAP_OPTIONS, JobManagerOptions.TOTAL_PROCESS_MEMORY); @@ -130,6 +130,42 @@ public class JobManagerProcessUtilsTest extends ProcessMemoryUtilsTestBase<JobMa validateFail(conf); } + @Test + public void testJvmHeapExceedsTotalFlinkMemoryFailure() { + MemorySize totalFlinkMemory = MemorySize.ofMebiBytes(100); + MemorySize jvmHeap = MemorySize.ofMebiBytes(150); + + Configuration conf = new Configuration(); + conf.set(JobManagerOptions.TOTAL_FLINK_MEMORY, totalFlinkMemory); + conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeap); + + validateFail(conf); + } + + @Test + public void testOffHeapMemoryDerivedFromJvmHeapAndTotalFlinkMemory() { + MemorySize jvmHeap = MemorySize.ofMebiBytes(150); + MemorySize defaultOffHeap = JobManagerOptions.OFF_HEAP_MEMORY.defaultValue(); + MemorySize expectedOffHeap = MemorySize.ofMebiBytes(100).add(defaultOffHeap); + MemorySize totalFlinkMemory = jvmHeap.add(expectedOffHeap); + + Configuration conf = new Configuration(); + conf.set(JobManagerOptions.TOTAL_FLINK_MEMORY, totalFlinkMemory); + conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeap); + + JobManagerProcessSpec JobManagerProcessSpec = JobManagerProcessUtils.processSpecFromConfig(conf); + assertThat(JobManagerProcessSpec.getJvmDirectMemorySize(), is(expectedOffHeap)); + MatcherAssert.assertThat( + testLoggerResource.getMessages(), + hasItem(containsString(String.format( + "The Off-Heap Memory size (%s) is derived the configured Total Flink Memory size (%s) minus " + + "the configured JVM Heap Memory size (%s). The default Off-Heap Memory size (%s) is ignored.", + expectedOffHeap.toHumanReadableString(), + totalFlinkMemory.toHumanReadableString(), + jvmHeap.toHumanReadableString(), + defaultOffHeap.toHumanReadableString())))); + } + @Override protected JobManagerProcessSpec processSpecFromConfig(Configuration config) { return JobManagerProcessUtils.processSpecFromConfig(config);