Repository: flink
Updated Branches:
  refs/heads/master 54dd91603 -> 1854a3de1


[FLINK-7316][network] Always use off-heap network buffers.

This is another step at using or own (off-heap) buffers for network
communication that we pass through netty in order to avoid unnecessary buffer
copies.

This closes #4481.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1854a3de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1854a3de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1854a3de

Branch: refs/heads/master
Commit: 1854a3de19a8f73a49e3c1d9438d61b5e4c6a452
Parents: 54dd916
Author: Nico Kruber <n...@data-artisans.com>
Authored: Tue Aug 1 13:24:00 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri Nov 24 10:48:32 2017 +0100

----------------------------------------------------------------------
 docs/ops/config.md                              |   4 +
 flink-dist/src/main/flink-bin/bin/config.sh     |   7 +-
 .../ContaineredTaskManagerParameters.java       |   4 +-
 .../io/network/buffer/NetworkBufferPool.java    |  27 ++---
 .../taskexecutor/TaskManagerServices.java       | 106 +++++++++----------
 .../TaskManagerServicesConfiguration.java       |  31 ++++--
 .../NetworkEnvironmentConfiguration.java        |  15 +--
 .../ContaineredTaskManagerParametersTest.java   |  41 ++++++-
 .../io/network/NetworkEnvironmentTest.java      |   4 +-
 .../io/network/api/writer/RecordWriterTest.java |   4 +-
 .../network/buffer/BufferPoolFactoryTest.java   |   7 +-
 .../buffer/LocalBufferPoolDestroyTest.java      |   3 +-
 .../io/network/buffer/LocalBufferPoolTest.java  |   9 +-
 .../network/buffer/NetworkBufferPoolTest.java   |  19 ++--
 .../consumer/LocalInputChannelTest.java         |   7 +-
 .../BackPressureStatsTrackerITCase.java         |   4 +-
 .../taskexecutor/TaskManagerServicesTest.java   |  21 +++-
 ...askManagerComponentsStartupShutdownTest.java |   4 +-
 .../io/BarrierBufferMassiveRandomTest.java      |  17 ++-
 .../YARNSessionCapacitySchedulerITCase.java     |   8 +-
 20 files changed, 195 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index bcf7671..ed65880 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -673,6 +673,10 @@ for each point-to-point exchange of data over the network, 
which typically happe
 repartitioning or broadcasting steps (shuffle phase). In those, each parallel 
task inside the
 TaskManager has to be able to talk to all other parallel tasks.
 
+<div class="alert alert-warning">
+  <strong>Note:</strong> Since Flink 1.5, network buffers will always be 
allocated off-heap, i.e. outside of the JVM heap, irrespective of the value of 
<code>taskmanager.memory.off-heap</code>. This way, we can pass these buffers 
directly to the underlying network stack layers.
+</div>
+
 #### Setting Memory Fractions
 
 Previously, the number of network buffers was set manually which became a 
