Repository: flink
Updated Branches:
  refs/heads/master 3126bf522 -> a144d0f77


[FLINK-8674][runtime] Improve performance of flushAlways in StreamRecordWriter

Reduce the number of data notifications in case of flushAlways = true. Instead 
of notifying all of
the channels/subpartitions, notify only the one that has just been written to.

This closes #5526.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a144d0f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a144d0f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a144d0f7

Branch: refs/heads/master
Commit: a144d0f77fb249aaf609b448e30137bbd3d98a65
Parents: 3126bf5
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Thu Feb 15 14:41:39 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu Feb 22 15:16:01 2018 +0100

----------------------------------------------------------------------
 .../io/network/api/writer/RecordWriter.java     | 27 ++++++++++++++++----
 .../api/writer/ResultPartitionWriter.java       |  9 +++++--
 .../io/network/partition/ResultPartition.java   |  7 ++++-
 .../operators/shipping/OutputCollector.java     |  2 +-
 ...AbstractCollectingResultPartitionWriter.java |  7 ++++-
 .../io/network/api/writer/RecordWriterTest.java | 18 +++++++++----
 .../SlotCountExceedingParallelismTest.java      |  2 +-
 .../ScheduleOrUpdateConsumersTest.java          |  2 +-
 .../TaskCancelAsyncProducerConsumerITCase.java  |  2 +-
 .../apache/flink/runtime/jobmanager/Tasks.scala |  4 +--
 .../runtime/io/RecordWriterOutput.java          |  2 +-
 .../runtime/io/StreamRecordWriter.java          | 19 ++------------
 .../io/benchmark/LongRecordWriterThread.java    |  2 +-
 .../StreamNetworkPointToPointBenchmark.java     |  2 +-
 .../runtime/NetworkStackThroughputITCase.java   |  4 +--
 15 files changed, 67 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index fabac9e..4ec2863 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -43,7 +43,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * <p>The RecordWriter wraps the runtime's {@link ResultPartitionWriter} and 
takes care of
  * serializing records into buffers.
  *
