This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4869d3593d5b7fcff36abbe71a3e1a46b7a64998 Author: Zhijiang <[email protected]> AuthorDate: Wed Oct 28 18:05:57 2020 +0800 [FLINK-15981][network] Implement zero-copy file transfer in network based on FileRegion for bounded blocking partitions For file-type bounded blocking partition, the previous way was to allocate two unpooled segments for every subpartition to read data during network transfer. The memory overhead is serious and could cause OOM errors (direct memory). This change uses FileRegion to transfer file data to the network without copying it to memory first (zero-copy transfer). The exception is SSL-enabled setups: Direct transfer from file cache to network cannot work with SSL, because with SSL the data needs to be encrypted before being pushed into the network socket. SSL-enabled setups still behave as before. --- .../io/network/NettyShuffleServiceFactory.java | 3 +- .../runtime/io/network/buffer/BufferRecycler.java | 14 ++ .../io/network/buffer/FileRegionBuffer.java | 240 +++++++++++++++++++++ .../partition/BoundedBlockingSubpartition.java | 27 ++- ...dBlockingSubpartitionDirectTransferReader.java} | 159 +++++++------- .../BoundedBlockingSubpartitionReader.java | 3 +- .../partition/BoundedBlockingSubpartitionType.java | 43 +++- .../runtime/io/network/partition/BoundedData.java | 6 + .../network/partition/BufferReaderWriterUtil.java | 41 +++- .../network/partition/FileChannelBoundedData.java | 5 + .../FileChannelMemoryMappedBoundedData.java | 5 + .../network/partition/MemoryMappedBoundedData.java | 5 + .../network/partition/ResultPartitionFactory.java | 24 ++- .../io/network/partition/ResultSubpartition.java | 6 +- .../partition/SortMergeSubpartitionReader.java | 7 +- .../partition/consumer/LocalInputChannel.java | 5 + .../partition/consumer/SingleInputGate.java | 14 +- .../partition/consumer/SingleInputGateFactory.java | 3 +- .../NettyShuffleEnvironmentConfiguration.java | 4 + ...oundedBlockingSubpartitionAvailabilityTest.java | 1 + .../partition/BoundedBlockingSubpartitionTest.java | 41 +++- .../BoundedBlockingSubpartitionWriteReadTest.java | 7 +- .../partition/FileChannelBoundedDataTest.java | 1 + .../network/partition/InputGateFairnessTest.java | 4 +- .../network/partition/ResultPartitionBuilder.java | 10 +- .../partition/ResultPartitionFactoryTest.java | 3 +- .../partition/consumer/SingleInputGateBuilder.java | 5 +- .../flink/test/runtime/FileBufferReaderITCase.java | 21 +- 28 files changed, 586 insertions(+), 121 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java index ecd8027..d5fbd7f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java @@ -128,7 +128,8 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh config.getCompressionCodec(), config.getMaxBuffersPerChannel(), config.sortShuffleMinBuffers(), - config.sortShuffleMinParallelism()); + config.sortShuffleMinParallelism(), + config.isSSLEnabled()); SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory( taskExecutorResourceId, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java index 66b1fa2..c8aced7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java @@ -32,4 +32,18 @@ public interface BufferRecycler { * @param memorySegment The memory segment to be recycled. */ void recycle(MemorySegment memorySegment); + + /** + * The buffer recycler does nothing for recycled segment. + */ + final class DummyBufferRecycler implements BufferRecycler { + + public static final BufferRecycler INSTANCE = new DummyBufferRecycler(); + + private DummyBufferRecycler() {} + + @Override + public void recycle(MemorySegment memorySegment) { + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FileRegionBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FileRegionBuffer.java new file mode 100644 index 0000000..04b7c15 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FileRegionBuffer.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil; +import org.apache.flink.util.FlinkRuntimeException; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; +import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.WritableByteChannel; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class represents a chunk of data in a file channel. Its purpose is to be passed + * to the netty code and to be written to the socket via the zero-copy direct transfer + * capabilities of {@link FileChannel#transferTo(long, long, WritableByteChannel)}. + * + * <p>This class implements {@link Buffer} mainly for compatible with existing usages. + * It can be thought of as a "lazy buffer" that does not hold the data directly, although + * the data can be fetches as a read-only {@code ByteBuffer} when needed, for example + * in local input channels. See {@link #readInto(MemorySegment)} and + * {@link #getNioBufferReadable()}. Because this buffer is read-only, the modification + * methods (and methods that give a writable buffer) throw {@link UnsupportedOperationException}. + * + * <p>This class extends from Netty's {@link DefaultFileRegion}, similar as the + * {@link NetworkBuffer} extends from Netty's {@link ByteBuf}. That way we can pass both + * of them to Netty in the same way, and Netty will internally treat them appropriately. + */ +public class FileRegionBuffer extends DefaultFileRegion implements Buffer { + + private final FileChannel fileChannel; + + /** The {@link DataType} this buffer represents. */ + private final DataType dataType; + + /** Whether the buffer is compressed or not. */ + private final boolean isCompressed; + + public FileRegionBuffer( + FileChannel fileChannel, + long fileChannelPosition, + int bufferSize, + DataType dataType, + boolean isCompressed) { + + super(fileChannel, fileChannelPosition, bufferSize); + + this.fileChannel = checkNotNull(fileChannel); + this.dataType = checkNotNull(dataType); + this.isCompressed = isCompressed; + } + + private int bufferSize() { + // this is guaranteed to not be lossy, because we initialize this from + // an int in the constructor. + return (int) count(); + } + + // ------------------------------------------------------------------------ + // Buffer override methods + // ------------------------------------------------------------------------ + + @Override + public boolean isBuffer() { + return dataType.isBuffer(); + } + + @Override + public MemorySegment getMemorySegment() { + throw new UnsupportedOperationException("Method should never be called."); + } + + @Override + public int getMemorySegmentOffset() { + throw new UnsupportedOperationException("Method should never be called."); + } + + @Override + public ReadOnlySlicedNetworkBuffer readOnlySlice() { + throw new UnsupportedOperationException("Method should never be called."); + } + + @Override + public ReadOnlySlicedNetworkBuffer readOnlySlice(int index, int length) { + throw new UnsupportedOperationException("Method should never be called."); + } + + @Override + public int getMaxCapacity() { + throw new UnsupportedOperationException("Method should never be called."); + } + + @Override + public int getReaderIndex() { + throw new UnsupportedOperationException("Method should never be called."); + } + + @Override + public void setReaderIndex(int readerIndex) throws IndexOutOfBoundsException { + throw new UnsupportedOperationException("Method should never be called."); + } + + /** + * This method is only called by tests and by event-deserialization, like + * checkpoint barriers. Because such events are not used for bounded intermediate + * results, this method currently executes only in tests. + */ + @Override + public ByteBuffer getNioBufferReadable() { + try { + final ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize()); + BufferReaderWriterUtil.readByteBufferFully(fileChannel, buffer, position()); + buffer.flip(); + return buffer; + } catch (IOException e) { + // this is not very pretty, but given that this code runs only in tests + // the exception wrapping here is simpler than updating the method signature + // to declare IOExceptions, as would be necessary for a proper "lazy buffer". + throw new FlinkRuntimeException(e.getMessage(), e); + } + } + + @Override + public ByteBuffer getNioBuffer(int index, int length) throws IndexOutOfBoundsException { + throw new UnsupportedOperationException("Method should never be called."); + } + + @Override + public ByteBuf asByteBuf() { + throw new UnsupportedOperationException("Method should never be called."); + } + + @Override + public void setSize(int writerIndex) { + throw new UnsupportedOperationException("Method should never be called."); + } + + @Override + public int getSize() { + return bufferSize(); + } + + @Override + public int readableBytes() { + return bufferSize(); + } + + @Override + public void setAllocator(ByteBufAllocator allocator) { + // nothing to do + } + + @Override + public BufferRecycler getRecycler() { + return null; + } + + @Override + public void recycleBuffer() { + // nothing to do + } + + @Override + public boolean isRecycled() { + return false; + } + + @Override + public FileRegionBuffer retainBuffer() { + return (FileRegionBuffer) super.retain(); + } + + @Override + public boolean isCompressed() { + return isCompressed; + } + + @Override + public void setCompressed(boolean isCompressed) { + throw new UnsupportedOperationException("Method should never be called."); + } + + @Override + public DataType getDataType() { + return dataType; + } + + @Override + public void setDataType(DataType dataType) { + throw new UnsupportedOperationException("Method should never be called."); + } + + // ------------------------------------------------------------------------ + // File region override methods + // ------------------------------------------------------------------------ + + @Override + public void deallocate() { + // nothing to do + } + + // ------------------------------------------------------------------------ + // Utils + // ------------------------------------------------------------------------ + + public Buffer readInto(MemorySegment segment) throws IOException { + final ByteBuffer buffer = segment.wrap(0, bufferSize()); + BufferReaderWriterUtil.readByteBufferFully(fileChannel, buffer, position()); + + return new NetworkBuffer( + segment, + BufferRecycler.DummyBufferRecycler.INSTANCE, + dataType, + isCompressed, + bufferSize()); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java index 07722ac..6df664f 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java @@ -74,7 +74,10 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { /** All created and not yet released readers. */ @GuardedBy("lock") - private final Set<BoundedBlockingSubpartitionReader> readers; + private final Set<ResultSubpartitionView> readers; + + /** Flag to transfer file via FileRegion way in network stack if partition type is file without SSL enabled. */ + private final boolean useDirectFileTransfer; /** Counter for the number of data buffers (not events!) written. */ private int numDataBuffersWritten; @@ -91,11 +94,13 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { public BoundedBlockingSubpartition( int index, ResultPartition parent, - BoundedData data) { + BoundedData data, + boolean useDirectFileTransfer) { super(index, parent); this.data = checkNotNull(data); + this.useDirectFileTransfer = useDirectFileTransfer; this.readers = new HashSet<>(); } @@ -204,14 +209,20 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { checkState(!isReleased, "data partition already released"); checkState(isFinished, "writing of blocking partition not yet finished"); - final BoundedBlockingSubpartitionReader reader = new BoundedBlockingSubpartitionReader( + final ResultSubpartitionView reader; + if (useDirectFileTransfer) { + reader = new BoundedBlockingSubpartitionDirectTransferReader( + this, data.getFilePath(), numDataBuffersWritten, numBuffersAndEventsWritten); + } else { + reader = new BoundedBlockingSubpartitionReader( this, data, numDataBuffersWritten, availability); + } readers.add(reader); return reader; } } - void releaseReaderReference(BoundedBlockingSubpartitionReader reader) throws IOException { + void releaseReaderReference(ResultSubpartitionView reader) throws IOException { onConsumedSubpartition(); synchronized (lock) { @@ -262,10 +273,10 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { * Data is eagerly spilled (written to disk) and readers directly read from the file. */ public static BoundedBlockingSubpartition createWithFileChannel( - int index, ResultPartition parent, File tempFile, int readBufferSize) throws IOException { + int index, ResultPartition parent, File tempFile, int readBufferSize, boolean sslEnabled) throws IOException { final FileChannelBoundedData bd = FileChannelBoundedData.create(tempFile.toPath(), readBufferSize); - return new BoundedBlockingSubpartition(index, parent, bd); + return new BoundedBlockingSubpartition(index, parent, bd, !sslEnabled); } /** @@ -277,7 +288,7 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { int index, ResultPartition parent, File tempFile) throws IOException { final MemoryMappedBoundedData bd = MemoryMappedBoundedData.create(tempFile.toPath()); - return new BoundedBlockingSubpartition(index, parent, bd); + return new BoundedBlockingSubpartition(index, parent, bd, false); } @@ -292,6 +303,6 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { int index, ResultPartition parent, File tempFile) throws IOException { final FileChannelMemoryMappedBoundedData bd = FileChannelMemoryMappedBoundedData.create(tempFile.toPath()); - return new BoundedBlockingSubpartition(index, parent, bd); + return new BoundedBlockingSubpartition(index, parent, bd, false); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java similarity index 51% copy from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java index 7b3ae45..54ef0de 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java @@ -25,104 +25,88 @@ import org.apache.flink.util.IOUtils; import javax.annotation.Nullable; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The reader (read view) of a BoundedBlockingSubpartition. + * The reader (read view) of a BoundedBlockingSubpartition based on + * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion}. */ -final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView { +public class BoundedBlockingSubpartitionDirectTransferReader implements ResultSubpartitionView { /** The result subpartition that we read. */ private final BoundedBlockingSubpartition parent; - /** The listener that is notified when there are available buffers for this subpartition view. */ - private final BufferAvailabilityListener availabilityListener; - - /** The next buffer (look ahead). Null once the data is depleted or reader is disposed. */ - @Nullable - private Buffer nextBuffer; - - /** The reader/decoder to the memory mapped region with the data we currently read from. - * Null once the reader empty or disposed.*/ - @Nullable - private BoundedData.Reader dataReader; + /** The reader/decoder to the file region with the data we currently read from. */ + private final BoundedData.Reader dataReader; /** The remaining number of data buffers (not events) in the result. */ - private int dataBufferBacklog; + private int numDataBuffers; - /** Flag whether this reader is released. Atomic, to avoid double release. */ + /** The remaining number of data buffers and events in the result. */ + private int numDataAndEventBuffers; + + /** Flag whether this reader is released. */ private boolean isReleased; private int sequenceNumber; - /** - * Convenience constructor that takes a single buffer. - */ - BoundedBlockingSubpartitionReader( - BoundedBlockingSubpartition parent, - BoundedData data, - int numDataBuffers, - BufferAvailabilityListener availabilityListener) throws IOException { + BoundedBlockingSubpartitionDirectTransferReader( + BoundedBlockingSubpartition parent, + Path filePath, + int numDataBuffers, + int numDataAndEventBuffers) throws IOException { this.parent = checkNotNull(parent); - checkNotNull(data); - this.dataReader = data.createReader(this); - this.nextBuffer = dataReader.nextBuffer(); + checkNotNull(filePath); + this.dataReader = new FileRegionReader(filePath); checkArgument(numDataBuffers >= 0); - this.dataBufferBacklog = numDataBuffers; + this.numDataBuffers = numDataBuffers; - this.availabilityListener = checkNotNull(availabilityListener); + checkArgument(numDataAndEventBuffers >= 0); + this.numDataAndEventBuffers = numDataAndEventBuffers; } @Nullable @Override public BufferAndBacklog getNextBuffer() throws IOException { - final Buffer current = nextBuffer; // copy reference to stack + if (isReleased) { + return null; + } + Buffer current = dataReader.nextBuffer(); if (current == null) { // as per contract, we must return null when the reader is empty, // but also in case the reader is disposed (rather than throwing an exception) return null; } - if (current.isBuffer()) { - dataBufferBacklog--; - } - assert dataReader != null; - nextBuffer = dataReader.nextBuffer(); + updateStatistics(current); - return BufferAndBacklog.fromBufferAndLookahead(current, nextBuffer, dataBufferBacklog, sequenceNumber++); + // We simply assume all the data are non-events for batch jobs to avoid pre-fetching the next header + Buffer.DataType nextDataType = numDataAndEventBuffers > 0 ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.NONE; + return BufferAndBacklog.fromBufferAndLookahead(current, nextDataType, numDataBuffers, sequenceNumber++); } - /** - * This method is actually only meaningful for the {@link BoundedBlockingSubpartitionType#FILE}. - * - * <p>For the other types the {@link #nextBuffer} can not be ever set to null, so it is no need - * to notify available via this method. But the implementation is also compatible with other - * types even though called by mistake. - */ - @Override - public void notifyDataAvailable() { - if (nextBuffer == null) { - assert dataReader != null; - - try { - nextBuffer = dataReader.nextBuffer(); - } catch (IOException ex) { - // this exception wrapper is only for avoiding throwing IOException explicitly - // in relevant interface methods - throw new IllegalStateException("No data available while reading", ex); - } - - // next buffer is null indicates the end of partition - if (nextBuffer != null) { - availabilityListener.notifyDataAvailable(); - } + private void updateStatistics(Buffer buffer) { + if (buffer.isBuffer()) { + numDataBuffers--; } + numDataAndEventBuffers--; + } + + @Override + public boolean isAvailable(int numCreditsAvailable) { + // We simply assume there are no events except EndOfPartitionEvent for bath jobs, + // then it has no essential effect to ignore the judgement of next event buffer. + return numCreditsAvailable > 0 && numDataAndEventBuffers > 0; } @Override @@ -132,10 +116,6 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView IOUtils.closeQuietly(dataReader); - // nulling these fields means the read method and will fail fast - nextBuffer = null; - dataReader = null; - // Notify the parent that this one is released. This allows the parent to // eventually release all resources (when all readers are done and the // parent is disposed). @@ -148,34 +128,57 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView } @Override - public void resumeConsumption() { - throw new UnsupportedOperationException("Method should never be called."); + public Throwable getFailureCause() { + // we can never throw an error after this was created + return null; } @Override - public boolean isAvailable(int numCreditsAvailable) { - if (numCreditsAvailable > 0) { - return nextBuffer != null; - } - - return nextBuffer != null && !nextBuffer.isBuffer(); + public int unsynchronizedGetNumberOfQueuedBuffers() { + return parent.unsynchronizedGetNumberOfQueuedBuffers(); } @Override - public Throwable getFailureCause() { - // we can never throw an error after this was created - return null; + public void notifyDataAvailable() { + throw new UnsupportedOperationException("Method should never be called."); } @Override - public int unsynchronizedGetNumberOfQueuedBuffers() { - return parent.unsynchronizedGetNumberOfQueuedBuffers(); + public void resumeConsumption() { + throw new UnsupportedOperationException("Method should never be called."); } @Override public String toString() { return String.format("Blocking Subpartition Reader: ID=%s, index=%d", - parent.parent.getPartitionId(), - parent.getSubPartitionIndex()); + parent.parent.getPartitionId(), + parent.getSubPartitionIndex()); + } + + /** + * The reader to read from {@link BoundedBlockingSubpartition} and return the wrapped + * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion} based buffer. + */ + static final class FileRegionReader implements BoundedData.Reader { + + private final FileChannel fileChannel; + + private final ByteBuffer headerBuffer; + + FileRegionReader(Path filePath) throws IOException { + this.fileChannel = FileChannel.open(filePath, StandardOpenOption.READ); + this.headerBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer(); + } + + @Nullable + @Override + public Buffer nextBuffer() throws IOException { + return BufferReaderWriterUtil.readFileRegionFromByteChannel(fileChannel, headerBuffer); + } + + @Override + public void close() throws IOException { + fileChannel.close(); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java index 7b3ae45..1612580 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java @@ -94,8 +94,9 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView assert dataReader != null; nextBuffer = dataReader.nextBuffer(); + Buffer.DataType nextDataType = nextBuffer != null ? nextBuffer.getDataType() : Buffer.DataType.NONE; - return BufferAndBacklog.fromBufferAndLookahead(current, nextBuffer, dataBufferBacklog, sequenceNumber++); + return BufferAndBacklog.fromBufferAndLookahead(current, nextDataType, dataBufferBacklog, sequenceNumber++); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java index ef35f23..b74f425 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java @@ -33,8 +33,14 @@ public enum BoundedBlockingSubpartitionType { FILE { @Override - public BoundedBlockingSubpartition create(int index, BoundedBlockingResultPartition parent, File tempFile, int readBufferSize) throws IOException { - return BoundedBlockingSubpartition.createWithFileChannel(index, parent, tempFile, readBufferSize); + public BoundedBlockingSubpartition create( + int index, + ResultPartition parent, + File tempFile, + int readBufferSize, + boolean sslEnabled) throws IOException { + + return BoundedBlockingSubpartition.createWithFileChannel(index, parent, tempFile, readBufferSize, sslEnabled); } }, @@ -46,7 +52,13 @@ public enum BoundedBlockingSubpartitionType { MMAP { @Override - public BoundedBlockingSubpartition create(int index, BoundedBlockingResultPartition parent, File tempFile, int readBufferSize) throws IOException { + public BoundedBlockingSubpartition create( + int index, + ResultPartition parent, + File tempFile, + int readBufferSize, + boolean sslEnabled) throws IOException { + return BoundedBlockingSubpartition.createWithMemoryMappedFile(index, parent, tempFile); } }, @@ -61,7 +73,13 @@ public enum BoundedBlockingSubpartitionType { FILE_MMAP { @Override - public BoundedBlockingSubpartition create(int index, BoundedBlockingResultPartition parent, File tempFile, int readBufferSize) throws IOException { + public BoundedBlockingSubpartition create( + int index, + ResultPartition parent, + File tempFile, + int readBufferSize, + boolean sslEnabled) throws IOException { + return BoundedBlockingSubpartition.createWithFileAndMemoryMappedReader(index, parent, tempFile); } }, @@ -74,8 +92,14 @@ public enum BoundedBlockingSubpartitionType { AUTO { @Override - public BoundedBlockingSubpartition create(int index, BoundedBlockingResultPartition parent, File tempFile, int readBufferSize) throws IOException { - return ResultPartitionFactory.getBoundedBlockingType().create(index, parent, tempFile, readBufferSize); + public BoundedBlockingSubpartition create( + int index, + ResultPartition parent, + File tempFile, + int readBufferSize, + boolean sslEnabled) throws IOException { + + return ResultPartitionFactory.getBoundedBlockingType().create(index, parent, tempFile, readBufferSize, sslEnabled); } }; @@ -84,5 +108,10 @@ public enum BoundedBlockingSubpartitionType { /** * Creates BoundedBlockingSubpartition of this type. */ - public abstract BoundedBlockingSubpartition create(int index, BoundedBlockingResultPartition parent, File tempFile, int readBufferSize) throws IOException; + public abstract BoundedBlockingSubpartition create( + int index, + ResultPartition parent, + File tempFile, + int readBufferSize, + boolean sslEnabled) throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java index a8681cc..cad3c66 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java @@ -24,6 +24,7 @@ import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; +import java.nio.file.Path; /** * BoundedData is the data store in a single bounded blocking subpartition. @@ -75,6 +76,11 @@ interface BoundedData extends Closeable { */ long getSize(); + /** + * The file path for the persisted {@link BoundedBlockingSubpartition}. + */ + Path getFilePath(); + // ------------------------------------------------------------------------ /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java index 6e60bea..d13621f 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java @@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.FileRegionBuffer; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; @@ -40,7 +41,7 @@ import java.nio.channels.FileChannel; * <p>The encoding is the same across FileChannel and ByteBuffer, so this class can * write to a file and read from the byte buffer that results from mapping this file to memory. */ -final class BufferReaderWriterUtil { +public final class BufferReaderWriterUtil { static final int HEADER_LENGTH = 8; @@ -166,6 +167,28 @@ final class BufferReaderWriterUtil { } @Nullable + static Buffer readFileRegionFromByteChannel(FileChannel channel, ByteBuffer headerBuffer) throws IOException { + headerBuffer.clear(); + if (!tryReadByteBuffer(channel, headerBuffer)) { + return null; + } + headerBuffer.flip(); + + final boolean isEvent = headerBuffer.getShort() == HEADER_VALUE_IS_EVENT; + final Buffer.DataType dataType = isEvent ? Buffer.DataType.EVENT_BUFFER : Buffer.DataType.DATA_BUFFER; + final boolean isCompressed = headerBuffer.getShort() == BUFFER_IS_COMPRESSED; + final int size = headerBuffer.getInt(); + + // the file region does not advance position. it must not, because it gets written + // interleaved with these calls, which would completely mess up the reading. + // so we advance the positions always and only here. + final long position = channel.position(); + channel.position(position + size); + + return new FileRegionBuffer(channel, position, size, dataType, isCompressed); + } + + @Nullable static Buffer readFromByteChannel( FileChannel channel, ByteBuffer headerBuffer, @@ -236,6 +259,22 @@ final class BufferReaderWriterUtil { while (b.hasRemaining()); } + public static void readByteBufferFully( + final FileChannel channel, + final ByteBuffer b, + long position) throws IOException { + + // the post-checked loop here gets away with one less check in the normal case + do { + final int numRead = channel.read(b, position); + if (numRead == -1) { + throwPrematureEndOfFile(); + } + position += numRead; + } + while (b.hasRemaining()); + } + static void writeBuffer(FileChannel channel, ByteBuffer buffer) throws IOException { while (buffer.hasRemaining()) { channel.write(buffer); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java index 690ad7d..ba35e52 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java @@ -88,6 +88,11 @@ final class FileChannelBoundedData implements BoundedData { } @Override + public Path getFilePath() { + return filePath; + } + + @Override public void close() throws IOException { IOUtils.closeQuietly(fileChannel); Files.delete(filePath); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java index f22efd0..eededad 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java @@ -168,6 +168,11 @@ final class FileChannelMemoryMappedBoundedData implements BoundedData { return pos; } + @Override + public Path getFilePath() { + return filePath; + } + private void mapRegionAndStartNext() throws IOException { final ByteBuffer region = fileChannel.map(MapMode.READ_ONLY, startOfCurrentRegion, pos - startOfCurrentRegion); region.order(ByteOrder.nativeOrder()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java index e8718f5..0a464f5 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java @@ -177,6 +177,11 @@ final class MemoryMappedBoundedData implements BoundedData { return size; } + @Override + public Path getFilePath() { + return filePath; + } + private void rollOverToNextBuffer() throws IOException { if (currentBuffer != null) { // we need to remember the original buffers, not any slices. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java index c83eaff..ba1fc1e 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java @@ -68,6 +68,8 @@ public class ResultPartitionFactory { private final int sortShuffleMinParallelism; + private final boolean sslEnabled; + public ResultPartitionFactory( ResultPartitionManager partitionManager, FileChannelManager channelManager, @@ -80,7 +82,8 @@ public class ResultPartitionFactory { String compressionCodec, int maxBuffersPerChannel, int sortShuffleMinBuffers, - int sortShuffleMinParallelism) { + int sortShuffleMinParallelism, + boolean sslEnabled) { this.partitionManager = partitionManager; this.channelManager = channelManager; @@ -94,6 +97,7 @@ public class ResultPartitionFactory { this.maxBuffersPerChannel = maxBuffersPerChannel; this.sortShuffleMinBuffers = sortShuffleMinBuffers; this.sortShuffleMinParallelism = sortShuffleMinParallelism; + this.sslEnabled = sslEnabled; } public ResultPartition create( @@ -179,12 +183,13 @@ public class ResultPartitionFactory { bufferCompressor, bufferPoolFactory); - initializeBoundedBlockingPartitions( - subpartitions, - blockingPartition, - blockingSubpartitionType, - networkBufferSize, - channelManager); + initializeBoundedBlockingPartitions( + subpartitions, + blockingPartition, + blockingSubpartitionType, + networkBufferSize, + channelManager, + sslEnabled); partition = blockingPartition; } @@ -203,12 +208,13 @@ public class ResultPartitionFactory { BoundedBlockingResultPartition parent, BoundedBlockingSubpartitionType blockingSubpartitionType, int networkBufferSize, - FileChannelManager channelManager) { + FileChannelManager channelManager, + boolean sslEnabled) { int i = 0; try { for (i = 0; i < subpartitions.length; i++) { final File spillFile = channelManager.createChannel().getPathFile(); - subpartitions[i] = blockingSubpartitionType.create(i, parent, spillFile, networkBufferSize); + subpartitions[i] = blockingSubpartitionType.create(i, parent, spillFile, networkBufferSize, sslEnabled); } } catch (IOException e) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java index 9299c86..2d89b81 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -23,8 +23,6 @@ import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; -import javax.annotation.Nullable; - import java.io.IOException; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -166,13 +164,13 @@ public abstract class ResultSubpartition { public static BufferAndBacklog fromBufferAndLookahead( Buffer current, - @Nullable Buffer lookahead, + Buffer.DataType nextDataType, int backlog, int sequenceNumber) { return new BufferAndBacklog( current, backlog, - lookahead != null ? lookahead.getDataType() : Buffer.DataType.NONE, + nextDataType, sequenceNumber); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java index f6c55db..9a6c68b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java @@ -106,8 +106,13 @@ public class SortMergeSubpartitionReader implements ResultSubpartitionView, Buff --dataBufferBacklog; } + final Buffer lookAhead = buffersRead.peek(); + return BufferAndBacklog.fromBufferAndLookahead( - buffer, buffersRead.peek(), dataBufferBacklog, sequenceNumber++); + buffer, + lookAhead == null ? Buffer.DataType.NONE : lookAhead.getDataType(), + dataBufferBacklog, + sequenceNumber++); } void readBuffers() throws IOException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 47512be..45d17f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.TaskEventPublisher; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FileRegionBuffer; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ChannelStateHolder; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; @@ -226,6 +227,10 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit Buffer buffer = next.buffer(); + if (buffer instanceof FileRegionBuffer) { + buffer = ((FileRegionBuffer) buffer).readInto(inputGate.getUnpooledSegment()); + } + numBytesIn.inc(buffer.getSize()); numBuffersIn.inc(); if (buffer.getDataType().hasPriority()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 35049ef..fa4699c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.core.memory.MemorySegmentProvider; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -189,6 +191,9 @@ public class SingleInputGate extends IndexedInputGate { private final MemorySegmentProvider memorySegmentProvider; + /** The segment to read data from file region of bounded blocking partition by local input channel. */ + private final MemorySegment unpooledSegment; + public SingleInputGate( String owningTaskName, int gateIndex, @@ -199,7 +204,8 @@ public class SingleInputGate extends IndexedInputGate { PartitionProducerStateProvider partitionProducerStateProvider, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @Nullable BufferDecompressor bufferDecompressor, - MemorySegmentProvider memorySegmentProvider) { + MemorySegmentProvider memorySegmentProvider, + int segmentSize) { this.owningTaskName = checkNotNull(owningTaskName); Preconditions.checkArgument(0 <= gateIndex, "The gate index must be positive."); @@ -228,6 +234,8 @@ public class SingleInputGate extends IndexedInputGate { this.memorySegmentProvider = checkNotNull(memorySegmentProvider); this.closeFuture = new CompletableFuture<>(); + + this.unpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(segmentSize); } protected PrioritizedDeque<InputChannel> getInputChannelsWithData() { @@ -510,6 +518,10 @@ public class SingleInputGate extends IndexedInputGate { return retriggerLocalRequestTimer; } + MemorySegment getUnpooledSegment() { + return unpooledSegment; + } + @Override public void close() throws IOException { boolean released = false; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java index 74e5bf2..b04db34 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java @@ -134,7 +134,8 @@ public class SingleInputGateFactory { partitionProducerStateProvider, bufferPoolFactory, bufferDecompressor, - networkBufferPool); + networkBufferPool, + networkBufferSize); createInputChannels(owningTaskName, igdd, inputGate, metrics); return inputGate; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java index 8f32660..9d75783 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java @@ -172,6 +172,10 @@ public class NettyShuffleEnvironmentConfiguration { return blockingShuffleCompressionEnabled; } + public boolean isSSLEnabled() { + return nettyConfig != null && nettyConfig.getSSLEnabled(); + } + public String getCompressionCodec() { return compressionCodec; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java index fd1188c..870c096 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java @@ -125,6 +125,7 @@ public class BoundedBlockingSubpartitionAvailabilityTest { BoundedBlockingResultPartition parent = (BoundedBlockingResultPartition) new ResultPartitionBuilder() .setResultPartitionType(ResultPartitionType.BLOCKING_PERSISTENT) .setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE) + .setSSLEnabled(true) .setFileChannelManager(new FileChannelManagerImpl(new String[] { TMP_FOLDER.newFolder().toString() }, "data")) .setNetworkBufferSize(BUFFER_SIZE) .build(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java index ce4083f..2e1c394 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.io.disk.FileChannelManager; import org.apache.flink.runtime.io.disk.FileChannelManagerImpl; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.util.EnvironmentInformation; import org.junit.AfterClass; @@ -28,11 +29,17 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -46,12 +53,25 @@ import static org.junit.Assert.fail; * <p>Full read / write tests for the partition and the reader are in * {@link BoundedBlockingSubpartitionWriteReadTest}. */ +@RunWith(Parameterized.class) public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase { private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory(); private static FileChannelManager fileChannelManager; + private final BoundedBlockingSubpartitionType type; + + private final boolean sslEnabled; + + @Parameterized.Parameters(name = "type = {0}, sslEnabled = {1}") + public static Collection<Object[]> parameters() { + return Arrays.stream(BoundedBlockingSubpartitionType.values()) + .map((type) -> new Object[][] { { type, true }, { type, false } }) + .flatMap(Arrays::stream) + .collect(Collectors.toList()); + } + @ClassRule public static final TemporaryFolder TMP_DIR = new TemporaryFolder(); @@ -65,6 +85,11 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase { fileChannelManager.close(); } + public BoundedBlockingSubpartitionTest(BoundedBlockingSubpartitionType type, boolean sslEnabled) { + this.type = type; + this.sslEnabled = sslEnabled; + } + // ------------------------------------------------------------------------ @Test @@ -97,8 +122,7 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase { @Override ResultSubpartition createSubpartition() throws Exception { final ResultPartition resultPartition = createPartition(ResultPartitionType.BLOCKING, fileChannelManager); - return BoundedBlockingSubpartition.createWithMemoryMappedFile( - 0, resultPartition, new File(TMP_DIR.newFolder(), "subpartition")); + return type.create(0, resultPartition, new File(TMP_DIR.newFolder(), "subpartition"), BufferBuilderTestUtils.BUFFER_SIZE, sslEnabled); } @Override @@ -108,7 +132,8 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase { return new BoundedBlockingSubpartition( 0, resultPartition, - new FailingBoundedData()); + new FailingBoundedData(), + !sslEnabled && type == BoundedBlockingSubpartitionType.FILE); } // ------------------------------------------------------------------------ @@ -136,6 +161,11 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase { } @Override + public Path getFilePath() { + throw new UnsupportedOperationException(); + } + + @Override public void close() {} } @@ -166,6 +196,11 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase { } @Override + public Path getFilePath() { + throw new UnsupportedOperationException(); + } + + @Override public void close() {} } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java index 94b7543..6f84254 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java @@ -75,6 +75,8 @@ public class BoundedBlockingSubpartitionWriteReadTest { private final boolean compressionEnabled; + private final boolean sslEnabled; + @Parameters(name = "type = {0}, compressionEnabled = {1}") public static Collection<Object[]> parameters() { return Arrays.stream(BoundedBlockingSubpartitionType.values()) @@ -86,6 +88,8 @@ public class BoundedBlockingSubpartitionWriteReadTest { public BoundedBlockingSubpartitionWriteReadTest(BoundedBlockingSubpartitionType type, boolean compressionEnabled) { this.type = type; this.compressionEnabled = compressionEnabled; + // we can also make use of the same flag since they are completely irrelevant + this.sslEnabled = compressionEnabled; } // ------------------------------------------------------------------------ @@ -235,7 +239,8 @@ public class BoundedBlockingSubpartitionWriteReadTest { compressionEnabled, BUFFER_SIZE), new File(TMP_FOLDER.newFolder(), "partitiondata"), - BUFFER_SIZE); + BUFFER_SIZE, + sslEnabled); } private static LongReader[] createSubpartitionLongReaders( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java index 559d13d..aa4f582 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java @@ -157,6 +157,7 @@ public class FileChannelBoundedDataTest extends BoundedDataTestBase { .setResultPartitionType(ResultPartitionType.BLOCKING) .setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE) .setFileChannelManager(fileChannelManager) + .setSSLEnabled(true) .build(); return resultPartition.subpartitions[0]; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java index b6fadf8..2e99a65 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java @@ -327,6 +327,7 @@ public class InputGateFairnessTest { // ------------------------------------------------------------------------ private static class FairnessVerifyingInputGate extends SingleInputGate { + private static final int BUFFER_SIZE = 32 * 1024; private static final SupplierWithException<BufferPool, IOException> STUB_BUFFER_POOL_FACTORY = NoOpBufferPool::new; @@ -351,7 +352,8 @@ public class InputGateFairnessTest { SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, STUB_BUFFER_POOL_FACTORY, null, - new UnpooledMemorySegmentProvider(32 * 1024)); + new UnpooledMemorySegmentProvider(BUFFER_SIZE), + BUFFER_SIZE); channelsWithData = getInputChannelsWithData(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java index 67e7adb..a356f11 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java @@ -68,6 +68,8 @@ public class ResultPartitionBuilder { private boolean blockingShuffleCompressionEnabled = false; + private boolean sslEnabled = false; + private String compressionCodec = "LZ4"; public ResultPartitionBuilder setResultPartitionIndex(int partitionIndex) { @@ -166,6 +168,11 @@ public class ResultPartitionBuilder { return this; } + public ResultPartitionBuilder setSSLEnabled(boolean sslEnabled) { + this.sslEnabled = sslEnabled; + return this; + } + public ResultPartition build() { ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory( partitionManager, @@ -179,7 +186,8 @@ public class ResultPartitionBuilder { compressionCodec, maxBuffersPerChannel, sortShuffleMinBuffers, - sortShuffleMinParallelism); + sortShuffleMinParallelism, + sslEnabled); SupplierWithException<BufferPool, IOException> factory = bufferPoolFactory.orElseGet(() -> resultPartitionFactory.createBufferPoolFactory(numberOfSubpartitions, partitionType)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java index 40297f6..4467d66 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java @@ -125,7 +125,8 @@ public class ResultPartitionFactoryTest extends TestLogger { "LZ4", Integer.MAX_VALUE, 10, - sortShuffleMinParallelism); + sortShuffleMinParallelism, + false); final ResultPartitionDeploymentDescriptor descriptor = new ResultPartitionDeploymentDescriptor( PartitionDescriptorBuilder diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java index 8179e64..051ef56 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java @@ -45,6 +45,8 @@ public class SingleInputGateBuilder { private final IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID(); + private final int bufferSize = 4096; + private ResultPartitionType partitionType = ResultPartitionType.PIPELINED; private int consumedSubpartitionIndex = 0; @@ -147,7 +149,8 @@ public class SingleInputGateBuilder { partitionProducerStateProvider, bufferPoolFactory, bufferDecompressor, - segmentProvider); + segmentProvider, + bufferSize); if (channelFactory != null) { gate.setInputChannels(IntStream.range(0, numberOfChannels) .mapToObj(index -> channelFactory.apply(InputChannelBuilder.newBuilder().setChannelIndex(index), gate)) diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java index eae3e0e..9c058f2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.net.SSLUtilsTest; import org.apache.flink.testutils.serialization.types.ByteArrayType; import org.apache.flink.util.TestLogger; @@ -48,7 +49,11 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import static org.hamcrest.Matchers.is; @@ -65,6 +70,7 @@ import static org.junit.Assert.assertThat; * the first fetched buffer from {@link org.apache.flink.runtime.io.network.partition.FileChannelBoundedData} has not * been recycled while fetching the second buffer to trigger next read ahead, which breaks the above assumption. */ +@RunWith(Parameterized.class) public class FileBufferReaderITCase extends TestLogger { private static final int parallelism = 8; @@ -79,6 +85,14 @@ public class FileBufferReaderITCase extends TestLogger { private static final byte[] dataSource = new byte[recordSize]; + @Parameterized.Parameters(name = "SSL Enabled = {0}") + public static List<Boolean> paras() { + return Arrays.asList(true, false); + } + + @Parameterized.Parameter + public boolean sslEnabled; + @BeforeClass public static void setup() { for (int i = 0; i < dataSource.length; i++) { @@ -89,7 +103,12 @@ public class FileBufferReaderITCase extends TestLogger { @Test public void testSequentialReading() throws Exception { // setup - final Configuration configuration = new Configuration(); + final Configuration configuration; + if (sslEnabled) { + configuration = SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores("JDK"); + } else { + configuration = new Configuration(); + } configuration.setString(RestOptions.BIND_PORT, "0"); configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, "file"); configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse("1g"));
