This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dfdfdadf445ea055c841c526b1a382424e1e1865
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Fri Jun 12 10:03:59 2020 +0200

    [FLINK-17322][network] Disallowing repeated consumer creation for 
BufferBuilder.
    
    This is a partial revert of FLINK-10995.
---
 .../runtime/io/network/api/writer/RecordWriter.java      |  5 +++++
 .../flink/runtime/io/network/buffer/BufferBuilder.java   |  4 ++++
 .../io/network/api/writer/RecordWriterDelegateTest.java  | 10 ++++++++--
 .../runtime/io/network/api/writer/RecordWriterTest.java  |  5 +++--
 .../io/network/buffer/BufferBuilderAndConsumerTest.java  | 16 +++-------------
 5 files changed, 23 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 7d0f7de..be40d8d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -349,4 +349,9 @@ public abstract class RecordWriter<T extends 
IOReadableWritable> implements Avai
                        }
                }
        }
+
+       @VisibleForTesting
+       ResultPartitionWriter getTargetPartition() {
+               return targetPartition;
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 7780ba8..2cb873c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -41,6 +41,8 @@ public class BufferBuilder {
 
        private final SettablePositionMarker positionMarker = new 
SettablePositionMarker();
 
+       private boolean bufferConsumerCreated = false;
+
        public BufferBuilder(MemorySegment memorySegment, BufferRecycler 
recycler) {
                this.memorySegment = checkNotNull(memorySegment);
                this.recycler = checkNotNull(recycler);
@@ -53,6 +55,8 @@ public class BufferBuilder {
         * @return created matching instance of {@link BufferConsumer} to this 
{@link BufferBuilder}.
         */
        public BufferConsumer createBufferConsumer() {
+               checkState(!bufferConsumerCreated, "Two BufferConsumer 
shouldn't exist for one BufferBuilder");
+               bufferConsumerCreated = true;
                return new BufferConsumer(
                        memorySegment,
                        recycler,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
index 97d8053..4a7e5c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
@@ -25,7 +25,9 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.util.TestLogger;
@@ -148,13 +150,17 @@ public class RecordWriterDelegateTest extends TestLogger {
                assertTrue(writerDelegate.getAvailableFuture().isDone());
 
                // request one buffer from the local pool to make it unavailable
-               final BufferBuilder bufferBuilder = 
checkNotNull(writerDelegate.getRecordWriter(0).getBufferBuilder(0));
+               RecordWriter recordWriter = writerDelegate.getRecordWriter(0);
+               final BufferBuilder bufferBuilder = 
checkNotNull(recordWriter.getBufferBuilder(0));
                assertFalse(writerDelegate.isAvailable());
                CompletableFuture future = writerDelegate.getAvailableFuture();
                assertFalse(future.isDone());
 
                // recycle the buffer to make the local pool available again
-               final Buffer buffer = 
BufferBuilderTestUtils.buildSingleBuffer(bufferBuilder);
+               BufferBuilderTestUtils.fillBufferBuilder(bufferBuilder, 
1).finish();
+               ResultSubpartitionView readView = 
recordWriter.getTargetPartition().getSubpartition(0).createReadView(new 
NoOpBufferAvailablityListener());
+               Buffer buffer = readView.getNextBuffer().buffer();
+
                buffer.recycleBuffer();
                assertTrue(future.isDone());
                assertTrue(writerDelegate.isAvailable());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 84ec497..6e75dff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -519,8 +519,9 @@ public class RecordWriterTest {
                        new NoOpResultPartitionConsumableNotifier());
                final RecordWriter recordWriter = 
createRecordWriter(partitionWrapper);
                BufferBuilder builder = recordWriter.requestNewBufferBuilder(0);
-               final Buffer buffer = 
BufferBuilderTestUtils.buildSingleBuffer(builder);
-               builder.finish();
+               BufferBuilderTestUtils.fillBufferBuilder(builder, 1).finish();
+               ResultSubpartitionView readView = 
resultPartition.getSubpartition(0).createReadView(new 
NoOpBufferAvailablityListener());
+               Buffer buffer = readView.getNextBuffer().buffer();
 
                // idle time is zero when there is buffer available.
                assertEquals(0, 
recordWriter.getIdleTimeMsPerSecond().getCount());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
index 3900bb0..124e845 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
@@ -119,21 +119,11 @@ public class BufferBuilderAndConsumerTest {
                assertContent(bufferConsumer, 42);
        }
 
-       @Test
+       @Test(expected = IllegalStateException.class)
        public void creatingBufferConsumerTwice() {
                BufferBuilder bufferBuilder = createBufferBuilder();
-               BufferConsumer bufferConsumer1 = 
bufferBuilder.createBufferConsumer();
-
-               assertEquals(0, bufferConsumer1.getCurrentReaderPosition());
-               assertContent(bufferConsumer1);
-
-               ByteBuffer bytesToWrite = toByteBuffer(0, 1);
-               bufferBuilder.appendAndCommit(bytesToWrite);
-               BufferConsumer bufferConsumer2 = 
bufferBuilder.createBufferConsumer();
-               bufferBuilder.appendAndCommit(toByteBuffer(2));
-
-               assertEquals(bytesToWrite.position(), 
bufferConsumer2.getCurrentReaderPosition());
-               assertContent(bufferConsumer2, 2);
+               bufferBuilder.createBufferConsumer();
+               bufferBuilder.createBufferConsumer();
        }
 
        @Test

Reply via email to