Repository: flink Updated Branches: refs/heads/master 0ba528c71 -> 0df8e0797
[FLINK-7400][cluster] fix cut-off memory not used for off-heap reserve as intended + fix description of `containerized.heap-cutoff-ratio` [FLINK-7400][yarn] add an integration test for yarn container memory restrictions using off-heap memory [FLINK-7400] address PR comments This closes #4506. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0df8e079 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0df8e079 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0df8e079 Branch: refs/heads/master Commit: 0df8e0797459ef9e8dfa177920def08bc2f11d65 Parents: 0ba528c Author: Nico Kruber <[email protected]> Authored: Wed Aug 9 11:53:03 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Wed Nov 1 13:11:18 2017 +0100 ---------------------------------------------------------------------- docs/ops/config.md | 2 +- .../ContaineredTaskManagerParameters.java | 3 +- .../YARNSessionCapacitySchedulerITCase.java | 55 +++++++++++++++++++- 3 files changed, 56 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0df8e079/docs/ops/config.md ---------------------------------------------------------------------- diff --git a/docs/ops/config.md b/docs/ops/config.md index b72bc10..9ee7106 100644 --- a/docs/ops/config.md +++ b/docs/ops/config.md @@ -451,7 +451,7 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use ### YARN -- `containerized.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from containers started by YARN. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove a 15% of the memory from the requested heap as a safety margin. +- `containerized.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to remove from containers started by YARN. When a user requests a certain amount of memory for each TaskManager container (for example 4 GB), we can not pass this amount as the maximum heap space for the JVM (`-Xmx` argument) because the JVM is also allocating memory outside the heap. YARN is very strict with killing containers which are using more memory than requested. Therefore, we remove this fraction of the memory from the requested heap as a safety margin and add it to the memory used off-heap. - `containerized.heap-cutoff-min`: (Default 600 MB) Minimum amount of memory to cut off the requested heap size. http://git-wip-us.apache.org/repos/asf/flink/blob/0df8e079/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java index 7e9891f..c35cf81 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java @@ -141,7 +141,8 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable { // (2) split the remaining Java memory between heap and off-heap final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(javaMemorySizeMB, config); - final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : javaMemorySizeMB - heapSizeMB; + // use the cut-off memory for off-heap (that was its intention) + final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : containerMemoryMB - heapSizeMB; // (3) obtain the additional environment variables from the configuration final HashMap<String, String> envVars = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/0df8e079/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index 4441d8a..03c61e8 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -18,9 +18,12 @@ package org.apache.flink.yarn; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.runtime.client.JobClient; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.util.TestBaseUtils; @@ -124,9 +127,57 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { "-ys", "2", //test that the job is executed with a DOP of 2 "-yjm", "768", "-ytm", "1024", exampleJarLocation.getAbsolutePath()}, - /* test succeeded after this string */ + /* test succeeded after this string */ "Job execution complete", - /* prohibited strings: (we want to see "DataSink (...) (2/2) switched to FINISHED") */ + /* prohibited strings: (to verify the parallelism) */ + // (we should see "DataSink (...) (1/2)" and "DataSink (...) (2/2)" instead) + new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"}, + RunTypes.CLI_FRONTEND, 0, true); + LOG.info("Finished perJobYarnCluster()"); + } + + /** + * Test per-job yarn cluster and memory calculations for off-heap use (see FLINK-7400) with the + * same job as {@link #perJobYarnCluster()}. + * + * <p>This ensures that with (any) pre-allocated off-heap memory by us, there is some off-heap + * memory remaining for Flink's libraries. Creating task managers will thus fail if no off-heap + * memory remains. + */ + @Test + public void perJobYarnClusterOffHeap() { + LOG.info("Starting perJobYarnCluster()"); + addTestAppender(JobClient.class, Level.INFO); + File exampleJarLocation = new File("target/programs/BatchWordCount.jar"); + Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation); + + // set memory constraints (otherwise this is the same test as perJobYarnCluster() above) + final long taskManagerMemoryMB = 1024; + //noinspection NumericOverflow if the calculation of the total Java memory size overflows, default configuration parameters are wrong in the first place, so we can ignore this inspection + final long networkBuffersMB = TaskManagerServices + .calculateNetworkBufferMemory( + (taskManagerMemoryMB - + ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.defaultValue()) << 20, + new Configuration()) >> 20; + final long offHeapMemory = taskManagerMemoryMB + - ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN.defaultValue() + // cutoff memory (will be added automatically) + - networkBuffersMB // amount of memory used for network buffers + - 100; // reserve something for the Java heap space + + runWithArgs(new String[]{"run", "-m", "yarn-cluster", + "-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(), + "-yn", "1", + "-ys", "2", //test that the job is executed with a DOP of 2 + "-yjm", "768", + "-ytm", String.valueOf(taskManagerMemoryMB), + "-yD", "taskmanager.memory.off-heap=true", + "-yD", "taskmanager.memory.size=" + offHeapMemory, + "-yD", "taskmanager.memory.preallocate=true", exampleJarLocation.getAbsolutePath()}, + /* test succeeded after this string */ + "Job execution complete", + /* prohibited strings: (to verify the parallelism) */ + // (we should see "DataSink (...) (1/2)" and "DataSink (...) (2/2)" instead) new String[]{"DataSink \\(.*\\) \\(1/1\\) switched to FINISHED"}, RunTypes.CLI_FRONTEND, 0, true); LOG.info("Finished perJobYarnCluster()");
