[FLINK-8582][runtime] Optimize BufferBuilder writes

By introducing #commit() method on critical path we reduce number of volatile 
writes from 2 down to 1.
This improves network throughput by 20% and restores the orignal performance 
for high latency cases.

This closes #5423.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc55d7a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc55d7a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc55d7a0

Branch: refs/heads/master
Commit: bc55d7a084f5bf109c3bc2ff134699c997648552
Parents: 4c38b38
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Thu Feb 8 09:16:30 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Mon Feb 19 15:05:24 2018 +0100

----------------------------------------------------------------------
 .../serialization/SpanningRecordSerializer.java | 32 +++++++--------
 .../io/network/buffer/BufferBuilder.java        | 23 +++++++++++
 .../buffer/BufferBuilderAndConsumerTest.java    | 42 ++++++++++++++------
 .../network/buffer/BufferBuilderTestUtils.java  |  2 +-
 .../partition/PipelinedSubpartitionTest.java    |  4 +-
 .../consumer/LocalInputChannelTest.java         |  2 +-
 6 files changed, 70 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc55d7a0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index ba8e659..d7befeb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 
@@ -94,8 +93,11 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
                dataBuffer = serializationBuffer.wrapAsByteBuffer();
 
                // Copy from intermediate buffers to current target memory 
segment
-               copyToTargetBufferFrom(lengthBuffer);
-               copyToTargetBufferFrom(dataBuffer);
+               if (targetBuffer != null) {
+                       targetBuffer.append(lengthBuffer);
+                       targetBuffer.append(dataBuffer);
+                       targetBuffer.commit();
+               }
 
                return getSerializationResult();
        }
@@ -104,12 +106,19 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
        public SerializationResult setNextBufferBuilder(BufferBuilder buffer) 
throws IOException {
                targetBuffer = buffer;
 
+               boolean mustCommit = false;
                if (lengthBuffer.hasRemaining()) {
-                       copyToTargetBufferFrom(lengthBuffer);
+                       targetBuffer.append(lengthBuffer);
+                       mustCommit = true;
                }
 
                if (dataBuffer.hasRemaining()) {
-                       copyToTargetBufferFrom(dataBuffer);
+                       targetBuffer.append(dataBuffer);
+                       mustCommit = true;
+               }
+
+               if (mustCommit) {
+                       targetBuffer.commit();
                }
 
                SerializationResult result = getSerializationResult();
@@ -124,19 +133,6 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
                return result;
        }
 
