[FLINK-8582][runtime] Introduce BufferConsumer BufferConsumer will be used in the future for reading partially written MemorySegments. On flushes instead of requesting new MemorySegment BufferConsumer code will allow to continue writting to partially filled up MemmorySegment.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/306fd8e4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/306fd8e4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/306fd8e4 Branch: refs/heads/master Commit: 306fd8e41d96d8fdf7470cb3fa56917ec1b2cd31 Parents: 66ac59f Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Thu Jan 4 16:42:46 2018 +0100 Committer: Piotr Nowojski <piotr.nowoj...@gmail.com> Committed: Mon Feb 19 12:21:14 2018 +0100 ---------------------------------------------------------------------- .../api/serialization/RecordSerializer.java | 12 -- .../serialization/SpanningRecordSerializer.java | 10 - .../io/network/api/writer/RecordWriter.java | 43 ++-- .../io/network/buffer/BufferBuilder.java | 132 ++++++++++-- .../io/network/buffer/BufferConsumer.java | 112 ++++++++++ .../io/network/buffer/LocalBufferPool.java | 26 ++- .../SpanningRecordSerializationTest.java | 17 +- .../buffer/BufferBuilderAndConsumerTest.java | 202 +++++++++++++++++++ .../io/network/buffer/BufferBuilderTest.java | 114 ----------- .../network/buffer/BufferBuilderTestUtils.java | 6 + .../IteratorWrappingTestSingleInputGate.java | 8 +- .../BackPressureStatsTrackerImplITCase.java | 3 +- .../consumer/StreamTestSingleInputGate.java | 8 +- 13 files changed, 506 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java index 9fb656b..a74a068 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import java.io.IOException; @@ -87,17 +86,6 @@ public interface RecordSerializer<T extends IOReadableWritable> { SerializationResult setNextBufferBuilder(BufferBuilder bufferBuilder) throws IOException; /** - * Retrieves the current target buffer and sets its size to the actual - * number of written bytes. - * - * <p>After calling this method, a new target buffer is required to continue - * writing (see {@link #setNextBufferBuilder(BufferBuilder)}). - * - * @return the target buffer that was used - */ - Buffer getCurrentBuffer(); - - /** * Clear and release internal state. */ void clear(); http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index 263ff43..ba8e659 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -147,16 +147,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R } @Override - public Buffer getCurrentBuffer() { - if (targetBuffer == null) { - return null; - } - Buffer result = targetBuffer.build(); - targetBuffer = null; - return result; - } - - @Override public void clear() { targetBuffer = null; } http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index b47c461..da28cf7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -27,10 +27,12 @@ import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.util.XORShiftRandom; import java.io.IOException; +import java.util.Optional; import java.util.Random; import static org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult; @@ -60,6 +62,8 @@ public class RecordWriter<T extends IOReadableWritable> { /** {@link RecordSerializer} per outgoing channel. */ private final RecordSerializer<T>[] serializers; + private final Optional<BufferConsumer>[] bufferConsumers; + private final Random rng = new XORShiftRandom(); private Counter numBytesOut = new SimpleCounter(); @@ -81,8 +85,10 @@ public class RecordWriter<T extends IOReadableWritable> { * serializer. */ this.serializers = new SpanningRecordSerializer[numChannels]; + this.bufferConsumers = new Optional[numChannels]; for (int i = 0; i < numChannels; i++) { serializers[i] = new SpanningRecordSerializer<T>(); + bufferConsumers[i] = Optional.empty(); } } @@ -128,6 +134,8 @@ public class RecordWriter<T extends IOReadableWritable> { } else { BufferBuilder bufferBuilder = targetPartition.getBufferProvider().requestBufferBuilderBlocking(); + checkState(!bufferConsumers[targetChannel].isPresent()); + bufferConsumers[targetChannel] = Optional.of(bufferBuilder.createBufferConsumer()); result = serializer.setNextBufferBuilder(bufferBuilder); } } @@ -166,18 +174,11 @@ public class RecordWriter<T extends IOReadableWritable> { } public void clearBuffers() { - for (RecordSerializer<?> serializer : serializers) { + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { + RecordSerializer<?> serializer = serializers[targetChannel]; synchronized (serializer) { - try { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - buffer.recycleBuffer(); - } - } - finally { - serializer.clear(); - } + closeBufferConsumer(targetChannel); + serializer.clear(); } } } @@ -190,8 +191,8 @@ public class RecordWriter<T extends IOReadableWritable> { } /** - * Writes the buffer to the {@link ResultPartitionWriter} and removes the - * buffer from the serializer state. + * Tries to consume serialized data and (if data present) writes them to the {@link ResultPartitionWriter}. + * After writing it clean ups the state. * * <p><b>Needs to be synchronized on the serializer!</b> * @@ -201,17 +202,25 @@ public class RecordWriter<T extends IOReadableWritable> { int targetChannel, RecordSerializer<T> serializer) throws IOException { - Buffer buffer = serializer.getCurrentBuffer(); - if (buffer == null) { + Optional<BufferConsumer> bufferConsumer = bufferConsumers[targetChannel]; + if (!bufferConsumer.isPresent()) { return false; } - numBytesOut.inc(buffer.getSizeUnsafe()); + numBytesOut.inc(bufferConsumer.get().getWrittenBytes()); try { - targetPartition.writeBuffer(buffer, targetChannel); + targetPartition.writeBuffer(bufferConsumer.get().build(), targetChannel); return true; } finally { serializer.clear(); + closeBufferConsumer(targetChannel); + } + } + + private void closeBufferConsumer(int targetChannel) { + if (bufferConsumers[targetChannel].isPresent()) { + bufferConsumers[targetChannel].get().close(); + bufferConsumers[targetChannel] = Optional.empty(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java index ff59f96..bac141f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.buffer; import org.apache.flink.core.memory.MemorySegment; import javax.annotation.concurrent.NotThreadSafe; +import javax.annotation.concurrent.ThreadSafe; import java.nio.ByteBuffer; @@ -28,8 +29,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; /** - * Not thread safe class for filling in the initial content of the {@link Buffer}. Once writing to the builder - * is complete, {@link Buffer} instance can be built and shared across multiple threads. + * Not thread safe class for filling in the content of the {@link MemorySegment}. To access written data please use + * {@link BufferConsumer} which allows to build {@link Buffer} instances from the written data. */ @NotThreadSafe public class BufferBuilder { @@ -37,9 +38,9 @@ public class BufferBuilder { private final BufferRecycler recycler; - private int position = 0; + private final SettablePositionMarker positionMarker = new SettablePositionMarker(); - private boolean built = false; + private boolean bufferConsumerCreated = false; public BufferBuilder(MemorySegment memorySegment, BufferRecycler recycler) { this.memorySegment = checkNotNull(memorySegment); @@ -47,36 +48,133 @@ public class BufferBuilder { } /** + * @return created matching instance of {@link BufferConsumer} to this {@link BufferBuilder}. There can exist only + * one {@link BufferConsumer} per each {@link BufferBuilder} and vice versa. + */ + public BufferConsumer createBufferConsumer() { + checkState(!bufferConsumerCreated, "There can not exists two BufferConsumer for one BufferBuilder"); + bufferConsumerCreated = true; + return new BufferConsumer( + memorySegment, + recycler, + positionMarker); + } + + /** + * Append as many data as possible from {@code source}. Not everything might be copied if there is not enough + * space in the underlying {@link MemorySegment} + * * @return number of copied bytes */ public int append(ByteBuffer source) { - checkState(!built); + checkState(!isFinished()); int needed = source.remaining(); - int available = limit() - position; + int available = getMaxCapacity() - positionMarker.getCached(); int toCopy = Math.min(needed, available); - memorySegment.put(position, source, toCopy); - position += toCopy; + memorySegment.put(positionMarker.getCached(), source, toCopy); + positionMarker.move(toCopy); return toCopy; } - public boolean isFull() { - checkState(position <= limit()); - return position == limit(); + /** + * Mark this {@link BufferBuilder} and associated {@link BufferConsumer} as finished - no new data writes will be + * allowed. + * + * @return number of written bytes. + */ + public int finish() { + checkState(!isFinished()); + positionMarker.markFinished(); + return getWrittenBytes(); } - public Buffer build() { - checkState(!built); - built = true; - return new NetworkBuffer(memorySegment, recycler, true, position); + public boolean isFinished() { + return positionMarker.isFinished(); + } + + public boolean isFull() { + checkState(positionMarker.getCached() <= getMaxCapacity()); + return positionMarker.getCached() == getMaxCapacity(); } public boolean isEmpty() { - return position == 0; + return positionMarker.getCached() == 0; } - private int limit() { + public int getMaxCapacity() { return memorySegment.size(); } + + public int getWrittenBytes() { + return positionMarker.getCached(); + } + + /** + * Holds a reference to the current writer position. Negative values indicate that writer ({@link BufferBuilder} + * has finished. Value {@code Integer.MIN_VALUE} represents finished empty buffer. + */ + @ThreadSafe + interface PositionMarker { + int FINISHED_EMPTY = Integer.MIN_VALUE; + + int get(); + + static boolean isFinished(int position) { + return position < 0; + } + + static int getAbsolute(int position) { + if (position == FINISHED_EMPTY) { + return 0; + } + return Math.abs(position); + } + } + + /** + * Cached writing implementation of {@link PositionMarker}. + * + * <p>Writer ({@link BufferBuilder}) and reader ({@link BufferConsumer}) caches must be implemented independently + * of one another - for example the cached values can not accidentally leak from one to another. + */ + private static class SettablePositionMarker implements PositionMarker { + private volatile int position = 0; + + /** + * Locally cached value of volatile {@code position} to avoid unnecessary volatile accesses. + */ + private int cachedPosition = 0; + + @Override + public int get() { + return position; + } + + public boolean isFinished() { + return PositionMarker.isFinished(cachedPosition); + } + + public int getCached() { + return PositionMarker.getAbsolute(cachedPosition); + } + + public void markFinished() { + int newValue = -getCached(); + if (newValue == 0) { + newValue = FINISHED_EMPTY; + } + set(newValue); + } + + public void move(int offset) { + set(cachedPosition + offset); + } + + public void set(int value) { + cachedPosition = value; + position = cachedPosition; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java new file mode 100644 index 0000000..177bac0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder.PositionMarker; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.Closeable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Not thread safe class for producing {@link Buffer}. + * + * <p>It reads data written by {@link BufferBuilder}. + * Although it is not thread safe and can be used only by one single thread, this thread can be different then the + * thread using/writing to {@link BufferBuilder}. Pattern here is simple: one thread writes data to + * {@link BufferBuilder} and there can be a different thread reading from it using {@link BufferConsumer}. + */ +@NotThreadSafe +public class BufferConsumer implements Closeable { + private final Buffer buffer; + + private final CachedPositionMarker writerPosition; + + private int currentReaderPosition = 0; + + public BufferConsumer( + MemorySegment memorySegment, + BufferRecycler recycler, + PositionMarker currentWriterPosition) { + + this.buffer = new NetworkBuffer(checkNotNull(memorySegment), checkNotNull(recycler), true); + this.writerPosition = new CachedPositionMarker(checkNotNull(currentWriterPosition)); + } + + public boolean isFinished() { + return writerPosition.isFinished(); + } + + /** + * @return sliced {@link Buffer} containing the not yet consumed data. Returned {@link Buffer} shares the reference + * counter with the parent {@link BufferConsumer} - in order to recycle memory both of them must be recycled/closed. + */ + public Buffer build() { + writerPosition.update(); + Buffer slice = buffer.readOnlySlice(currentReaderPosition, writerPosition.getCached() - currentReaderPosition); + currentReaderPosition = writerPosition.getCached(); + return slice.retainBuffer(); + } + + @Override + public void close() { + if (!buffer.isRecycled()) { + buffer.recycleBuffer(); + } + } + + public int getWrittenBytes() { + return writerPosition.getCached(); + } + + /** + * Cached reading wrapper around {@link PositionMarker}. + * + * <p>Writer ({@link BufferBuilder}) and reader ({@link BufferConsumer}) caches must be implemented independently + * of one another - for example the cached values can not accidentally leak from one to another. + */ + private static class CachedPositionMarker { + private final PositionMarker positionMarker; + + /** + * Locally cached value of {@link PositionMarker} to avoid unnecessary volatile accesses. + */ + private int cachedPosition; + + public CachedPositionMarker(PositionMarker positionMarker) { + this.positionMarker = checkNotNull(positionMarker); + update(); + } + + public boolean isFinished() { + return PositionMarker.isFinished(cachedPosition); + } + + public int getCached() { + return PositionMarker.getAbsolute(cachedPosition); + } + + private void update() { + this.cachedPosition = positionMarker.get(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index a36d6be..0a311aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -180,8 +180,7 @@ class LocalBufferPool implements BufferPool { @Override public Buffer requestBuffer() throws IOException { try { - BufferBuilder bufferBuilder = requestBufferBuilder(false); - return bufferBuilder != null ? bufferBuilder.build() : null; + return toBuffer(requestMemorySegment(false)); } catch (InterruptedException e) { throw new IOException(e); @@ -190,16 +189,29 @@ class LocalBufferPool implements BufferPool { @Override public Buffer requestBufferBlocking() throws IOException, InterruptedException { - BufferBuilder bufferBuilder = requestBufferBuilder(true); - return bufferBuilder != null ? bufferBuilder.build() : null; + return toBuffer(requestMemorySegment(true)); } @Override public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException { - return requestBufferBuilder(true); + return toBufferBuilder(requestMemorySegment(true)); } - private BufferBuilder requestBufferBuilder(boolean isBlocking) throws InterruptedException, IOException { + private Buffer toBuffer(MemorySegment memorySegment) { + if (memorySegment == null) { + return null; + } + return new NetworkBuffer(memorySegment, this); + } + + private BufferBuilder toBufferBuilder(MemorySegment memorySegment) { + if (memorySegment == null) { + return null; + } + return new BufferBuilder(memorySegment, this); + } + + private MemorySegment requestMemorySegment(boolean isBlocking) throws InterruptedException, IOException { synchronized (availableMemorySegments) { returnExcessMemorySegments(); @@ -234,7 +246,7 @@ class LocalBufferPool implements BufferPool { } } - return new BufferBuilder(availableMemorySegments.poll(), this); + return availableMemorySegments.poll(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java index af26d8d..356f210 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType; import org.apache.flink.testutils.serialization.types.IntType; import org.apache.flink.testutils.serialization.types.SerializationTestType; @@ -33,6 +34,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; /** @@ -134,7 +136,8 @@ public class SpanningRecordSerializationTest { // ------------------------------------------------------------------------------------------------------------- - serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); + BufferBuilder bufferBuilder = createBufferBuilder(segmentSize); + serializer.setNextBufferBuilder(bufferBuilder); int numRecords = 0; for (SerializationTestType record : records) { @@ -146,7 +149,7 @@ public class SpanningRecordSerializationTest { // serialize record if (serializer.addRecord(record).isFullBuffer()) { // buffer is full => start deserializing - deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize); + deserializer.setNextBuffer(buildSingleBuffer(bufferBuilder)); while (!serializedRecords.isEmpty()) { SerializationTestType expected = serializedRecords.poll(); @@ -162,14 +165,18 @@ public class SpanningRecordSerializationTest { } // move buffers as long as necessary (for long records) - while (serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)).isFullBuffer()) { - deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize); + bufferBuilder = createBufferBuilder(segmentSize); + serializer.clear(); + while (serializer.setNextBufferBuilder(bufferBuilder).isFullBuffer()) { + deserializer.setNextBuffer(buildSingleBuffer(bufferBuilder)); + bufferBuilder = createBufferBuilder(segmentSize); + serializer.clear(); } } } // deserialize left over records - deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize); + deserializer.setNextBuffer(buildSingleBuffer(bufferBuilder)); while (!serializedRecords.isEmpty()) { SerializationTestType expected = serializedRecords.poll(); http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java new file mode 100644 index 0000000..b2cccb5 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import org.apache.flink.core.memory.MemorySegmentFactory; + +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.nio.IntBuffer; +import java.util.ArrayList; + +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link BufferBuilder}. + */ +public class BufferBuilderAndConsumerTest { + private static final int BUFFER_INT_SIZE = 10; + private static final int BUFFER_SIZE = BUFFER_INT_SIZE * Integer.BYTES; + + @Test + public void referenceCounting() { + BufferBuilder bufferBuilder = createBufferBuilder(); + BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer(); + + assertEquals(3 * Integer.BYTES, bufferBuilder.append(toByteBuffer(1, 2, 3))); + + Buffer buffer = bufferConsumer.build(); + assertFalse(buffer.isRecycled()); + buffer.recycleBuffer(); + assertFalse(buffer.isRecycled()); + bufferConsumer.close(); + assertTrue(buffer.isRecycled()); + } + + @Test + public void append() { + BufferBuilder bufferBuilder = createBufferBuilder(); + + int[] intsToWrite = new int[] {0, 1, 2, 3, 42}; + ByteBuffer bytesToWrite = toByteBuffer(intsToWrite); + + assertEquals(bytesToWrite.limit(), bufferBuilder.append(bytesToWrite)); + + assertEquals(bytesToWrite.limit(), bytesToWrite.position()); + assertFalse(bufferBuilder.isFull()); + + assertContent(bufferBuilder.createBufferConsumer(), intsToWrite); + } + + @Test + public void multipleAppends() { + BufferBuilder bufferBuilder = createBufferBuilder(); + BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer(); + + bufferBuilder.append(toByteBuffer(0, 1)); + bufferBuilder.append(toByteBuffer(2)); + bufferBuilder.append(toByteBuffer(3, 42)); + + assertContent(bufferConsumer, 0, 1, 2, 3, 42); + } + + @Test + public void appendOverSize() { + BufferBuilder bufferBuilder = createBufferBuilder(); + BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer(); + ByteBuffer bytesToWrite = toByteBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 42); + + assertEquals(BUFFER_SIZE, bufferBuilder.append(bytesToWrite)); + + assertTrue(bufferBuilder.isFull()); + assertContent(bufferConsumer, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + bufferBuilder = createBufferBuilder(); + bufferConsumer = bufferBuilder.createBufferConsumer(); + assertEquals(Integer.BYTES, bufferBuilder.append(bytesToWrite)); + + assertFalse(bufferBuilder.isFull()); + assertContent(bufferConsumer, 42); + } + + @Test + public void buildEmptyBuffer() { + Buffer buffer = buildSingleBuffer(createBufferBuilder()); + assertEquals(0, buffer.getSize()); + assertContent(buffer); + } + + @Test + public void buildingBufferMultipleTimes() { + BufferBuilder bufferBuilder = createBufferBuilder(); + try (BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer()) { + bufferBuilder.append(toByteBuffer(0, 1)); + bufferBuilder.append(toByteBuffer(2)); + + assertContent(bufferConsumer, 0, 1, 2); + + bufferBuilder.append(toByteBuffer(3, 42)); + bufferBuilder.append(toByteBuffer(44)); + + assertContent(bufferConsumer, 3, 42, 44); + + ArrayList<Integer> originalValues = new ArrayList<>(); + while (!bufferBuilder.isFull()) { + bufferBuilder.append(toByteBuffer(1337)); + originalValues.add(1337); + } + + assertContent(bufferConsumer, originalValues.stream().mapToInt(Integer::intValue).toArray()); + } + } + + @Test + public void emptyIsFinished() { + testIsFinished(0); + } + + @Test + public void partiallyFullIsFinished() { + testIsFinished(BUFFER_INT_SIZE / 2); + } + + @Test + public void fullIsFinished() { + testIsFinished(BUFFER_INT_SIZE); + } + + private static void testIsFinished(int writes) { + BufferBuilder bufferBuilder = createBufferBuilder(); + BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer(); + + for (int i = 0; i < writes; i++) { + assertEquals(Integer.BYTES, bufferBuilder.append(toByteBuffer(42))); + } + + assertFalse(bufferBuilder.isFinished()); + assertFalse(bufferConsumer.isFinished()); + + bufferConsumer.build(); + + assertFalse(bufferBuilder.isFinished()); + assertFalse(bufferConsumer.isFinished()); + + bufferBuilder.finish(); + + assertTrue(bufferBuilder.isFinished()); + assertFalse(bufferConsumer.isFinished()); + + bufferConsumer.build(); + + assertTrue(bufferConsumer.isFinished()); + } + + private static ByteBuffer toByteBuffer(int... data) { + ByteBuffer byteBuffer = ByteBuffer.allocate(data.length * Integer.BYTES); + byteBuffer.asIntBuffer().put(data); + return byteBuffer; + } + + private static void assertContent(BufferConsumer actualConsumer, int... expected) { + assertFalse(actualConsumer.isFinished()); + Buffer buffer = actualConsumer.build(); + assertFalse(buffer.isRecycled()); + assertContent(buffer, expected); + assertEquals(expected.length * Integer.BYTES, buffer.getSize()); + buffer.recycleBuffer(); + } + + private static void assertContent(Buffer actualBuffer, int... expected) { + IntBuffer actualIntBuffer = actualBuffer.getNioBufferReadable().asIntBuffer(); + int[] actual = new int[actualIntBuffer.limit()]; + actualIntBuffer.get(actual); + assertArrayEquals(expected, actual); + + assertEquals(FreeingBufferRecycler.INSTANCE, actualBuffer.getRecycler()); + } + + private static BufferBuilder createBufferBuilder() { + return new BufferBuilder(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), FreeingBufferRecycler.INSTANCE); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java deleted file mode 100644 index d0df02d..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.buffer; - -import org.apache.flink.core.memory.MemorySegmentFactory; - -import org.junit.Test; - -import java.nio.ByteBuffer; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * Tests for {@link BufferBuilder}. - */ -public class BufferBuilderTest { - private static final int BUFFER_SIZE = 10 * Integer.BYTES; - - @Test - public void append() { - BufferBuilder bufferBuilder = createBufferBuilder(); - int[] intsToWrite = new int[] {0, 1, 2, 3, 42}; - ByteBuffer bytesToWrite = toByteBuffer(intsToWrite); - - assertEquals(bytesToWrite.limit(), bufferBuilder.append(bytesToWrite)); - - assertEquals(bytesToWrite.limit(), bytesToWrite.position()); - assertFalse(bufferBuilder.isFull()); - Buffer buffer = bufferBuilder.build(); - assertBufferContent(buffer, intsToWrite); - assertEquals(5 * Integer.BYTES, buffer.getSize()); - assertEquals(FreeingBufferRecycler.INSTANCE, buffer.getRecycler()); - } - - @Test - public void multipleAppends() { - BufferBuilder bufferBuilder = createBufferBuilder(); - - bufferBuilder.append(toByteBuffer(0, 1)); - bufferBuilder.append(toByteBuffer(2)); - bufferBuilder.append(toByteBuffer(3, 42)); - - Buffer buffer = bufferBuilder.build(); - assertBufferContent(buffer, 0, 1, 2, 3, 42); - assertEquals(5 * Integer.BYTES, buffer.getSize()); - } - - @Test - public void appendOverSize() { - BufferBuilder bufferBuilder = createBufferBuilder(); - ByteBuffer bytesToWrite = toByteBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 42); - - assertEquals(BUFFER_SIZE, bufferBuilder.append(bytesToWrite)); - - assertTrue(bufferBuilder.isFull()); - Buffer buffer = bufferBuilder.build(); - assertBufferContent(buffer, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - assertEquals(BUFFER_SIZE, buffer.getSize()); - - bufferBuilder = createBufferBuilder(); - assertEquals(Integer.BYTES, bufferBuilder.append(bytesToWrite)); - - assertFalse(bufferBuilder.isFull()); - buffer = bufferBuilder.build(); - assertBufferContent(buffer, 42); - assertEquals(Integer.BYTES, buffer.getSize()); - } - - @Test - public void buildEmptyBuffer() { - Buffer buffer = createBufferBuilder().build(); - assertEquals(0, buffer.getSize()); - assertBufferContent(buffer); - } - - @Test(expected = IllegalStateException.class) - public void buildingBufferTwice() { - BufferBuilder bufferBuilder = createBufferBuilder(); - bufferBuilder.build(); - bufferBuilder.build(); - } - - private static ByteBuffer toByteBuffer(int... data) { - ByteBuffer byteBuffer = ByteBuffer.allocate(data.length * Integer.BYTES); - byteBuffer.asIntBuffer().put(data); - return byteBuffer; - } - - private static void assertBufferContent(Buffer actualBuffer, int... expected) { - assertEquals(toByteBuffer(expected), actualBuffer.getNioBufferReadable()); - } - - private static BufferBuilder createBufferBuilder() { - return new BufferBuilder(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), FreeingBufferRecycler.INSTANCE); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java index 1113664..bdbb5e0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java @@ -29,4 +29,10 @@ public class BufferBuilderTestUtils { MemorySegmentFactory.allocateUnpooledSegment(size), FreeingBufferRecycler.INSTANCE); } + + public static Buffer buildSingleBuffer(BufferBuilder bufferBuilder) { + try (BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer()) { + return bufferConsumer.build(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java index 5fe835a..ef30ee1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.MutableObjectIterator; @@ -32,6 +33,7 @@ import org.mockito.stubbing.Answer; import java.io.IOException; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; import static org.mockito.Mockito.when; @@ -69,13 +71,15 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e @Override public InputChannel.BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable { if (hasData) { - serializer.setNextBufferBuilder(createBufferBuilder(bufferSize)); + serializer.clear(); + BufferBuilder bufferBuilder = createBufferBuilder(bufferSize); + serializer.setNextBufferBuilder(bufferBuilder); serializer.addRecord(reuse); hasData = inputIterator.next(reuse) != null; // Call getCurrentBuffer to ensure size is set - return new InputChannel.BufferAndAvailability(serializer.getCurrentBuffer(), true, 0); + return new InputChannel.BufferAndAvailability(buildSingleBuffer(bufferBuilder), true, 0); } else { when(inputChannel.getInputChannel().isReleased()).thenReturn(true); http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java index cb67d48..994d02e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -324,7 +325,7 @@ public class BackPressureStatsTrackerImplITCase extends TestLogger { while (true) { final BufferBuilder bufferBuilder = testBufferPool.requestBufferBuilderBlocking(); // Got a buffer, yay! - bufferBuilder.build().recycleBuffer(); + BufferBuilderTestUtils.buildSingleBuffer(bufferBuilder).recycleBuffer(); new CountDownLatch(1).await(); } http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java index 11d8f11..e938ac8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.plugable.SerializationDelegate; @@ -38,6 +39,7 @@ import org.mockito.stubbing.Answer; import java.io.IOException; import java.util.concurrent.ConcurrentLinkedQueue; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; @@ -102,12 +104,14 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { } else if (input != null && input.isStreamRecord()) { Object inputElement = input.getStreamRecord(); - recordSerializer.setNextBufferBuilder(createBufferBuilder(bufferSize)); + BufferBuilder bufferBuilder = createBufferBuilder(bufferSize); + recordSerializer.setNextBufferBuilder(bufferBuilder); delegate.setInstance(inputElement); recordSerializer.addRecord(delegate); + bufferBuilder.finish(); // Call getCurrentBuffer to ensure size is set - return new BufferAndAvailability(recordSerializer.getCurrentBuffer(), false, 0); + return new BufferAndAvailability(buildSingleBuffer(bufferBuilder), false, 0); } else if (input != null && input.isEvent()) { AbstractEvent event = input.getEvent(); return new BufferAndAvailability(EventSerializer.toBuffer(event), false, 0);