This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2c8ee78f714af4995f618c768661f5b638cd3025
Author: Zhijiang <wangzhijiang...@aliyun.com>
AuthorDate: Wed Jul 10 14:31:30 2019 +0800

    [FLINK-13100][network] Fix the bug of throwing IOException while 
FileChannelBoundedData#nextBuffer
    
    The implementation of FileChannelBoundedData#nextBuffer assumes that there 
is always an available buffer, otherwise an IOException is thrown
    and it always assumes that pool of two buffers is enough (before using the 
3rd buffer, first one was expected to be recycled already). But in
    the case of pending flush operation (when the socket channel is not 
writable while netty thread is calling writeAndFlush method), the first
    fetched buffer from FileChannelBoundedData has not been recycled while 
fetching the second buffer to trigger next read ahead, which breaks the
    above assumption.
    
    In order to fix this problem, we make read ahead is not always available 
for FileChannelBoundedData. If there are no available buffers to read
    the next data, we retrigger the read ahead while recycling buffer via 
ResultSubpartitionView#notifyDataAvailable.
---
 .../partition/BoundedBlockingSubpartition.java     |   3 +-
 .../BoundedBlockingSubpartitionReader.java         |  43 ++++-
 .../runtime/io/network/partition/BoundedData.java  |  12 +-
 .../network/partition/FileChannelBoundedData.java  |  36 ++--
 .../FileChannelMemoryMappedBoundedData.java        |   2 +-
 .../network/partition/MemoryMappedBoundedData.java |   2 +-
 ...oundedBlockingSubpartitionAvailabilityTest.java | 158 +++++++++++++++++
 .../partition/BoundedBlockingSubpartitionTest.java |  38 +++-
 .../io/network/partition/BoundedDataTestBase.java  |   2 +-
 ...Test.java => CountingAvailabilityListener.java} |  21 +--
 .../partition/FileChannelBoundedDataTest.java      | 172 ++++++++++++++++++
 .../flink/test/runtime/FileBufferReaderITCase.java | 196 +++++++++++++++++++++
 12 files changed, 638 insertions(+), 47 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
index 7a74872..785e44c 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -198,9 +198,8 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
 
                        availability.notifyDataAvailable();
 
-                       final BoundedData.Reader dataReader = 
data.createReader();
                        final BoundedBlockingSubpartitionReader reader = new 
BoundedBlockingSubpartitionReader(
-                                       this, dataReader, 
numDataBuffersWritten);
+                                       this, data, numDataBuffersWritten, 
availability);
                        readers.add(reader);
                        return reader;
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
index f7536b9..63e5e22 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
@@ -37,6 +37,9 @@ final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView
        /** The result subpartition that we read. */
        private final BoundedBlockingSubpartition parent;
 
+       /** The listener that is notified when there are available buffers for 
this subpartition view. */
+       private final BufferAvailabilityListener availabilityListener;
+
        /** The next buffer (look ahead). Null once the data is depleted or 
reader is disposed. */
        @Nullable
        private Buffer nextBuffer;
@@ -57,16 +60,20 @@ final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView
         */
        BoundedBlockingSubpartitionReader(
                        BoundedBlockingSubpartition parent,
-                       BoundedData.Reader dataReader,
-                       int numDataBuffers) throws IOException {
-
-               checkArgument(numDataBuffers >= 0);
+                       BoundedData data,
+                       int numDataBuffers,
+                       BufferAvailabilityListener availabilityListener) throws 
IOException {
 
                this.parent = checkNotNull(parent);
-               this.dataReader = checkNotNull(dataReader);
-               this.dataBufferBacklog = numDataBuffers;
 
+               checkNotNull(data);
+               this.dataReader = data.createReader(this);
                this.nextBuffer = dataReader.nextBuffer();
+
+               checkArgument(numDataBuffers >= 0);
+               this.dataBufferBacklog = numDataBuffers;
+
+               this.availabilityListener = checkNotNull(availabilityListener);
        }
 
        @Nullable
@@ -89,9 +96,31 @@ final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView
                return BufferAndBacklog.fromBufferAndLookahead(current, 
nextBuffer, dataBufferBacklog);
        }
 