quite error-prone task

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index 3055999..1b03e68 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -570,13 +570,12 @@ calculateTaskManagerHeapSizeMB() {
         exit 1
     fi
 
-    local tm_heap_size_mb=${FLINK_TM_HEAP}
+    local network_buffers_mb=$(($(calculateNetworkBufferMemory) >> 20)) # 
bytes to megabytes
+    # network buffers are always off-heap and thus need to be deduced from the 
heap memory size
+    local tm_heap_size_mb=$((${FLINK_TM_HEAP} - network_buffers_mb))
 
     if useOffHeapMemory; then
 
-        local network_buffers_mb=$(($(calculateNetworkBufferMemory) >> 20)) # 
bytes to megabytes
-        tm_heap_size_mb=$((tm_heap_size_mb - network_buffers_mb))
-
         if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
             # We split up the total memory in heap and off-heap memory
             if [[ "${tm_heap_size_mb}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; 
then

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index c35cf81..c4dd486 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -142,7 +142,7 @@ public class ContaineredTaskManagerParameters implements 
java.io.Serializable {
                // (2) split the remaining Java memory between heap and off-heap
                final long heapSizeMB = 
TaskManagerServices.calculateHeapSizeMB(javaMemorySizeMB, config);
                // use the cut-off memory for off-heap (that was its intention)
-               final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : 
containerMemoryMB - heapSizeMB;
+               final long offHeapSizeMB = containerMemoryMB - heapSizeMB;
 
                // (3) obtain the additional environment variables from the 
configuration
                final HashMap<String, String> envVars = new HashMap<>();
@@ -158,6 +158,6 @@ public class ContaineredTaskManagerParameters implements 
java.io.Serializable {
 
                // done
                return new ContaineredTaskManagerParameters(
-                       containerMemoryMB, heapSizeMB, offHeapSize, numSlots, 
envVars);
+                       containerMemoryMB, heapSizeMB, offHeapSizeMB, numSlots, 
envVars);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index f899f05..7b817ca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -21,9 +21,9 @@ package org.apache.flink.runtime.io.network.buffer;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.MathUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,7 +37,6 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The NetworkBufferPool is a fixed size pool of {@link MemorySegment} 
instances
@@ -63,15 +62,14 @@ public class NetworkBufferPool implements BufferPoolFactory 
{
 
        private final Object factoryLock = new Object();
 
-       private final Set<LocalBufferPool> allBufferPools = new 
HashSet<LocalBufferPool>();
+       private final Set<LocalBufferPool> allBufferPools = new HashSet<>();
 
        private int numTotalRequiredBuffers;
 
        /**
         * Allocates all {@link MemorySegment} instances managed by this pool.
         */
-       public NetworkBufferPool(int numberOfSegmentsToAllocate, int 
segmentSize, MemoryType memoryType) {
-               checkNotNull(memoryType);
+       public NetworkBufferPool(int numberOfSegmentsToAllocate, int 
segmentSize) {
                
                this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
                this.memorySegmentSize = segmentSize;
@@ -87,20 +85,9 @@ public class NetworkBufferPool implements BufferPoolFactory {
                }
 
                try {
-                       if (memoryType == MemoryType.HEAP) {
-                               for (int i = 0; i < numberOfSegmentsToAllocate; 
i++) {
-                                       byte[] memory = new byte[segmentSize];
-                                       
availableMemorySegments.add(MemorySegmentFactory.wrapPooledHeapMemory(memory, 
null));
-                               }
-                       }
-                       else if (memoryType == MemoryType.OFF_HEAP) {
-                               for (int i = 0; i < numberOfSegmentsToAllocate; 
i++) {
-                                       ByteBuffer memory = 
ByteBuffer.allocateDirect(segmentSize);
-                                       
availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory,
 null));
-                               }
-                       }
-                       else {
-                               throw new IllegalArgumentException("Unknown 
memory type " + memoryType);
+                       for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
+                               ByteBuffer memory = 
ByteBuffer.allocateDirect(segmentSize);
+                               
availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory,
 null));
                        }
                }
                catch (OutOfMemoryError err) {
@@ -336,7 +323,7 @@ public class NetworkBufferPool implements BufferPoolFactory 
{
                        return;
                }
 
-               /**
+               /*
                 * With buffer pools being potentially limited, let's 
distribute the available memory
                 * segments based on the capacity of each buffer pool, i.e. the 
maximum number of segments
                 * an unlimited buffer pool can take is 
numAvailableMemorySegment, for limited buffer pools

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 4daff05..0756529 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -213,11 +213,11 @@ public class TaskManagerServices {
                // computing the amount of memory to use depends on how much 
memory is available
                // it strictly needs to happen AFTER the network stack has been 
initialized
 
-               MemoryType memType = 
taskManagerServicesConfiguration.getNetworkConfig().memoryType();
-
                // check if a value has been configured
                long configuredMemory = 
taskManagerServicesConfiguration.getConfiguredMemory();
 
+               MemoryType memType = 
taskManagerServicesConfiguration.getMemoryType();
+
                final long memorySize;
 
                boolean preAllocateMemory = 
taskManagerServicesConfiguration.isPreAllocateMemory();
@@ -234,7 +234,7 @@ public class TaskManagerServices {
                        float memoryFraction = 
taskManagerServicesConfiguration.getMemoryFraction();
 
                        if (memType == MemoryType.HEAP) {
-                               // network buffers already allocated -> use 
memoryFraction of the remaining heap:
+                               // network buffers allocated off-heap -> use 
memoryFraction of the available heap:
                                long relativeMemSize = (long) 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * memoryFraction);
                                if (preAllocateMemory) {
                                        LOG.info("Using {} of the currently 
free heap space for managed heap memory ({} MB)." ,
@@ -247,10 +247,10 @@ public class TaskManagerServices {
                        } else if (memType == MemoryType.OFF_HEAP) {
                                // The maximum heap memory has been adjusted 
according to the fraction (see
                                // calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration config)), i.e.
-                               // maxJvmHeap = jvmHeapNoNet - jvmHeapNoNet * 
memoryFraction = jvmHeapNoNet * (1 - memoryFraction)
-                               // directMemorySize = jvmHeapNoNet * 
memoryFraction
-                               long maxMemory = 
EnvironmentInformation.getMaxJvmHeapMemory();
-                               long directMemorySize = (long) (maxMemory / 
(1.0 - memoryFraction) * memoryFraction);
+                               // maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * 
memoryFraction = jvmTotalNoNet * (1 - memoryFraction)
+                               // directMemorySize = jvmTotalNoNet * 
memoryFraction
+                               long maxJvmHeap = 
EnvironmentInformation.getMaxJvmHeapMemory();
+                               long directMemorySize = (long) (maxJvmHeap / 
(1.0 - memoryFraction) * memoryFraction);
                                if (preAllocateMemory) {
                                        LOG.info("Using {} of the maximum 
memory size for managed off-heap memory ({} MB)." ,
                                                memoryFraction, 
directMemorySize >> 20);
@@ -312,8 +312,7 @@ public class TaskManagerServices {
 
                NetworkBufferPool networkBufferPool = new NetworkBufferPool(
                        (int) numNetBuffersLong,
-                       segmentSize,
-                       networkEnvironmentConfiguration.memoryType());
+                       segmentSize);
 
                ConnectionManager connectionManager;
 
@@ -390,7 +389,7 @@ public class TaskManagerServices {
         * @param config
         *              configuration object
         *
-        * @return memory to use for network buffers (in bytes)
+        * @return memory to use for network buffers (in bytes); at least one 
memory segment
         */
        @SuppressWarnings("deprecation")
        public static long calculateNetworkBufferMemory(long 
totalJavaMemorySize, Configuration config) {
@@ -419,6 +418,14 @@ public class TaskManagerServices {
                                                
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
                                        "Network buffer memory size too large: 
" + networkBufBytes + " >= " +
                                                totalJavaMemorySize + " (total 
JVM memory size)");
+                       TaskManagerServicesConfiguration
+                               .checkConfigParameter(networkBufBytes >= 
segmentSize,
+                                       "(" + networkBufFraction + ", " + 
networkBufMin + ", " + networkBufMax + ")",
+                                       "(" + 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
+                                               
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
+                                               
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
+                                       "Network buffer memory size too small: 
" + networkBufBytes + " < " +
+                                               segmentSize + " (" + 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
                } else {
                        // use old (deprecated) network buffers parameter
                        int numNetworkBuffers = 
config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
@@ -431,6 +438,11 @@ public class TaskManagerServices {
                                        networkBufBytes, 
TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
                                        "Network buffer memory size too large: 
" + networkBufBytes + " >= " +
                                                totalJavaMemorySize + " (total 
JVM memory size)");
+                       TaskManagerServicesConfiguration
+                               .checkConfigParameter(networkBufBytes >= 
segmentSize,
+                                       networkBufBytes, 
TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
+                                       "Network buffer memory size too small: 
" + networkBufBytes + " < " +
+                                               segmentSize + " (" + 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
                }
 
                return networkBufBytes;
@@ -469,37 +481,24 @@ public class TaskManagerServices {
                        return networkBufMin;
                }
 
-               // relative network buffer pool size using the fraction
+               // relative network buffer pool size using the fraction...
 
-               final MemoryType memType = networkConfig.memoryType();
+               // The maximum heap memory has been adjusted as in
+               // calculateHeapSizeMB(long totalJavaMemorySizeMB, 
Configuration config))
+               // and we need to invert these calculations.
 
-               final long networkBufBytes;
-               if (memType == MemoryType.HEAP) {
-                       // use fraction parts of the available heap memory
+               final MemoryType memType = tmConfig.getMemoryType();
 
-                       final long relativeMemSize = 
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
-                       networkBufBytes = Math.min(networkBufMax, 
Math.max(networkBufMin,
-                               (long) (networkBufFraction * relativeMemSize)));
+               final long maxMemory = 
EnvironmentInformation.getMaxJvmHeapMemory();
 
-                       TaskManagerServicesConfiguration
-                               .checkConfigParameter(networkBufBytes < 
relativeMemSize,
-                                       "(" + networkBufFraction + ", " + 
networkBufMin + ", " + networkBufMax + ")",
-                                       "(" + 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
-                                               
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
-                                               
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
-                                       "Network buffer memory size too large: 
" + networkBufBytes + " >= " +
-                                               relativeMemSize + "(free JVM 
heap size)");
+               final long jvmHeapNoNet;
+               if (memType == MemoryType.HEAP) {
+                       jvmHeapNoNet = maxMemory;
                } else if (memType == MemoryType.OFF_HEAP) {
-                       // The maximum heap memory has been adjusted 
accordingly as in
-                       // calculateHeapSizeMB(long totalJavaMemorySizeMB, 
Configuration config))
-                       // and we need to invert these calculations.
-
-                       final long maxMemory = 
EnvironmentInformation.getMaxJvmHeapMemory();
 
                        // check if a value has been configured
                        long configuredMemory = tmConfig.getConfiguredMemory() 
<< 20; // megabytes to bytes
 
-                       final long jvmHeapNoNet;
                        if (configuredMemory > 0) {
                                // The maximum heap memory has been adjusted 
according to configuredMemory, i.e.
                                // maxJvmHeap = jvmHeapNoNet - configuredMemory
@@ -512,25 +511,25 @@ public class TaskManagerServices {
                                final float managedFraction = 
tmConfig.getMemoryFraction();
                                jvmHeapNoNet = (long) (maxMemory / (1.0 - 
managedFraction));
                        }
-
-                       // finally extract the network buffer memory size again 
from:
-                       // jvmHeapNoNet = jvmHeap - networkBufBytes
-                       //              = jvmHeap - Math.min(networkBufMax, 
Math.max(networkBufMin, jvmHeap * netFraction)
-                       networkBufBytes = Math.min(networkBufMax, 
Math.max(networkBufMin,
-                               (long) (jvmHeapNoNet / (1.0 - 
networkBufFraction) * networkBufFraction)));
-
-                       TaskManagerServicesConfiguration
-                               .checkConfigParameter(networkBufBytes < 
maxMemory,
-                                       "(" + networkBufFraction + ", " + 
networkBufMin + ", " + networkBufMax + ")",
-                                       "(" + 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
-                                               
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
-                                               
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
-                                       "Network buffer memory size too large: 
" + networkBufBytes + " >= " +
-                                               maxMemory + "(maximum JVM heap 
size)");
                } else {
                        throw new RuntimeException("No supported memory type 
detected.");
                }
 
+               // finally extract the network buffer memory size again from:
+               // jvmHeapNoNet = jvmHeap - networkBufBytes
+               //              = jvmHeap - Math.min(networkBufMax, 
Math.max(networkBufMin, jvmHeap * netFraction)
+               final long networkBufBytes = Math.min(networkBufMax, 
Math.max(networkBufMin,
+                       (long) (jvmHeapNoNet / (1.0 - networkBufFraction) * 
networkBufFraction)));
+
+               TaskManagerServicesConfiguration
+                       .checkConfigParameter(networkBufBytes < maxMemory,
+                               "(" + networkBufFraction + ", " + networkBufMin 
+ ", " + networkBufMax + ")",
+                               "(" + 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
+                                       
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
+                                       
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
+                               "Network buffer memory size too large: " + 
networkBufBytes + " >= " +
+                                       maxMemory + "(maximum JVM heap size)");
+
                return networkBufBytes;
        }
 
@@ -548,7 +547,12 @@ public class TaskManagerServices {
        public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, 
Configuration config) {
                Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
 
-               final long totalJavaMemorySize = totalJavaMemorySizeMB << 20; 
// megabytes to bytes
+               // subtract the Java memory used for network buffers (always 
off-heap)
+               final long networkBufMB =
+                       calculateNetworkBufferMemory(
+                               totalJavaMemorySizeMB << 20, // megabytes to 
bytes
+                               config) >> 20; // bytes to megabytes
+               final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - 
networkBufMB;
 
                // split the available Java memory between heap and off-heap
 
@@ -557,10 +561,6 @@ public class TaskManagerServices {
                final long heapSizeMB;
                if (useOffHeap) {
 
-                       // subtract the Java memory used for network buffers
-                       final long networkBufMB = 
calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20; // bytes to 
megabytes
-                       final long remainingJavaMemorySizeMB = 
totalJavaMemorySizeMB - networkBufMB;
-
                        long offHeapSize = 
config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
 
                        if (offHeapSize <= 0) {
@@ -578,7 +578,7 @@ public class TaskManagerServices {
 
                        heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
                } else {
-                       heapSizeMB = totalJavaMemorySizeMB;
+                       heapSizeMB = remainingJavaMemorySizeMB;
                }
 
                return heapSizeMB;

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index bae683b..4f8641e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -67,6 +67,8 @@ public class TaskManagerServicesConfiguration {
         */
        private final long configuredMemory;
 
+       private final MemoryType memoryType;
+
        private final boolean preAllocateMemory;
 
        private final float memoryFraction;
@@ -80,6 +82,7 @@ public class TaskManagerServicesConfiguration {
                        QueryableStateConfiguration queryableStateConfig,
                        int numberOfSlots,
                        long configuredMemory,
+                       MemoryType memoryType,
                        boolean preAllocateMemory,
                        float memoryFraction,
                        long timerServiceShutdownTimeout) {
@@ -91,6 +94,7 @@ public class TaskManagerServicesConfiguration {
                this.numberOfSlots = checkNotNull(numberOfSlots);
 
                this.configuredMemory = configuredMemory;
+               this.memoryType = checkNotNull(memoryType);
                this.preAllocateMemory = preAllocateMemory;
                this.memoryFraction = memoryFraction;
 
@@ -128,6 +132,15 @@ public class TaskManagerServicesConfiguration {
        }
 
        /**
+        * Returns the memory type to use.
+        *
+        * @return on-heap or off-heap memory
+        */
+       public MemoryType getMemoryType() {
+               return memoryType;
+       }
+
+       /**
         * Returns the size of the managed memory (in megabytes), if configured.
         *
         * @return managed memory or a default value (currently <tt>-1</tt>) if 
not configured
@@ -194,6 +207,14 @@ public class TaskManagerServicesConfiguration {
                                "If you leave this config parameter empty, the 
system automatically " +
                                "pick a fraction of the available memory.");
 
+               // check whether we use heap or off-heap memory
+               final MemoryType memType;
+               if 
(configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
+                       memType = MemoryType.OFF_HEAP;
+               } else {
+                       memType = MemoryType.HEAP;
+               }
+
                boolean preAllocateMemory = 
configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE);
 
                float memoryFraction = 
configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
@@ -210,6 +231,7 @@ public class TaskManagerServicesConfiguration {
                        queryableStateConfig,
                        slots,
                        configuredMemory,
+                       memType,
                        preAllocateMemory,
                        memoryFraction,
                        timerServiceShutdownTimeout);
@@ -258,14 +280,6 @@ public class TaskManagerServicesConfiguration {
                        TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
                        "Memory segment size must be a power of 2.");
 
-               // check whether we use heap or off-heap memory
-               final MemoryType memType;
-               if 
(configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
-                       memType = MemoryType.OFF_HEAP;
-               } else {
-                       memType = MemoryType.HEAP;
-               }
-
                // network buffer memory fraction
 
                float networkBufFraction = 
configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
@@ -324,7 +338,6 @@ public class TaskManagerServicesConfiguration {
                        networkBufMin,
                        networkBufMax,
                        pageSize,
-                       memType,
                        ioMode,
                        initialRequestBackoff,
                        maxRequestBackoff,

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
index 193fd90..6c66c77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 
@@ -37,8 +36,6 @@ public class NetworkEnvironmentConfiguration {
 
        private final int networkBufferSize;
 
-       private final MemoryType memoryType;
-
        private final IOMode ioMode;
 
        private final int partitionRequestInitialBackoff;
@@ -59,7 +56,6 @@ public class NetworkEnvironmentConfiguration {
                        long networkBufMin,
                        long networkBufMax,
                        int networkBufferSize,
-                       MemoryType memoryType,
                        IOMode ioMode,
                        int partitionRequestInitialBackoff,
                        int partitionRequestMaxBackoff,
@@ -67,7 +63,7 @@ public class NetworkEnvironmentConfiguration {
                        int floatingNetworkBuffersPerGate) {
 
                this(networkBufFraction, networkBufMin, networkBufMax, 
networkBufferSize,
-                               memoryType, ioMode,
+                               ioMode,
                                partitionRequestInitialBackoff, 
partitionRequestMaxBackoff,
                                networkBuffersPerChannel, 
floatingNetworkBuffersPerGate,
                                null);
@@ -79,7 +75,6 @@ public class NetworkEnvironmentConfiguration {
                        long networkBufMin,
                        long networkBufMax,
                        int networkBufferSize,
-                       MemoryType memoryType,
                        IOMode ioMode,
                        int partitionRequestInitialBackoff,
                        int partitionRequestMaxBackoff,
@@ -91,7 +86,6 @@ public class NetworkEnvironmentConfiguration {
                this.networkBufMin = networkBufMin;
                this.networkBufMax = networkBufMax;
                this.networkBufferSize = networkBufferSize;
-               this.memoryType = memoryType;
                this.ioMode = ioMode;
                this.partitionRequestInitialBackoff = 
partitionRequestInitialBackoff;
                this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
@@ -118,10 +112,6 @@ public class NetworkEnvironmentConfiguration {
                return networkBufferSize;
        }
 
-       public MemoryType memoryType() {
-               return memoryType;
-       }
-
        public IOMode ioMode() {
                return ioMode;
        }
@@ -152,7 +142,6 @@ public class NetworkEnvironmentConfiguration {
        public int hashCode() {
                int result = 1;
                result = 31 * result + networkBufferSize;
-               result = 31 * result + memoryType.hashCode();
                result = 31 * result + ioMode.hashCode();
                result = 31 * result + partitionRequestInitialBackoff;
                result = 31 * result + partitionRequestMaxBackoff;
@@ -181,7 +170,6 @@ public class NetworkEnvironmentConfiguration {
                                        this.partitionRequestMaxBackoff == 
that.partitionRequestMaxBackoff &&
                                        this.networkBuffersPerChannel == 
that.networkBuffersPerChannel &&
                                        this.floatingNetworkBuffersPerGate == 
that.floatingNetworkBuffersPerGate &&
-                                       this.memoryType == that.memoryType &&
                                        this.ioMode == that.ioMode && 
                                        (nettyConfig != null ? 
nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
                }
@@ -194,7 +182,6 @@ public class NetworkEnvironmentConfiguration {
                                ", networkBufMin=" + networkBufMin +
                                ", networkBufMax=" + networkBufMax +
                                ", networkBufferSize=" + networkBufferSize +
-                               ", memoryType=" + memoryType +
                                ", ioMode=" + ioMode +
                                ", partitionRequestInitialBackoff=" + 
partitionRequestInitialBackoff +
                                ", partitionRequestMaxBackoff=" + 
partitionRequestMaxBackoff +

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
index ad11f70..8d9ea88 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.runtime.clusterframework;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import static 
org.apache.flink.configuration.TaskManagerOptions.MEMORY_OFF_HEAP;
+import static 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBufferMemory;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -30,7 +32,7 @@ public class ContaineredTaskManagerParametersTest extends 
TestLogger {
        private static final long CONTAINER_MEMORY = 8192L;
 
        /**
-        * This tests that per default the off heap memory is set to -1.
+        * This tests that per default the off heap memory is set to what the 
network buffers require.
         */
        @Test
        public void testOffHeapMemoryWithDefaultConfiguration() {
@@ -38,15 +40,46 @@ public class ContaineredTaskManagerParametersTest extends 
TestLogger {
 
                ContaineredTaskManagerParameters params =
                        ContaineredTaskManagerParameters.create(conf, 
CONTAINER_MEMORY, 1);
-               assertEquals(-1L, params.taskManagerDirectMemoryLimitMB());
+
+               final float memoryCutoffRatio = conf.getFloat(
+                       ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
+                       ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
+               final int minCutoff = conf.getInteger(
+                       ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
+                       ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
+
+               long cutoff = Math.max((long) (CONTAINER_MEMORY * 
memoryCutoffRatio), minCutoff);
+               final long networkBufMB =
+                       calculateNetworkBufferMemory(
+                               (CONTAINER_MEMORY - cutoff) << 20, // megabytes 
to bytes
+                               conf) >> 20; // bytes to megabytes
+               assertEquals(networkBufMB + cutoff, 
params.taskManagerDirectMemoryLimitMB());
+       }
+
+       /**
+        * This tests that when using off-heap memory the sum of on and off 
heap memory does not exceed the container
+        * maximum.
+        */
+       @Test
+       public void testTotalMemoryDoesNotExceedContainerMemoryOnHeap() {
+               Configuration conf = new Configuration();
+               conf.setBoolean(MEMORY_OFF_HEAP, false);
+
+               ContaineredTaskManagerParameters params =
+                       ContaineredTaskManagerParameters.create(conf, 
CONTAINER_MEMORY, 1);
+
+               assertTrue(params.taskManagerDirectMemoryLimitMB() > 0L);
+
+               assertTrue(params.taskManagerHeapSizeMB() +
+                       params.taskManagerDirectMemoryLimitMB() <= 
CONTAINER_MEMORY);
        }
 
        /**
-        * This tests that when using off heap memory the sum of on and off 
heap memory does not exceeds the container
+        * This tests that when using on-heap memory the sum of on and off heap 
memory does not exceed the container
         * maximum.
         */
        @Test
-       public void testTotalMemoryDoesNotExceedContainerMemory() {
+       public void testTotalMemoryDoesNotExceedContainerMemoryOffHeap() {
                Configuration conf = new Configuration();
                conf.setBoolean(MEMORY_OFF_HEAP, true);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index ef2d5c2..123082f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -33,6 +32,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -63,7 +63,7 @@ public class NetworkEnvironmentTest {
        public void testRegisterTaskUsesBoundedBuffers() throws Exception {
 
                final NetworkEnvironment network = new NetworkEnvironment(
-                       new NetworkBufferPool(numBuffers, memorySegmentSize, 
MemoryType.HEAP),
+                       new NetworkBufferPool(numBuffers, memorySegmentSize),
                        new LocalConnectionManager(),
                        new ResultPartitionManager(),
                        new TaskEventDispatcher(),

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index df5c616..9f699da 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -42,6 +41,7 @@ import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.XORShiftRandom;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -176,7 +176,7 @@ public class RecordWriterTest {
                BufferPool bufferPool = null;
 
                try {
-                       buffers = new NetworkBufferPool(1, 1024, 
MemoryType.HEAP);
+                       buffers = new NetworkBufferPool(1, 1024);
                        bufferPool = spy(buffers.createBufferPool(1, 
Integer.MAX_VALUE));
 
                        ResultPartitionWriter partitionWriter = 
mock(ResultPartitionWriter.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
index 941aeae..d15aba6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
 
 import org.junit.After;
 import org.junit.Before;
@@ -55,7 +54,7 @@ public class BufferPoolFactoryTest {
 
        @Before
        public void setupNetworkBufferPool() {
-               networkBufferPool = new NetworkBufferPool(numBuffers, 
memorySegmentSize, MemoryType.HEAP);
+               networkBufferPool = new NetworkBufferPool(numBuffers, 
memorySegmentSize);
        }
 
        @After
@@ -245,7 +244,7 @@ public class BufferPoolFactoryTest {
 
        @Test
        public void testUniformDistributionBounded3() throws IOException {
-               NetworkBufferPool globalPool = new NetworkBufferPool(3, 128, 
MemoryType.HEAP);
+               NetworkBufferPool globalPool = new NetworkBufferPool(3, 128);
                try {
                        BufferPool first = globalPool.createBufferPool(0, 10);
                        assertEquals(3, first.getNumBuffers());
@@ -278,7 +277,7 @@ public class BufferPoolFactoryTest {
         */
        @Test
        public void testUniformDistributionBounded4() throws IOException {
-               NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, 
MemoryType.HEAP);
+               NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
                try {
                        BufferPool first = globalPool.createBufferPool(0, 10);
                        assertEquals(10, first.getNumBuffers());

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
index 18e2136..6e02542 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
-import org.apache.flink.core.memory.MemoryType;
 import org.junit.Test;
 
 import java.util.concurrent.atomic.AtomicReference;
@@ -46,7 +45,7 @@ public class LocalBufferPoolDestroyTest {
                LocalBufferPool localBufferPool = null;
 
                try {
-                       networkBufferPool = new NetworkBufferPool(1, 4096, 
MemoryType.HEAP);
+                       networkBufferPool = new NetworkBufferPool(1, 4096);
                        localBufferPool = new 
LocalBufferPool(networkBufferPool, 1);
 
                        // Drain buffer pool

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 7a309d7..4eb568a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
-import org.apache.flink.core.memory.MemoryType;
-
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
 import org.junit.After;
@@ -63,7 +61,7 @@ public class LocalBufferPoolTest {
 
        @Before
        public void setupLocalBufferPool() {
-               networkBufferPool = new NetworkBufferPool(numBuffers, 
memorySegmentSize, MemoryType.HEAP);
+               networkBufferPool = new NetworkBufferPool(numBuffers, 
memorySegmentSize);
                localBufferPool = new LocalBufferPool(networkBufferPool, 1);
 
                assertEquals(0, 
localBufferPool.getNumberOfAvailableMemorySegments());
@@ -77,6 +75,9 @@ public class LocalBufferPoolTest {
 
                String msg = "Did not return all buffers to memory segment pool 
after test.";
                assertEquals(msg, numBuffers, 
networkBufferPool.getNumberOfAvailableMemorySegments());
+               // no other local buffer pools used than the one above, but 
call just in case
+               networkBufferPool.destroyAllBufferPools();
+               networkBufferPool.destroy();
        }
 
        @AfterClass
@@ -227,7 +228,7 @@ public class LocalBufferPoolTest {
                // and the twoTimesListener will be added into the 
registeredListeners
                // queue of buffer pool again
                available1.recycle();
-               
+
                verify(oneTimeListener, 
times(1)).notifyBufferAvailable(any(Buffer.class));
                verify(twoTimesListener, 
times(1)).notifyBufferAvailable(any(Buffer.class));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index e30e955..4d7648a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -31,10 +31,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import static org.hamcrest.core.IsCollectionContaining.hasItem;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
 import static org.hamcrest.core.IsNot.not;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -54,7 +53,7 @@ public class NetworkBufferPoolTest {
                        final int bufferSize = 128;
                        final int numBuffers = 10;
 
-                       NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, bufferSize, MemoryType.HEAP);
+                       NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, bufferSize);
                        assertEquals(bufferSize, 
globalPool.getMemorySegmentSize());
                        assertEquals(numBuffers, 
globalPool.getTotalNumberOfMemorySegments());
                        assertEquals(numBuffers, 
globalPool.getNumberOfAvailableMemorySegments());
@@ -98,7 +97,7 @@ public class NetworkBufferPoolTest {
        @Test
        public void testDestroyAll() {
                try {
-                       NetworkBufferPool globalPool = new 
NetworkBufferPool(10, 128, MemoryType.HEAP);
+                       NetworkBufferPool globalPool = new 
NetworkBufferPool(10, 128);
 
                        BufferPool fixedPool = globalPool.createBufferPool(2, 
2);
                        BufferPool boundedPool = globalPool.createBufferPool(0, 
1);
@@ -193,7 +192,7 @@ public class NetworkBufferPoolTest {
        public void testRequestMemorySegmentsLessThanTotalBuffers() throws 
Exception {
                final int numBuffers = 10;
 
-               NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+               NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128);
 
                List<MemorySegment> memorySegments = Collections.emptyList();
                try {
@@ -217,7 +216,7 @@ public class NetworkBufferPoolTest {
        public void testRequestMemorySegmentsMoreThanTotalBuffers() throws 
Exception {
                final int numBuffers = 10;
 
-               NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+               NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128);
 
                try {
                        globalPool.requestMemorySegments(numBuffers + 1);
@@ -237,7 +236,7 @@ public class NetworkBufferPoolTest {
        public void testRequestMemorySegmentsWithInvalidArgument() throws 
Exception {
                final int numBuffers = 10;
 
-               NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+               NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128);
 
                try {
                        // the number of requested buffers should be larger 
than zero
@@ -258,7 +257,7 @@ public class NetworkBufferPoolTest {
        public void testRequestMemorySegmentsWithBuffersTaken() throws 
IOException, InterruptedException {
                final int numBuffers = 10;
 
-               NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+               NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(numBuffers, 128);
 
                final List<Buffer> buffers = new ArrayList<>(numBuffers);
                List<MemorySegment> memorySegments = Collections.emptyList();
@@ -314,7 +313,7 @@ public class NetworkBufferPoolTest {
        public void testRequestMemorySegmentsInterruptable() throws Exception {
                final int numBuffers = 10;
 
-               NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+               NetworkBufferPool globalPool = new 
NetworkBufferPool(numBuffers, 128);
                MemorySegment segment = globalPool.requestMemorySegment();
                assertNotNull(segment);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index e685f17..5f7fd82 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -47,7 +46,6 @@ import 
org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import scala.Tuple2;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -59,6 +57,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import scala.Tuple2;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -95,7 +95,7 @@ public class LocalInputChannelTest {
 
                final NetworkBufferPool networkBuffers = new NetworkBufferPool(
                        (parallelism * producerBufferPoolSize) + (parallelism * 
parallelism),
-                       TestBufferFactory.BUFFER_SIZE, MemoryType.HEAP);
+                       TestBufferFactory.BUFFER_SIZE);
 
                final ResultPartitionConsumableNotifier 
partitionConsumableNotifier =
                        mock(ResultPartitionConsumableNotifier.class);
@@ -176,6 +176,7 @@ public class LocalInputChannelTest {
                        }
                }
                finally {
+                       networkBuffers.destroyAllBufferPools();
                        networkBuffers.destroy();
                        executor.shutdown();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
index dc22752..6b0f251 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
@@ -21,7 +21,6 @@ package 
org.apache.flink.runtime.rest.handler.legacy.backpressure;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
@@ -76,12 +75,13 @@ public class BackPressureStatsTrackerITCase extends 
TestLogger {
        @BeforeClass
        public static void setup() {
                testActorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
-               networkBufferPool = new NetworkBufferPool(100, 8192, 
MemoryType.HEAP);
+               networkBufferPool = new NetworkBufferPool(100, 8192);
        }
 
        @AfterClass
        public static void teardown() {
                JavaTestKit.shutdownActorSystem(testActorSystem);
+               networkBufferPool.destroyAllBufferPools();
                networkBufferPool.destroy();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
index a8358a1..1ec280d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
@@ -204,8 +204,16 @@ public class TaskManagerServicesTest {
                tmConfig = 
getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
                        
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
                        0.1f, 60L << 20, 1L << 30, MemoryType.HEAP);
-               
when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L
 << 20); // 1000MB
-               assertEquals(100L << 20, 
TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
+               
when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(900L << 20); // 
900MB
+               assertEquals((100L << 20) + 1 /* one too many due to floating 
point imprecision */,
+                       
TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
+
+               tmConfig = 
getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
+                       
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
+                       0.2f, 60L << 20, 1L << 30, MemoryType.HEAP);
+               
when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(800L << 20); // 
800MB
+               assertEquals((200L << 20) + 3 /* slightly too many due to 
floating point imprecision */,
+                       
TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
 
                tmConfig = getTmConfig(10, 
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
                        0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
@@ -242,7 +250,6 @@ public class TaskManagerServicesTest {
                        networkBufMin,
                        networkBufMax,
                        TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
-                       memType,
                        null,
                        
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
                        
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
@@ -257,6 +264,7 @@ public class TaskManagerServicesTest {
                        QueryableStateConfiguration.disabled(),
                        1,
                        managedMemory,
+                       memType,
                        false,
                        managedMemoryFraction,
                        0);
@@ -274,9 +282,14 @@ public class TaskManagerServicesTest {
                config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 
1L << 30); // 1GB
 
                config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
-               assertEquals(1000, 
TaskManagerServices.calculateHeapSizeMB(1000, config));
+               assertEquals(900, TaskManagerServices.calculateHeapSizeMB(1000, 
config));
+
+               config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
+               
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.2f);
+               assertEquals(800, TaskManagerServices.calculateHeapSizeMB(1000, 
config));
 
                config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
+               
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
                config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10); // 
10MB
                assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, 
config));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 4b62770..98b5b8b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -130,7 +130,7 @@ public class TaskManagerComponentsStartupShutdownTest 
extends TestLogger {
                        // note: the network buffer memory configured here is 
not actually used below but set
                        // accordingly to be consistent
                        final NetworkEnvironmentConfiguration netConf = new 
NetworkEnvironmentConfiguration(
-                                       0.1f, networkBufNum * BUFFER_SIZE, 
networkBufNum * BUFFER_SIZE, BUFFER_SIZE, MemoryType.HEAP, 
IOManager.IOMode.SYNC,
+                                       0.1f, networkBufNum * BUFFER_SIZE, 
networkBufNum * BUFFER_SIZE, BUFFER_SIZE, IOManager.IOMode.SYNC,
                                        0, 0, 2, 8, null);
 
                        ResourceID taskManagerId = ResourceID.generate();
@@ -140,7 +140,7 @@ public class TaskManagerComponentsStartupShutdownTest 
extends TestLogger {
                        final MemoryManager memManager = new 
MemoryManager(networkBufNum * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, 
false);
                        final IOManager ioManager = new IOManagerAsync(TMP_DIR);
                        final NetworkEnvironment network = new 
NetworkEnvironment(
-                               new NetworkBufferPool(32, 
netConf.networkBufferSize(), netConf.memoryType()),
+                               new NetworkBufferPool(32, 
netConf.networkBufferSize()),
                                new LocalConnectionManager(),
                                new ResultPartitionManager(),
                                new TaskEventDispatcher(),

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index c801d09..090e44a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -49,11 +48,15 @@ public class BarrierBufferMassiveRandomTest {
        @Test
        public void testWithTwoChannelsAndRandomBarriers() {
                IOManager ioMan = null;
+               NetworkBufferPool networkBufferPool1 = null;
+               NetworkBufferPool networkBufferPool2 = null;
                try {
                        ioMan = new IOManagerAsync();
 
-                       BufferPool pool1 = new NetworkBufferPool(100, 
PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100);
-                       BufferPool pool2 = new NetworkBufferPool(100, 
PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100);
+                       networkBufferPool1 = new NetworkBufferPool(100, 
PAGE_SIZE);
+                       networkBufferPool2 = new NetworkBufferPool(100, 
PAGE_SIZE);
+                       BufferPool pool1 = 
networkBufferPool1.createBufferPool(100, 100);
+                       BufferPool pool2 = 
networkBufferPool2.createBufferPool(100, 100);
 
                        RandomGeneratingInputGate myIG = new 
RandomGeneratingInputGate(
                                        new BufferPool[] { pool1, pool2 },
@@ -76,6 +79,14 @@ public class BarrierBufferMassiveRandomTest {
                        if (ioMan != null) {
                                ioMan.shutdown();
                        }
+                       if (networkBufferPool1 != null) {
+                               networkBufferPool1.destroyAllBufferPools();
+                               networkBufferPool1.destroy();
+                       }
+                       if (networkBufferPool2 != null) {
+                               networkBufferPool2.destroyAllBufferPools();
+                               networkBufferPool2.destroy();
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 03c61e8..8716f8a 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -465,7 +465,9 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
                                "-yt", flinkLibFolder.getAbsolutePath(),
                                "-yn", "1",
                                "-yjm", "768",
-                               "-yD", "yarn.heap-cutoff-ratio=0.5", // test if 
the cutoff is passed correctly
+                               // test if the cutoff is passed correctly (only 
useful when larger than the value
+                               // of containerized.heap-cutoff-min (default: 
600MB)
+                               "-yD", "yarn.heap-cutoff-ratio=0.7",
                                "-yD", "yarn.tags=test-tag",
                                "-ytm", "1024",
                                "-ys", "2", // test requesting slots from YARN.
@@ -544,8 +546,8 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
                        });
                        Assert.assertNotNull("Unable to locate JobManager log", 
jobmanagerLog);
                        content = FileUtils.readFileToString(jobmanagerLog);
-                       // TM was started with 1024 but we cut off 50% (NOT THE 
DEFAULT VALUE)
-                       String expected = "Starting TaskManagers with command: 
$JAVA_HOME/bin/java -Xms424m -Xmx424m";
+                       // TM was started with 1024 but we cut off 70% (NOT THE 
DEFAULT VALUE)
+                       String expected = "Starting TaskManagers with command: 
$JAVA_HOME/bin/java -Xms244m -Xmx244m -XX:MaxDirectMemorySize=780m";
                        Assert.assertTrue("Expected string '" + expected + "' 
not found in JobManager log: '" + jobmanagerLog + "'",
                                content.contains(expected));
                        expected = " (2/2) (attempt #0) to ";

Reply via email to