tillrohrmann commented on a change in pull request #9760: 
[FLINK-13982][runtime] Implement memory calculation logics
URL: https://github.com/apache/flink/pull/9760#discussion_r334000883
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
 ##########
 @@ -0,0 +1,685 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link TaskExecutorResourceUtils}.
+ */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
+
+       private static final MemorySize TASK_HEAP_SIZE = 
MemorySize.parse("100m");
+       private static final MemorySize MANAGED_MEM_SIZE = 
MemorySize.parse("200m");
+       private static final MemorySize TOTAL_FLINK_MEM_SIZE = 
MemorySize.parse("800m");
+       private static final MemorySize TOTAL_PROCESS_MEM_SIZE = 
MemorySize.parse("1g");
+
+       private static final TaskExecutorResourceSpec TM_RESOURCE_SPEC = new 
TaskExecutorResourceSpec(
+               MemorySize.parse("1m"),
+               MemorySize.parse("2m"),
+               MemorySize.parse("3m"),
+               MemorySize.parse("4m"),
+               MemorySize.parse("5m"),
+               MemorySize.parse("6m"),
+               MemorySize.parse("7m"),
+               MemorySize.parse("8m"));
+
+       @Test
+       public void testGenerateDynamicConfigurations() {
+               String dynamicConfigsStr = 
TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC);
+               Map<String, String> configs = new HashMap<>();
+               String[] configStrs = dynamicConfigsStr.split(" ");
+               assertThat(configStrs.length % 2, is(0));
+               for (int i = 0; i < configStrs.length; ++i) {
+                       String configStr = configStrs[i];
+                       if (i % 2 == 0) {
+                               assertThat(configStr, is("-D"));
+                       } else {
+                               String[] configKV = configStr.split("=");
+                               assertThat(configKV.length, is(2));
+                               configs.put(configKV[0], configKV[1]);
+                       }
+               }
+
+               
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
+               
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskHeapSize()));
+               
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())),
 is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
+               
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+               
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())),
 is(TM_RESOURCE_SPEC.getShuffleMemSize()));
+               
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())),
 is(TM_RESOURCE_SPEC.getManagedMemorySize()));
+               
assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())),
 is(TM_RESOURCE_SPEC.getOffHeapManagedMemorySize()));
