[FLINK-8582][runtime] Introduce BufferConsumer

BufferConsumer will be used in the future for reading partially written
MemorySegments. On flushes instead of requesting new MemorySegment 
BufferConsumer
code will allow to continue writting to partially filled up MemmorySegment.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/306fd8e4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/306fd8e4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/306fd8e4

Branch: refs/heads/master
Commit: 306fd8e41d96d8fdf7470cb3fa56917ec1b2cd31
Parents: 66ac59f
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Thu Jan 4 16:42:46 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:14 2018 +0100

----------------------------------------------------------------------
 .../api/serialization/RecordSerializer.java     |  12 --
 .../serialization/SpanningRecordSerializer.java |  10 -
 .../io/network/api/writer/RecordWriter.java     |  43 ++--
 .../io/network/buffer/BufferBuilder.java        | 132 ++++++++++--
 .../io/network/buffer/BufferConsumer.java       | 112 ++++++++++
 .../io/network/buffer/LocalBufferPool.java      |  26 ++-
 .../SpanningRecordSerializationTest.java        |  17 +-
 .../buffer/BufferBuilderAndConsumerTest.java    | 202 +++++++++++++++++++
 .../io/network/buffer/BufferBuilderTest.java    | 114 -----------
 .../network/buffer/BufferBuilderTestUtils.java  |   6 +
 .../IteratorWrappingTestSingleInputGate.java    |   8 +-
 .../BackPressureStatsTrackerImplITCase.java     |   3 +-
 .../consumer/StreamTestSingleInputGate.java     |   8 +-
 13 files changed, 506 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index 9fb656b..a74a068 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 
 import java.io.IOException;
@@ -87,17 +86,6 @@ public interface RecordSerializer<T extends 
IOReadableWritable> {
        SerializationResult setNextBufferBuilder(BufferBuilder bufferBuilder) 
throws IOException;
 
        /**
-        * Retrieves the current target buffer and sets its size to the actual
-        * number of written bytes.
-        *
-        * <p>After calling this method, a new target buffer is required to 
continue
-        * writing (see {@link #setNextBufferBuilder(BufferBuilder)}).
-        *
-        * @return the target buffer that was used
-        */
-       Buffer getCurrentBuffer();
-
-       /**
         * Clear and release internal state.
         */
        void clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 263ff43..ba8e659 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -147,16 +147,6 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
        }
 
        @Override
