This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit dfdfdadf445ea055c841c526b1a382424e1e1865 Author: Arvid Heise <ar...@ververica.com> AuthorDate: Fri Jun 12 10:03:59 2020 +0200 [FLINK-17322][network] Disallowing repeated consumer creation for BufferBuilder. This is a partial revert of FLINK-10995. --- .../runtime/io/network/api/writer/RecordWriter.java | 5 +++++ .../flink/runtime/io/network/buffer/BufferBuilder.java | 4 ++++ .../io/network/api/writer/RecordWriterDelegateTest.java | 10 ++++++++-- .../runtime/io/network/api/writer/RecordWriterTest.java | 5 +++-- .../io/network/buffer/BufferBuilderAndConsumerTest.java | 16 +++------------- 5 files changed, 23 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 7d0f7de..be40d8d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -349,4 +349,9 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai } } } + + @VisibleForTesting + ResultPartitionWriter getTargetPartition() { + return targetPartition; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java index 7780ba8..2cb873c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java @@ -41,6 +41,8 @@ public class BufferBuilder { private final SettablePositionMarker positionMarker = new SettablePositionMarker(); + private boolean bufferConsumerCreated = false; + public BufferBuilder(MemorySegment memorySegment, BufferRecycler recycler) { this.memorySegment = checkNotNull(memorySegment); this.recycler = checkNotNull(recycler); @@ -53,6 +55,8 @@ public class BufferBuilder { * @return created matching instance of {@link BufferConsumer} to this {@link BufferBuilder}. */ public BufferConsumer createBufferConsumer() { + checkState(!bufferConsumerCreated, "Two BufferConsumer shouldn't exist for one BufferBuilder"); + bufferConsumerCreated = true; return new BufferConsumer( memorySegment, recycler, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java index 97d8053..4a7e5c5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java @@ -25,7 +25,9 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder; +import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.util.TestLogger; @@ -148,13 +150,17 @@ public class RecordWriterDelegateTest extends TestLogger { assertTrue(writerDelegate.getAvailableFuture().isDone()); // request one buffer from the local pool to make it unavailable - final BufferBuilder bufferBuilder = checkNotNull(writerDelegate.getRecordWriter(0).getBufferBuilder(0)); + RecordWriter recordWriter = writerDelegate.getRecordWriter(0); + final BufferBuilder bufferBuilder = checkNotNull(recordWriter.getBufferBuilder(0)); assertFalse(writerDelegate.isAvailable()); CompletableFuture future = writerDelegate.getAvailableFuture(); assertFalse(future.isDone()); // recycle the buffer to make the local pool available again - final Buffer buffer = BufferBuilderTestUtils.buildSingleBuffer(bufferBuilder); + BufferBuilderTestUtils.fillBufferBuilder(bufferBuilder, 1).finish(); + ResultSubpartitionView readView = recordWriter.getTargetPartition().getSubpartition(0).createReadView(new NoOpBufferAvailablityListener()); + Buffer buffer = readView.getNextBuffer().buffer(); + buffer.recycleBuffer(); assertTrue(future.isDone()); assertTrue(writerDelegate.isAvailable()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index 84ec497..6e75dff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -519,8 +519,9 @@ public class RecordWriterTest { new NoOpResultPartitionConsumableNotifier()); final RecordWriter recordWriter = createRecordWriter(partitionWrapper); BufferBuilder builder = recordWriter.requestNewBufferBuilder(0); - final Buffer buffer = BufferBuilderTestUtils.buildSingleBuffer(builder); - builder.finish(); + BufferBuilderTestUtils.fillBufferBuilder(builder, 1).finish(); + ResultSubpartitionView readView = resultPartition.getSubpartition(0).createReadView(new NoOpBufferAvailablityListener()); + Buffer buffer = readView.getNextBuffer().buffer(); // idle time is zero when there is buffer available. assertEquals(0, recordWriter.getIdleTimeMsPerSecond().getCount()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java index 3900bb0..124e845 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java @@ -119,21 +119,11 @@ public class BufferBuilderAndConsumerTest { assertContent(bufferConsumer, 42); } - @Test + @Test(expected = IllegalStateException.class) public void creatingBufferConsumerTwice() { BufferBuilder bufferBuilder = createBufferBuilder(); - BufferConsumer bufferConsumer1 = bufferBuilder.createBufferConsumer(); - - assertEquals(0, bufferConsumer1.getCurrentReaderPosition()); - assertContent(bufferConsumer1); - - ByteBuffer bytesToWrite = toByteBuffer(0, 1); - bufferBuilder.appendAndCommit(bytesToWrite); - BufferConsumer bufferConsumer2 = bufferBuilder.createBufferConsumer(); - bufferBuilder.appendAndCommit(toByteBuffer(2)); - - assertEquals(bytesToWrite.position(), bufferConsumer2.getCurrentReaderPosition()); - assertContent(bufferConsumer2, 2); + bufferBuilder.createBufferConsumer(); + bufferBuilder.createBufferConsumer(); } @Test