This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4869d3593d5b7fcff36abbe71a3e1a46b7a64998
Author: Zhijiang <[email protected]>
AuthorDate: Wed Oct 28 18:05:57 2020 +0800

    [FLINK-15981][network] Implement zero-copy file transfer in network based 
on FileRegion for bounded blocking partitions
    
    For file-type bounded blocking partition, the previous way was to allocate 
two unpooled segments for every subpartition
    to read data during network transfer. The memory overhead is serious and 
could cause OOM errors (direct memory).
    
    This change uses FileRegion to transfer file data to the network without 
copying it to memory first (zero-copy transfer).
    
    The exception is SSL-enabled setups: Direct transfer from file cache to 
network cannot work with SSL, because with SSL the
    data needs to be encrypted before being pushed into the network socket. 
SSL-enabled setups still behave as before.
---
 .../io/network/NettyShuffleServiceFactory.java     |   3 +-
 .../runtime/io/network/buffer/BufferRecycler.java  |  14 ++
 .../io/network/buffer/FileRegionBuffer.java        | 240 +++++++++++++++++++++
 .../partition/BoundedBlockingSubpartition.java     |  27 ++-
 ...dBlockingSubpartitionDirectTransferReader.java} | 159 +++++++-------
 .../BoundedBlockingSubpartitionReader.java         |   3 +-
 .../partition/BoundedBlockingSubpartitionType.java |  43 +++-
 .../runtime/io/network/partition/BoundedData.java  |   6 +
 .../network/partition/BufferReaderWriterUtil.java  |  41 +++-
 .../network/partition/FileChannelBoundedData.java  |   5 +
 .../FileChannelMemoryMappedBoundedData.java        |   5 +
 .../network/partition/MemoryMappedBoundedData.java |   5 +
 .../network/partition/ResultPartitionFactory.java  |  24 ++-
 .../io/network/partition/ResultSubpartition.java   |   6 +-
 .../partition/SortMergeSubpartitionReader.java     |   7 +-
 .../partition/consumer/LocalInputChannel.java      |   5 +
 .../partition/consumer/SingleInputGate.java        |  14 +-
 .../partition/consumer/SingleInputGateFactory.java |   3 +-
 .../NettyShuffleEnvironmentConfiguration.java      |   4 +
 ...oundedBlockingSubpartitionAvailabilityTest.java |   1 +
 .../partition/BoundedBlockingSubpartitionTest.java |  41 +++-
 .../BoundedBlockingSubpartitionWriteReadTest.java  |   7 +-
 .../partition/FileChannelBoundedDataTest.java      |   1 +
 .../network/partition/InputGateFairnessTest.java   |   4 +-
 .../network/partition/ResultPartitionBuilder.java  |  10 +-
 .../partition/ResultPartitionFactoryTest.java      |   3 +-
 .../partition/consumer/SingleInputGateBuilder.java |   5 +-
 .../flink/test/runtime/FileBufferReaderITCase.java |  21 +-
 28 files changed, 586 insertions(+), 121 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index ecd8027..d5fbd7f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -128,7 +128,8 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettySh
                        config.getCompressionCodec(),
                        config.getMaxBuffersPerChannel(),
                        config.sortShuffleMinBuffers(),
-                       config.sortShuffleMinParallelism());
+                       config.sortShuffleMinParallelism(),
+                       config.isSSLEnabled());
 
                SingleInputGateFactory singleInputGateFactory = new 