-       /**
-        * Copies as many bytes as possible from the given {@link ByteBuffer} 
to the {@link MemorySegment} of the target
-        * {@link Buffer} and advances the current position by the number of 
written bytes.
-        *
-        * @param source the {@link ByteBuffer} to copy data from
-        */
-       private void copyToTargetBufferFrom(ByteBuffer source) {
-               if (targetBuffer == null) {
-                       return;
-               }
-               targetBuffer.append(source);
-       }
-
        private SerializationResult getSerializationResult() {
                if (dataBuffer.hasRemaining() || lengthBuffer.hasRemaining()) {
                        return 
SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL;

http://git-wip-us.apache.org/repos/asf/flink/blob/bc55d7a0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index bac141f..63b60d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -61,6 +61,15 @@ public class BufferBuilder {
        }
 
        /**
+        * Same as {@link #append(ByteBuffer)} but additionally {@link 
#commit()} the appending.
+        */
+       public int appendAndCommit(ByteBuffer source) {
+               int writtenBytes = append(source);
+               commit();
+               return writtenBytes;
+       }
+
+       /**
         * Append as many data as possible from {@code source}. Not everything 
might be copied if there is not enough
         * space in the underlying {@link MemorySegment}
         *
@@ -79,6 +88,14 @@ public class BufferBuilder {
        }
 
        /**
+        * Make the change visible to the readers. This is costly operation 
(volatile access) thus in case of bulk writes
+        * it's better to commit them all together instead one by one.
+        */
+       public void commit() {
+               positionMarker.commit();
+       }
+
+       /**
         * Mark this {@link BufferBuilder} and associated {@link 
BufferConsumer} as finished - no new data writes will be
         * allowed.
         *
@@ -87,6 +104,7 @@ public class BufferBuilder {
        public int finish() {
                checkState(!isFinished());
                positionMarker.markFinished();
+               commit();
                return getWrittenBytes();
        }
 
@@ -138,6 +156,8 @@ public class BufferBuilder {
         *
         * <p>Writer ({@link BufferBuilder}) and reader ({@link 
BufferConsumer}) caches must be implemented independently
         * of one another - for example the cached values can not accidentally 
leak from one to another.
+        *
+        * <p>Remember to commit the {@link SettablePositionMarker} to make the 
changes visible.
         */
        private static class SettablePositionMarker implements PositionMarker {
                private volatile int position = 0;
@@ -174,6 +194,9 @@ public class BufferBuilder {
 
                public void set(int value) {
                        cachedPosition = value;
+               }
+
+               public void commit() {
                        position = cachedPosition;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc55d7a0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
index a20397d..edf2bfe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
@@ -44,7 +44,7 @@ public class BufferBuilderAndConsumerTest {
                BufferBuilder bufferBuilder = createBufferBuilder();
                BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumer();
 
-               assertEquals(3 * Integer.BYTES, 
bufferBuilder.append(toByteBuffer(1, 2, 3)));
+               assertEquals(3 * Integer.BYTES, 
bufferBuilder.appendAndCommit(toByteBuffer(1, 2, 3)));
 
                Buffer buffer = bufferConsumer.build();
                assertFalse(buffer.isRecycled());
@@ -61,7 +61,7 @@ public class BufferBuilderAndConsumerTest {
                int[] intsToWrite = new int[] {0, 1, 2, 3, 42};
                ByteBuffer bytesToWrite = toByteBuffer(intsToWrite);
 
-               assertEquals(bytesToWrite.limit(), 
bufferBuilder.append(bytesToWrite));
+               assertEquals(bytesToWrite.limit(), 
bufferBuilder.appendAndCommit(bytesToWrite));
 
                assertEquals(bytesToWrite.limit(), bytesToWrite.position());
                assertFalse(bufferBuilder.isFull());
@@ -74,10 +74,26 @@ public class BufferBuilderAndConsumerTest {
                BufferBuilder bufferBuilder = createBufferBuilder();
                BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumer();
 
+               bufferBuilder.appendAndCommit(toByteBuffer(0, 1));
+               bufferBuilder.appendAndCommit(toByteBuffer(2));
+               bufferBuilder.appendAndCommit(toByteBuffer(3, 42));
+
+               assertContent(bufferConsumer, 0, 1, 2, 3, 42);
+       }
+
+       @Test
+       public void multipleNotCommittedAppends() {
+               BufferBuilder bufferBuilder = createBufferBuilder();
+               BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumer();
+
                bufferBuilder.append(toByteBuffer(0, 1));
                bufferBuilder.append(toByteBuffer(2));
                bufferBuilder.append(toByteBuffer(3, 42));
 
+               assertContent(bufferConsumer);
+
+               bufferBuilder.commit();
+
                assertContent(bufferConsumer, 0, 1, 2, 3, 42);
        }
 
@@ -87,14 +103,14 @@ public class BufferBuilderAndConsumerTest {
                BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumer();
                ByteBuffer bytesToWrite = toByteBuffer(0, 1, 2, 3, 4, 5, 6, 7, 
8, 9, 42);
 
-               assertEquals(BUFFER_SIZE, bufferBuilder.append(bytesToWrite));
+               assertEquals(BUFFER_SIZE, 
bufferBuilder.appendAndCommit(bytesToWrite));
 
                assertTrue(bufferBuilder.isFull());
                assertContent(bufferConsumer, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
 
                bufferBuilder = createBufferBuilder();
                bufferConsumer = bufferBuilder.createBufferConsumer();
-               assertEquals(Integer.BYTES, bufferBuilder.append(bytesToWrite));
+               assertEquals(Integer.BYTES, 
bufferBuilder.appendAndCommit(bytesToWrite));
 
                assertFalse(bufferBuilder.isFull());
                assertContent(bufferConsumer, 42);
@@ -112,18 +128,18 @@ public class BufferBuilderAndConsumerTest {
                BufferBuilder bufferBuilder = createBufferBuilder();
                BufferConsumer bufferConsumer1 = 
bufferBuilder.createBufferConsumer();
 
-               bufferBuilder.append(toByteBuffer(0, 1));
+               bufferBuilder.appendAndCommit(toByteBuffer(0, 1));
 
                BufferConsumer bufferConsumer2 = bufferConsumer1.copy();
 
-               bufferBuilder.append(toByteBuffer(2));
+               bufferBuilder.appendAndCommit(toByteBuffer(2));
 
                assertContent(bufferConsumer1, 0, 1, 2);
                assertContent(bufferConsumer2, 0, 1, 2);
 
                BufferConsumer bufferConsumer3 = bufferConsumer1.copy();
 
-               bufferBuilder.append(toByteBuffer(3, 42));
+               bufferBuilder.appendAndCommit(toByteBuffer(3, 42));
 
                BufferConsumer bufferConsumer4 = bufferConsumer1.copy();
 
@@ -144,19 +160,19 @@ public class BufferBuilderAndConsumerTest {
        public void buildingBufferMultipleTimes() {
                BufferBuilder bufferBuilder = createBufferBuilder();
                try (BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumer()) {
-                       bufferBuilder.append(toByteBuffer(0, 1));
-                       bufferBuilder.append(toByteBuffer(2));
+                       bufferBuilder.appendAndCommit(toByteBuffer(0, 1));
+                       bufferBuilder.appendAndCommit(toByteBuffer(2));
 
                        assertContent(bufferConsumer, 0, 1, 2);
 
-                       bufferBuilder.append(toByteBuffer(3, 42));
-                       bufferBuilder.append(toByteBuffer(44));
+                       bufferBuilder.appendAndCommit(toByteBuffer(3, 42));
+                       bufferBuilder.appendAndCommit(toByteBuffer(44));
 
                        assertContent(bufferConsumer, 3, 42, 44);
 
                        ArrayList<Integer> originalValues = new ArrayList<>();
                        while (!bufferBuilder.isFull()) {
-                               bufferBuilder.append(toByteBuffer(1337));
+                               
bufferBuilder.appendAndCommit(toByteBuffer(1337));
                                originalValues.add(1337);
                        }
 
@@ -184,7 +200,7 @@ public class BufferBuilderAndConsumerTest {
                BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumer();
 
                for (int i = 0; i < writes; i++) {
-                       assertEquals(Integer.BYTES, 
bufferBuilder.append(toByteBuffer(42)));
+                       assertEquals(Integer.BYTES, 
bufferBuilder.appendAndCommit(toByteBuffer(42)));
                }
 
                assertFalse(bufferBuilder.isFinished());

http://git-wip-us.apache.org/repos/asf/flink/blob/bc55d7a0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
index ead42df..a6e9fdc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
@@ -43,7 +43,7 @@ public class BufferBuilderTestUtils {
                BufferBuilder bufferBuilder = new BufferBuilder(
                        MemorySegmentFactory.allocateUnpooledSegment(size),
                        FreeingBufferRecycler.INSTANCE);
-               bufferBuilder.append(ByteBuffer.allocate(dataSize));
+               bufferBuilder.appendAndCommit(ByteBuffer.allocate(dataSize));
                return bufferBuilder;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc55d7a0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 4f3a5f9..2ca01c8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -131,7 +131,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                        assertEquals(0, 
availablityListener.getNumNotifications());
 
                        BufferBuilder bufferBuilder = createBufferBuilder();
-                       bufferBuilder.append(ByteBuffer.allocate(1024));
+                       
bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
                        subpartition.add(bufferBuilder.createBufferConsumer());
 
                        assertNextBuffer(readView, 1024, false, 1);
@@ -360,7 +360,7 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
                                        next++;
                                }
 
-                               
checkState(bufferBuilder.append(ByteBuffer.wrap(segment.getArray())) == 
segmentSize);
+                               
checkState(bufferBuilder.appendAndCommit(ByteBuffer.wrap(segment.getArray())) 
== segmentSize);
                                bufferBuilder.finish();
 
                                numberOfBuffers++;

http://git-wip-us.apache.org/repos/asf/flink/blob/bc55d7a0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index d5c2492..c78b7b9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -449,7 +449,7 @@ public class LocalInputChannelTest {
                        if (channelIndexes.size() > 0) {
                                final int channelIndex = 
channelIndexes.remove(0);
                                BufferBuilder bufferBuilder = 
bufferProvider.requestBufferBuilderBlocking();
-                               bufferBuilder.append(ByteBuffer.wrap(new 
byte[4]));
+                               
bufferBuilder.appendAndCommit(ByteBuffer.wrap(new byte[4]));
                                bufferBuilder.finish();
                                return new 
BufferConsumerAndChannel(bufferBuilder.createBufferConsumer(), channelIndex);
                        }

Reply via email to