+       }
+
+       @Test
+       public void testGenerateJvmParameters() throws Exception {
+               String jvmParamsStr = 
TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC);
+               MemorySize heapSizeMax = null;
+               MemorySize heapSizeMin = null;
+               MemorySize directSize = null;
+               MemorySize metaspaceSize = null;
+               for (String paramStr : jvmParamsStr.split(" ")) {
+                       if (paramStr.startsWith("-Xmx")) {
+                               heapSizeMax = 
MemorySize.parse(paramStr.substring("-Xmx".length()));
+                       } else if (paramStr.startsWith("-Xms")) {
+                               heapSizeMin = 
MemorySize.parse(paramStr.substring("-Xms".length()));
+                       } else if 
(paramStr.startsWith("-XX:MaxDirectMemorySize=")) {
+                               directSize = 
MemorySize.parse(paramStr.substring("-XX:MaxDirectMemorySize=".length()));
+                       } else if (paramStr.startsWith("-XX:MetaspaceSize=")) {
+                               metaspaceSize = 
MemorySize.parse(paramStr.substring("-XX:MetaspaceSize=".length()));
+                       } else {
+                               throw new Exception("Unknown JVM parameter: " + 
paramStr);
+                       }
+               }
+
+               assertThat(heapSizeMax, 
is(TM_RESOURCE_SPEC.getFrameworkHeapSize().add(TM_RESOURCE_SPEC.getTaskHeapSize()).add(TM_RESOURCE_SPEC.getOnHeapManagedMemorySize())));
+               assertThat(heapSizeMin, is(heapSizeMax));
+               assertThat(directSize, 
is(TM_RESOURCE_SPEC.getTaskOffHeapSize().add(TM_RESOURCE_SPEC.getShuffleMemSize())));
+               assertThat(metaspaceSize, 
is(TM_RESOURCE_SPEC.getJvmMetaspaceSize()));
+       }
+
+       @Test public void testConfigFrameworkHeapMemory() {
+               final MemorySize frameworkHeapSize = MemorySize.parse("100m");
+
+               Configuration conf = new Configuration();
+               conf.setString(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, 
frameworkHeapSize.getMebiBytes() + "m");
+
+               validateInAllConfigurations(conf, taskExecutorResourceSpec -> 
assertThat(taskExecutorResourceSpec.getFrameworkHeapSize(), 
is(frameworkHeapSize)));
+       }
+
+       @Test
+       public void testConfigTaskHeapMemory() {
+               final MemorySize taskHeapSize = MemorySize.parse("50m");
+
+               Configuration conf = new Configuration();
+               conf.setString(TaskManagerOptions.TASK_HEAP_MEMORY, 
taskHeapSize.getMebiBytes() + "m");
+
+               // validate in configurations without explicit task heap memory 
size,
+               // to avoid checking against overwritten task heap memory size
+               validateInConfigurationsWithoutExplicitTaskHeapMem(conf, 
taskExecutorResourceSpec -> 
assertThat(taskExecutorResourceSpec.getTaskHeapSize(), is(taskHeapSize)));
+       }
+
+       @Test
+       public void testConfigTaskOffheapMemory() {
+               final MemorySize taskOffHeapSize = MemorySize.parse("50m");
+
+               Configuration conf = new Configuration();
+               conf.setString(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, 
taskOffHeapSize.getMebiBytes() + "m");
+
+               validateInAllConfigurations(conf, taskExecutorResourceSpec -> 
assertThat(taskExecutorResourceSpec.getTaskOffHeapSize(), is(taskOffHeapSize)));
+       }
+
+       @Test
+       public void testConfigShuffleMemoryRange() {
+               final MemorySize shuffleMin = MemorySize.parse("50m");
+               final MemorySize shuffleMax = MemorySize.parse("200m");
+
+               Configuration conf = new Configuration();
+               conf.setString(TaskManagerOptions.SHUFFLE_MEMORY_MAX, 
shuffleMax.getMebiBytes() + "m");
+               conf.setString(TaskManagerOptions.SHUFFLE_MEMORY_MIN, 
shuffleMin.getMebiBytes() + "m");
+
+               validateInAllConfigurations(conf, taskExecutorResourceSpec -> {
+                       
assertThat(taskExecutorResourceSpec.getShuffleMemSize().getBytes(), 
greaterThanOrEqualTo(shuffleMin.getBytes()));
+                       
assertThat(taskExecutorResourceSpec.getShuffleMemSize().getBytes(), 
lessThanOrEqualTo(shuffleMax.getBytes()));
+               });
+       }
+
+       @Test
+       public void testConfigShuffleMemoryRangeFailure() {
+               final MemorySize shuffleMin = MemorySize.parse("200m");
+               final MemorySize shuffleMax = MemorySize.parse("50m");
+
+               Configuration conf = new Configuration();
+               conf.setString(TaskManagerOptions.SHUFFLE_MEMORY_MAX, 
shuffleMax.getMebiBytes() + "m");
+               conf.setString(TaskManagerOptions.SHUFFLE_MEMORY_MIN, 
shuffleMin.getMebiBytes() + "m");
+
+               validateFailInAllConfigurations(conf);
+       }
+
+       @Test
+       public void testConfigShuffleMemoryFraction() {
+               final MemorySize shuffleMin = MemorySize.parse("0m");
+               final MemorySize shuffleMax = MemorySize.parse("1t");
+               final float fraction = 0.2f;
+
+               Configuration conf = new Configuration();
+               conf.setString(TaskManagerOptions.SHUFFLE_MEMORY_MAX, 
shuffleMax.getMebiBytes() + "m");
+               conf.setString(TaskManagerOptions.SHUFFLE_MEMORY_MIN, 
shuffleMin.getMebiBytes() + "m");
+               conf.setFloat(TaskManagerOptions.SHUFFLE_MEMORY_FRACTION, 
fraction);
+
+               // validate in configurations without explicit total 
flink/process memory, otherwise explicit configured
+               // shuffle memory fraction might conflict with total 
flink/process memory minus other memory sizes
+               validateInConfigWithExplicitTaskHeapAndManagedMem(conf, 
taskExecutorResourceSpec ->
+                       
assertThat(taskExecutorResourceSpec.getShuffleMemSize(), 
is(taskExecutorResourceSpec.getTotalFlinkMemorySize().multiply(fraction))));
+       }
+
+       @Test
+       public void testConfigShuffleMemoryFractionFailure() {
+               Configuration conf = new Configuration();
+               conf.setFloat(TaskManagerOptions.SHUFFLE_MEMORY_FRACTION, 
-0.1f);
+               validateFailInAllConfigurations(conf);
+
+               conf.setFloat(TaskManagerOptions.SHUFFLE_MEMORY_FRACTION, 1.0f);
+               validateFailInAllConfigurations(conf);
+       }
+
+       @Test
+       @SuppressWarnings("deprecation")
+       public void testConfigShuffleMemoryLegacyRangeFraction() {
+               final MemorySize shuffleMin = MemorySize.parse("50m");
+               final MemorySize shuffleMax = MemorySize.parse("200m");
+               final float fraction = 0.2f;
+
+               Configuration conf = new Configuration();
+               
conf.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, 
shuffleMin.getMebiBytes() + "m");
+               
conf.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, 
shuffleMax.getMebiBytes() + "m");
+
+               validateInAllConfigurations(conf, taskExecutorResourceSpec -> {
+                       
assertThat(taskExecutorResourceSpec.getShuffleMemSize().getBytes(), 
greaterThanOrEqualTo(shuffleMin.getBytes()));
+                       
assertThat(taskExecutorResourceSpec.getShuffleMemSize().getBytes(), 
lessThanOrEqualTo(shuffleMax.getBytes()));
+               });
+
+               
conf.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "0m");
+               
conf.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "1t");
+               
conf.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 
fraction);
+
+               validateInConfigWithExplicitTaskHeapAndManagedMem(conf, 
taskExecutorResourceSpec ->
+                       
assertThat(taskExecutorResourceSpec.getShuffleMemSize(), 
is(taskExecutorResourceSpec.getTotalFlinkMemorySize().multiply(fraction))));
+       }
+
+       @Test
+       @SuppressWarnings("deprecation")
+       public void testConfigShuffleMemoryLegacyNumOfBuffers() {
+               final MemorySize pageSize = MemorySize.parse("32k");
+               final int numOfBuffers = 1024;
+               final MemorySize shuffleSize = pageSize.multiply(numOfBuffers);
+
+               Configuration conf = new Configuration();
+               conf.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 
pageSize.getKibiBytes() + "k");
+               
conf.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 
numOfBuffers);
+
+               // validate in configurations without explicit total 
flink/process memory, otherwise explicit configured
+               // shuffle memory size might conflict with total flink/process 
memory minus other memory sizes
+               validateInConfigWithExplicitTaskHeapAndManagedMem(conf, 
taskExecutorResourceSpec ->
+                       
assertThat(taskExecutorResourceSpec.getShuffleMemSize(), is(shuffleSize)));
+       }
+
+       @Test
+       public void testConfigManagedMemorySize() {
+               final MemorySize managedMemSize = MemorySize.parse("100m");
+
+               Configuration conf = new Configuration();
+               conf.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
managedMemSize.getMebiBytes() + "m");
+
+               // validate in configurations without explicit managed memory 
size,
+               // to avoid checking against overwritten managed memory size
+               validateInConfigurationsWithoutExplicitManagedMem(conf, 
taskExecutorResourceSpec -> 
assertThat(taskExecutorResourceSpec.getManagedMemorySize(), 
is(managedMemSize)));
+       }
+
+       @Test
+       @SuppressWarnings("deprecation")
+       public void testConfigManagedMemoryLegacySize() {
+               final MemorySize managedMemSize = MemorySize.parse("100m");
+
+               Configuration conf = new Configuration();
+               conf.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, 
managedMemSize.getMebiBytes() + "m");
+
+               // validate in configurations without explicit managed memory 
size,
+               // to avoid checking against overwritten managed memory size
+               validateInConfigurationsWithoutExplicitManagedMem(conf, 
taskExecutorResourceSpec -> 
assertThat(taskExecutorResourceSpec.getManagedMemorySize(), 
is(managedMemSize)));
+       }
+
+       @Test
+       public void testConfigManagedMemoryFraction() {
+               final float fraction = 0.5f;
+
+               Configuration conf = new Configuration();
+               conf.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 
fraction);
+
+               // managed memory fraction is only used when managed memory 
size is not explicitly configured
+               validateInConfigurationsWithoutExplicitManagedMem(conf, 
taskExecutorResourceSpec ->
+                       
assertThat(taskExecutorResourceSpec.getManagedMemorySize(), 
is(taskExecutorResourceSpec.getTotalFlinkMemorySize().multiply(fraction))));
+       }
+
+       @Test
+       public void testConfigManagedMemoryFractionFailure() {
+               final Configuration conf = new Configuration();
+               conf.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 
-0.1f);
+               validateFailInConfigurationsWithoutExplicitManagedMem(conf);
+
+               conf.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 1.0f);
+               validateFailInConfigurationsWithoutExplicitManagedMem(conf);
+       }
+
+       @Test
+       @SuppressWarnings("deprecation")
+       public void testConfigManagedMemoryLegacyFraction() {
+               final float fraction = 0.5f;
+
+               Configuration conf = new Configuration();
+               
conf.setFloat(TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION, fraction);
+
+               // managed memory fraction is only used when managed memory 
size is not explicitly configured
+               validateInConfigurationsWithoutExplicitManagedMem(conf, 
taskExecutorResourceSpec ->
+                       
assertThat(taskExecutorResourceSpec.getManagedMemorySize(), 
is(taskExecutorResourceSpec.getTotalFlinkMemorySize().multiply(fraction))));
+       }
+
+       @Test
+       public void testConfigOffHeapManagedMemorySize() {
+               final MemorySize offHeapSize = MemorySize.parse("20m");
+
+               Configuration conf = new Configuration();
+               conf.setString(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE, 
offHeapSize.getMebiBytes() + "m");
+
+               validateInAllConfigurations(conf, taskExecutorResourceSpec -> {
+                       
assertThat(taskExecutorResourceSpec.getOffHeapManagedMemorySize(), 
is(offHeapSize));
+                       
assertThat(taskExecutorResourceSpec.getOnHeapManagedMemorySize(), 
is(taskExecutorResourceSpec.getManagedMemorySize().subtract(taskExecutorResourceSpec.getOffHeapManagedMemorySize())));
+               });
+       }
+
+       @Test
+       public void testConfigOffHeapManagedMemorySizeFailure() {
+               final MemorySize offHeapSize = MemorySize.parse("1t");
+
+               Configuration conf = new Configuration();
+               conf.setString(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE, 
offHeapSize.getMebiBytes() + "m");
+
+               validateFailInAllConfigurations(conf);
+       }
+
+       @Test
+       public void testConfigOffHeapManagedMemoryFraction() {
+               final float fraction = 0.5f;
+
+               Configuration conf = new Configuration();
+               
conf.setFloat(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_FRACTION, fraction);
+
+               validateInAllConfigurations(conf, taskExecutorResourceSpec -> {
+                       
assertThat(taskExecutorResourceSpec.getOffHeapManagedMemorySize(), 
is(taskExecutorResourceSpec.getManagedMemorySize().multiply(fraction)));
+                       
assertThat(taskExecutorResourceSpec.getOnHeapManagedMemorySize(), 
is(taskExecutorResourceSpec.getManagedMemorySize().subtract(taskExecutorResourceSpec.getOffHeapManagedMemorySize())));
+               });
+       }
+
+       @Test
+       public void testConfigOffHeapManagedMemoryFractionFailure() {
+               // negative off-heap managed memory fraction means not 
configured, if off-heap managed memory size is also not configured,
+               // legacy 'taskmanager.memory.off-heap' will be used to set 
managed memory to either all on-heap or all off-heap
 
 Review comment:
   the comments don't seem to aligned with the test case

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to