SingleInputGateFactory(
                        taskExecutorResourceId,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java
index 66b1fa2..c8aced7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java
@@ -32,4 +32,18 @@ public interface BufferRecycler {
         * @param memorySegment The memory segment to be recycled.
         */
        void recycle(MemorySegment memorySegment);
+
+       /**
+        * The buffer recycler does nothing for recycled segment.
+        */
+       final class DummyBufferRecycler implements BufferRecycler {
+
+               public static final BufferRecycler INSTANCE = new 
DummyBufferRecycler();
+
+               private DummyBufferRecycler() {}
+
+               @Override
+               public void recycle(MemorySegment memorySegment) {
+               }
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FileRegionBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FileRegionBuffer.java
new file mode 100644
index 0000000..04b7c15
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FileRegionBuffer.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class represents a chunk of data in a file channel. Its purpose is to 
be passed
+ * to the netty code and to be written to the socket via the zero-copy direct 
transfer
+ * capabilities of {@link FileChannel#transferTo(long, long, 
WritableByteChannel)}.
+ *
+ * <p>This class implements {@link Buffer} mainly for compatible with existing 
usages.
+ * It can be thought of as a "lazy buffer" that does not hold the data 
directly, although
+ * the data can be fetches as a read-only {@code ByteBuffer} when needed, for 
example
+ * in local input channels. See {@link #readInto(MemorySegment)} and
+ * {@link #getNioBufferReadable()}. Because this buffer is read-only, the 
modification
+ * methods (and methods that give a writable buffer) throw {@link 
UnsupportedOperationException}.
+ *
+ * <p>This class extends from Netty's {@link DefaultFileRegion}, similar as the
+ * {@link NetworkBuffer} extends from Netty's {@link ByteBuf}. That way we can 
pass both
+ * of them to Netty in the same way, and Netty will internally treat them 
appropriately.
+ */
+public class FileRegionBuffer extends DefaultFileRegion implements Buffer {
+
+       private final FileChannel fileChannel;
+
+       /** The {@link DataType} this buffer represents. */
+       private final DataType dataType;
+
+       /** Whether the buffer is compressed or not. */
+       private final boolean isCompressed;
+
+       public FileRegionBuffer(
+                       FileChannel fileChannel,
+                       long fileChannelPosition,
+                       int bufferSize,
+                       DataType dataType,
+                       boolean isCompressed) {
+
+               super(fileChannel, fileChannelPosition, bufferSize);
+
+               this.fileChannel = checkNotNull(fileChannel);
+               this.dataType = checkNotNull(dataType);
+               this.isCompressed = isCompressed;
+       }
+
+       private int bufferSize() {
+               // this is guaranteed to not be lossy, because we initialize 
this from
+               // an int in the constructor.
+               return (int) count();
+       }
+
+       // 
------------------------------------------------------------------------
+       // Buffer override methods
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public boolean isBuffer() {
+               return dataType.isBuffer();
+       }
+
+       @Override
+       public MemorySegment getMemorySegment() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public int getMemorySegmentOffset() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public ReadOnlySlicedNetworkBuffer readOnlySlice() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public ReadOnlySlicedNetworkBuffer readOnlySlice(int index, int length) 
{
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public int getMaxCapacity() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public int getReaderIndex() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public void setReaderIndex(int readerIndex) throws 
IndexOutOfBoundsException {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       /**
+        * This method is only called by tests and by event-deserialization, 
like
+        * checkpoint barriers. Because such events are not used for bounded 
intermediate
+        * results, this method currently executes only in tests.
+        */
+       @Override
+       public ByteBuffer getNioBufferReadable() {
+               try {
+                       final ByteBuffer buffer = 
ByteBuffer.allocateDirect(bufferSize());
+                       BufferReaderWriterUtil.readByteBufferFully(fileChannel, 
buffer, position());
+                       buffer.flip();
+                       return buffer;
+               } catch (IOException e) {
+                       // this is not very pretty, but given that this code 
runs only in tests
+                       // the exception wrapping here is simpler than updating 
the method signature
+                       // to declare IOExceptions, as would be necessary for a 
proper "lazy buffer".
+                       throw new FlinkRuntimeException(e.getMessage(), e);
+               }
+       }
+
+       @Override
+       public ByteBuffer getNioBuffer(int index, int length) throws 
IndexOutOfBoundsException {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public ByteBuf asByteBuf() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public void setSize(int writerIndex) {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public int getSize() {
+               return bufferSize();
+       }
+
+       @Override
+       public int readableBytes() {
+               return bufferSize();
+       }
+
+       @Override
+       public void setAllocator(ByteBufAllocator allocator) {
+               // nothing to do
+       }
+
+       @Override
+       public BufferRecycler getRecycler() {
+               return null;
+       }
+
+       @Override
+       public void recycleBuffer() {
+               // nothing to do
+       }
+
+       @Override
+       public boolean isRecycled() {
+               return false;
+       }
+
+       @Override
+       public FileRegionBuffer retainBuffer() {
+               return (FileRegionBuffer) super.retain();
+       }
+
+       @Override
+       public boolean isCompressed() {
+               return isCompressed;
+       }
+
+       @Override
+       public void setCompressed(boolean isCompressed) {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public DataType getDataType() {
+               return dataType;
+       }
+
+       @Override
+       public void setDataType(DataType dataType) {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       // 
------------------------------------------------------------------------
+       // File region override methods
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void deallocate() {
+               // nothing to do
+       }
+
+       // 
------------------------------------------------------------------------
+       // Utils
+       // 
------------------------------------------------------------------------
+
+       public Buffer readInto(MemorySegment segment) throws IOException {
+               final ByteBuffer buffer = segment.wrap(0, bufferSize());
+               BufferReaderWriterUtil.readByteBufferFully(fileChannel, buffer, 
position());
+
+               return new NetworkBuffer(
+                       segment,
+                       BufferRecycler.DummyBufferRecycler.INSTANCE,
+                       dataType,
+                       isCompressed,
+                       bufferSize());
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
index 07722ac..6df664f 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -74,7 +74,10 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
 
        /** All created and not yet released readers. */
        @GuardedBy("lock")
-       private final Set<BoundedBlockingSubpartitionReader> readers;
+       private final Set<ResultSubpartitionView> readers;
+
+       /** Flag to transfer file via FileRegion way in network stack if 
partition type is file without SSL enabled. */
+       private final boolean useDirectFileTransfer;
 
        /** Counter for the number of data buffers (not events!) written. */
        private int numDataBuffersWritten;
@@ -91,11 +94,13 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
        public BoundedBlockingSubpartition(
                        int index,
                        ResultPartition parent,
-                       BoundedData data) {
+                       BoundedData data,
+                       boolean useDirectFileTransfer) {
 
                super(index, parent);
 
                this.data = checkNotNull(data);
+               this.useDirectFileTransfer = useDirectFileTransfer;
                this.readers = new HashSet<>();
        }
 
@@ -204,14 +209,20 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
                        checkState(!isReleased, "data partition already 
released");
                        checkState(isFinished, "writing of blocking partition 
not yet finished");
 
-                       final BoundedBlockingSubpartitionReader reader = new 
BoundedBlockingSubpartitionReader(
+                       final ResultSubpartitionView reader;
+                       if (useDirectFileTransfer) {
+                               reader = new 
BoundedBlockingSubpartitionDirectTransferReader(
+                                       this, data.getFilePath(), 
numDataBuffersWritten, numBuffersAndEventsWritten);
+                       } else {
+                               reader = new BoundedBlockingSubpartitionReader(
                                        this, data, numDataBuffersWritten, 
availability);
+                       }
                        readers.add(reader);
                        return reader;
                }
        }
 
-       void releaseReaderReference(BoundedBlockingSubpartitionReader reader) 
throws IOException {
+       void releaseReaderReference(ResultSubpartitionView reader) throws 
IOException {
                onConsumedSubpartition();
 
                synchronized (lock) {
@@ -262,10 +273,10 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
         * Data is eagerly spilled (written to disk) and readers directly read 
from the file.
         */
        public static BoundedBlockingSubpartition createWithFileChannel(
-                       int index, ResultPartition parent, File tempFile, int 
readBufferSize) throws IOException {
+                       int index, ResultPartition parent, File tempFile, int 
readBufferSize, boolean sslEnabled) throws IOException {
 
                final FileChannelBoundedData bd = 
FileChannelBoundedData.create(tempFile.toPath(), readBufferSize);
-               return new BoundedBlockingSubpartition(index, parent, bd);
+               return new BoundedBlockingSubpartition(index, parent, bd, 
!sslEnabled);
        }
 
        /**
@@ -277,7 +288,7 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
                        int index, ResultPartition parent, File tempFile) 
throws IOException {
 
                final MemoryMappedBoundedData bd = 
MemoryMappedBoundedData.create(tempFile.toPath());
-               return new BoundedBlockingSubpartition(index, parent, bd);
+               return new BoundedBlockingSubpartition(index, parent, bd, 
false);
 
        }
 
@@ -292,6 +303,6 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
                        int index, ResultPartition parent, File tempFile) 
throws IOException {
 
                final FileChannelMemoryMappedBoundedData bd = 
FileChannelMemoryMappedBoundedData.create(tempFile.toPath());
-               return new BoundedBlockingSubpartition(index, parent, bd);
+               return new BoundedBlockingSubpartition(index, parent, bd, 
false);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java
similarity index 51%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java
index 7b3ae45..54ef0de 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java
@@ -25,104 +25,88 @@ import org.apache.flink.util.IOUtils;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The reader (read view) of a BoundedBlockingSubpartition.
+ * The reader (read view) of a BoundedBlockingSubpartition based on
+ * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion}.
  */
-final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView {
+public class BoundedBlockingSubpartitionDirectTransferReader implements 
ResultSubpartitionView {
 
        /** The result subpartition that we read. */
        private final BoundedBlockingSubpartition parent;
 
-       /** The listener that is notified when there are available buffers for 
this subpartition view. */
-       private final BufferAvailabilityListener availabilityListener;
-
-       /** The next buffer (look ahead). Null once the data is depleted or 
reader is disposed. */
-       @Nullable
-       private Buffer nextBuffer;
-
-       /** The reader/decoder to the memory mapped region with the data we 
currently read from.
-        * Null once the reader empty or disposed.*/
-       @Nullable
-       private BoundedData.Reader dataReader;
+       /** The reader/decoder to the file region with the data we currently 
read from. */
+       private final BoundedData.Reader dataReader;
 
        /** The remaining number of data buffers (not events) in the result. */
-       private int dataBufferBacklog;
+       private int numDataBuffers;
 
-       /** Flag whether this reader is released. Atomic, to avoid double 
release. */
+       /** The remaining number of data buffers and events in the result. */
+       private int numDataAndEventBuffers;
+
+       /** Flag whether this reader is released. */
        private boolean isReleased;
 
        private int sequenceNumber;
 
-       /**
-        * Convenience constructor that takes a single buffer.
-        */
-       BoundedBlockingSubpartitionReader(
-                       BoundedBlockingSubpartition parent,
-                       BoundedData data,
-                       int numDataBuffers,
-                       BufferAvailabilityListener availabilityListener) throws 
IOException {
+       BoundedBlockingSubpartitionDirectTransferReader(
+               BoundedBlockingSubpartition parent,
+               Path filePath,
+               int numDataBuffers,
+               int numDataAndEventBuffers) throws IOException {
 
                this.parent = checkNotNull(parent);
 
-               checkNotNull(data);
-               this.dataReader = data.createReader(this);
-               this.nextBuffer = dataReader.nextBuffer();
+               checkNotNull(filePath);
+               this.dataReader = new FileRegionReader(filePath);
 
                checkArgument(numDataBuffers >= 0);
-               this.dataBufferBacklog = numDataBuffers;
+               this.numDataBuffers = numDataBuffers;
 
-               this.availabilityListener = checkNotNull(availabilityListener);
+               checkArgument(numDataAndEventBuffers >= 0);
+               this.numDataAndEventBuffers = numDataAndEventBuffers;
        }
 
        @Nullable
        @Override
        public BufferAndBacklog getNextBuffer() throws IOException {
-               final Buffer current = nextBuffer; // copy reference to stack
+               if (isReleased) {
+                       return null;
+               }
 
+               Buffer current = dataReader.nextBuffer();
                if (current == null) {
                        // as per contract, we must return null when the reader 
is empty,
                        // but also in case the reader is disposed (rather than 
throwing an exception)
                        return null;
                }
-               if (current.isBuffer()) {
-                       dataBufferBacklog--;
-               }
 
-               assert dataReader != null;
-               nextBuffer = dataReader.nextBuffer();
+               updateStatistics(current);
 
-               return BufferAndBacklog.fromBufferAndLookahead(current, 
nextBuffer, dataBufferBacklog, sequenceNumber++);
+               // We simply assume all the data are non-events for batch jobs 
to avoid pre-fetching the next header
+               Buffer.DataType nextDataType = numDataAndEventBuffers > 0 ? 
Buffer.DataType.DATA_BUFFER : Buffer.DataType.NONE;
+               return BufferAndBacklog.fromBufferAndLookahead(current, 
nextDataType, numDataBuffers, sequenceNumber++);
        }
 
-       /**
-        * This method is actually only meaningful for the {@link 
BoundedBlockingSubpartitionType#FILE}.
-        *
-        * <p>For the other types the {@link #nextBuffer} can not be ever set 
to null, so it is no need
-        * to notify available via this method. But the implementation is also 
compatible with other
-        * types even though called by mistake.
-        */
-       @Override
-       public void notifyDataAvailable() {
-               if (nextBuffer == null) {
-                       assert dataReader != null;
-
-                       try {
-                               nextBuffer = dataReader.nextBuffer();
-                       } catch (IOException ex) {
-                               // this exception wrapper is only for avoiding 
throwing IOException explicitly
-                               // in relevant interface methods
-                               throw new IllegalStateException("No data 
available while reading", ex);
-                       }
-
-                       // next buffer is null indicates the end of partition
-                       if (nextBuffer != null) {
-                               availabilityListener.notifyDataAvailable();
-                       }
+       private void updateStatistics(Buffer buffer) {
+               if (buffer.isBuffer()) {
+                       numDataBuffers--;
                }
+               numDataAndEventBuffers--;
+       }
+
+       @Override
+       public boolean isAvailable(int numCreditsAvailable) {
+               // We simply assume there are no events except 
EndOfPartitionEvent for bath jobs,
+               // then it has no essential effect to ignore the judgement of 
next event buffer.
+               return numCreditsAvailable > 0 && numDataAndEventBuffers > 0;
        }
 
        @Override
@@ -132,10 +116,6 @@ final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView
 
                IOUtils.closeQuietly(dataReader);
 
-               // nulling these fields means the read method and will fail fast
-               nextBuffer = null;
-               dataReader = null;
-
                // Notify the parent that this one is released. This allows the 
parent to
                // eventually release all resources (when all readers are done 
and the
                // parent is disposed).
@@ -148,34 +128,57 @@ final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView
        }
 
        @Override
-       public void resumeConsumption() {
-               throw new UnsupportedOperationException("Method should never be 
called.");
+       public Throwable getFailureCause() {
+               // we can never throw an error after this was created
+               return null;
        }
 
        @Override
-       public boolean isAvailable(int numCreditsAvailable) {
-               if (numCreditsAvailable > 0) {
-                       return nextBuffer != null;
-               }
-
-               return nextBuffer != null && !nextBuffer.isBuffer();
+       public int unsynchronizedGetNumberOfQueuedBuffers() {
+               return parent.unsynchronizedGetNumberOfQueuedBuffers();
        }
 
        @Override
-       public Throwable getFailureCause() {
-               // we can never throw an error after this was created
-               return null;
+       public void notifyDataAvailable() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
        }
 
        @Override
-       public int unsynchronizedGetNumberOfQueuedBuffers() {
-               return parent.unsynchronizedGetNumberOfQueuedBuffers();
+       public void resumeConsumption() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
        }
 
        @Override
        public String toString() {
                return String.format("Blocking Subpartition Reader: ID=%s, 
index=%d",
-                               parent.parent.getPartitionId(),
-                               parent.getSubPartitionIndex());
+                       parent.parent.getPartitionId(),
+                       parent.getSubPartitionIndex());
+       }
+
+       /**
+        * The reader to read from {@link BoundedBlockingSubpartition} and 
return the wrapped
+        * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion} 
based buffer.
+        */
+       static final class FileRegionReader implements BoundedData.Reader {
+
+               private final FileChannel fileChannel;
+
+               private final ByteBuffer headerBuffer;
+
+               FileRegionReader(Path filePath) throws IOException {
+                       this.fileChannel = FileChannel.open(filePath, 
StandardOpenOption.READ);
+                       this.headerBuffer = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
+               }
+
+               @Nullable
+               @Override
+               public Buffer nextBuffer() throws IOException {
+                       return 
BufferReaderWriterUtil.readFileRegionFromByteChannel(fileChannel, headerBuffer);
+               }
+
+               @Override
+               public void close() throws IOException {
+                       fileChannel.close();
+               }
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
index 7b3ae45..1612580 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
@@ -94,8 +94,9 @@ final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView
 
                assert dataReader != null;
                nextBuffer = dataReader.nextBuffer();
+               Buffer.DataType nextDataType = nextBuffer != null ? 
nextBuffer.getDataType() : Buffer.DataType.NONE;
 
-               return BufferAndBacklog.fromBufferAndLookahead(current, 
nextBuffer, dataBufferBacklog, sequenceNumber++);
+               return BufferAndBacklog.fromBufferAndLookahead(current, 
nextDataType, dataBufferBacklog, sequenceNumber++);
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java
index ef35f23..b74f425 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java
@@ -33,8 +33,14 @@ public enum BoundedBlockingSubpartitionType {
        FILE {
 
                @Override
-               public BoundedBlockingSubpartition create(int index, 
BoundedBlockingResultPartition parent, File tempFile, int readBufferSize) 
throws IOException {
-                       return 
BoundedBlockingSubpartition.createWithFileChannel(index, parent, tempFile, 
readBufferSize);
+               public BoundedBlockingSubpartition create(
+                       int index,
+                       ResultPartition parent,
+                       File tempFile,
+                       int readBufferSize,
+                       boolean sslEnabled) throws IOException {
+
+                       return 
BoundedBlockingSubpartition.createWithFileChannel(index, parent, tempFile, 
readBufferSize, sslEnabled);
                }
        },
 
@@ -46,7 +52,13 @@ public enum BoundedBlockingSubpartitionType {
        MMAP {
 
                @Override
-               public BoundedBlockingSubpartition create(int index, 
BoundedBlockingResultPartition parent, File tempFile, int readBufferSize) 
throws IOException {
+               public BoundedBlockingSubpartition create(
+                       int index,
+                       ResultPartition parent,
+                       File tempFile,
+                       int readBufferSize,
+                       boolean sslEnabled) throws IOException {
+
                        return 
BoundedBlockingSubpartition.createWithMemoryMappedFile(index, parent, tempFile);
                }
        },
@@ -61,7 +73,13 @@ public enum BoundedBlockingSubpartitionType {
        FILE_MMAP {
 
                @Override
-               public BoundedBlockingSubpartition create(int index, 
BoundedBlockingResultPartition parent, File tempFile, int readBufferSize) 
throws IOException {
+               public BoundedBlockingSubpartition create(
+                       int index,
+                       ResultPartition parent,
+                       File tempFile,
+                       int readBufferSize,
+                       boolean sslEnabled) throws IOException {
+
                        return 
BoundedBlockingSubpartition.createWithFileAndMemoryMappedReader(index, parent, 
tempFile);
                }
        },
@@ -74,8 +92,14 @@ public enum BoundedBlockingSubpartitionType {
        AUTO {
 
                @Override
-               public BoundedBlockingSubpartition create(int index, 
BoundedBlockingResultPartition parent, File tempFile, int readBufferSize) 
throws IOException {
-                       return 
ResultPartitionFactory.getBoundedBlockingType().create(index, parent, tempFile, 
readBufferSize);
+               public BoundedBlockingSubpartition create(
+                       int index,
+                       ResultPartition parent,
+                       File tempFile,
+                       int readBufferSize,
+                       boolean sslEnabled) throws IOException {
+
+                       return 
ResultPartitionFactory.getBoundedBlockingType().create(index, parent, tempFile, 
readBufferSize, sslEnabled);
                }
        };
 
@@ -84,5 +108,10 @@ public enum BoundedBlockingSubpartitionType {
        /**
         * Creates BoundedBlockingSubpartition of this type.
         */
-       public abstract BoundedBlockingSubpartition create(int index, 
BoundedBlockingResultPartition parent, File tempFile, int readBufferSize) 
throws IOException;
+       public abstract BoundedBlockingSubpartition create(
+               int index,
+               ResultPartition parent,
+               File tempFile,
+               int readBufferSize,
+               boolean sslEnabled) throws IOException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java
index a8681cc..cad3c66 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java
@@ -24,6 +24,7 @@ import javax.annotation.Nullable;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.file.Path;
 
 /**
  * BoundedData is the data store in a single bounded blocking subpartition.
@@ -75,6 +76,11 @@ interface BoundedData extends Closeable {
         */
        long getSize();
 
+       /**
+        * The file path for the persisted {@link BoundedBlockingSubpartition}.
+        */
+       Path getFilePath();
+
        // 
------------------------------------------------------------------------
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
index 6e60bea..d13621f 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.FileRegionBuffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 
@@ -40,7 +41,7 @@ import java.nio.channels.FileChannel;
  * <p>The encoding is the same across FileChannel and ByteBuffer, so this 
class can
  * write to a file and read from the byte buffer that results from mapping 
this file to memory.
  */
-final class BufferReaderWriterUtil {
+public final class BufferReaderWriterUtil {
 
        static final int HEADER_LENGTH = 8;
 
@@ -166,6 +167,28 @@ final class BufferReaderWriterUtil {
        }
 
        @Nullable
+       static Buffer readFileRegionFromByteChannel(FileChannel channel, 
ByteBuffer headerBuffer) throws IOException {
+               headerBuffer.clear();
+               if (!tryReadByteBuffer(channel, headerBuffer)) {
+                       return null;
+               }
+               headerBuffer.flip();
+
+               final boolean isEvent = headerBuffer.getShort() == 
HEADER_VALUE_IS_EVENT;
+               final Buffer.DataType dataType = isEvent ? 
Buffer.DataType.EVENT_BUFFER : Buffer.DataType.DATA_BUFFER;
+               final boolean isCompressed = headerBuffer.getShort() == 
BUFFER_IS_COMPRESSED;
+               final int size = headerBuffer.getInt();
+
+               // the file region does not advance position. it must not, 
because it gets written
+               // interleaved with these calls, which would completely mess up 
the reading.
+               // so we advance the positions always and only here.
+               final long position = channel.position();
+               channel.position(position + size);
+
+               return new FileRegionBuffer(channel, position, size, dataType, 
isCompressed);
+       }
+
+       @Nullable
        static Buffer readFromByteChannel(
                        FileChannel channel,
                        ByteBuffer headerBuffer,
@@ -236,6 +259,22 @@ final class BufferReaderWriterUtil {
                while (b.hasRemaining());
        }
 
+       public static void readByteBufferFully(
+                       final FileChannel channel,
+                       final ByteBuffer b,
+                       long position) throws IOException {
+
+               // the post-checked loop here gets away with one less check in 
the normal case
+               do {
+                       final int numRead = channel.read(b, position);
+                       if (numRead == -1) {
+                               throwPrematureEndOfFile();
+                       }
+                       position += numRead;
+               }
+               while (b.hasRemaining());
+       }
+
        static void writeBuffer(FileChannel channel, ByteBuffer buffer) throws 
IOException {
                while (buffer.hasRemaining()) {
                        channel.write(buffer);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
index 690ad7d..ba35e52 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
@@ -88,6 +88,11 @@ final class FileChannelBoundedData implements BoundedData {
        }
 
        @Override
+       public Path getFilePath() {
+               return filePath;
+       }
+
+       @Override
        public void close() throws IOException {
                IOUtils.closeQuietly(fileChannel);
                Files.delete(filePath);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
index f22efd0..eededad 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
@@ -168,6 +168,11 @@ final class FileChannelMemoryMappedBoundedData implements 
BoundedData {
                return pos;
        }
 
+       @Override
+       public Path getFilePath() {
+               return filePath;
+       }
+
        private void mapRegionAndStartNext() throws IOException {
                final ByteBuffer region = fileChannel.map(MapMode.READ_ONLY, 
startOfCurrentRegion, pos - startOfCurrentRegion);
                region.order(ByteOrder.nativeOrder());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java
index e8718f5..0a464f5 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java
@@ -177,6 +177,11 @@ final class MemoryMappedBoundedData implements BoundedData 
{
                return size;
        }
 
+       @Override
+       public Path getFilePath() {
+               return filePath;
+       }
+
        private void rollOverToNextBuffer() throws IOException {
                if (currentBuffer != null) {
                        // we need to remember the original buffers, not any 
slices.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index c83eaff..ba1fc1e 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -68,6 +68,8 @@ public class ResultPartitionFactory {
 
        private final int sortShuffleMinParallelism;
 
+       private final boolean sslEnabled;
+
        public ResultPartitionFactory(
                ResultPartitionManager partitionManager,
                FileChannelManager channelManager,
@@ -80,7 +82,8 @@ public class ResultPartitionFactory {
                String compressionCodec,
                int maxBuffersPerChannel,
                int sortShuffleMinBuffers,
-               int sortShuffleMinParallelism) {
+               int sortShuffleMinParallelism,
+               boolean sslEnabled) {
 
                this.partitionManager = partitionManager;
                this.channelManager = channelManager;
@@ -94,6 +97,7 @@ public class ResultPartitionFactory {
                this.maxBuffersPerChannel = maxBuffersPerChannel;
                this.sortShuffleMinBuffers = sortShuffleMinBuffers;
                this.sortShuffleMinParallelism = sortShuffleMinParallelism;
+               this.sslEnabled = sslEnabled;
        }
 
        public ResultPartition create(
@@ -179,12 +183,13 @@ public class ResultPartitionFactory {
                                        bufferCompressor,
                                        bufferPoolFactory);
 
-                               initializeBoundedBlockingPartitions(
-                                       subpartitions,
-                                       blockingPartition,
-                                       blockingSubpartitionType,
-                                       networkBufferSize,
-                                       channelManager);
+                       initializeBoundedBlockingPartitions(
+                               subpartitions,
+                               blockingPartition,
+                               blockingSubpartitionType,
+                               networkBufferSize,
+                               channelManager,
+                               sslEnabled);
 
                                partition = blockingPartition;
                        }
@@ -203,12 +208,13 @@ public class ResultPartitionFactory {
                        BoundedBlockingResultPartition parent,
                        BoundedBlockingSubpartitionType 
blockingSubpartitionType,
                        int networkBufferSize,
-                       FileChannelManager channelManager) {
+                       FileChannelManager channelManager,
+                       boolean sslEnabled) {
                int i = 0;
                try {
                        for (i = 0; i < subpartitions.length; i++) {
                                final File spillFile = 
channelManager.createChannel().getPathFile();
-                               subpartitions[i] = 
blockingSubpartitionType.create(i, parent, spillFile, networkBufferSize);
+                               subpartitions[i] = 
blockingSubpartitionType.create(i, parent, spillFile, networkBufferSize, 
sslEnabled);
                        }
                }
                catch (IOException e) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
index 9299c86..2d89b81 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
@@ -23,8 +23,6 @@ import 
org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -166,13 +164,13 @@ public abstract class ResultSubpartition {
 
                public static BufferAndBacklog fromBufferAndLookahead(
                                Buffer current,
-                               @Nullable Buffer lookahead,
+                               Buffer.DataType nextDataType,
                                int backlog,
                                int sequenceNumber) {
                        return new BufferAndBacklog(
                                current,
                                backlog,
-                               lookahead != null ? lookahead.getDataType() : 
Buffer.DataType.NONE,
+                               nextDataType,
                                sequenceNumber);
                }
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
index f6c55db..9a6c68b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
@@ -106,8 +106,13 @@ public class SortMergeSubpartitionReader implements 
ResultSubpartitionView, Buff
                        --dataBufferBacklog;
                }
 
+               final Buffer lookAhead = buffersRead.peek();
+
                return BufferAndBacklog.fromBufferAndLookahead(
-                       buffer, buffersRead.peek(), dataBufferBacklog, 
sequenceNumber++);
+                               buffer,
+                               lookAhead == null ? Buffer.DataType.NONE : 
lookAhead.getDataType(),
+                               dataBufferBacklog,
+                               sequenceNumber++);
        }
 
        void readBuffers() throws IOException {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 47512be..45d17f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FileRegionBuffer;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
@@ -226,6 +227,10 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
 
                Buffer buffer = next.buffer();
 
+               if (buffer instanceof FileRegionBuffer) {
+                       buffer = ((FileRegionBuffer) 
buffer).readInto(inputGate.getUnpooledSegment());
+               }
+
                numBytesIn.inc(buffer.getSize());
                numBuffersIn.inc();
                if (buffer.getDataType().hasPriority()) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 35049ef..fa4699c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -189,6 +191,9 @@ public class SingleInputGate extends IndexedInputGate {
 
        private final MemorySegmentProvider memorySegmentProvider;
 
+       /** The segment to read data from file region of bounded blocking 
partition by local input channel. */
+       private final MemorySegment unpooledSegment;
+
        public SingleInputGate(
                String owningTaskName,
                int gateIndex,
@@ -199,7 +204,8 @@ public class SingleInputGate extends IndexedInputGate {
                PartitionProducerStateProvider partitionProducerStateProvider,
                SupplierWithException<BufferPool, IOException> 
bufferPoolFactory,
                @Nullable BufferDecompressor bufferDecompressor,
-               MemorySegmentProvider memorySegmentProvider) {
+               MemorySegmentProvider memorySegmentProvider,
+               int segmentSize) {
 
                this.owningTaskName = checkNotNull(owningTaskName);
                Preconditions.checkArgument(0 <= gateIndex, "The gate index 
must be positive.");
@@ -228,6 +234,8 @@ public class SingleInputGate extends IndexedInputGate {
                this.memorySegmentProvider = 
checkNotNull(memorySegmentProvider);
 
                this.closeFuture = new CompletableFuture<>();
+
+               this.unpooledSegment = 
MemorySegmentFactory.allocateUnpooledSegment(segmentSize);
        }
 
        protected PrioritizedDeque<InputChannel> getInputChannelsWithData() {
@@ -510,6 +518,10 @@ public class SingleInputGate extends IndexedInputGate {
                return retriggerLocalRequestTimer;
        }
 
+       MemorySegment getUnpooledSegment() {
+               return unpooledSegment;
+       }
+
        @Override
        public void close() throws IOException {
                boolean released = false;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
index 74e5bf2..b04db34 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
@@ -134,7 +134,8 @@ public class SingleInputGateFactory {
                        partitionProducerStateProvider,
                        bufferPoolFactory,
                        bufferDecompressor,
-                       networkBufferPool);
+                       networkBufferPool,
+                       networkBufferSize);
 
                createInputChannels(owningTaskName, igdd, inputGate, metrics);
                return inputGate;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index 8f32660..9d75783 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -172,6 +172,10 @@ public class NettyShuffleEnvironmentConfiguration {
                return blockingShuffleCompressionEnabled;
        }
 
+       public boolean isSSLEnabled() {
+               return nettyConfig != null && nettyConfig.getSSLEnabled();
+       }
+
        public String getCompressionCodec() {
                return compressionCodec;
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
index fd1188c..870c096 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
@@ -125,6 +125,7 @@ public class BoundedBlockingSubpartitionAvailabilityTest {
                BoundedBlockingResultPartition parent = 
(BoundedBlockingResultPartition) new ResultPartitionBuilder()
                        
.setResultPartitionType(ResultPartitionType.BLOCKING_PERSISTENT)
                        
.setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE)
+                       .setSSLEnabled(true)
                        .setFileChannelManager(new FileChannelManagerImpl(new 
String[] { TMP_FOLDER.newFolder().toString() }, "data"))
                        .setNetworkBufferSize(BUFFER_SIZE)
                        .build();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
index ce4083f..2e1c394 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import org.junit.AfterClass;
@@ -28,11 +29,17 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -46,12 +53,25 @@ import static org.junit.Assert.fail;
  * <p>Full read / write tests for the partition and the reader are in
  * {@link BoundedBlockingSubpartitionWriteReadTest}.
  */
+@RunWith(Parameterized.class)
 public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase {
 
        private static final String tempDir = 
EnvironmentInformation.getTemporaryFileDirectory();
 
        private static FileChannelManager fileChannelManager;
 
+       private final BoundedBlockingSubpartitionType type;
+
+       private final boolean sslEnabled;
+
+       @Parameterized.Parameters(name = "type = {0}, sslEnabled = {1}")
+       public static Collection<Object[]> parameters() {
+               return Arrays.stream(BoundedBlockingSubpartitionType.values())
+                       .map((type) -> new Object[][] { { type, true }, { type, 
false } })
+                       .flatMap(Arrays::stream)
+                       .collect(Collectors.toList());
+       }
+
        @ClassRule
        public static final TemporaryFolder TMP_DIR = new TemporaryFolder();
 
@@ -65,6 +85,11 @@ public class BoundedBlockingSubpartitionTest extends 
SubpartitionTestBase {
                fileChannelManager.close();
        }
 
+       public BoundedBlockingSubpartitionTest(BoundedBlockingSubpartitionType 
type, boolean sslEnabled) {
+               this.type = type;
+               this.sslEnabled = sslEnabled;
+       }
+
        // 
------------------------------------------------------------------------
 
        @Test
@@ -97,8 +122,7 @@ public class BoundedBlockingSubpartitionTest extends 
SubpartitionTestBase {
        @Override
        ResultSubpartition createSubpartition() throws Exception {
                final ResultPartition resultPartition = 
createPartition(ResultPartitionType.BLOCKING, fileChannelManager);
-               return BoundedBlockingSubpartition.createWithMemoryMappedFile(
-                               0, resultPartition, new 
File(TMP_DIR.newFolder(), "subpartition"));
+               return type.create(0, resultPartition, new 
File(TMP_DIR.newFolder(), "subpartition"), BufferBuilderTestUtils.BUFFER_SIZE, 
sslEnabled);
        }
 
        @Override
@@ -108,7 +132,8 @@ public class BoundedBlockingSubpartitionTest extends 
SubpartitionTestBase {
                return new BoundedBlockingSubpartition(
                                0,
                                resultPartition,
-                               new FailingBoundedData());
+                               new FailingBoundedData(),
+                               !sslEnabled && type == 
BoundedBlockingSubpartitionType.FILE);
        }
 
        // 
------------------------------------------------------------------------
@@ -136,6 +161,11 @@ public class BoundedBlockingSubpartitionTest extends 
SubpartitionTestBase {
                }
 
                @Override
+               public Path getFilePath() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
                public void close() {}
        }
 
@@ -166,6 +196,11 @@ public class BoundedBlockingSubpartitionTest extends 
SubpartitionTestBase {
                }
 
                @Override
+               public Path getFilePath() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
                public void close() {}
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
index 94b7543..6f84254 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
@@ -75,6 +75,8 @@ public class BoundedBlockingSubpartitionWriteReadTest {
 
        private final boolean compressionEnabled;
 
+       private final boolean sslEnabled;
+
        @Parameters(name = "type = {0}, compressionEnabled = {1}")
        public static Collection<Object[]> parameters() {
                return Arrays.stream(BoundedBlockingSubpartitionType.values())
@@ -86,6 +88,8 @@ public class BoundedBlockingSubpartitionWriteReadTest {
        public 
BoundedBlockingSubpartitionWriteReadTest(BoundedBlockingSubpartitionType type, 
boolean compressionEnabled) {
                this.type = type;
                this.compressionEnabled = compressionEnabled;
+               // we can also make use of the same flag since they are 
completely irrelevant
+               this.sslEnabled = compressionEnabled;
        }
 
        // 
------------------------------------------------------------------------
@@ -235,7 +239,8 @@ public class BoundedBlockingSubpartitionWriteReadTest {
                                        compressionEnabled,
                                        BUFFER_SIZE),
                                new File(TMP_FOLDER.newFolder(), 
"partitiondata"),
-                               BUFFER_SIZE);
+                               BUFFER_SIZE,
+                               sslEnabled);
        }
 
        private static LongReader[] createSubpartitionLongReaders(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
index 559d13d..aa4f582 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
@@ -157,6 +157,7 @@ public class FileChannelBoundedDataTest extends 
BoundedDataTestBase {
                        .setResultPartitionType(ResultPartitionType.BLOCKING)
                        
.setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE)
                        .setFileChannelManager(fileChannelManager)
+                       .setSSLEnabled(true)
                        .build();
                return resultPartition.subpartitions[0];
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index b6fadf8..2e99a65 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -327,6 +327,7 @@ public class InputGateFairnessTest {
        // 
------------------------------------------------------------------------
 
        private static class FairnessVerifyingInputGate extends SingleInputGate 
{
+               private static final int BUFFER_SIZE = 32 * 1024;
                private static final SupplierWithException<BufferPool, 
IOException> STUB_BUFFER_POOL_FACTORY =
                        NoOpBufferPool::new;
 
@@ -351,7 +352,8 @@ public class InputGateFairnessTest {
                                SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER,
                                STUB_BUFFER_POOL_FACTORY,
                                null,
-                               new UnpooledMemorySegmentProvider(32 * 1024));
+                               new UnpooledMemorySegmentProvider(BUFFER_SIZE),
+                               BUFFER_SIZE);
 
                        channelsWithData = getInputChannelsWithData();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 67e7adb..a356f11 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -68,6 +68,8 @@ public class ResultPartitionBuilder {
 
        private boolean blockingShuffleCompressionEnabled = false;
 
+       private boolean sslEnabled = false;
+
        private String compressionCodec = "LZ4";
 
        public ResultPartitionBuilder setResultPartitionIndex(int 
partitionIndex) {
@@ -166,6 +168,11 @@ public class ResultPartitionBuilder {
                return this;
        }
 
+       public ResultPartitionBuilder setSSLEnabled(boolean sslEnabled) {
+               this.sslEnabled = sslEnabled;
+               return this;
+       }
+
        public ResultPartition build() {
                ResultPartitionFactory resultPartitionFactory = new 
ResultPartitionFactory(
                        partitionManager,
@@ -179,7 +186,8 @@ public class ResultPartitionBuilder {
                        compressionCodec,
                        maxBuffersPerChannel,
                        sortShuffleMinBuffers,
-                       sortShuffleMinParallelism);
+                       sortShuffleMinParallelism,
+                       sslEnabled);
 
                SupplierWithException<BufferPool, IOException> factory = 
bufferPoolFactory.orElseGet(() ->
                        
resultPartitionFactory.createBufferPoolFactory(numberOfSubpartitions, 
partitionType));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 40297f6..4467d66 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -125,7 +125,8 @@ public class ResultPartitionFactoryTest extends TestLogger {
                        "LZ4",
                        Integer.MAX_VALUE,
                        10,
-                       sortShuffleMinParallelism);
+                       sortShuffleMinParallelism,
+                       false);
 
                final ResultPartitionDeploymentDescriptor descriptor = new 
ResultPartitionDeploymentDescriptor(
                        PartitionDescriptorBuilder
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index 8179e64..051ef56 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -45,6 +45,8 @@ public class SingleInputGateBuilder {
 
        private final IntermediateDataSetID intermediateDataSetID = new 
IntermediateDataSetID();
 
+       private final int bufferSize = 4096;
+
        private ResultPartitionType partitionType = 
ResultPartitionType.PIPELINED;
 
        private int consumedSubpartitionIndex = 0;
@@ -147,7 +149,8 @@ public class SingleInputGateBuilder {
                        partitionProducerStateProvider,
                        bufferPoolFactory,
                        bufferDecompressor,
-                       segmentProvider);
+                       segmentProvider,
+                       bufferSize);
                if (channelFactory != null) {
                        gate.setInputChannels(IntStream.range(0, 
numberOfChannels)
                                .mapToObj(index -> 
channelFactory.apply(InputChannelBuilder.newBuilder().setChannelIndex(index), 
gate))
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
index eae3e0e..9c058f2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.net.SSLUtilsTest;
 import org.apache.flink.testutils.serialization.types.ByteArrayType;
 import org.apache.flink.util.TestLogger;
 
@@ -48,7 +49,11 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.Matchers.is;
@@ -65,6 +70,7 @@ import static org.junit.Assert.assertThat;
  * the first fetched buffer from {@link 
org.apache.flink.runtime.io.network.partition.FileChannelBoundedData} has not
  * been recycled while fetching the second buffer to trigger next read ahead, 
which breaks the above assumption.
  */
+@RunWith(Parameterized.class)
 public class FileBufferReaderITCase extends TestLogger {
 
        private static final int parallelism = 8;
@@ -79,6 +85,14 @@ public class FileBufferReaderITCase extends TestLogger {
 
        private static final byte[] dataSource = new byte[recordSize];
 
+       @Parameterized.Parameters(name = "SSL Enabled = {0}")
+       public static List<Boolean> paras() {
+               return Arrays.asList(true, false);
+       }
+
+       @Parameterized.Parameter
+       public boolean sslEnabled;
+
        @BeforeClass
        public static void setup() {
                for (int i = 0; i < dataSource.length; i++) {
@@ -89,7 +103,12 @@ public class FileBufferReaderITCase extends TestLogger {
        @Test
        public void testSequentialReading() throws Exception {
                // setup
-               final Configuration configuration = new Configuration();
+               final Configuration configuration;
+               if (sslEnabled) {
+                       configuration = 
SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores("JDK");
+               } else {
+                       configuration = new Configuration();
+               }
                configuration.setString(RestOptions.BIND_PORT, "0");
                
configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE,
 "file");
                configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, 
MemorySize.parse("1g"));

Reply via email to