Repository: flink Updated Branches: refs/heads/master 54dd91603 -> 1854a3de1
[FLINK-7316][network] Always use off-heap network buffers. This is another step at using or own (off-heap) buffers for network communication that we pass through netty in order to avoid unnecessary buffer copies. This closes #4481. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1854a3de Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1854a3de Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1854a3de Branch: refs/heads/master Commit: 1854a3de19a8f73a49e3c1d9438d61b5e4c6a452 Parents: 54dd916 Author: Nico Kruber <n...@data-artisans.com> Authored: Tue Aug 1 13:24:00 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Fri Nov 24 10:48:32 2017 +0100 ---------------------------------------------------------------------- docs/ops/config.md | 4 + flink-dist/src/main/flink-bin/bin/config.sh | 7 +- .../ContaineredTaskManagerParameters.java | 4 +- .../io/network/buffer/NetworkBufferPool.java | 27 ++--- .../taskexecutor/TaskManagerServices.java | 106 +++++++++---------- .../TaskManagerServicesConfiguration.java | 31 ++++-- .../NetworkEnvironmentConfiguration.java | 15 +-- .../ContaineredTaskManagerParametersTest.java | 41 ++++++- .../io/network/NetworkEnvironmentTest.java | 4 +- .../io/network/api/writer/RecordWriterTest.java | 4 +- .../network/buffer/BufferPoolFactoryTest.java | 7 +- .../buffer/LocalBufferPoolDestroyTest.java | 3 +- .../io/network/buffer/LocalBufferPoolTest.java | 9 +- .../network/buffer/NetworkBufferPoolTest.java | 19 ++-- .../consumer/LocalInputChannelTest.java | 7 +- .../BackPressureStatsTrackerITCase.java | 4 +- .../taskexecutor/TaskManagerServicesTest.java | 21 +++- ...askManagerComponentsStartupShutdownTest.java | 4 +- .../io/BarrierBufferMassiveRandomTest.java | 17 ++- .../YARNSessionCapacitySchedulerITCase.java | 8 +- 20 files changed, 195 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/docs/ops/config.md ---------------------------------------------------------------------- diff --git a/docs/ops/config.md b/docs/ops/config.md index bcf7671..ed65880 100644 --- a/docs/ops/config.md +++ b/docs/ops/config.md @@ -673,6 +673,10 @@ for each point-to-point exchange of data over the network, which typically happe repartitioning or broadcasting steps (shuffle phase). In those, each parallel task inside the TaskManager has to be able to talk to all other parallel tasks. +<div class="alert alert-warning"> + <strong>Note:</strong> Since Flink 1.5, network buffers will always be allocated off-heap, i.e. outside of the JVM heap, irrespective of the value of <code>taskmanager.memory.off-heap</code>. This way, we can pass these buffers directly to the underlying network stack layers. +</div> + #### Setting Memory Fractions Previously, the number of network buffers was set manually which became a quite error-prone task http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-dist/src/main/flink-bin/bin/config.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index 3055999..1b03e68 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -570,13 +570,12 @@ calculateTaskManagerHeapSizeMB() { exit 1 fi - local tm_heap_size_mb=${FLINK_TM_HEAP} + local network_buffers_mb=$(($(calculateNetworkBufferMemory) >> 20)) # bytes to megabytes + # network buffers are always off-heap and thus need to be deduced from the heap memory size + local tm_heap_size_mb=$((${FLINK_TM_HEAP} - network_buffers_mb)) if useOffHeapMemory; then - local network_buffers_mb=$(($(calculateNetworkBufferMemory) >> 20)) # bytes to megabytes - tm_heap_size_mb=$((tm_heap_size_mb - network_buffers_mb)) - if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then # We split up the total memory in heap and off-heap memory if [[ "${tm_heap_size_mb}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java index c35cf81..c4dd486 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java @@ -142,7 +142,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable { // (2) split the remaining Java memory between heap and off-heap final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(javaMemorySizeMB, config); // use the cut-off memory for off-heap (that was its intention) - final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : containerMemoryMB - heapSizeMB; + final long offHeapSizeMB = containerMemoryMB - heapSizeMB; // (3) obtain the additional environment variables from the configuration final HashMap<String, String> envVars = new HashMap<>(); @@ -158,6 +158,6 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable { // done return new ContaineredTaskManagerParameters( - containerMemoryMB, heapSizeMB, offHeapSize, numSlots, envVars); + containerMemoryMB, heapSizeMB, offHeapSizeMB, numSlots, envVars); } } http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index f899f05..7b817ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -21,9 +21,9 @@ package org.apache.flink.runtime.io.network.buffer; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.core.memory.MemoryType; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.MathUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +37,6 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; /** * The NetworkBufferPool is a fixed size pool of {@link MemorySegment} instances @@ -63,15 +62,14 @@ public class NetworkBufferPool implements BufferPoolFactory { private final Object factoryLock = new Object(); - private final Set<LocalBufferPool> allBufferPools = new HashSet<LocalBufferPool>(); + private final Set<LocalBufferPool> allBufferPools = new HashSet<>(); private int numTotalRequiredBuffers; /** * Allocates all {@link MemorySegment} instances managed by this pool. */ - public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize, MemoryType memoryType) { - checkNotNull(memoryType); + public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) { this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate; this.memorySegmentSize = segmentSize; @@ -87,20 +85,9 @@ public class NetworkBufferPool implements BufferPoolFactory { } try { - if (memoryType == MemoryType.HEAP) { - for (int i = 0; i < numberOfSegmentsToAllocate; i++) { - byte[] memory = new byte[segmentSize]; - availableMemorySegments.add(MemorySegmentFactory.wrapPooledHeapMemory(memory, null)); - } - } - else if (memoryType == MemoryType.OFF_HEAP) { - for (int i = 0; i < numberOfSegmentsToAllocate; i++) { - ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize); - availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null)); - } - } - else { - throw new IllegalArgumentException("Unknown memory type " + memoryType); + for (int i = 0; i < numberOfSegmentsToAllocate; i++) { + ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize); + availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null)); } } catch (OutOfMemoryError err) { @@ -336,7 +323,7 @@ public class NetworkBufferPool implements BufferPoolFactory { return; } - /** + /* * With buffer pools being potentially limited, let's distribute the available memory * segments based on the capacity of each buffer pool, i.e. the maximum number of segments * an unlimited buffer pool can take is numAvailableMemorySegment, for limited buffer pools http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 4daff05..0756529 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -213,11 +213,11 @@ public class TaskManagerServices { // computing the amount of memory to use depends on how much memory is available // it strictly needs to happen AFTER the network stack has been initialized - MemoryType memType = taskManagerServicesConfiguration.getNetworkConfig().memoryType(); - // check if a value has been configured long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory(); + MemoryType memType = taskManagerServicesConfiguration.getMemoryType(); + final long memorySize; boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory(); @@ -234,7 +234,7 @@ public class TaskManagerServices { float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction(); if (memType == MemoryType.HEAP) { - // network buffers already allocated -> use memoryFraction of the remaining heap: + // network buffers allocated off-heap -> use memoryFraction of the available heap: long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * memoryFraction); if (preAllocateMemory) { LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." , @@ -247,10 +247,10 @@ public class TaskManagerServices { } else if (memType == MemoryType.OFF_HEAP) { // The maximum heap memory has been adjusted according to the fraction (see // calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e. - // maxJvmHeap = jvmHeapNoNet - jvmHeapNoNet * memoryFraction = jvmHeapNoNet * (1 - memoryFraction) - // directMemorySize = jvmHeapNoNet * memoryFraction - long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory(); - long directMemorySize = (long) (maxMemory / (1.0 - memoryFraction) * memoryFraction); + // maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction) + // directMemorySize = jvmTotalNoNet * memoryFraction + long maxJvmHeap = EnvironmentInformation.getMaxJvmHeapMemory(); + long directMemorySize = (long) (maxJvmHeap / (1.0 - memoryFraction) * memoryFraction); if (preAllocateMemory) { LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." , memoryFraction, directMemorySize >> 20); @@ -312,8 +312,7 @@ public class TaskManagerServices { NetworkBufferPool networkBufferPool = new NetworkBufferPool( (int) numNetBuffersLong, - segmentSize, - networkEnvironmentConfiguration.memoryType()); + segmentSize); ConnectionManager connectionManager; @@ -390,7 +389,7 @@ public class TaskManagerServices { * @param config * configuration object * - * @return memory to use for network buffers (in bytes) + * @return memory to use for network buffers (in bytes); at least one memory segment */ @SuppressWarnings("deprecation") public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) { @@ -419,6 +418,14 @@ public class TaskManagerServices { TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")", "Network buffer memory size too large: " + networkBufBytes + " >= " + totalJavaMemorySize + " (total JVM memory size)"); + TaskManagerServicesConfiguration + .checkConfigParameter(networkBufBytes >= segmentSize, + "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")", + "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " + + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " + + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")", + "Network buffer memory size too small: " + networkBufBytes + " < " + + segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")"); } else { // use old (deprecated) network buffers parameter int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); @@ -431,6 +438,11 @@ public class TaskManagerServices { networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), "Network buffer memory size too large: " + networkBufBytes + " >= " + totalJavaMemorySize + " (total JVM memory size)"); + TaskManagerServicesConfiguration + .checkConfigParameter(networkBufBytes >= segmentSize, + networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), + "Network buffer memory size too small: " + networkBufBytes + " < " + + segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")"); } return networkBufBytes; @@ -469,37 +481,24 @@ public class TaskManagerServices { return networkBufMin; } - // relative network buffer pool size using the fraction + // relative network buffer pool size using the fraction... - final MemoryType memType = networkConfig.memoryType(); + // The maximum heap memory has been adjusted as in + // calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)) + // and we need to invert these calculations. - final long networkBufBytes; - if (memType == MemoryType.HEAP) { - // use fraction parts of the available heap memory + final MemoryType memType = tmConfig.getMemoryType(); - final long relativeMemSize = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(); - networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin, - (long) (networkBufFraction * relativeMemSize))); + final long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory(); - TaskManagerServicesConfiguration - .checkConfigParameter(networkBufBytes < relativeMemSize, - "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")", - "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " + - TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " + - TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")", - "Network buffer memory size too large: " + networkBufBytes + " >= " + - relativeMemSize + "(free JVM heap size)"); + final long jvmHeapNoNet; + if (memType == MemoryType.HEAP) { + jvmHeapNoNet = maxMemory; } else if (memType == MemoryType.OFF_HEAP) { - // The maximum heap memory has been adjusted accordingly as in - // calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)) - // and we need to invert these calculations. - - final long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory(); // check if a value has been configured long configuredMemory = tmConfig.getConfiguredMemory() << 20; // megabytes to bytes - final long jvmHeapNoNet; if (configuredMemory > 0) { // The maximum heap memory has been adjusted according to configuredMemory, i.e. // maxJvmHeap = jvmHeapNoNet - configuredMemory @@ -512,25 +511,25 @@ public class TaskManagerServices { final float managedFraction = tmConfig.getMemoryFraction(); jvmHeapNoNet = (long) (maxMemory / (1.0 - managedFraction)); } - - // finally extract the network buffer memory size again from: - // jvmHeapNoNet = jvmHeap - networkBufBytes - // = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction) - networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin, - (long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction))); - - TaskManagerServicesConfiguration - .checkConfigParameter(networkBufBytes < maxMemory, - "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")", - "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " + - TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " + - TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")", - "Network buffer memory size too large: " + networkBufBytes + " >= " + - maxMemory + "(maximum JVM heap size)"); } else { throw new RuntimeException("No supported memory type detected."); } + // finally extract the network buffer memory size again from: + // jvmHeapNoNet = jvmHeap - networkBufBytes + // = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction) + final long networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin, + (long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction))); + + TaskManagerServicesConfiguration + .checkConfigParameter(networkBufBytes < maxMemory, + "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")", + "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " + + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " + + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")", + "Network buffer memory size too large: " + networkBufBytes + " >= " + + maxMemory + "(maximum JVM heap size)"); + return networkBufBytes; } @@ -548,7 +547,12 @@ public class TaskManagerServices { public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) { Preconditions.checkArgument(totalJavaMemorySizeMB > 0); - final long totalJavaMemorySize = totalJavaMemorySizeMB << 20; // megabytes to bytes + // subtract the Java memory used for network buffers (always off-heap) + final long networkBufMB = + calculateNetworkBufferMemory( + totalJavaMemorySizeMB << 20, // megabytes to bytes + config) >> 20; // bytes to megabytes + final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB; // split the available Java memory between heap and off-heap @@ -557,10 +561,6 @@ public class TaskManagerServices { final long heapSizeMB; if (useOffHeap) { - // subtract the Java memory used for network buffers - final long networkBufMB = calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20; // bytes to megabytes - final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB; - long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE); if (offHeapSize <= 0) { @@ -578,7 +578,7 @@ public class TaskManagerServices { heapSizeMB = remainingJavaMemorySizeMB - offHeapSize; } else { - heapSizeMB = totalJavaMemorySizeMB; + heapSizeMB = remainingJavaMemorySizeMB; } return heapSizeMB; http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index bae683b..4f8641e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -67,6 +67,8 @@ public class TaskManagerServicesConfiguration { */ private final long configuredMemory; + private final MemoryType memoryType; + private final boolean preAllocateMemory; private final float memoryFraction; @@ -80,6 +82,7 @@ public class TaskManagerServicesConfiguration { QueryableStateConfiguration queryableStateConfig, int numberOfSlots, long configuredMemory, + MemoryType memoryType, boolean preAllocateMemory, float memoryFraction, long timerServiceShutdownTimeout) { @@ -91,6 +94,7 @@ public class TaskManagerServicesConfiguration { this.numberOfSlots = checkNotNull(numberOfSlots); this.configuredMemory = configuredMemory; + this.memoryType = checkNotNull(memoryType); this.preAllocateMemory = preAllocateMemory; this.memoryFraction = memoryFraction; @@ -128,6 +132,15 @@ public class TaskManagerServicesConfiguration { } /** + * Returns the memory type to use. + * + * @return on-heap or off-heap memory + */ + public MemoryType getMemoryType() { + return memoryType; + } + + /** * Returns the size of the managed memory (in megabytes), if configured. * * @return managed memory or a default value (currently <tt>-1</tt>) if not configured @@ -194,6 +207,14 @@ public class TaskManagerServicesConfiguration { "If you leave this config parameter empty, the system automatically " + "pick a fraction of the available memory."); + // check whether we use heap or off-heap memory + final MemoryType memType; + if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) { + memType = MemoryType.OFF_HEAP; + } else { + memType = MemoryType.HEAP; + } + boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE); float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION); @@ -210,6 +231,7 @@ public class TaskManagerServicesConfiguration { queryableStateConfig, slots, configuredMemory, + memType, preAllocateMemory, memoryFraction, timerServiceShutdownTimeout); @@ -258,14 +280,6 @@ public class TaskManagerServicesConfiguration { TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(), "Memory segment size must be a power of 2."); - // check whether we use heap or off-heap memory - final MemoryType memType; - if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) { - memType = MemoryType.OFF_HEAP; - } else { - memType = MemoryType.HEAP; - } - // network buffer memory fraction float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION); @@ -324,7 +338,6 @@ public class TaskManagerServicesConfiguration { networkBufMin, networkBufMax, pageSize, - memType, ioMode, initialRequestBackoff, maxRequestBackoff, http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java index 193fd90..6c66c77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.taskmanager; -import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; import org.apache.flink.runtime.io.network.netty.NettyConfig; @@ -37,8 +36,6 @@ public class NetworkEnvironmentConfiguration { private final int networkBufferSize; - private final MemoryType memoryType; - private final IOMode ioMode; private final int partitionRequestInitialBackoff; @@ -59,7 +56,6 @@ public class NetworkEnvironmentConfiguration { long networkBufMin, long networkBufMax, int networkBufferSize, - MemoryType memoryType, IOMode ioMode, int partitionRequestInitialBackoff, int partitionRequestMaxBackoff, @@ -67,7 +63,7 @@ public class NetworkEnvironmentConfiguration { int floatingNetworkBuffersPerGate) { this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize, - memoryType, ioMode, + ioMode, partitionRequestInitialBackoff, partitionRequestMaxBackoff, networkBuffersPerChannel, floatingNetworkBuffersPerGate, null); @@ -79,7 +75,6 @@ public class NetworkEnvironmentConfiguration { long networkBufMin, long networkBufMax, int networkBufferSize, - MemoryType memoryType, IOMode ioMode, int partitionRequestInitialBackoff, int partitionRequestMaxBackoff, @@ -91,7 +86,6 @@ public class NetworkEnvironmentConfiguration { this.networkBufMin = networkBufMin; this.networkBufMax = networkBufMax; this.networkBufferSize = networkBufferSize; - this.memoryType = memoryType; this.ioMode = ioMode; this.partitionRequestInitialBackoff = partitionRequestInitialBackoff; this.partitionRequestMaxBackoff = partitionRequestMaxBackoff; @@ -118,10 +112,6 @@ public class NetworkEnvironmentConfiguration { return networkBufferSize; } - public MemoryType memoryType() { - return memoryType; - } - public IOMode ioMode() { return ioMode; } @@ -152,7 +142,6 @@ public class NetworkEnvironmentConfiguration { public int hashCode() { int result = 1; result = 31 * result + networkBufferSize; - result = 31 * result + memoryType.hashCode(); result = 31 * result + ioMode.hashCode(); result = 31 * result + partitionRequestInitialBackoff; result = 31 * result + partitionRequestMaxBackoff; @@ -181,7 +170,6 @@ public class NetworkEnvironmentConfiguration { this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff && this.networkBuffersPerChannel == that.networkBuffersPerChannel && this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate && - this.memoryType == that.memoryType && this.ioMode == that.ioMode && (nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null); } @@ -194,7 +182,6 @@ public class NetworkEnvironmentConfiguration { ", networkBufMin=" + networkBufMin + ", networkBufMax=" + networkBufMax + ", networkBufferSize=" + networkBufferSize + - ", memoryType=" + memoryType + ", ioMode=" + ioMode + ", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff + ", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff + http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java index ad11f70..8d9ea88 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java @@ -18,11 +18,13 @@ package org.apache.flink.runtime.clusterframework; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.TestLogger; import org.junit.Test; import static org.apache.flink.configuration.TaskManagerOptions.MEMORY_OFF_HEAP; +import static org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBufferMemory; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -30,7 +32,7 @@ public class ContaineredTaskManagerParametersTest extends TestLogger { private static final long CONTAINER_MEMORY = 8192L; /** - * This tests that per default the off heap memory is set to -1. + * This tests that per default the off heap memory is set to what the network buffers require. */ @Test public void testOffHeapMemoryWithDefaultConfiguration() { @@ -38,15 +40,46 @@ public class ContaineredTaskManagerParametersTest extends TestLogger { ContaineredTaskManagerParameters params = ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1); - assertEquals(-1L, params.taskManagerDirectMemoryLimitMB()); + + final float memoryCutoffRatio = conf.getFloat( + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO, + ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO); + final int minCutoff = conf.getInteger( + ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN, + ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF); + + long cutoff = Math.max((long) (CONTAINER_MEMORY * memoryCutoffRatio), minCutoff); + final long networkBufMB = + calculateNetworkBufferMemory( + (CONTAINER_MEMORY - cutoff) << 20, // megabytes to bytes + conf) >> 20; // bytes to megabytes + assertEquals(networkBufMB + cutoff, params.taskManagerDirectMemoryLimitMB()); + } + + /** + * This tests that when using off-heap memory the sum of on and off heap memory does not exceed the container + * maximum. + */ + @Test + public void testTotalMemoryDoesNotExceedContainerMemoryOnHeap() { + Configuration conf = new Configuration(); + conf.setBoolean(MEMORY_OFF_HEAP, false); + + ContaineredTaskManagerParameters params = + ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1); + + assertTrue(params.taskManagerDirectMemoryLimitMB() > 0L); + + assertTrue(params.taskManagerHeapSizeMB() + + params.taskManagerDirectMemoryLimitMB() <= CONTAINER_MEMORY); } /** - * This tests that when using off heap memory the sum of on and off heap memory does not exceeds the container + * This tests that when using on-heap memory the sum of on and off heap memory does not exceed the container * maximum. */ @Test - public void testTotalMemoryDoesNotExceedContainerMemory() { + public void testTotalMemoryDoesNotExceedContainerMemoryOffHeap() { Configuration conf = new Configuration(); conf.setBoolean(MEMORY_OFF_HEAP, true); http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index ef2d5c2..123082f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.api.common.JobID; -import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.BufferPool; @@ -33,6 +32,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskActions; + import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -63,7 +63,7 @@ public class NetworkEnvironmentTest { public void testRegisterTaskUsesBoundedBuffers() throws Exception { final NetworkEnvironment network = new NetworkEnvironment( - new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP), + new NetworkBufferPool(numBuffers, memorySegmentSize), new LocalConnectionManager(), new ResultPartitionManager(), new TaskEventDispatcher(), http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index df5c616..9f699da 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -23,7 +23,6 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -42,6 +41,7 @@ import org.apache.flink.runtime.io.network.util.TestTaskEvent; import org.apache.flink.runtime.testutils.DiscardingRecycler; import org.apache.flink.types.IntValue; import org.apache.flink.util.XORShiftRandom; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -176,7 +176,7 @@ public class RecordWriterTest { BufferPool bufferPool = null; try { - buffers = new NetworkBufferPool(1, 1024, MemoryType.HEAP); + buffers = new NetworkBufferPool(1, 1024); bufferPool = spy(buffers.createBufferPool(1, Integer.MAX_VALUE)); ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class); http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java index 941aeae..d15aba6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.buffer; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemoryType; import org.junit.After; import org.junit.Before; @@ -55,7 +54,7 @@ public class BufferPoolFactoryTest { @Before public void setupNetworkBufferPool() { - networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP); + networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize); } @After @@ -245,7 +244,7 @@ public class BufferPoolFactoryTest { @Test public void testUniformDistributionBounded3() throws IOException { - NetworkBufferPool globalPool = new NetworkBufferPool(3, 128, MemoryType.HEAP); + NetworkBufferPool globalPool = new NetworkBufferPool(3, 128); try { BufferPool first = globalPool.createBufferPool(0, 10); assertEquals(3, first.getNumBuffers()); @@ -278,7 +277,7 @@ public class BufferPoolFactoryTest { */ @Test public void testUniformDistributionBounded4() throws IOException { - NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP); + NetworkBufferPool globalPool = new NetworkBufferPool(10, 128); try { BufferPool first = globalPool.createBufferPool(0, 10); assertEquals(10, first.getNumBuffers()); http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java index 18e2136..6e02542 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.buffer; -import org.apache.flink.core.memory.MemoryType; import org.junit.Test; import java.util.concurrent.atomic.AtomicReference; @@ -46,7 +45,7 @@ public class LocalBufferPoolDestroyTest { LocalBufferPool localBufferPool = null; try { - networkBufferPool = new NetworkBufferPool(1, 4096, MemoryType.HEAP); + networkBufferPool = new NetworkBufferPool(1, 4096); localBufferPool = new LocalBufferPool(networkBufferPool, 1); // Drain buffer pool http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java index 7a309d7..4eb568a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.io.network.buffer; -import org.apache.flink.core.memory.MemoryType; - import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.junit.After; @@ -63,7 +61,7 @@ public class LocalBufferPoolTest { @Before public void setupLocalBufferPool() { - networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP); + networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize); localBufferPool = new LocalBufferPool(networkBufferPool, 1); assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments()); @@ -77,6 +75,9 @@ public class LocalBufferPoolTest { String msg = "Did not return all buffers to memory segment pool after test."; assertEquals(msg, numBuffers, networkBufferPool.getNumberOfAvailableMemorySegments()); + // no other local buffer pools used than the one above, but call just in case + networkBufferPool.destroyAllBufferPools(); + networkBufferPool.destroy(); } @AfterClass @@ -227,7 +228,7 @@ public class LocalBufferPoolTest { // and the twoTimesListener will be added into the registeredListeners // queue of buffer pool again available1.recycle(); - + verify(oneTimeListener, times(1)).notifyBufferAvailable(any(Buffer.class)); verify(twoTimesListener, times(1)).notifyBufferAvailable(any(Buffer.class)); http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java index e30e955..4d7648a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java @@ -19,9 +19,9 @@ package org.apache.flink.runtime.io.network.buffer; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemoryType; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -31,10 +31,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import static org.hamcrest.core.IsCollectionContaining.hasItem; import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.core.IsCollectionContaining.hasItem; import static org.hamcrest.core.IsNot.not; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -54,7 +53,7 @@ public class NetworkBufferPoolTest { final int bufferSize = 128; final int numBuffers = 10; - NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize, MemoryType.HEAP); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize); assertEquals(bufferSize, globalPool.getMemorySegmentSize()); assertEquals(numBuffers, globalPool.getTotalNumberOfMemorySegments()); assertEquals(numBuffers, globalPool.getNumberOfAvailableMemorySegments()); @@ -98,7 +97,7 @@ public class NetworkBufferPoolTest { @Test public void testDestroyAll() { try { - NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP); + NetworkBufferPool globalPool = new NetworkBufferPool(10, 128); BufferPool fixedPool = globalPool.createBufferPool(2, 2); BufferPool boundedPool = globalPool.createBufferPool(0, 1); @@ -193,7 +192,7 @@ public class NetworkBufferPoolTest { public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception { final int numBuffers = 10; - NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128); List<MemorySegment> memorySegments = Collections.emptyList(); try { @@ -217,7 +216,7 @@ public class NetworkBufferPoolTest { public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception { final int numBuffers = 10; - NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128); try { globalPool.requestMemorySegments(numBuffers + 1); @@ -237,7 +236,7 @@ public class NetworkBufferPoolTest { public void testRequestMemorySegmentsWithInvalidArgument() throws Exception { final int numBuffers = 10; - NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128); try { // the number of requested buffers should be larger than zero @@ -258,7 +257,7 @@ public class NetworkBufferPoolTest { public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException { final int numBuffers = 10; - NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128); final List<Buffer> buffers = new ArrayList<>(numBuffers); List<MemorySegment> memorySegments = Collections.emptyList(); @@ -314,7 +313,7 @@ public class NetworkBufferPoolTest { public void testRequestMemorySegmentsInterruptable() throws Exception { final int numBuffers = 10; - NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP); + NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128); MemorySegment segment = globalPool.requestMemorySegment(); assertNotNull(segment); http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index e685f17..5f7fd82 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.JobID; -import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.TaskEventDispatcher; @@ -47,7 +46,6 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import scala.Tuple2; import java.io.IOException; import java.util.Collections; @@ -59,6 +57,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import scala.Tuple2; + import static org.apache.flink.util.Preconditions.checkArgument; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -95,7 +95,7 @@ public class LocalInputChannelTest { final NetworkBufferPool networkBuffers = new NetworkBufferPool( (parallelism * producerBufferPoolSize) + (parallelism * parallelism), - TestBufferFactory.BUFFER_SIZE, MemoryType.HEAP); + TestBufferFactory.BUFFER_SIZE); final ResultPartitionConsumableNotifier partitionConsumableNotifier = mock(ResultPartitionConsumableNotifier.class); @@ -176,6 +176,7 @@ public class LocalInputChannelTest { } } finally { + networkBuffers.destroyAllBufferPools(); networkBuffers.destroy(); executor.shutdown(); } http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java index dc22752..6b0f251 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.akka.AkkaJobManagerGateway; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobClient; @@ -76,12 +75,13 @@ public class BackPressureStatsTrackerITCase extends TestLogger { @BeforeClass public static void setup() { testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); - networkBufferPool = new NetworkBufferPool(100, 8192, MemoryType.HEAP); + networkBufferPool = new NetworkBufferPool(100, 8192); } @AfterClass public static void teardown() { JavaTestKit.shutdownActorSystem(testActorSystem); + networkBufferPool.destroyAllBufferPools(); networkBufferPool.destroy(); } http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java index a8358a1..1ec280d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java @@ -204,8 +204,16 @@ public class TaskManagerServicesTest { tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(), TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(), 0.1f, 60L << 20, 1L << 30, MemoryType.HEAP); - when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB - assertEquals(100L << 20, TaskManagerServices.calculateNetworkBufferMemory(tmConfig)); + when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(900L << 20); // 900MB + assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */, + TaskManagerServices.calculateNetworkBufferMemory(tmConfig)); + + tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(), + TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(), + 0.2f, 60L << 20, 1L << 30, MemoryType.HEAP); + when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(800L << 20); // 800MB + assertEquals((200L << 20) + 3 /* slightly too many due to floating point imprecision */, + TaskManagerServices.calculateNetworkBufferMemory(tmConfig)); tmConfig = getTmConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(), 0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP); @@ -242,7 +250,6 @@ public class TaskManagerServicesTest { networkBufMin, networkBufMax, TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), - memType, null, TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(), TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(), @@ -257,6 +264,7 @@ public class TaskManagerServicesTest { QueryableStateConfiguration.disabled(), 1, managedMemory, + memType, false, managedMemoryFraction, 0); @@ -274,9 +282,14 @@ public class TaskManagerServicesTest { config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 30); // 1GB config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false); - assertEquals(1000, TaskManagerServices.calculateHeapSizeMB(1000, config)); + assertEquals(900, TaskManagerServices.calculateHeapSizeMB(1000, config)); + + config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false); + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.2f); + assertEquals(800, TaskManagerServices.calculateHeapSizeMB(1000, config)); config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true); + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f); config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10); // 10MB assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config)); http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 4b62770..98b5b8b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -130,7 +130,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { // note: the network buffer memory configured here is not actually used below but set // accordingly to be consistent final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration( - 0.1f, networkBufNum * BUFFER_SIZE, networkBufNum * BUFFER_SIZE, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, + 0.1f, networkBufNum * BUFFER_SIZE, networkBufNum * BUFFER_SIZE, BUFFER_SIZE, IOManager.IOMode.SYNC, 0, 0, 2, 8, null); ResourceID taskManagerId = ResourceID.generate(); @@ -140,7 +140,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { final MemoryManager memManager = new MemoryManager(networkBufNum * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, false); final IOManager ioManager = new IOManagerAsync(TMP_DIR); final NetworkEnvironment network = new NetworkEnvironment( - new NetworkBufferPool(32, netConf.networkBufferSize(), netConf.memoryType()), + new NetworkBufferPool(32, netConf.networkBufferSize()), new LocalConnectionManager(), new ResultPartitionManager(), new TaskEventDispatcher(), http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java index c801d09..090e44a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.runtime.io; -import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -49,11 +48,15 @@ public class BarrierBufferMassiveRandomTest { @Test public void testWithTwoChannelsAndRandomBarriers() { IOManager ioMan = null; + NetworkBufferPool networkBufferPool1 = null; + NetworkBufferPool networkBufferPool2 = null; try { ioMan = new IOManagerAsync(); - BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100); - BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100); + networkBufferPool1 = new NetworkBufferPool(100, PAGE_SIZE); + networkBufferPool2 = new NetworkBufferPool(100, PAGE_SIZE); + BufferPool pool1 = networkBufferPool1.createBufferPool(100, 100); + BufferPool pool2 = networkBufferPool2.createBufferPool(100, 100); RandomGeneratingInputGate myIG = new RandomGeneratingInputGate( new BufferPool[] { pool1, pool2 }, @@ -76,6 +79,14 @@ public class BarrierBufferMassiveRandomTest { if (ioMan != null) { ioMan.shutdown(); } + if (networkBufferPool1 != null) { + networkBufferPool1.destroyAllBufferPools(); + networkBufferPool1.destroy(); + } + if (networkBufferPool2 != null) { + networkBufferPool2.destroyAllBufferPools(); + networkBufferPool2.destroy(); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index 03c61e8..8716f8a 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -465,7 +465,9 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { "-yt", flinkLibFolder.getAbsolutePath(), "-yn", "1", "-yjm", "768", - "-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly + // test if the cutoff is passed correctly (only useful when larger than the value + // of containerized.heap-cutoff-min (default: 600MB) + "-yD", "yarn.heap-cutoff-ratio=0.7", "-yD", "yarn.tags=test-tag", "-ytm", "1024", "-ys", "2", // test requesting slots from YARN. @@ -544,8 +546,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { }); Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog); content = FileUtils.readFileToString(jobmanagerLog); - // TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE) - String expected = "Starting TaskManagers with command: $JAVA_HOME/bin/java -Xms424m -Xmx424m"; + // TM was started with 1024 but we cut off 70% (NOT THE DEFAULT VALUE) + String expected = "Starting TaskManagers with command: $JAVA_HOME/bin/java -Xms244m -Xmx244m -XX:MaxDirectMemorySize=780m"; Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '" + jobmanagerLog + "'", content.contains(expected)); expected = " (2/2) (attempt #0) to ";