tillrohrmann commented on a change in pull request #9760: [FLINK-13982][runtime] Implement memory calculation logics URL: https://github.com/apache/flink/pull/9760#discussion_r334000883
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java ########## @@ -0,0 +1,685 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link TaskExecutorResourceUtils}. + */ +public class TaskExecutorResourceUtilsTest extends TestLogger { + + private static final MemorySize TASK_HEAP_SIZE = MemorySize.parse("100m"); + private static final MemorySize MANAGED_MEM_SIZE = MemorySize.parse("200m"); + private static final MemorySize TOTAL_FLINK_MEM_SIZE = MemorySize.parse("800m"); + private static final MemorySize TOTAL_PROCESS_MEM_SIZE = MemorySize.parse("1g"); + + private static final TaskExecutorResourceSpec TM_RESOURCE_SPEC = new TaskExecutorResourceSpec( + MemorySize.parse("1m"), + MemorySize.parse("2m"), + MemorySize.parse("3m"), + MemorySize.parse("4m"), + MemorySize.parse("5m"), + MemorySize.parse("6m"), + MemorySize.parse("7m"), + MemorySize.parse("8m")); + + @Test + public void testGenerateDynamicConfigurations() { + String dynamicConfigsStr = TaskExecutorResourceUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC); + Map<String, String> configs = new HashMap<>(); + String[] configStrs = dynamicConfigsStr.split(" "); + assertThat(configStrs.length % 2, is(0)); + for (int i = 0; i < configStrs.length; ++i) { + String configStr = configStrs[i]; + if (i % 2 == 0) { + assertThat(configStr, is("-D")); + } else { + String[] configKV = configStr.split("="); + assertThat(configKV.length, is(2)); + configs.put(configKV[0], configKV[1]); + } + } + + assertThat(MemorySize.parse(configs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())), is(TM_RESOURCE_SPEC.getFrameworkHeapSize())); + assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())), is(TM_RESOURCE_SPEC.getTaskHeapSize())); + assertThat(MemorySize.parse(configs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())), is(TM_RESOURCE_SPEC.getTaskOffHeapSize())); + assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MAX.key())), is(TM_RESOURCE_SPEC.getShuffleMemSize())); + assertThat(MemorySize.parse(configs.get(TaskManagerOptions.SHUFFLE_MEMORY_MIN.key())), is(TM_RESOURCE_SPEC.getShuffleMemSize())); + assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())), is(TM_RESOURCE_SPEC.getManagedMemorySize())); + assertThat(MemorySize.parse(configs.get(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE.key())), is(TM_RESOURCE_SPEC.getOffHeapManagedMemorySize())); + } + + @Test + public void testGenerateJvmParameters() throws Exception { + String jvmParamsStr = TaskExecutorResourceUtils.generateJvmParametersStr(TM_RESOURCE_SPEC); + MemorySize heapSizeMax = null; + MemorySize heapSizeMin = null; + MemorySize directSize = null; + MemorySize metaspaceSize = null; + for (String paramStr : jvmParamsStr.split(" ")) { + if (paramStr.startsWith("-Xmx")) { + heapSizeMax = MemorySize.parse(paramStr.substring("-Xmx".length())); + } else if (paramStr.startsWith("-Xms")) { + heapSizeMin = MemorySize.parse(paramStr.substring("-Xms".length())); + } else if (paramStr.startsWith("-XX:MaxDirectMemorySize=")) { + directSize = MemorySize.parse(paramStr.substring("-XX:MaxDirectMemorySize=".length())); + } else if (paramStr.startsWith("-XX:MetaspaceSize=")) { + metaspaceSize = MemorySize.parse(paramStr.substring("-XX:MetaspaceSize=".length())); + } else { + throw new Exception("Unknown JVM parameter: " + paramStr); + } + } + + assertThat(heapSizeMax, is(TM_RESOURCE_SPEC.getFrameworkHeapSize().add(TM_RESOURCE_SPEC.getTaskHeapSize()).add(TM_RESOURCE_SPEC.getOnHeapManagedMemorySize()))); + assertThat(heapSizeMin, is(heapSizeMax)); + assertThat(directSize, is(TM_RESOURCE_SPEC.getTaskOffHeapSize().add(TM_RESOURCE_SPEC.getShuffleMemSize()))); + assertThat(metaspaceSize, is(TM_RESOURCE_SPEC.getJvmMetaspaceSize())); + } + + @Test public void testConfigFrameworkHeapMemory() { + final MemorySize frameworkHeapSize = MemorySize.parse("100m"); + + Configuration conf = new Configuration(); + conf.setString(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, frameworkHeapSize.getMebiBytes() + "m"); + + validateInAllConfigurations(conf, taskExecutorResourceSpec -> assertThat(taskExecutorResourceSpec.getFrameworkHeapSize(), is(frameworkHeapSize))); + } + + @Test + public void testConfigTaskHeapMemory() { + final MemorySize taskHeapSize = MemorySize.parse("50m"); + + Configuration conf = new Configuration(); + conf.setString(TaskManagerOptions.TASK_HEAP_MEMORY, taskHeapSize.getMebiBytes() + "m"); + + // validate in configurations without explicit task heap memory size, + // to avoid checking against overwritten task heap memory size + validateInConfigurationsWithoutExplicitTaskHeapMem(conf, taskExecutorResourceSpec -> assertThat(taskExecutorResourceSpec.getTaskHeapSize(), is(taskHeapSize))); + } + + @Test + public void testConfigTaskOffheapMemory() { + final MemorySize taskOffHeapSize = MemorySize.parse("50m"); + + Configuration conf = new Configuration(); + conf.setString(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, taskOffHeapSize.getMebiBytes() + "m"); + + validateInAllConfigurations(conf, taskExecutorResourceSpec -> assertThat(taskExecutorResourceSpec.getTaskOffHeapSize(), is(taskOffHeapSize))); + } + + @Test + public void testConfigShuffleMemoryRange() { + final MemorySize shuffleMin = MemorySize.parse("50m"); + final MemorySize shuffleMax = MemorySize.parse("200m"); + + Configuration conf = new Configuration(); + conf.setString(TaskManagerOptions.SHUFFLE_MEMORY_MAX, shuffleMax.getMebiBytes() + "m"); + conf.setString(TaskManagerOptions.SHUFFLE_MEMORY_MIN, shuffleMin.getMebiBytes() + "m"); + + validateInAllConfigurations(conf, taskExecutorResourceSpec -> { + assertThat(taskExecutorResourceSpec.getShuffleMemSize().getBytes(), greaterThanOrEqualTo(shuffleMin.getBytes())); + assertThat(taskExecutorResourceSpec.getShuffleMemSize().getBytes(), lessThanOrEqualTo(shuffleMax.getBytes())); + }); + } + + @Test + public void testConfigShuffleMemoryRangeFailure() { + final MemorySize shuffleMin = MemorySize.parse("200m"); + final MemorySize shuffleMax = MemorySize.parse("50m"); + + Configuration conf = new Configuration(); + conf.setString(TaskManagerOptions.SHUFFLE_MEMORY_MAX, shuffleMax.getMebiBytes() + "m"); + conf.setString(TaskManagerOptions.SHUFFLE_MEMORY_MIN, shuffleMin.getMebiBytes() + "m"); + + validateFailInAllConfigurations(conf); + } + + @Test + public void testConfigShuffleMemoryFraction() { + final MemorySize shuffleMin = MemorySize.parse("0m"); + final MemorySize shuffleMax = MemorySize.parse("1t"); + final float fraction = 0.2f; + + Configuration conf = new Configuration(); + conf.setString(TaskManagerOptions.SHUFFLE_MEMORY_MAX, shuffleMax.getMebiBytes() + "m"); + conf.setString(TaskManagerOptions.SHUFFLE_MEMORY_MIN, shuffleMin.getMebiBytes() + "m"); + conf.setFloat(TaskManagerOptions.SHUFFLE_MEMORY_FRACTION, fraction); + + // validate in configurations without explicit total flink/process memory, otherwise explicit configured + // shuffle memory fraction might conflict with total flink/process memory minus other memory sizes + validateInConfigWithExplicitTaskHeapAndManagedMem(conf, taskExecutorResourceSpec -> + assertThat(taskExecutorResourceSpec.getShuffleMemSize(), is(taskExecutorResourceSpec.getTotalFlinkMemorySize().multiply(fraction)))); + } + + @Test + public void testConfigShuffleMemoryFractionFailure() { + Configuration conf = new Configuration(); + conf.setFloat(TaskManagerOptions.SHUFFLE_MEMORY_FRACTION, -0.1f); + validateFailInAllConfigurations(conf); + + conf.setFloat(TaskManagerOptions.SHUFFLE_MEMORY_FRACTION, 1.0f); + validateFailInAllConfigurations(conf); + } + + @Test + @SuppressWarnings("deprecation") + public void testConfigShuffleMemoryLegacyRangeFraction() { + final MemorySize shuffleMin = MemorySize.parse("50m"); + final MemorySize shuffleMax = MemorySize.parse("200m"); + final float fraction = 0.2f; + + Configuration conf = new Configuration(); + conf.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, shuffleMin.getMebiBytes() + "m"); + conf.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, shuffleMax.getMebiBytes() + "m"); + + validateInAllConfigurations(conf, taskExecutorResourceSpec -> { + assertThat(taskExecutorResourceSpec.getShuffleMemSize().getBytes(), greaterThanOrEqualTo(shuffleMin.getBytes())); + assertThat(taskExecutorResourceSpec.getShuffleMemSize().getBytes(), lessThanOrEqualTo(shuffleMax.getBytes())); + }); + + conf.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, "0m"); + conf.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, "1t"); + conf.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, fraction); + + validateInConfigWithExplicitTaskHeapAndManagedMem(conf, taskExecutorResourceSpec -> + assertThat(taskExecutorResourceSpec.getShuffleMemSize(), is(taskExecutorResourceSpec.getTotalFlinkMemorySize().multiply(fraction)))); + } + + @Test + @SuppressWarnings("deprecation") + public void testConfigShuffleMemoryLegacyNumOfBuffers() { + final MemorySize pageSize = MemorySize.parse("32k"); + final int numOfBuffers = 1024; + final MemorySize shuffleSize = pageSize.multiply(numOfBuffers); + + Configuration conf = new Configuration(); + conf.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, pageSize.getKibiBytes() + "k"); + conf.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, numOfBuffers); + + // validate in configurations without explicit total flink/process memory, otherwise explicit configured + // shuffle memory size might conflict with total flink/process memory minus other memory sizes + validateInConfigWithExplicitTaskHeapAndManagedMem(conf, taskExecutorResourceSpec -> + assertThat(taskExecutorResourceSpec.getShuffleMemSize(), is(shuffleSize))); + } + + @Test + public void testConfigManagedMemorySize() { + final MemorySize managedMemSize = MemorySize.parse("100m"); + + Configuration conf = new Configuration(); + conf.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemSize.getMebiBytes() + "m"); + + // validate in configurations without explicit managed memory size, + // to avoid checking against overwritten managed memory size + validateInConfigurationsWithoutExplicitManagedMem(conf, taskExecutorResourceSpec -> assertThat(taskExecutorResourceSpec.getManagedMemorySize(), is(managedMemSize))); + } + + @Test + @SuppressWarnings("deprecation") + public void testConfigManagedMemoryLegacySize() { + final MemorySize managedMemSize = MemorySize.parse("100m"); + + Configuration conf = new Configuration(); + conf.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, managedMemSize.getMebiBytes() + "m"); + + // validate in configurations without explicit managed memory size, + // to avoid checking against overwritten managed memory size + validateInConfigurationsWithoutExplicitManagedMem(conf, taskExecutorResourceSpec -> assertThat(taskExecutorResourceSpec.getManagedMemorySize(), is(managedMemSize))); + } + + @Test + public void testConfigManagedMemoryFraction() { + final float fraction = 0.5f; + + Configuration conf = new Configuration(); + conf.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, fraction); + + // managed memory fraction is only used when managed memory size is not explicitly configured + validateInConfigurationsWithoutExplicitManagedMem(conf, taskExecutorResourceSpec -> + assertThat(taskExecutorResourceSpec.getManagedMemorySize(), is(taskExecutorResourceSpec.getTotalFlinkMemorySize().multiply(fraction)))); + } + + @Test + public void testConfigManagedMemoryFractionFailure() { + final Configuration conf = new Configuration(); + conf.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, -0.1f); + validateFailInConfigurationsWithoutExplicitManagedMem(conf); + + conf.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 1.0f); + validateFailInConfigurationsWithoutExplicitManagedMem(conf); + } + + @Test + @SuppressWarnings("deprecation") + public void testConfigManagedMemoryLegacyFraction() { + final float fraction = 0.5f; + + Configuration conf = new Configuration(); + conf.setFloat(TaskManagerOptions.LEGACY_MANAGED_MEMORY_FRACTION, fraction); + + // managed memory fraction is only used when managed memory size is not explicitly configured + validateInConfigurationsWithoutExplicitManagedMem(conf, taskExecutorResourceSpec -> + assertThat(taskExecutorResourceSpec.getManagedMemorySize(), is(taskExecutorResourceSpec.getTotalFlinkMemorySize().multiply(fraction)))); + } + + @Test + public void testConfigOffHeapManagedMemorySize() { + final MemorySize offHeapSize = MemorySize.parse("20m"); + + Configuration conf = new Configuration(); + conf.setString(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE, offHeapSize.getMebiBytes() + "m"); + + validateInAllConfigurations(conf, taskExecutorResourceSpec -> { + assertThat(taskExecutorResourceSpec.getOffHeapManagedMemorySize(), is(offHeapSize)); + assertThat(taskExecutorResourceSpec.getOnHeapManagedMemorySize(), is(taskExecutorResourceSpec.getManagedMemorySize().subtract(taskExecutorResourceSpec.getOffHeapManagedMemorySize()))); + }); + } + + @Test + public void testConfigOffHeapManagedMemorySizeFailure() { + final MemorySize offHeapSize = MemorySize.parse("1t"); + + Configuration conf = new Configuration(); + conf.setString(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_SIZE, offHeapSize.getMebiBytes() + "m"); + + validateFailInAllConfigurations(conf); + } + + @Test + public void testConfigOffHeapManagedMemoryFraction() { + final float fraction = 0.5f; + + Configuration conf = new Configuration(); + conf.setFloat(TaskManagerOptions.MANAGED_MEMORY_OFFHEAP_FRACTION, fraction); + + validateInAllConfigurations(conf, taskExecutorResourceSpec -> { + assertThat(taskExecutorResourceSpec.getOffHeapManagedMemorySize(), is(taskExecutorResourceSpec.getManagedMemorySize().multiply(fraction))); + assertThat(taskExecutorResourceSpec.getOnHeapManagedMemorySize(), is(taskExecutorResourceSpec.getManagedMemorySize().subtract(taskExecutorResourceSpec.getOffHeapManagedMemorySize()))); + }); + } + + @Test + public void testConfigOffHeapManagedMemoryFractionFailure() { + // negative off-heap managed memory fraction means not configured, if off-heap managed memory size is also not configured, + // legacy 'taskmanager.memory.off-heap' will be used to set managed memory to either all on-heap or all off-heap Review comment: the comments don't seem to aligned with the test case ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services