-       public Buffer getCurrentBuffer() {
-               if (targetBuffer == null) {
-                       return null;
-               }
-               Buffer result = targetBuffer.build();
-               targetBuffer = null;
-               return result;
-       }
-
-       @Override
        public void clear() {
                targetBuffer = null;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index b47c461..da28cf7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -27,10 +27,12 @@ import 
org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.util.XORShiftRandom;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.Random;
 
 import static 
org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
@@ -60,6 +62,8 @@ public class RecordWriter<T extends IOReadableWritable> {
        /** {@link RecordSerializer} per outgoing channel. */
        private final RecordSerializer<T>[] serializers;
 
+       private final Optional<BufferConsumer>[] bufferConsumers;
+
        private final Random rng = new XORShiftRandom();
 
        private Counter numBytesOut = new SimpleCounter();
@@ -81,8 +85,10 @@ public class RecordWriter<T extends IOReadableWritable> {
                 * serializer.
                 */
                this.serializers = new SpanningRecordSerializer[numChannels];
+               this.bufferConsumers = new Optional[numChannels];
                for (int i = 0; i < numChannels; i++) {
                        serializers[i] = new SpanningRecordSerializer<T>();
+                       bufferConsumers[i] = Optional.empty();
                }
        }
 
@@ -128,6 +134,8 @@ public class RecordWriter<T extends IOReadableWritable> {
                                } else {
                                        BufferBuilder bufferBuilder =
                                                
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
+                                       
checkState(!bufferConsumers[targetChannel].isPresent());
+                                       bufferConsumers[targetChannel] = 
Optional.of(bufferBuilder.createBufferConsumer());
                                        result = 
serializer.setNextBufferBuilder(bufferBuilder);
                                }
                        }
@@ -166,18 +174,11 @@ public class RecordWriter<T extends IOReadableWritable> {
        }
 
        public void clearBuffers() {
-               for (RecordSerializer<?> serializer : serializers) {
+               for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
+                       RecordSerializer<?> serializer = 
serializers[targetChannel];
                        synchronized (serializer) {
-                               try {
-                                       Buffer buffer = 
serializer.getCurrentBuffer();
-
-                                       if (buffer != null) {
-                                               buffer.recycleBuffer();
-                                       }
-                               }
-                               finally {
-                                       serializer.clear();
-                               }
+                               closeBufferConsumer(targetChannel);
+                               serializer.clear();
                        }
                }
        }
@@ -190,8 +191,8 @@ public class RecordWriter<T extends IOReadableWritable> {
        }
 
        /**
-        * Writes the buffer to the {@link ResultPartitionWriter} and removes 
the
-        * buffer from the serializer state.
+        * Tries to consume serialized data and (if data present) writes them 
to the {@link ResultPartitionWriter}.
+        * After writing it clean ups the state.
         *
         * <p><b>Needs to be synchronized on the serializer!</b>
         *
@@ -201,17 +202,25 @@ public class RecordWriter<T extends IOReadableWritable> {
                        int targetChannel,
                        RecordSerializer<T> serializer) throws IOException {
 
-               Buffer buffer = serializer.getCurrentBuffer();
-               if (buffer == null) {
+               Optional<BufferConsumer> bufferConsumer = 
bufferConsumers[targetChannel];
+               if (!bufferConsumer.isPresent()) {
                        return false;
                }
 
-               numBytesOut.inc(buffer.getSizeUnsafe());
+               numBytesOut.inc(bufferConsumer.get().getWrittenBytes());
                try {
-                       targetPartition.writeBuffer(buffer, targetChannel);
+                       
targetPartition.writeBuffer(bufferConsumer.get().build(), targetChannel);
                        return true;
                } finally {
                        serializer.clear();
+                       closeBufferConsumer(targetChannel);
+               }
+       }
+
+       private void closeBufferConsumer(int targetChannel) {
+               if (bufferConsumers[targetChannel].isPresent()) {
+                       bufferConsumers[targetChannel].get().close();
+                       bufferConsumers[targetChannel] = Optional.empty();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index ff59f96..bac141f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.buffer;
 import org.apache.flink.core.memory.MemorySegment;
 
 import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
 
 import java.nio.ByteBuffer;
 
@@ -28,8 +29,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Not thread safe class for filling in the initial content of the {@link 
Buffer}. Once writing to the builder
- * is complete, {@link Buffer} instance can be built and shared across 
multiple threads.
+ * Not thread safe class for filling in the content of the {@link 
MemorySegment}. To access written data please use
+ * {@link BufferConsumer} which allows to build {@link Buffer} instances from 
the written data.
  */
 @NotThreadSafe
 public class BufferBuilder {
@@ -37,9 +38,9 @@ public class BufferBuilder {
 
        private final BufferRecycler recycler;
 
-       private int position = 0;
+       private final SettablePositionMarker positionMarker = new 
SettablePositionMarker();
 
-       private boolean built = false;
+       private boolean bufferConsumerCreated = false;
 
        public BufferBuilder(MemorySegment memorySegment, BufferRecycler 
recycler) {
                this.memorySegment = checkNotNull(memorySegment);
@@ -47,36 +48,133 @@ public class BufferBuilder {
        }
 
        /**
+        * @return created matching instance of {@link BufferConsumer} to this 
{@link BufferBuilder}. There can exist only
+        * one {@link BufferConsumer} per each {@link BufferBuilder} and vice 
versa.
+        */
+       public BufferConsumer createBufferConsumer() {
+               checkState(!bufferConsumerCreated, "There can not exists two 
BufferConsumer for one BufferBuilder");
+               bufferConsumerCreated = true;
+               return new BufferConsumer(
+                       memorySegment,
+                       recycler,
+                       positionMarker);
+       }
+
+       /**
+        * Append as many data as possible from {@code source}. Not everything 
might be copied if there is not enough
+        * space in the underlying {@link MemorySegment}
+        *
         * @return number of copied bytes
         */
        public int append(ByteBuffer source) {
-               checkState(!built);
+               checkState(!isFinished());
 
                int needed = source.remaining();
-               int available = limit() - position;
+               int available = getMaxCapacity() - positionMarker.getCached();
                int toCopy = Math.min(needed, available);
 
-               memorySegment.put(position, source, toCopy);
-               position += toCopy;
+               memorySegment.put(positionMarker.getCached(), source, toCopy);
+               positionMarker.move(toCopy);
                return toCopy;
        }
 
-       public boolean isFull() {
-               checkState(position <= limit());
-               return position == limit();
+       /**
+        * Mark this {@link BufferBuilder} and associated {@link 
BufferConsumer} as finished - no new data writes will be
+        * allowed.
+        *
+        * @return number of written bytes.
+        */
+       public int finish() {
+               checkState(!isFinished());
+               positionMarker.markFinished();
+               return getWrittenBytes();
        }
 
-       public Buffer build() {
-               checkState(!built);
-               built = true;
-               return new NetworkBuffer(memorySegment, recycler, true, 
position);
+       public boolean isFinished() {
+               return positionMarker.isFinished();
+       }
+
+       public boolean isFull() {
+               checkState(positionMarker.getCached() <= getMaxCapacity());
+               return positionMarker.getCached() == getMaxCapacity();
        }
 
        public boolean isEmpty() {
-               return position == 0;
+               return positionMarker.getCached() == 0;
        }
 
-       private int limit() {
+       public int getMaxCapacity() {
                return memorySegment.size();
        }
+
+       public int getWrittenBytes() {
+               return positionMarker.getCached();
+       }
+
+       /**
+        * Holds a reference to the current writer position. Negative values 
indicate that writer ({@link BufferBuilder}
+        * has finished. Value {@code Integer.MIN_VALUE} represents finished 
empty buffer.
+        */
+       @ThreadSafe
+       interface PositionMarker {
+               int FINISHED_EMPTY = Integer.MIN_VALUE;
+
+               int get();
+
+               static boolean isFinished(int position) {
+                       return position < 0;
+               }
+
+               static int getAbsolute(int position) {
+                       if (position == FINISHED_EMPTY) {
+                               return 0;
+                       }
+                       return Math.abs(position);
+               }
+       }
+
+       /**
+        * Cached writing implementation of {@link PositionMarker}.
+        *
+        * <p>Writer ({@link BufferBuilder}) and reader ({@link 
BufferConsumer}) caches must be implemented independently
+        * of one another - for example the cached values can not accidentally 
leak from one to another.
+        */
+       private static class SettablePositionMarker implements PositionMarker {
+               private volatile int position = 0;
+
+               /**
+                * Locally cached value of volatile {@code position} to avoid 
unnecessary volatile accesses.
+                */
+               private int cachedPosition = 0;
+
+               @Override
+               public int get() {
+                       return position;
+               }
+
+               public boolean isFinished() {
+                       return PositionMarker.isFinished(cachedPosition);
+               }
+
+               public int getCached() {
+                       return PositionMarker.getAbsolute(cachedPosition);
+               }
+
+               public void markFinished() {
+                       int newValue = -getCached();
+                       if (newValue == 0) {
+                               newValue = FINISHED_EMPTY;
+                       }
+                       set(newValue);
+               }
+
+               public void move(int offset) {
+                       set(cachedPosition + offset);
+               }
+
+               public void set(int value) {
+                       cachedPosition = value;
+                       position = cachedPosition;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
new file mode 100644
index 0000000..177bac0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder.PositionMarker;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Closeable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Not thread safe class for producing {@link Buffer}.
+ *
+ * <p>It reads data written by {@link BufferBuilder}.
+ * Although it is not thread safe and can be used only by one single thread, 
this thread can be different then the
+ * thread using/writing to {@link BufferBuilder}. Pattern here is simple: one 
thread writes data to
+ * {@link BufferBuilder} and there can be a different thread reading from it 
using {@link BufferConsumer}.
+ */
+@NotThreadSafe
+public class BufferConsumer implements Closeable {
+       private final Buffer buffer;
+
+       private final CachedPositionMarker writerPosition;
+
+       private int currentReaderPosition = 0;
+
+       public BufferConsumer(
+                       MemorySegment memorySegment,
+                       BufferRecycler recycler,
+                       PositionMarker currentWriterPosition) {
+
+               this.buffer = new NetworkBuffer(checkNotNull(memorySegment), 
checkNotNull(recycler), true);
+               this.writerPosition = new 
CachedPositionMarker(checkNotNull(currentWriterPosition));
+       }
+
+       public boolean isFinished() {
+               return writerPosition.isFinished();
+       }
+
+       /**
+        * @return sliced {@link Buffer} containing the not yet consumed data. 
Returned {@link Buffer} shares the reference
+        * counter with the parent {@link BufferConsumer} - in order to recycle 
memory both of them must be recycled/closed.
+        */
+       public Buffer build() {
+               writerPosition.update();
+               Buffer slice = buffer.readOnlySlice(currentReaderPosition, 
writerPosition.getCached() - currentReaderPosition);
+               currentReaderPosition = writerPosition.getCached();
+               return slice.retainBuffer();
+       }
+
+       @Override
+       public void close() {
+               if (!buffer.isRecycled()) {
+                       buffer.recycleBuffer();
+               }
+       }
+
+       public int getWrittenBytes() {
+               return writerPosition.getCached();
+       }
+
+       /**
+        * Cached reading wrapper around {@link PositionMarker}.
+        *
+        * <p>Writer ({@link BufferBuilder}) and reader ({@link 
BufferConsumer}) caches must be implemented independently
+        * of one another - for example the cached values can not accidentally 
leak from one to another.
+        */
+       private static class CachedPositionMarker {
+               private final PositionMarker positionMarker;
+
+               /**
+                * Locally cached value of {@link PositionMarker} to avoid 
unnecessary volatile accesses.
+                */
+               private int cachedPosition;
+
+               public CachedPositionMarker(PositionMarker positionMarker) {
+                       this.positionMarker = checkNotNull(positionMarker);
+                       update();
+               }
+
+               public boolean isFinished() {
+                       return PositionMarker.isFinished(cachedPosition);
+               }
+
+               public int getCached() {
+                       return PositionMarker.getAbsolute(cachedPosition);
+               }
+
+               private void update() {
+                       this.cachedPosition = positionMarker.get();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index a36d6be..0a311aa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -180,8 +180,7 @@ class LocalBufferPool implements BufferPool {
        @Override
        public Buffer requestBuffer() throws IOException {
                try {
-                       BufferBuilder bufferBuilder = 
requestBufferBuilder(false);
-                       return bufferBuilder != null ? bufferBuilder.build() : 
null;
+                       return toBuffer(requestMemorySegment(false));
                }
                catch (InterruptedException e) {
                        throw new IOException(e);
@@ -190,16 +189,29 @@ class LocalBufferPool implements BufferPool {
 
        @Override
        public Buffer requestBufferBlocking() throws IOException, 
InterruptedException {
-               BufferBuilder bufferBuilder = requestBufferBuilder(true);
-               return bufferBuilder != null ? bufferBuilder.build() : null;
+               return toBuffer(requestMemorySegment(true));
        }
 
        @Override
        public BufferBuilder requestBufferBuilderBlocking() throws IOException, 
InterruptedException {
-               return requestBufferBuilder(true);
+               return toBufferBuilder(requestMemorySegment(true));
        }
 
-       private BufferBuilder requestBufferBuilder(boolean isBlocking) throws 
InterruptedException, IOException {
+       private Buffer toBuffer(MemorySegment memorySegment) {
+               if (memorySegment == null) {
+                       return null;
+               }
+               return new NetworkBuffer(memorySegment, this);
+       }
+
+       private BufferBuilder toBufferBuilder(MemorySegment memorySegment) {
+               if (memorySegment == null) {
+                       return null;
+               }
+               return new BufferBuilder(memorySegment, this);
+       }
+
+       private MemorySegment requestMemorySegment(boolean isBlocking) throws 
InterruptedException, IOException {
                synchronized (availableMemorySegments) {
                        returnExcessMemorySegments();
 
@@ -234,7 +246,7 @@ class LocalBufferPool implements BufferPool {
                                }
                        }
 
-                       return new 
BufferBuilder(availableMemorySegments.poll(), this);
+                       return availableMemorySegments.poll();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index af26d8d..356f210 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType;
 import org.apache.flink.testutils.serialization.types.IntType;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
@@ -33,6 +34,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 
 /**
@@ -134,7 +136,8 @@ public class SpanningRecordSerializationTest {
 
                // 
-------------------------------------------------------------------------------------------------------------
 
-               
serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
+               BufferBuilder bufferBuilder = createBufferBuilder(segmentSize);
+               serializer.setNextBufferBuilder(bufferBuilder);
 
                int numRecords = 0;
                for (SerializationTestType record : records) {
@@ -146,7 +149,7 @@ public class SpanningRecordSerializationTest {
                        // serialize record
                        if (serializer.addRecord(record).isFullBuffer()) {
                                // buffer is full => start deserializing
-                               
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(),
 segmentSize);
+                               
deserializer.setNextBuffer(buildSingleBuffer(bufferBuilder));
 
                                while (!serializedRecords.isEmpty()) {
                                        SerializationTestType expected = 
serializedRecords.poll();
@@ -162,14 +165,18 @@ public class SpanningRecordSerializationTest {
                                }
 
                                // move buffers as long as necessary (for long 
records)
-                               while 
(serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)).isFullBuffer())
 {
-                                       
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(),
 segmentSize);
+                               bufferBuilder = 
createBufferBuilder(segmentSize);
+                               serializer.clear();
+                               while 
(serializer.setNextBufferBuilder(bufferBuilder).isFullBuffer()) {
+                                       
deserializer.setNextBuffer(buildSingleBuffer(bufferBuilder));
+                                       bufferBuilder = 
createBufferBuilder(segmentSize);
+                                       serializer.clear();
                                }
                        }
                }
 
                // deserialize left over records
-               
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(),
 segmentSize);
+               deserializer.setNextBuffer(buildSingleBuffer(bufferBuilder));
 
                while (!serializedRecords.isEmpty()) {
                        SerializationTestType expected = 
serializedRecords.poll();

http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
new file mode 100644
index 0000000..b2cccb5
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link BufferBuilder}.
+ */
+public class BufferBuilderAndConsumerTest {
+       private static final int BUFFER_INT_SIZE = 10;
+       private static final int BUFFER_SIZE = BUFFER_INT_SIZE * Integer.BYTES;
+
+       @Test
+       public void referenceCounting() {
+               BufferBuilder bufferBuilder = createBufferBuilder();
+               BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumer();
+
+               assertEquals(3 * Integer.BYTES, 
bufferBuilder.append(toByteBuffer(1, 2, 3)));
+
+               Buffer buffer = bufferConsumer.build();
+               assertFalse(buffer.isRecycled());
+               buffer.recycleBuffer();
+               assertFalse(buffer.isRecycled());
+               bufferConsumer.close();
+               assertTrue(buffer.isRecycled());
+       }
+
+       @Test
+       public void append() {
+               BufferBuilder bufferBuilder = createBufferBuilder();
+
+               int[] intsToWrite = new int[] {0, 1, 2, 3, 42};
+               ByteBuffer bytesToWrite = toByteBuffer(intsToWrite);
+
+               assertEquals(bytesToWrite.limit(), 
bufferBuilder.append(bytesToWrite));
+
+               assertEquals(bytesToWrite.limit(), bytesToWrite.position());
+               assertFalse(bufferBuilder.isFull());
+
+               assertContent(bufferBuilder.createBufferConsumer(), 
intsToWrite);
+       }
+
+       @Test
+       public void multipleAppends() {
+               BufferBuilder bufferBuilder = createBufferBuilder();
+               BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumer();
+
+               bufferBuilder.append(toByteBuffer(0, 1));
+               bufferBuilder.append(toByteBuffer(2));
+               bufferBuilder.append(toByteBuffer(3, 42));
+
+               assertContent(bufferConsumer, 0, 1, 2, 3, 42);
+       }
+
+       @Test
+       public void appendOverSize() {
+               BufferBuilder bufferBuilder = createBufferBuilder();
+               BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumer();
+               ByteBuffer bytesToWrite = toByteBuffer(0, 1, 2, 3, 4, 5, 6, 7, 
8, 9, 42);
+
+               assertEquals(BUFFER_SIZE, bufferBuilder.append(bytesToWrite));
+
+               assertTrue(bufferBuilder.isFull());
+               assertContent(bufferConsumer, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+               bufferBuilder = createBufferBuilder();
+               bufferConsumer = bufferBuilder.createBufferConsumer();
+               assertEquals(Integer.BYTES, bufferBuilder.append(bytesToWrite));
+
+               assertFalse(bufferBuilder.isFull());
+               assertContent(bufferConsumer, 42);
+       }
+
+       @Test
+       public void buildEmptyBuffer() {
+               Buffer buffer = buildSingleBuffer(createBufferBuilder());
+               assertEquals(0, buffer.getSize());
+               assertContent(buffer);
+       }
+
+       @Test
+       public void buildingBufferMultipleTimes() {
+               BufferBuilder bufferBuilder = createBufferBuilder();
+               try (BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumer()) {
+                       bufferBuilder.append(toByteBuffer(0, 1));
+                       bufferBuilder.append(toByteBuffer(2));
+
+                       assertContent(bufferConsumer, 0, 1, 2);
+
+                       bufferBuilder.append(toByteBuffer(3, 42));
+                       bufferBuilder.append(toByteBuffer(44));
+
+                       assertContent(bufferConsumer, 3, 42, 44);
+
+                       ArrayList<Integer> originalValues = new ArrayList<>();
+                       while (!bufferBuilder.isFull()) {
+                               bufferBuilder.append(toByteBuffer(1337));
+                               originalValues.add(1337);
+                       }
+
+                       assertContent(bufferConsumer, 
originalValues.stream().mapToInt(Integer::intValue).toArray());
+               }
+       }
+
+       @Test
+       public void emptyIsFinished() {
+               testIsFinished(0);
+       }
+
+       @Test
+       public void partiallyFullIsFinished() {
+               testIsFinished(BUFFER_INT_SIZE / 2);
+       }
+
+       @Test
+       public void fullIsFinished() {
+               testIsFinished(BUFFER_INT_SIZE);
+       }
+
+       private static void testIsFinished(int writes) {
+               BufferBuilder bufferBuilder = createBufferBuilder();
+               BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumer();
+
+               for (int i = 0; i < writes; i++) {
+                       assertEquals(Integer.BYTES, 
bufferBuilder.append(toByteBuffer(42)));
+               }
+
+               assertFalse(bufferBuilder.isFinished());
+               assertFalse(bufferConsumer.isFinished());
+
+               bufferConsumer.build();
+
+               assertFalse(bufferBuilder.isFinished());
+               assertFalse(bufferConsumer.isFinished());
+
+               bufferBuilder.finish();
+
+               assertTrue(bufferBuilder.isFinished());
+               assertFalse(bufferConsumer.isFinished());
+
+               bufferConsumer.build();
+
+               assertTrue(bufferConsumer.isFinished());
+       }
+
+       private static ByteBuffer toByteBuffer(int... data) {
+               ByteBuffer byteBuffer = ByteBuffer.allocate(data.length * 
Integer.BYTES);
+               byteBuffer.asIntBuffer().put(data);
+               return byteBuffer;
+       }
+
+       private static void assertContent(BufferConsumer actualConsumer, int... 
expected) {
+               assertFalse(actualConsumer.isFinished());
+               Buffer buffer = actualConsumer.build();
+               assertFalse(buffer.isRecycled());
+               assertContent(buffer, expected);
+               assertEquals(expected.length * Integer.BYTES, buffer.getSize());
+               buffer.recycleBuffer();
+       }
+
+       private static void assertContent(Buffer actualBuffer, int... expected) 
{
+               IntBuffer actualIntBuffer = 
actualBuffer.getNioBufferReadable().asIntBuffer();
+               int[] actual = new int[actualIntBuffer.limit()];
+               actualIntBuffer.get(actual);
+               assertArrayEquals(expected, actual);
+
+               assertEquals(FreeingBufferRecycler.INSTANCE, 
actualBuffer.getRecycler());
+       }
+
+       private static BufferBuilder createBufferBuilder() {
+               return new 
BufferBuilder(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), 
FreeingBufferRecycler.INSTANCE);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
deleted file mode 100644
index d0df02d..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.buffer;
-
-import org.apache.flink.core.memory.MemorySegmentFactory;
-
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link BufferBuilder}.
- */
-public class BufferBuilderTest {
-       private static final int BUFFER_SIZE = 10 * Integer.BYTES;
-
-       @Test
-       public void append() {
-               BufferBuilder bufferBuilder = createBufferBuilder();
-               int[] intsToWrite = new int[] {0, 1, 2, 3, 42};
-               ByteBuffer bytesToWrite = toByteBuffer(intsToWrite);
-
-               assertEquals(bytesToWrite.limit(), 
bufferBuilder.append(bytesToWrite));
-
-               assertEquals(bytesToWrite.limit(), bytesToWrite.position());
-               assertFalse(bufferBuilder.isFull());
-               Buffer buffer = bufferBuilder.build();
-               assertBufferContent(buffer, intsToWrite);
-               assertEquals(5 * Integer.BYTES, buffer.getSize());
-               assertEquals(FreeingBufferRecycler.INSTANCE, 
buffer.getRecycler());
-       }
-
-       @Test
-       public void multipleAppends() {
-               BufferBuilder bufferBuilder = createBufferBuilder();
-
-               bufferBuilder.append(toByteBuffer(0, 1));
-               bufferBuilder.append(toByteBuffer(2));
-               bufferBuilder.append(toByteBuffer(3, 42));
-
-               Buffer buffer = bufferBuilder.build();
-               assertBufferContent(buffer, 0, 1, 2, 3, 42);
-               assertEquals(5 * Integer.BYTES, buffer.getSize());
-       }
-
-       @Test
-       public void appendOverSize() {
-               BufferBuilder bufferBuilder = createBufferBuilder();
-               ByteBuffer bytesToWrite = toByteBuffer(0, 1, 2, 3, 4, 5, 6, 7, 
8, 9, 42);
-
-               assertEquals(BUFFER_SIZE, bufferBuilder.append(bytesToWrite));
-
-               assertTrue(bufferBuilder.isFull());
-               Buffer buffer = bufferBuilder.build();
-               assertBufferContent(buffer, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
-               assertEquals(BUFFER_SIZE, buffer.getSize());
-
-               bufferBuilder = createBufferBuilder();
-               assertEquals(Integer.BYTES, bufferBuilder.append(bytesToWrite));
-
-               assertFalse(bufferBuilder.isFull());
-               buffer = bufferBuilder.build();
-               assertBufferContent(buffer, 42);
-               assertEquals(Integer.BYTES, buffer.getSize());
-       }
-
-       @Test
-       public void buildEmptyBuffer() {
-               Buffer buffer = createBufferBuilder().build();
-               assertEquals(0, buffer.getSize());
-               assertBufferContent(buffer);
-       }
-
-       @Test(expected = IllegalStateException.class)
-       public void buildingBufferTwice() {
-               BufferBuilder bufferBuilder = createBufferBuilder();
-               bufferBuilder.build();
-               bufferBuilder.build();
-       }
-
-       private static ByteBuffer toByteBuffer(int... data) {
-               ByteBuffer byteBuffer = ByteBuffer.allocate(data.length * 
Integer.BYTES);
-               byteBuffer.asIntBuffer().put(data);
-               return byteBuffer;
-       }
-
-       private static void assertBufferContent(Buffer actualBuffer, int... 
expected) {
-               assertEquals(toByteBuffer(expected), 
actualBuffer.getNioBufferReadable());
-       }
-
-       private static BufferBuilder createBufferBuilder() {
-               return new 
BufferBuilder(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), 
FreeingBufferRecycler.INSTANCE);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
index 1113664..bdbb5e0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
@@ -29,4 +29,10 @@ public class BufferBuilderTestUtils {
                        MemorySegmentFactory.allocateUnpooledSegment(size),
                        FreeingBufferRecycler.INSTANCE);
        }
+
+       public static Buffer buildSingleBuffer(BufferBuilder bufferBuilder) {
+               try (BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumer()) {
+                       return bufferConsumer.build();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index 5fe835a..ef30ee1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
@@ -32,6 +33,7 @@ import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 import static org.mockito.Mockito.when;
 
@@ -69,13 +71,15 @@ public class IteratorWrappingTestSingleInputGate<T extends 
IOReadableWritable> e
                        @Override
                        public InputChannel.BufferAndAvailability 
answer(InvocationOnMock invocationOnMock) throws Throwable {
                                if (hasData) {
-                                       
serializer.setNextBufferBuilder(createBufferBuilder(bufferSize));
+                                       serializer.clear();
+                                       BufferBuilder bufferBuilder = 
createBufferBuilder(bufferSize);
+                                       
serializer.setNextBufferBuilder(bufferBuilder);
                                        serializer.addRecord(reuse);
 
                                        hasData = inputIterator.next(reuse) != 
null;
 
                                        // Call getCurrentBuffer to ensure size 
is set
-                                       return new 
InputChannel.BufferAndAvailability(serializer.getCurrentBuffer(), true, 0);
+                                       return new 
InputChannel.BufferAndAvailability(buildSingleBuffer(bufferBuilder), true, 0);
                                } else {
                                        
when(inputChannel.getInputChannel().isReleased()).thenReturn(true);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
index cb67d48..994d02e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -324,7 +325,7 @@ public class BackPressureStatsTrackerImplITCase extends 
TestLogger {
                        while (true) {
                                final BufferBuilder bufferBuilder = 
testBufferPool.requestBufferBuilderBlocking();
                                // Got a buffer, yay!
-                               bufferBuilder.build().recycleBuffer();
+                               
BufferBuilderTestUtils.buildSingleBuffer(bufferBuilder).recycleBuffer();
 
                                new CountDownLatch(1).await();
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/306fd8e4/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 11d8f11..e938ac8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -38,6 +39,7 @@ import org.mockito.stubbing.Answer;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
@@ -102,12 +104,14 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                                        } else if (input != null && 
input.isStreamRecord()) {
                                                Object inputElement = 
input.getStreamRecord();
 
-                                               
recordSerializer.setNextBufferBuilder(createBufferBuilder(bufferSize));
+                                               BufferBuilder bufferBuilder = 
createBufferBuilder(bufferSize);
+                                               
recordSerializer.setNextBufferBuilder(bufferBuilder);
                                                
delegate.setInstance(inputElement);
                                                
recordSerializer.addRecord(delegate);
+                                               bufferBuilder.finish();
 
                                                // Call getCurrentBuffer to 
ensure size is set
-                                               return new 
BufferAndAvailability(recordSerializer.getCurrentBuffer(), false, 0);
+                                               return new 
BufferAndAvailability(buildSingleBuffer(bufferBuilder), false, 0);
                                        } else if (input != null && 
input.isEvent()) {
                                                AbstractEvent event = 
input.getEvent();
                                                return new 
BufferAndAvailability(EventSerializer.toBuffer(event), false, 0);

Reply via email to