This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2c8ee78f714af4995f618c768661f5b638cd3025 Author: Zhijiang <wangzhijiang...@aliyun.com> AuthorDate: Wed Jul 10 14:31:30 2019 +0800 [FLINK-13100][network] Fix the bug of throwing IOException while FileChannelBoundedData#nextBuffer The implementation of FileChannelBoundedData#nextBuffer assumes that there is always an available buffer, otherwise an IOException is thrown and it always assumes that pool of two buffers is enough (before using the 3rd buffer, first one was expected to be recycled already). But in the case of pending flush operation (when the socket channel is not writable while netty thread is calling writeAndFlush method), the first fetched buffer from FileChannelBoundedData has not been recycled while fetching the second buffer to trigger next read ahead, which breaks the above assumption. In order to fix this problem, we make read ahead is not always available for FileChannelBoundedData. If there are no available buffers to read the next data, we retrigger the read ahead while recycling buffer via ResultSubpartitionView#notifyDataAvailable. --- .../partition/BoundedBlockingSubpartition.java | 3 +- .../BoundedBlockingSubpartitionReader.java | 43 ++++- .../runtime/io/network/partition/BoundedData.java | 12 +- .../network/partition/FileChannelBoundedData.java | 36 ++-- .../FileChannelMemoryMappedBoundedData.java | 2 +- .../network/partition/MemoryMappedBoundedData.java | 2 +- ...oundedBlockingSubpartitionAvailabilityTest.java | 158 +++++++++++++++++ .../partition/BoundedBlockingSubpartitionTest.java | 38 +++- .../io/network/partition/BoundedDataTestBase.java | 2 +- ...Test.java => CountingAvailabilityListener.java} | 21 +-- .../partition/FileChannelBoundedDataTest.java | 172 ++++++++++++++++++ .../flink/test/runtime/FileBufferReaderITCase.java | 196 +++++++++++++++++++++ 12 files changed, 638 insertions(+), 47 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java index 7a74872..785e44c 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java @@ -198,9 +198,8 @@ final class BoundedBlockingSubpartition extends ResultSubpartition { availability.notifyDataAvailable(); - final BoundedData.Reader dataReader = data.createReader(); final BoundedBlockingSubpartitionReader reader = new BoundedBlockingSubpartitionReader( - this, dataReader, numDataBuffersWritten); + this, data, numDataBuffersWritten, availability); readers.add(reader); return reader; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java index f7536b9..63e5e22 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java @@ -37,6 +37,9 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView /** The result subpartition that we read. */ private final BoundedBlockingSubpartition parent; + /** The listener that is notified when there are available buffers for this subpartition view. */ + private final BufferAvailabilityListener availabilityListener; + /** The next buffer (look ahead). Null once the data is depleted or reader is disposed. */ @Nullable private Buffer nextBuffer; @@ -57,16 +60,20 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView */ BoundedBlockingSubpartitionReader( BoundedBlockingSubpartition parent, - BoundedData.Reader dataReader, - int numDataBuffers) throws IOException { - - checkArgument(numDataBuffers >= 0); + BoundedData data, + int numDataBuffers, + BufferAvailabilityListener availabilityListener) throws IOException { this.parent = checkNotNull(parent); - this.dataReader = checkNotNull(dataReader); - this.dataBufferBacklog = numDataBuffers; + checkNotNull(data); + this.dataReader = data.createReader(this); this.nextBuffer = dataReader.nextBuffer(); + + checkArgument(numDataBuffers >= 0); + this.dataBufferBacklog = numDataBuffers; + + this.availabilityListener = checkNotNull(availabilityListener); } @Nullable @@ -89,9 +96,31 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView return BufferAndBacklog.fromBufferAndLookahead(current, nextBuffer, dataBufferBacklog); } + /** + * This method is actually only meaningful for the {@link BoundedBlockingSubpartitionType#FILE}. + * + * <p>For the other types the {@link #nextBuffer} can not be ever set to null, so it is no need + * to notify available via this method. But the implementation is also compatible with other + * types even though called by mistake. + */ @Override public void notifyDataAvailable() { - throw new IllegalStateException("No data should become available on a blocking partition during consumption."); + if (nextBuffer == null) { + assert dataReader != null; + + try { + nextBuffer = dataReader.nextBuffer(); + } catch (IOException ex) { + // this exception wrapper is only for avoiding throwing IOException explicitly + // in relevant interface methods + throw new IllegalStateException("No data available while reading", ex); + } + + // next buffer is null indicates the end of partition + if (nextBuffer != null) { + availabilityListener.notifyDataAvailable(); + } + } } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java index 4d58cf8..a8681cc 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java @@ -34,7 +34,7 @@ import java.io.IOException; * through the {@link #writeBuffer(Buffer)} method. * The write phase is ended by calling {@link #finishWrite()}. * After the write phase is finished, the data can be read multiple times through readers created - * via {@link #createReader()}. + * via {@link #createReader(ResultSubpartitionView)}. * Finally, the BoundedData is dropped / deleted by calling {@link #close()}. * * <h2>Thread Safety and Concurrency</h2> @@ -60,7 +60,15 @@ interface BoundedData extends Closeable { * Gets a reader for the bounded data. Multiple readers may be created. * This call only succeeds once the write phase was finished via {@link #finishWrite()}. */ - BoundedData.Reader createReader() throws IOException; + BoundedData.Reader createReader(ResultSubpartitionView subpartitionView) throws IOException; + + /** + * Gets a reader for the bounded data. Multiple readers may be created. + * This call only succeeds once the write phase was finished via {@link #finishWrite()}. + */ + default BoundedData.Reader createReader() throws IOException { + return createReader(new NoOpResultSubpartitionView()); + } /** * Gets the number of bytes of all written data (including the metadata in the buffer headers). diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java index 50dca60..690ad7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java @@ -75,11 +75,11 @@ final class FileChannelBoundedData implements BoundedData { } @Override - public Reader createReader() throws IOException { + public Reader createReader(ResultSubpartitionView subpartitionView) throws IOException { checkState(!fileChannel.isOpen()); final FileChannel fc = FileChannel.open(filePath, StandardOpenOption.READ); - return new FileBufferReader(fc, memorySegmentSize); + return new FileBufferReader(fc, memorySegmentSize, subpartitionView); } @Override @@ -117,7 +117,12 @@ final class FileChannelBoundedData implements BoundedData { private final ArrayDeque<MemorySegment> buffers; - FileBufferReader(FileChannel fileChannel, int bufferSize) { + private final ResultSubpartitionView subpartitionView; + + /** The tag indicates whether we have read the end of this file. */ + private boolean isFinished; + + FileBufferReader(FileChannel fileChannel, int bufferSize, ResultSubpartitionView subpartitionView) { this.fileChannel = checkNotNull(fileChannel); this.headerBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer(); this.buffers = new ArrayDeque<>(NUM_BUFFERS); @@ -125,26 +130,25 @@ final class FileChannelBoundedData implements BoundedData { for (int i = 0; i < NUM_BUFFERS; i++) { buffers.addLast(MemorySegmentFactory.allocateUnpooledOffHeapMemory(bufferSize, null)); } + + this.subpartitionView = checkNotNull(subpartitionView); } @Nullable @Override public Buffer nextBuffer() throws IOException { final MemorySegment memory = buffers.pollFirst(); + if (memory == null) { + return null; + } - if (memory != null) { - final Buffer next = BufferReaderWriterUtil.readFromByteChannel(fileChannel, headerBuffer, memory, this); - if (next != null) { - return next; - } - else { - recycle(memory); - return null; - } + final Buffer next = BufferReaderWriterUtil.readFromByteChannel(fileChannel, headerBuffer, memory, this); + if (next == null) { + isFinished = true; + recycle(memory); } - throw new IOException("Bug in BoundedBlockingSubpartition with FILE data: " + - "Requesting new buffer before previous buffer returned."); + return next; } @Override @@ -155,6 +159,10 @@ final class FileChannelBoundedData implements BoundedData { @Override public void recycle(MemorySegment memorySegment) { buffers.addLast(memorySegment); + + if (!isFinished) { + subpartitionView.notifyDataAvailable(); + } } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java index 4a71fcd..f22efd0 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java @@ -123,7 +123,7 @@ final class FileChannelMemoryMappedBoundedData implements BoundedData { } @Override - public BoundedData.Reader createReader() { + public BoundedData.Reader createReader(ResultSubpartitionView ignored) { checkState(!fileChannel.isOpen()); final List<ByteBuffer> buffers = memoryMappedRegions.stream() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java index 502c64c..e8718f5 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java @@ -113,7 +113,7 @@ final class MemoryMappedBoundedData implements BoundedData { } @Override - public BufferSlicer createReader() { + public BufferSlicer createReader(ResultSubpartitionView ignored) { assert currentBuffer == null; final List<ByteBuffer> buffers = fullBuffers.stream() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java new file mode 100644 index 0000000..915cf43 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the availability handling of the BoundedBlockingSubpartitions with not constant + * availability. + */ +public class BoundedBlockingSubpartitionAvailabilityTest { + + @ClassRule + public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); + + private static final int BUFFER_SIZE = 32 * 1024; + + @Test + public void testInitiallyAvailable() throws Exception { + final ResultSubpartition subpartition = createPartitionWithData(10); + final CountingAvailabilityListener listener = new CountingAvailabilityListener(); + + // test + final ResultSubpartitionView subpartitionView = subpartition.createReadView(listener); + + // assert + assertEquals(1, listener.numNotifications); + + // cleanup + subpartitionView.releaseAllResources(); + subpartition.release(); + } + + @Test + public void testUnavailableWhenBuffersExhausted() throws Exception { + // setup + final BoundedBlockingSubpartition subpartition = createPartitionWithData(100_000); + final CountingAvailabilityListener listener = new CountingAvailabilityListener(); + final ResultSubpartitionView reader = subpartition.createReadView(listener); + + // test + final List<BufferAndBacklog> data = drainAvailableData(reader); + + // assert + assertFalse(reader.isAvailable()); + assertFalse(data.get(data.size() - 1).isMoreAvailable()); + + // cleanup + reader.releaseAllResources(); + subpartition.release(); + } + + @Test + public void testAvailabilityNotificationWhenBuffersReturn() throws Exception { + // setup + final ResultSubpartition subpartition = createPartitionWithData(100_000); + final CountingAvailabilityListener listener = new CountingAvailabilityListener(); + final ResultSubpartitionView reader = subpartition.createReadView(listener); + + // test + final List<ResultSubpartition.BufferAndBacklog> data = drainAvailableData(reader); + data.get(0).buffer().recycleBuffer(); + data.get(1).buffer().recycleBuffer(); + + // assert + assertTrue(reader.isAvailable()); + assertEquals(2, listener.numNotifications); // one initial, one for new availability + + // cleanup + reader.releaseAllResources(); + subpartition.release(); + } + + @Test + public void testNotAvailableWhenEmpty() throws Exception { + // setup + final ResultSubpartition subpartition = createPartitionWithData(100_000); + final ResultSubpartitionView reader = subpartition.createReadView(new NoOpBufferAvailablityListener()); + + // test + drainAllData(reader); + + // assert + assertFalse(reader.isAvailable()); + + // cleanup + reader.releaseAllResources(); + subpartition.release(); + } + + // ------------------------------------------------------------------------ + + private static BoundedBlockingSubpartition createPartitionWithData(int numberOfBuffers) throws IOException { + ResultPartition parent = PartitionTestUtils.createPartition(); + + BoundedBlockingSubpartition partition = BoundedBlockingSubpartition.createWithFileChannel( + 0, parent, new File(TMP_FOLDER.newFolder(), "data"), BUFFER_SIZE); + + writeBuffers(partition, numberOfBuffers); + partition.finish(); + + return partition; + } + + private static void writeBuffers(ResultSubpartition partition, int numberOfBuffers) throws IOException { + for (int i = 0; i < numberOfBuffers; i++) { + partition.add(BufferBuilderTestUtils.createFilledBufferConsumer(BUFFER_SIZE, BUFFER_SIZE)); + } + } + + private static List<BufferAndBacklog> drainAvailableData(ResultSubpartitionView reader) throws Exception { + final ArrayList<BufferAndBacklog> list = new ArrayList<>(); + + BufferAndBacklog bab; + while ((bab = reader.getNextBuffer()) != null) { + list.add(bab); + } + + return list; + } + + private static void drainAllData(ResultSubpartitionView reader) throws Exception { + BufferAndBacklog bab; + while ((bab = reader.getNextBuffer()) != null) { + bab.buffer().recycleBuffer(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java index 9bd0c4b..ce4083f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java @@ -35,6 +35,7 @@ import java.io.File; import java.io.IOException; import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition; +import static org.apache.flink.util.Preconditions.checkNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -80,10 +81,11 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase { } @Test - public void testClosingClosesBoundedData() throws Exception { + public void testCloseBoundedData() throws Exception { final TestingBoundedDataReader reader = new TestingBoundedDataReader(); + final TestingBoundedData data = new TestingBoundedData(reader); final BoundedBlockingSubpartitionReader bbspr = new BoundedBlockingSubpartitionReader( - (BoundedBlockingSubpartition) createSubpartition(), reader, 10); + (BoundedBlockingSubpartition) createSubpartition(), data, 10, new NoOpBufferAvailablityListener()); bbspr.releaseAllResources(); @@ -124,7 +126,7 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase { } @Override - public Reader createReader() throws IOException { + public Reader createReader(ResultSubpartitionView subpartitionView) throws IOException { throw new UnsupportedOperationException(); } @@ -137,6 +139,36 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase { public void close() {} } + private static class TestingBoundedData implements BoundedData { + + private BoundedData.Reader reader; + + private TestingBoundedData(BoundedData.Reader reader) { + this.reader = checkNotNull(reader); + } + + @Override + public void writeBuffer(Buffer buffer) throws IOException { + } + + @Override + public void finishWrite() throws IOException { + } + + @Override + public Reader createReader(ResultSubpartitionView ignored) throws IOException { + return reader; + } + + @Override + public long getSize() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() {} + } + private static class TestingBoundedDataReader implements BoundedData.Reader { boolean closed; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java index c71b9df..365e93e 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java @@ -59,7 +59,7 @@ public abstract class BoundedDataTestBase { protected abstract BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) throws IOException; - private BoundedData createBoundedData() throws IOException { + protected BoundedData createBoundedData() throws IOException { return createBoundedData(createTempPath()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/CountingAvailabilityListener.java old mode 100755 new mode 100644 similarity index 59% copy from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java copy to flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/CountingAvailabilityListener.java index cc499f4..4e27ee0 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/CountingAvailabilityListener.java @@ -18,26 +18,15 @@ package org.apache.flink.runtime.io.network.partition; -import java.io.IOException; -import java.nio.file.Path; - /** - * Tests that read the BoundedBlockingSubpartition with multiple threads in parallel. + * A simple BufferAvailabilityListener that counts the number of notifications. */ -public class FileChannelBoundedDataTest extends BoundedDataTestBase { - - @Override - protected boolean isRegionBased() { - return false; - } +final class CountingAvailabilityListener implements BufferAvailabilityListener { - @Override - protected BoundedData createBoundedData(Path tempFilePath) throws IOException { - return FileChannelBoundedData.create(tempFilePath, BUFFER_SIZE); - } + int numNotifications; @Override - protected BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) throws IOException { - throw new UnsupportedOperationException(); + public void notifyDataAvailable() { + numNotifications++; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java index cc499f4..1ca2bc8 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java @@ -18,14 +18,45 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.io.disk.FileChannelManager; +import org.apache.flink.runtime.io.disk.FileChannelManagerImpl; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; +import org.apache.flink.runtime.util.EnvironmentInformation; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + import java.io.IOException; import java.nio.file.Path; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSomeBuffer; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + /** * Tests that read the BoundedBlockingSubpartition with multiple threads in parallel. */ public class FileChannelBoundedDataTest extends BoundedDataTestBase { + private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory(); + + private static FileChannelManager fileChannelManager; + + @BeforeClass + public static void setUp() { + fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing"); + } + + @AfterClass + public static void shutdown() throws Exception { + fileChannelManager.close(); + } + @Override protected boolean isRegionBased() { return false; @@ -40,4 +71,145 @@ public class FileChannelBoundedDataTest extends BoundedDataTestBase { protected BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) throws IOException { throw new UnsupportedOperationException(); } + + @Test + public void testReadNextBuffer() throws Exception { + final int numberOfBuffers = 3; + try (final BoundedData data = createBoundedData()) { + writeBuffers(data, numberOfBuffers); + + final BoundedData.Reader reader = data.createReader(); + final Buffer buffer1 = reader.nextBuffer(); + final Buffer buffer2 = reader.nextBuffer(); + + assertNotNull(buffer1); + assertNotNull(buffer2); + // there are only two available memory segments for reading data + assertNull(reader.nextBuffer()); + + // cleanup + buffer1.recycleBuffer(); + buffer2.recycleBuffer(); + } + } + + @Test + public void testRecycleBufferForNotifyingSubpartitionView() throws Exception { + final int numberOfBuffers = 2; + try (final BoundedData data = createBoundedData()) { + writeBuffers(data, numberOfBuffers); + + final VerifyNotificationResultSubpartitionView subpartitionView = new VerifyNotificationResultSubpartitionView(); + final BoundedData.Reader reader = data.createReader(subpartitionView); + final Buffer buffer1 = reader.nextBuffer(); + final Buffer buffer2 = reader.nextBuffer(); + assertNotNull(buffer1); + assertNotNull(buffer2); + + assertFalse(subpartitionView.isAvailable); + buffer1.recycleBuffer(); + // the view is notified while recycling buffer if reader has not tagged finished + assertTrue(subpartitionView.isAvailable); + + subpartitionView.resetAvailable(); + assertFalse(subpartitionView.isAvailable); + + // the next buffer is null to make reader tag finished + assertNull(reader.nextBuffer()); + + buffer2.recycleBuffer(); + // the view is not notified while recycling buffer if reader already finished + assertFalse(subpartitionView.isAvailable); + } + } + + @Test + public void testRecycleBufferForNotifyingBufferAvailabilityListener() throws Exception { + final ResultSubpartition subpartition = createFileBoundedBlockingSubpartition(); + final int numberOfBuffers = 2; + writeBuffers(subpartition, numberOfBuffers); + + final VerifyNotificationBufferAvailabilityListener listener = new VerifyNotificationBufferAvailabilityListener(); + final ResultSubpartitionView subpartitionView = subpartition.createReadView(listener); + // the notification is triggered while creating view + assertTrue(listener.isAvailable); + + listener.resetAvailable(); + assertFalse(listener.isAvailable); + + final BufferAndBacklog buffer1 = subpartitionView.getNextBuffer(); + final BufferAndBacklog buffer2 = subpartitionView.getNextBuffer(); + assertNotNull(buffer1); + assertNotNull(buffer2); + + // the next buffer is null in view because FileBufferReader has no available buffers for reading ahead + assertFalse(subpartitionView.isAvailable()); + // recycle a buffer to trigger notification of data available + buffer1.buffer().recycleBuffer(); + assertTrue(listener.isAvailable); + + // cleanup + buffer2.buffer().recycleBuffer(); + subpartitionView.releaseAllResources(); + subpartition.release(); + } + + private static ResultSubpartition createFileBoundedBlockingSubpartition() { + final ResultPartition resultPartition = new ResultPartitionBuilder() + .setNetworkBufferSize(BUFFER_SIZE) + .setResultPartitionType(ResultPartitionType.BLOCKING) + .setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE) + .setFileChannelManager(fileChannelManager) + .build(); + return resultPartition.subpartitions[0]; + } + + private static void writeBuffers(BoundedData data, int numberOfBuffers) throws IOException { + for (int i = 0; i < numberOfBuffers; i++) { + data.writeBuffer(buildSomeBuffer(BUFFER_SIZE)); + } + data.finishWrite(); + } + + private static void writeBuffers(ResultSubpartition subpartition, int numberOfBuffers) throws IOException { + for (int i = 0; i < numberOfBuffers; i++) { + subpartition.add(createFilledBufferConsumer(BUFFER_SIZE, BUFFER_SIZE)); + } + subpartition.finish(); + } + + /** + * This subpartition view is used for verifying the {@link ResultSubpartitionView#notifyDataAvailable()} + * was ever called before. + */ + private static class VerifyNotificationResultSubpartitionView extends NoOpResultSubpartitionView { + + private boolean isAvailable; + + @Override + public void notifyDataAvailable() { + isAvailable = true; + } + + private void resetAvailable() { + isAvailable = false; + } + } + + /** + * This listener is used for verifying the notification logic in {@link ResultSubpartitionView#notifyDataAvailable()}. + */ + private static class VerifyNotificationBufferAvailabilityListener implements BufferAvailabilityListener { + + private boolean isAvailable; + + @Override + public void notifyDataAvailable() { + isAvailable = true; + } + + private void resetAvailable() { + isAvailable = false; + } + } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java new file mode 100644 index 0000000..49d9ef5 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.runtime; + +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.reader.RecordReader; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder; +import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.testutils.serialization.types.ByteArrayType; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise; + +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests the bug reported in FLINK-131O0. + * + * <p>The implementation of {@link org.apache.flink.runtime.io.network.partition.BoundedData.Reader#nextBuffer()} + * for {@link BoundedBlockingSubpartitionType#FILE} assumes that there is always an available buffer, otherwise + * an IOException is thrown and it always assumes that pool of two buffers is enough (before using the 3rd buffer, + * first one was expected to be recycled already). But in the case of pending flush operation (when the socket channel + * is not writable while netty thread is calling {@link ChannelHandlerContext#writeAndFlush(Object, ChannelPromise)}), + * the first fetched buffer from {@link org.apache.flink.runtime.io.network.partition.FileChannelBoundedData} has not + * been recycled while fetching the second buffer to trigger next read ahead, which breaks the above assumption. + */ +public class FileBufferReaderITCase extends TestLogger { + + private static final int parallelism = 8; + + private static final int numRecords = 100_000; + + private static final byte[] dataSource = new byte[1024]; + + @BeforeClass + public static void setup() { + for (int i = 0; i < dataSource.length; i++) { + dataSource[i] = 0; + } + } + + @Test + public void testSequentialReading() throws Exception { + // setup + final Configuration configuration = new Configuration(); + configuration.setString(RestOptions.BIND_PORT, "0"); + configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BOUNDED_BLOCKING_SUBPARTITION_TYPE, "file"); + + final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumTaskManagers(parallelism) + .setNumSlotsPerTaskManager(1) + .build(); + + try (final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration)) { + miniCluster.start(); + + final MiniClusterClient client = new MiniClusterClient(configuration, miniCluster); + final JobGraph jobGraph = createJobGraph(); + final CompletableFuture<JobSubmissionResult> submitFuture = client.submitJob(jobGraph); + // wait for the submission to succeed + final JobSubmissionResult result = submitFuture.get(); + + final CompletableFuture<JobResult> resultFuture = client.requestJobResult(result.getJobID()); + final JobResult jobResult = resultFuture.get(); + + assertThat(jobResult.getSerializedThrowable().isPresent(), is(false)); + } + } + + private static JobGraph createJobGraph() { + final SlotSharingGroup group1 = new SlotSharingGroup(); + final SlotSharingGroup group2 = new SlotSharingGroup(); + + final JobVertex source = new JobVertex("source"); + source.setInvokableClass(TestSourceInvokable.class); + source.setParallelism(parallelism); + source.setSlotSharingGroup(group1); + + final JobVertex sink = new JobVertex("sink"); + sink.setInvokableClass(TestSinkInvokable.class); + sink.setParallelism(parallelism); + sink.setSlotSharingGroup(group2); + + sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + final JobGraph jobGraph = new JobGraph(source, sink); + jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES); + + return jobGraph; + } + + /** + * Basic source {@link AbstractInvokable} which sends the elements to the + * {@link TestSinkInvokable}. + */ + public static final class TestSourceInvokable extends AbstractInvokable { + + /** + * Create an Invokable task and set its environment. + * + * @param environment The environment assigned to this invokable. + */ + public TestSourceInvokable(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + final RecordWriter<ByteArrayType> writer = new RecordWriterBuilder().build(getEnvironment().getWriter(0)); + + final ByteArrayType bytes = new ByteArrayType(dataSource); + int counter = 0; + while (counter++ < numRecords) { + try { + writer.emit(bytes); + writer.flushAll(); + } finally { + writer.clearBuffers(); + } + } + } + } + + /** + * Basic sink {@link AbstractInvokable} which verifies the sent elements + * from the {@link TestSourceInvokable}. + */ + public static final class TestSinkInvokable extends AbstractInvokable { + + private int numReceived = 0; + + /** + * Create an Invokable task and set its environment. + * + * @param environment The environment assigned to this invokable. + */ + public TestSinkInvokable(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + final RecordReader<ByteArrayType> reader = new RecordReader<>( + getEnvironment().getInputGate(0), + ByteArrayType.class, + getEnvironment().getTaskManagerInfo().getTmpDirectories()); + + while (reader.hasNext()) { + reader.next(); + numReceived++; + } + + assertThat(numReceived, is(numRecords)); + } + } +}