+       /**
+        * This method is actually only meaningful for the {@link 
BoundedBlockingSubpartitionType#FILE}.
+        *
+        * <p>For the other types the {@link #nextBuffer} can not be ever set 
to null, so it is no need
+        * to notify available via this method. But the implementation is also 
compatible with other
+        * types even though called by mistake.
+        */
        @Override
        public void notifyDataAvailable() {
-               throw new IllegalStateException("No data should become 
available on a blocking partition during consumption.");
+               if (nextBuffer == null) {
+                       assert dataReader != null;
+
+                       try {
+                               nextBuffer = dataReader.nextBuffer();
+                       } catch (IOException ex) {
+                               // this exception wrapper is only for avoiding 
throwing IOException explicitly
+                               // in relevant interface methods
+                               throw new IllegalStateException("No data 
available while reading", ex);
+                       }
+
+                       // next buffer is null indicates the end of partition
+                       if (nextBuffer != null) {
+                               availabilityListener.notifyDataAvailable();
+                       }
+               }
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java
index 4d58cf8..a8681cc 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java
@@ -34,7 +34,7 @@ import java.io.IOException;
  * through the {@link #writeBuffer(Buffer)} method.
  * The write phase is ended by calling {@link #finishWrite()}.
  * After the write phase is finished, the data can be read multiple times 
through readers created
- * via {@link #createReader()}.
+ * via {@link #createReader(ResultSubpartitionView)}.
  * Finally, the BoundedData is dropped / deleted by calling {@link #close()}.
  *
  * <h2>Thread Safety and Concurrency</h2>
@@ -60,7 +60,15 @@ interface BoundedData extends Closeable {
         * Gets a reader for the bounded data. Multiple readers may be created.
         * This call only succeeds once the write phase was finished via {@link 
#finishWrite()}.
         */
-       BoundedData.Reader createReader() throws IOException;
+       BoundedData.Reader createReader(ResultSubpartitionView 
subpartitionView) throws IOException;
+
+       /**
+        * Gets a reader for the bounded data. Multiple readers may be created.
+        * This call only succeeds once the write phase was finished via {@link 
#finishWrite()}.
+        */
+       default BoundedData.Reader createReader() throws IOException {
+               return createReader(new NoOpResultSubpartitionView());
+       }
 
        /**
         * Gets the number of bytes of all written data (including the metadata 
in the buffer headers).
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
index 50dca60..690ad7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
@@ -75,11 +75,11 @@ final class FileChannelBoundedData implements BoundedData {
        }
 
        @Override
-       public Reader createReader() throws IOException {
+       public Reader createReader(ResultSubpartitionView subpartitionView) 
throws IOException {
                checkState(!fileChannel.isOpen());
 
                final FileChannel fc = FileChannel.open(filePath, 
StandardOpenOption.READ);
-               return new FileBufferReader(fc, memorySegmentSize);
+               return new FileBufferReader(fc, memorySegmentSize, 
subpartitionView);
        }
 
        @Override
@@ -117,7 +117,12 @@ final class FileChannelBoundedData implements BoundedData {
 
                private final ArrayDeque<MemorySegment> buffers;
 
-               FileBufferReader(FileChannel fileChannel, int bufferSize) {
+               private final ResultSubpartitionView subpartitionView;
+
+               /** The tag indicates whether we have read the end of this 
file. */
+               private boolean isFinished;
+
+               FileBufferReader(FileChannel fileChannel, int bufferSize, 
ResultSubpartitionView subpartitionView) {
                        this.fileChannel = checkNotNull(fileChannel);
                        this.headerBuffer = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
                        this.buffers = new ArrayDeque<>(NUM_BUFFERS);
@@ -125,26 +130,25 @@ final class FileChannelBoundedData implements BoundedData 
{
                        for (int i = 0; i < NUM_BUFFERS; i++) {
                                
buffers.addLast(MemorySegmentFactory.allocateUnpooledOffHeapMemory(bufferSize, 
null));
                        }
+
+                       this.subpartitionView = checkNotNull(subpartitionView);
                }
 
                @Nullable
                @Override
                public Buffer nextBuffer() throws IOException {
                        final MemorySegment memory = buffers.pollFirst();
+                       if (memory == null) {
+                               return null;
+                       }
 
-                       if (memory != null) {
-                               final Buffer next = 
BufferReaderWriterUtil.readFromByteChannel(fileChannel, headerBuffer, memory, 
this);
-                               if (next != null) {
-                                       return next;
-                               }
-                               else {
-                                       recycle(memory);
-                                       return null;
-                               }
+                       final Buffer next = 
BufferReaderWriterUtil.readFromByteChannel(fileChannel, headerBuffer, memory, 
this);
+                       if (next == null) {
+                               isFinished = true;
+                               recycle(memory);
                        }
 
-                       throw new IOException("Bug in 
BoundedBlockingSubpartition with FILE data: " +
-                                       "Requesting new buffer before previous 
buffer returned.");
+                       return next;
                }
 
                @Override
@@ -155,6 +159,10 @@ final class FileChannelBoundedData implements BoundedData {
                @Override
                public void recycle(MemorySegment memorySegment) {
                        buffers.addLast(memorySegment);
+
+                       if (!isFinished) {
+                               subpartitionView.notifyDataAvailable();
+                       }
                }
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
index 4a71fcd..f22efd0 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
@@ -123,7 +123,7 @@ final class FileChannelMemoryMappedBoundedData implements 
BoundedData {
        }
 
        @Override
-       public BoundedData.Reader createReader() {
+       public BoundedData.Reader createReader(ResultSubpartitionView ignored) {
                checkState(!fileChannel.isOpen());
 
                final List<ByteBuffer> buffers = memoryMappedRegions.stream()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java
index 502c64c..e8718f5 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java
@@ -113,7 +113,7 @@ final class MemoryMappedBoundedData implements BoundedData {
        }
 
        @Override
-       public BufferSlicer createReader() {
+       public BufferSlicer createReader(ResultSubpartitionView ignored) {
                assert currentBuffer == null;
 
                final List<ByteBuffer> buffers = fullBuffers.stream()
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
new file mode 100644
index 0000000..915cf43
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the availability handling of the BoundedBlockingSubpartitions 
with not constant
+ * availability.
+ */
+public class BoundedBlockingSubpartitionAvailabilityTest {
+
+       @ClassRule
+       public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+       private static final int BUFFER_SIZE = 32 * 1024;
+
+       @Test
+       public void testInitiallyAvailable() throws Exception {
+               final ResultSubpartition subpartition = 
createPartitionWithData(10);
+               final CountingAvailabilityListener listener = new 
CountingAvailabilityListener();
+
+               // test
+               final ResultSubpartitionView subpartitionView = 
subpartition.createReadView(listener);
+
+               // assert
+               assertEquals(1, listener.numNotifications);
+
+               // cleanup
+               subpartitionView.releaseAllResources();
+               subpartition.release();
+       }
+
+       @Test
+       public void testUnavailableWhenBuffersExhausted() throws Exception {
+               // setup
+               final BoundedBlockingSubpartition subpartition = 
createPartitionWithData(100_000);
+               final CountingAvailabilityListener listener = new 
CountingAvailabilityListener();
+               final ResultSubpartitionView reader = 
subpartition.createReadView(listener);
+
+               // test
+               final List<BufferAndBacklog> data = drainAvailableData(reader);
+
+               // assert
+               assertFalse(reader.isAvailable());
+               assertFalse(data.get(data.size() - 1).isMoreAvailable());
+
+               // cleanup
+               reader.releaseAllResources();
+               subpartition.release();
+       }
+
+       @Test
+       public void testAvailabilityNotificationWhenBuffersReturn() throws 
Exception {
+               // setup
+               final ResultSubpartition subpartition = 
createPartitionWithData(100_000);
+               final CountingAvailabilityListener listener = new 
CountingAvailabilityListener();
+               final ResultSubpartitionView reader = 
subpartition.createReadView(listener);
+
+               // test
+               final List<ResultSubpartition.BufferAndBacklog> data = 
drainAvailableData(reader);
+               data.get(0).buffer().recycleBuffer();
+               data.get(1).buffer().recycleBuffer();
+
+               // assert
+               assertTrue(reader.isAvailable());
+               assertEquals(2, listener.numNotifications); // one initial, one 
for new availability
+
+               // cleanup
+               reader.releaseAllResources();
+               subpartition.release();
+       }
+
+       @Test
+       public void testNotAvailableWhenEmpty() throws Exception {
+               // setup
+               final ResultSubpartition subpartition = 
createPartitionWithData(100_000);
+               final ResultSubpartitionView reader = 
subpartition.createReadView(new NoOpBufferAvailablityListener());
+
+               // test
+               drainAllData(reader);
+
+               // assert
+               assertFalse(reader.isAvailable());
+
+               // cleanup
+               reader.releaseAllResources();
+               subpartition.release();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static BoundedBlockingSubpartition createPartitionWithData(int 
numberOfBuffers) throws IOException {
+               ResultPartition parent = PartitionTestUtils.createPartition();
+
+               BoundedBlockingSubpartition partition = 
BoundedBlockingSubpartition.createWithFileChannel(
+                       0, parent, new File(TMP_FOLDER.newFolder(), "data"), 
BUFFER_SIZE);
+
+               writeBuffers(partition, numberOfBuffers);
+               partition.finish();
+
+               return partition;
+       }
+
+       private static void writeBuffers(ResultSubpartition partition, int 
numberOfBuffers) throws IOException {
+               for (int i = 0; i < numberOfBuffers; i++) {
+                       
partition.add(BufferBuilderTestUtils.createFilledBufferConsumer(BUFFER_SIZE, 
BUFFER_SIZE));
+               }
+       }
+
+       private static List<BufferAndBacklog> 
drainAvailableData(ResultSubpartitionView reader) throws Exception {
+               final ArrayList<BufferAndBacklog> list = new ArrayList<>();
+
+               BufferAndBacklog bab;
+               while ((bab = reader.getNextBuffer()) != null) {
+                       list.add(bab);
+               }
+
+               return list;
+       }
+
+       private static void drainAllData(ResultSubpartitionView reader) throws 
Exception {
+               BufferAndBacklog bab;
+               while ((bab = reader.getNextBuffer()) != null) {
+                       bab.buffer().recycleBuffer();
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
index 9bd0c4b..ce4083f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
@@ -35,6 +35,7 @@ import java.io.File;
 import java.io.IOException;
 
 import static 
org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -80,10 +81,11 @@ public class BoundedBlockingSubpartitionTest extends 
SubpartitionTestBase {
        }
 
        @Test
-       public void testClosingClosesBoundedData() throws Exception {
+       public void testCloseBoundedData() throws Exception {
                final TestingBoundedDataReader reader = new 
TestingBoundedDataReader();
+               final TestingBoundedData data = new TestingBoundedData(reader);
                final BoundedBlockingSubpartitionReader bbspr = new 
BoundedBlockingSubpartitionReader(
-                               (BoundedBlockingSubpartition) 
createSubpartition(), reader, 10);
+                               (BoundedBlockingSubpartition) 
createSubpartition(), data, 10, new NoOpBufferAvailablityListener());
 
                bbspr.releaseAllResources();
 
@@ -124,7 +126,7 @@ public class BoundedBlockingSubpartitionTest extends 
SubpartitionTestBase {
                }
 
                @Override
-               public Reader createReader() throws IOException {
+               public Reader createReader(ResultSubpartitionView 
subpartitionView) throws IOException {
                        throw new UnsupportedOperationException();
                }
 
@@ -137,6 +139,36 @@ public class BoundedBlockingSubpartitionTest extends 
SubpartitionTestBase {
                public void close() {}
        }
 
+       private static class TestingBoundedData implements BoundedData {
+
+               private BoundedData.Reader reader;
+
+               private TestingBoundedData(BoundedData.Reader reader) {
+                       this.reader = checkNotNull(reader);
+               }
+
+               @Override
+               public void writeBuffer(Buffer buffer) throws IOException {
+               }
+
+               @Override
+               public void finishWrite() throws IOException {
+               }
+
+               @Override
+               public Reader createReader(ResultSubpartitionView ignored) 
throws IOException {
+                       return reader;
+               }
+
+               @Override
+               public long getSize() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void close() {}
+       }
+
        private static class TestingBoundedDataReader implements 
BoundedData.Reader {
 
                boolean closed;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java
index c71b9df..365e93e 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java
@@ -59,7 +59,7 @@ public abstract class BoundedDataTestBase {
 
        protected abstract BoundedData createBoundedDataWithRegion(Path 
tempFilePath, int regionSize) throws IOException;
 
-       private BoundedData createBoundedData() throws IOException {
+       protected BoundedData createBoundedData() throws IOException {
                return createBoundedData(createTempPath());
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/CountingAvailabilityListener.java
old mode 100755
new mode 100644
similarity index 59%
copy from 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
copy to 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/CountingAvailabilityListener.java
index cc499f4..4e27ee0
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/CountingAvailabilityListener.java
@@ -18,26 +18,15 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import java.io.IOException;
-import java.nio.file.Path;
-
 /**
- * Tests that read the BoundedBlockingSubpartition with multiple threads in 
parallel.
+ * A simple BufferAvailabilityListener that counts the number of notifications.
  */
-public class FileChannelBoundedDataTest extends BoundedDataTestBase {
-
-       @Override
-       protected boolean isRegionBased() {
-               return false;
-       }
+final class CountingAvailabilityListener implements BufferAvailabilityListener 
{
 
-       @Override
-       protected BoundedData createBoundedData(Path tempFilePath) throws 
IOException {
-               return FileChannelBoundedData.create(tempFilePath, BUFFER_SIZE);
-       }
+       int numNotifications;
 
        @Override
-       protected BoundedData createBoundedDataWithRegion(Path tempFilePath, 
int regionSize) throws IOException {
-               throw new UnsupportedOperationException();
+       public void notifyDataAvailable() {
+               numNotifications++;
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
index cc499f4..1ca2bc8 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
@@ -18,14 +18,45 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
 import java.io.IOException;
 import java.nio.file.Path;
 
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSomeBuffer;
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Tests that read the BoundedBlockingSubpartition with multiple threads in 
parallel.
  */
 public class FileChannelBoundedDataTest extends BoundedDataTestBase {
 
+       private static final String tempDir = 
EnvironmentInformation.getTemporaryFileDirectory();
+
+       private static FileChannelManager fileChannelManager;
+
+       @BeforeClass
+       public static void setUp() {
+               fileChannelManager = new FileChannelManagerImpl(new String[] 
{tempDir}, "testing");
+       }
+
+       @AfterClass
+       public static void shutdown() throws Exception {
+               fileChannelManager.close();
+       }
+
        @Override
        protected boolean isRegionBased() {
                return false;
@@ -40,4 +71,145 @@ public class FileChannelBoundedDataTest extends 
BoundedDataTestBase {
        protected BoundedData createBoundedDataWithRegion(Path tempFilePath, 
int regionSize) throws IOException {
                throw new UnsupportedOperationException();
        }
+
+       @Test
+       public void testReadNextBuffer() throws Exception {
+               final int numberOfBuffers = 3;
+               try (final BoundedData data = createBoundedData()) {
+                       writeBuffers(data, numberOfBuffers);
+
+                       final BoundedData.Reader reader = data.createReader();
+                       final Buffer buffer1 = reader.nextBuffer();
+                       final Buffer buffer2 = reader.nextBuffer();
+
+                       assertNotNull(buffer1);
+                       assertNotNull(buffer2);
+                       // there are only two available memory segments for 
reading data
+                       assertNull(reader.nextBuffer());
+
+                       // cleanup
+                       buffer1.recycleBuffer();
+                       buffer2.recycleBuffer();
+               }
+       }
+
+       @Test
+       public void testRecycleBufferForNotifyingSubpartitionView() throws 
Exception {
+               final int numberOfBuffers = 2;
+               try (final BoundedData data = createBoundedData()) {
+                       writeBuffers(data, numberOfBuffers);
+
+                       final VerifyNotificationResultSubpartitionView 
subpartitionView = new VerifyNotificationResultSubpartitionView();
+                       final BoundedData.Reader reader = 
data.createReader(subpartitionView);
+                       final Buffer buffer1 = reader.nextBuffer();
+                       final Buffer buffer2 = reader.nextBuffer();
+                       assertNotNull(buffer1);
+                       assertNotNull(buffer2);
+
+                       assertFalse(subpartitionView.isAvailable);
+                       buffer1.recycleBuffer();
+                       // the view is notified while recycling buffer if 
reader has not tagged finished
+                       assertTrue(subpartitionView.isAvailable);
+
+                       subpartitionView.resetAvailable();
+                       assertFalse(subpartitionView.isAvailable);
+
+                       // the next buffer is null to make reader tag finished
+                       assertNull(reader.nextBuffer());
+
+                       buffer2.recycleBuffer();
+                       // the view is not notified while recycling buffer if 
reader already finished
+                       assertFalse(subpartitionView.isAvailable);
+               }
+       }
+
+       @Test
+       public void testRecycleBufferForNotifyingBufferAvailabilityListener() 
throws Exception {
+               final ResultSubpartition subpartition = 
createFileBoundedBlockingSubpartition();
+               final int numberOfBuffers = 2;
+               writeBuffers(subpartition, numberOfBuffers);
+
+               final VerifyNotificationBufferAvailabilityListener listener = 
new VerifyNotificationBufferAvailabilityListener();
+               final ResultSubpartitionView subpartitionView = 
subpartition.createReadView(listener);
+               // the notification is triggered while creating view
+               assertTrue(listener.isAvailable);
+
+               listener.resetAvailable();
+               assertFalse(listener.isAvailable);
+
+               final BufferAndBacklog buffer1 = 
subpartitionView.getNextBuffer();
+               final BufferAndBacklog buffer2 = 
subpartitionView.getNextBuffer();
+               assertNotNull(buffer1);
+               assertNotNull(buffer2);
+
+               // the next buffer is null in view because FileBufferReader has 
no available buffers for reading ahead
+               assertFalse(subpartitionView.isAvailable());
+               // recycle a buffer to trigger notification of data available
+               buffer1.buffer().recycleBuffer();
+               assertTrue(listener.isAvailable);
+
+               // cleanup
+               buffer2.buffer().recycleBuffer();
+               subpartitionView.releaseAllResources();
+               subpartition.release();
+       }
+
+       private static ResultSubpartition 
createFileBoundedBlockingSubpartition() {
+               final ResultPartition resultPartition = new 
ResultPartitionBuilder()
+                       .setNetworkBufferSize(BUFFER_SIZE)
+                       .setResultPartitionType(ResultPartitionType.BLOCKING)
+                       
.setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE)
+                       .setFileChannelManager(fileChannelManager)
+                       .build();
+               return resultPartition.subpartitions[0];
+       }
+
+       private static void writeBuffers(BoundedData data, int numberOfBuffers) 
throws IOException {
+               for (int i = 0; i < numberOfBuffers; i++) {
+                       data.writeBuffer(buildSomeBuffer(BUFFER_SIZE));
+               }
+               data.finishWrite();
+       }
+
+       private static void writeBuffers(ResultSubpartition subpartition, int 
numberOfBuffers) throws IOException {
+               for (int i = 0; i < numberOfBuffers; i++) {
+                       
subpartition.add(createFilledBufferConsumer(BUFFER_SIZE, BUFFER_SIZE));
+               }
+               subpartition.finish();
+       }
+
+       /**
+        * This subpartition view is used for verifying the {@link 
ResultSubpartitionView#notifyDataAvailable()}
+        * was ever called before.
+        */
+       private static class VerifyNotificationResultSubpartitionView extends 
NoOpResultSubpartitionView {
+
+               private boolean isAvailable;
+
+               @Override
+               public void notifyDataAvailable() {
+                       isAvailable = true;
+               }
+
+               private void resetAvailable() {
+                       isAvailable = false;
+               }
+       }
+
+       /**
+        * This listener is used for verifying the notification logic in {@link 
ResultSubpartitionView#notifyDataAvailable()}.
+        */
+       private static class VerifyNotificationBufferAvailabilityListener 
implements BufferAvailabilityListener {
+
+               private boolean isAvailable;
+
+               @Override
+               public void notifyDataAvailable() {
+                       isAvailable = true;
+               }
+
+               private void resetAvailable() {
+                       isAvailable = false;
+               }
+       }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
new file mode 100644
index 0000000..49d9ef5
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
+import 
org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.testutils.serialization.types.ByteArrayType;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests the bug reported in FLINK-131O0.
+ *
+ * <p>The implementation of {@link 
org.apache.flink.runtime.io.network.partition.BoundedData.Reader#nextBuffer()}
+ * for {@link BoundedBlockingSubpartitionType#FILE} assumes that there is 
always an available buffer, otherwise
+ * an IOException is thrown and it always assumes that pool of two buffers is 
enough (before using the 3rd buffer,
+ * first one was expected to be recycled already). But in the case of pending 
flush operation (when the socket channel
+ * is not writable while netty thread is calling {@link 
ChannelHandlerContext#writeAndFlush(Object, ChannelPromise)}),
+ * the first fetched buffer from {@link 
org.apache.flink.runtime.io.network.partition.FileChannelBoundedData} has not
+ * been recycled while fetching the second buffer to trigger next read ahead, 
which breaks the above assumption.
+ */
+public class FileBufferReaderITCase extends TestLogger {
+
+       private static final int parallelism = 8;
+
+       private static final int numRecords = 100_000;
+
+       private static final byte[] dataSource = new byte[1024];
+
+       @BeforeClass
+       public static void setup() {
+               for (int i = 0; i < dataSource.length; i++) {
+                       dataSource[i] = 0;
+               }
+       }
+
+       @Test
+       public void testSequentialReading() throws Exception {
+               // setup
+               final Configuration configuration = new Configuration();
+               configuration.setString(RestOptions.BIND_PORT, "0");
+               
configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BOUNDED_BLOCKING_SUBPARTITION_TYPE,
 "file");
+
+               final MiniClusterConfiguration miniClusterConfiguration = new 
MiniClusterConfiguration.Builder()
+                       .setConfiguration(configuration)
+                       .setNumTaskManagers(parallelism)
+                       .setNumSlotsPerTaskManager(1)
+                       .build();
+
+               try (final MiniCluster miniCluster = new 
MiniCluster(miniClusterConfiguration)) {
+                       miniCluster.start();
+
+                       final MiniClusterClient client = new 
MiniClusterClient(configuration, miniCluster);
+                       final JobGraph jobGraph = createJobGraph();
+                       final CompletableFuture<JobSubmissionResult> 
submitFuture = client.submitJob(jobGraph);
+                       // wait for the submission to succeed
+                       final JobSubmissionResult result = submitFuture.get();
+
+                       final CompletableFuture<JobResult> resultFuture = 
client.requestJobResult(result.getJobID());
+                       final JobResult jobResult = resultFuture.get();
+
+                       
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
+               }
+       }
+
+       private static JobGraph createJobGraph() {
+               final SlotSharingGroup group1 = new SlotSharingGroup();
+               final SlotSharingGroup group2 = new SlotSharingGroup();
+
+               final JobVertex source = new JobVertex("source");
+               source.setInvokableClass(TestSourceInvokable.class);
+               source.setParallelism(parallelism);
+               source.setSlotSharingGroup(group1);
+
+               final JobVertex sink = new JobVertex("sink");
+               sink.setInvokableClass(TestSinkInvokable.class);
+               sink.setParallelism(parallelism);
+               sink.setSlotSharingGroup(group2);
+
+               sink.connectNewDataSetAsInput(source, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+               final JobGraph jobGraph = new JobGraph(source, sink);
+               jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
+
+               return jobGraph;
+       }
+
+       /**
+        * Basic source {@link AbstractInvokable} which sends the elements to 
the
+        * {@link TestSinkInvokable}.
+        */
+       public static final class TestSourceInvokable extends AbstractInvokable 
{
+
+               /**
+                * Create an Invokable task and set its environment.
+                *
+                * @param environment The environment assigned to this 
invokable.
+                */
+               public TestSourceInvokable(Environment environment) {
+                       super(environment);
+               }
+
+               @Override
+               public void invoke() throws Exception {
+                       final RecordWriter<ByteArrayType> writer = new 
RecordWriterBuilder().build(getEnvironment().getWriter(0));
+
+                       final ByteArrayType bytes = new 
ByteArrayType(dataSource);
+                       int counter = 0;
+                       while (counter++ < numRecords) {
+                               try {
+                                       writer.emit(bytes);
+                                       writer.flushAll();
+                               } finally {
+                                       writer.clearBuffers();
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Basic sink {@link AbstractInvokable} which verifies the sent elements
+        * from the {@link TestSourceInvokable}.
+        */
+       public static final class TestSinkInvokable extends AbstractInvokable {
+
+               private int numReceived = 0;
+
+               /**
+                * Create an Invokable task and set its environment.
+                *
+                * @param environment The environment assigned to this 
invokable.
+                */
+               public TestSinkInvokable(Environment environment) {
+                       super(environment);
+               }
+
+               @Override
+               public void invoke() throws Exception {
+                       final RecordReader<ByteArrayType> reader = new 
RecordReader<>(
+                               getEnvironment().getInputGate(0),
+                               ByteArrayType.class,
+                               
getEnvironment().getTaskManagerInfo().getTmpDirectories());
+
+                       while (reader.hasNext()) {
+                               reader.next();
+                               numReceived++;
+                       }
+
+                       assertThat(numReceived, is(numRecords));
+               }
+       }
+}

Reply via email to