- * <p><strong>Important</strong>: it is necessary to call {@link #flush()} 
after
+ * <p><strong>Important</strong>: it is necessary to call {@link #flushAll()} 
after
  * all records have been written with {@link #emit(IOReadableWritable)}. This
  * ensures that all produced records are written to the output stream (incl.
  * partially filled ones).
@@ -58,13 +58,17 @@ public class RecordWriter<T extends IOReadableWritable> {
 
        private final int numChannels;
 
-       /** {@link RecordSerializer} per outgoing channel. */
+       /**
+        * {@link RecordSerializer} per outgoing channel.
+        */
        private final RecordSerializer<T>[] serializers;
 
        private final Optional<BufferBuilder>[] bufferBuilders;
 
        private final Random rng = new XORShiftRandom();
 
+       private final boolean flushAlways;
+
        private Counter numBytesOut = new SimpleCounter();
 
        public RecordWriter(ResultPartitionWriter writer) {
@@ -73,6 +77,11 @@ public class RecordWriter<T extends IOReadableWritable> {
 
        @SuppressWarnings("unchecked")
        public RecordWriter(ResultPartitionWriter writer, ChannelSelector<T> 
channelSelector) {
+               this(writer, channelSelector, false);
+       }
+
+       public RecordWriter(ResultPartitionWriter writer, ChannelSelector<T> 
channelSelector, boolean flushAlways) {
+               this.flushAlways = flushAlways;
                this.targetPartition = writer;
                this.channelSelector = channelSelector;
 
@@ -135,9 +144,13 @@ public class RecordWriter<T extends IOReadableWritable> {
                        result = 
serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
                }
                checkState(!serializer.hasSerializedData(), "All data should be 
written at once");
+
+               if (flushAlways) {
+                       targetPartition.flush(targetChannel);
+               }
        }
 
-       public BufferConsumer broadcastEvent(AbstractEvent event) throws 
IOException, InterruptedException {
+       public BufferConsumer broadcastEvent(AbstractEvent event) throws 
IOException {
                try (BufferConsumer eventBufferConsumer = 
EventSerializer.toBufferConsumer(event)) {
                        for (int targetChannel = 0; targetChannel < 
numChannels; targetChannel++) {
                                RecordSerializer<T> serializer = 
serializers[targetChannel];
@@ -147,12 +160,16 @@ public class RecordWriter<T extends IOReadableWritable> {
                                // retain the buffer so that it can be recycled 
by each channel of targetPartition
                                
targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);
                        }
+
+                       if (flushAlways) {
+                               flushAll();
+                       }
                        return eventBufferConsumer;
                }
        }
 
-       public void flush() {
-               targetPartition.flush();
+       public void flushAll() {
+               targetPartition.flushAll();
        }
 
        public void clearBuffers() {

http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 02049d9..4b5986e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -52,7 +52,12 @@ public interface ResultPartitionWriter {
        void addBufferConsumer(BufferConsumer bufferConsumer, int 
subpartitionIndex) throws IOException;
 
        /**
-        * Manually trigger consumption from enqueued {@link BufferConsumer 
BufferConsumers}.
+        * Manually trigger consumption from enqueued {@link BufferConsumer 
BufferConsumers} in all subpartitions.
         */
-       void flush();
+       void flushAll();
+
+       /**
+        * Manually trigger consumption from enqueued {@link BufferConsumer 
BufferConsumers} in one specified subpartition.
+        */
+       void flush(int subpartitionIndex);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 25a076b..fbbfa4b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -258,12 +258,17 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
        }
 
        @Override
-       public void flush() {
+       public void flushAll() {
                for (ResultSubpartition subpartition : subpartitions) {
                        subpartition.flush();
                }
        }
 
+       @Override
+       public void flush(int subpartitionIndex) {
+               subpartitions[subpartitionIndex].flush();
+       }
+
        /**
         * Finishes the result partition.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
index 382ae39..73c531c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
@@ -82,7 +82,7 @@ public class OutputCollector<T> implements Collector<T> {
        public void close() {
                for (RecordWriter<?> writer : writers) {
                        writer.clearBuffers();
-                       writer.flush();
+                       writer.flushAll();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index b2171c6..0324375 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -88,7 +88,7 @@ public abstract class AbstractCollectingResultPartitionWriter 
implements ResultP
        }
 
        @Override
-       public synchronized void flush() {
+       public synchronized void flushAll() {
                try {
                        processBufferConsumers();
                } catch (IOException e) {
@@ -96,5 +96,10 @@ public abstract class 
AbstractCollectingResultPartitionWriter implements ResultP
                }
        }
 
+       @Override
+       public void flush(int subpartitionIndex) {
+               flushAll();
+       }
+
        protected abstract void deserializeBuffer(Buffer buffer) throws 
IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ed32454..c7ef4f1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -130,7 +130,7 @@ public class RecordWriterTest {
 
                                        try {
                                                recordWriter.emit(val);
-                                               recordWriter.flush();
+                                               recordWriter.flushAll();
 
                                                recordWriter.emit(val);
                                        }
@@ -183,7 +183,7 @@ public class RecordWriterTest {
 
                // This should not throw an Exception iff the serializer state
                // has been cleared as expected.
-               recordWriter.flush();
+               recordWriter.flushAll();
        }
 
        /**
@@ -362,7 +362,7 @@ public class RecordWriterTest {
                RecordWriter<IntValue> writer = new RecordWriter<>(partition);
 
                writer.broadcastEmit(new IntValue(0));
-               writer.flush();
+               writer.flushAll();
 
                // Verify added to all queues
                assertEquals(1, queues[0].size());
@@ -426,7 +426,11 @@ public class RecordWriterTest {
                }
 
                @Override
-               public void flush() {
+               public void flushAll() {
+               }
+
+               @Override
+               public void flush(int subpartitionIndex) {
                }
        }
 
@@ -479,7 +483,11 @@ public class RecordWriterTest {
                }
 
                @Override
-               public void flush() {
+               public void flushAll() {
+               }
+
+               @Override
+               public void flush(int subpartitionIndex) {
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index a881233..cba80aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -145,7 +145,7 @@ public class SlotCountExceedingParallelismTest extends 
TestLogger {
                                for (int i = 0; i < numberOfTimesToSend; i++) {
                                        writer.emit(subtaskIndex);
                                }
-                               writer.flush();
+                               writer.flushAll();
                        }
                        finally {
                                writer.clearBuffers();

http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index 902c925..f55dfe4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -157,7 +157,7 @@ public class ScheduleOrUpdateConsumersTest extends 
TestLogger {
                                        for (int i = 0; i < 
numberOfTimesToSend; i++) {
                                                writer.emit(subtaskIndex);
                                        }
-                                       writer.flush();
+                                       writer.flushAll();
                                }
                                finally {
                                        writer.clearBuffers();

http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index 73553f5..c63af83 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -223,7 +223,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends 
TestLogger {
                                        while (true) {
                                                
current.setValue(current.getValue() + 1);
                                                recordWriter.emit(current);
-                                               recordWriter.flush();
+                                               recordWriter.flushAll();
                                        }
                                } catch (Exception e) {
                                        ASYNC_PRODUCER_EXCEPTION = e;

http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index bf2f3ba..a5d9082 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -36,7 +36,7 @@ object Tasks {
       try{
         writer.emit(new IntValue(42))
         writer.emit(new IntValue(1337))
-        writer.flush()
+        writer.flushAll()
       }finally{
         writer.clearBuffers()
       }
@@ -65,7 +65,7 @@ object Tasks {
           writer.emit(record)
         }
 
-        writer.flush()
+        writer.flushAll()
       } finally {
         writer.clearBuffers()
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index f1cc1dc..d62d80e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -153,7 +153,7 @@ public class RecordWriterOutput<OUT> implements 
OperatorChain.WatermarkGaugeExpo
        }
 
        public void flush() throws IOException {
-               recordWriter.flush();
+               recordWriter.flushAll();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index 7c47fcf..dad680c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -43,9 +43,6 @@ public class StreamRecordWriter<T extends IOReadableWritable> 
extends RecordWrit
        /** The thread that periodically flushes the output, to give an upper 
latency bound. */
        private final OutputFlusher outputFlusher;
 
-       /** Flag indicating whether the output should be flushed after every 
element. */
-       private final boolean flushAlways;
-
        /** The exception encountered in the flushing thread. */
        private Throwable flusherException;
 
@@ -58,20 +55,17 @@ public class StreamRecordWriter<T extends 
IOReadableWritable> extends RecordWrit
                        ChannelSelector<T> channelSelector,
                        long timeout,
                        String taskName) {
-               super(writer, channelSelector);
+               super(writer, channelSelector, timeout == 0);
 
                checkArgument(timeout >= -1);
 
                if (timeout == -1) {
-                       flushAlways = false;
                        outputFlusher = null;
                }
                else if (timeout == 0) {
-                       flushAlways = true;
                        outputFlusher = null;
                }
                else {
-                       flushAlways = false;
                        String threadName = taskName == null ?
                                DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "Output 
Timeout Flusher - " + taskName;
 
@@ -84,27 +78,18 @@ public class StreamRecordWriter<T extends 
IOReadableWritable> extends RecordWrit
        public void emit(T record) throws IOException, InterruptedException {
                checkErroneous();
                super.emit(record);
-               if (flushAlways) {
-                       flush();
-               }
        }
 
        @Override
        public void broadcastEmit(T record) throws IOException, 
InterruptedException {
                checkErroneous();
                super.broadcastEmit(record);
-               if (flushAlways) {
-                       flush();
-               }
        }
 
        @Override
        public void randomEmit(T record) throws IOException, 
InterruptedException {
                checkErroneous();
                super.randomEmit(record);
-               if (flushAlways) {
-                       flush();
-               }
        }
 
        /**
@@ -182,7 +167,7 @@ public class StreamRecordWriter<T extends 
IOReadableWritable> extends RecordWrit
 
                                        // any errors here should let the 
thread come to a halt and be
                                        // recognized by the writer
-                                       flush();
+                                       flushAll();
                                }
                        }
                        catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
index 7336b6b..b93b867 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
@@ -93,7 +93,7 @@ public class LongRecordWriterThread extends CheckedThread {
                }
                value.setValue(records);
                recordWriter.broadcastEmit(value);
-               recordWriter.flush();
+               recordWriter.flushAll();
 
                finishSendingRecords();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index cc302e8..7aa218e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -55,7 +55,7 @@ public class StreamNetworkPointToPointBenchmark {
                value.setValue(records);
                recordWriter.broadcastEmit(value);
                if (flushAfterLastEmit) {
-                       recordWriter.flush();
+                       recordWriter.flushAll();
                }
 
                recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 24a24ae..e6401c0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -106,7 +106,7 @@ public class NetworkStackThroughputITCase extends 
TestLogger {
                                }
                        }
                        finally {
-                               writer.flush();
+                               writer.flushAll();
                        }
                }
        }
@@ -139,7 +139,7 @@ public class NetworkStackThroughputITCase extends 
TestLogger {
                        }
                        finally {
                                reader.clearBuffers();
-                               writer.flush();
+                               writer.flushAll();
                        }
                }
        }

Reply via email to