Repository: flink Updated Branches: refs/heads/master 3126bf522 -> a144d0f77
[FLINK-8674][runtime] Improve performance of flushAlways in StreamRecordWriter Reduce the number of data notifications in case of flushAlways = true. Instead of notifying all of the channels/subpartitions, notify only the one that has just been written to. This closes #5526. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a144d0f7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a144d0f7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a144d0f7 Branch: refs/heads/master Commit: a144d0f77fb249aaf609b448e30137bbd3d98a65 Parents: 3126bf5 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Thu Feb 15 14:41:39 2018 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu Feb 22 15:16:01 2018 +0100 ---------------------------------------------------------------------- .../io/network/api/writer/RecordWriter.java | 27 ++++++++++++++++---- .../api/writer/ResultPartitionWriter.java | 9 +++++-- .../io/network/partition/ResultPartition.java | 7 ++++- .../operators/shipping/OutputCollector.java | 2 +- ...AbstractCollectingResultPartitionWriter.java | 7 ++++- .../io/network/api/writer/RecordWriterTest.java | 18 +++++++++---- .../SlotCountExceedingParallelismTest.java | 2 +- .../ScheduleOrUpdateConsumersTest.java | 2 +- .../TaskCancelAsyncProducerConsumerITCase.java | 2 +- .../apache/flink/runtime/jobmanager/Tasks.scala | 4 +-- .../runtime/io/RecordWriterOutput.java | 2 +- .../runtime/io/StreamRecordWriter.java | 19 ++------------ .../io/benchmark/LongRecordWriterThread.java | 2 +- .../StreamNetworkPointToPointBenchmark.java | 2 +- .../runtime/NetworkStackThroughputITCase.java | 4 +-- 15 files changed, 67 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index fabac9e..4ec2863 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -43,7 +43,7 @@ import static org.apache.flink.util.Preconditions.checkState; * <p>The RecordWriter wraps the runtime's {@link ResultPartitionWriter} and takes care of * serializing records into buffers. * - * <p><strong>Important</strong>: it is necessary to call {@link #flush()} after + * <p><strong>Important</strong>: it is necessary to call {@link #flushAll()} after * all records have been written with {@link #emit(IOReadableWritable)}. This * ensures that all produced records are written to the output stream (incl. * partially filled ones). @@ -58,13 +58,17 @@ public class RecordWriter<T extends IOReadableWritable> { private final int numChannels; - /** {@link RecordSerializer} per outgoing channel. */ + /** + * {@link RecordSerializer} per outgoing channel. + */ private final RecordSerializer<T>[] serializers; private final Optional<BufferBuilder>[] bufferBuilders; private final Random rng = new XORShiftRandom(); + private final boolean flushAlways; + private Counter numBytesOut = new SimpleCounter(); public RecordWriter(ResultPartitionWriter writer) { @@ -73,6 +77,11 @@ public class RecordWriter<T extends IOReadableWritable> { @SuppressWarnings("unchecked") public RecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector) { + this(writer, channelSelector, false); + } + + public RecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, boolean flushAlways) { + this.flushAlways = flushAlways; this.targetPartition = writer; this.channelSelector = channelSelector; @@ -135,9 +144,13 @@ public class RecordWriter<T extends IOReadableWritable> { result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder); } checkState(!serializer.hasSerializedData(), "All data should be written at once"); + + if (flushAlways) { + targetPartition.flush(targetChannel); + } } - public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { + public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException { try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event)) { for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { RecordSerializer<T> serializer = serializers[targetChannel]; @@ -147,12 +160,16 @@ public class RecordWriter<T extends IOReadableWritable> { // retain the buffer so that it can be recycled by each channel of targetPartition targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel); } + + if (flushAlways) { + flushAll(); + } return eventBufferConsumer; } } - public void flush() { - targetPartition.flush(); + public void flushAll() { + targetPartition.flushAll(); } public void clearBuffers() { http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java index 02049d9..4b5986e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java @@ -52,7 +52,12 @@ public interface ResultPartitionWriter { void addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException; /** - * Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers}. + * Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers} in all subpartitions. */ - void flush(); + void flushAll(); + + /** + * Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers} in one specified subpartition. + */ + void flush(int subpartitionIndex); } http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index 25a076b..fbbfa4b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -258,12 +258,17 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { } @Override - public void flush() { + public void flushAll() { for (ResultSubpartition subpartition : subpartitions) { subpartition.flush(); } } + @Override + public void flush(int subpartitionIndex) { + subpartitions[subpartitionIndex].flush(); + } + /** * Finishes the result partition. * http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java index 382ae39..73c531c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java @@ -82,7 +82,7 @@ public class OutputCollector<T> implements Collector<T> { public void close() { for (RecordWriter<?> writer : writers) { writer.clearBuffers(); - writer.flush(); + writer.flushAll(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java index b2171c6..0324375 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java @@ -88,7 +88,7 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP } @Override - public synchronized void flush() { + public synchronized void flushAll() { try { processBufferConsumers(); } catch (IOException e) { @@ -96,5 +96,10 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP } } + @Override + public void flush(int subpartitionIndex) { + flushAll(); + } + protected abstract void deserializeBuffer(Buffer buffer) throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index ed32454..c7ef4f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -130,7 +130,7 @@ public class RecordWriterTest { try { recordWriter.emit(val); - recordWriter.flush(); + recordWriter.flushAll(); recordWriter.emit(val); } @@ -183,7 +183,7 @@ public class RecordWriterTest { // This should not throw an Exception iff the serializer state // has been cleared as expected. - recordWriter.flush(); + recordWriter.flushAll(); } /** @@ -362,7 +362,7 @@ public class RecordWriterTest { RecordWriter<IntValue> writer = new RecordWriter<>(partition); writer.broadcastEmit(new IntValue(0)); - writer.flush(); + writer.flushAll(); // Verify added to all queues assertEquals(1, queues[0].size()); @@ -426,7 +426,11 @@ public class RecordWriterTest { } @Override - public void flush() { + public void flushAll() { + } + + @Override + public void flush(int subpartitionIndex) { } } @@ -479,7 +483,11 @@ public class RecordWriterTest { } @Override - public void flush() { + public void flushAll() { + } + + @Override + public void flush(int subpartitionIndex) { } } http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java index a881233..cba80aa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java @@ -145,7 +145,7 @@ public class SlotCountExceedingParallelismTest extends TestLogger { for (int i = 0; i < numberOfTimesToSend; i++) { writer.emit(subtaskIndex); } - writer.flush(); + writer.flushAll(); } finally { writer.clearBuffers(); http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java index 902c925..f55dfe4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java @@ -157,7 +157,7 @@ public class ScheduleOrUpdateConsumersTest extends TestLogger { for (int i = 0; i < numberOfTimesToSend; i++) { writer.emit(subtaskIndex); } - writer.flush(); + writer.flushAll(); } finally { writer.clearBuffers(); http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java index 73553f5..c63af83 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java @@ -223,7 +223,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger { while (true) { current.setValue(current.getValue() + 1); recordWriter.emit(current); - recordWriter.flush(); + recordWriter.flushAll(); } } catch (Exception e) { ASYNC_PRODUCER_EXCEPTION = e; http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala index bf2f3ba..a5d9082 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala @@ -36,7 +36,7 @@ object Tasks { try{ writer.emit(new IntValue(42)) writer.emit(new IntValue(1337)) - writer.flush() + writer.flushAll() }finally{ writer.clearBuffers() } @@ -65,7 +65,7 @@ object Tasks { writer.emit(record) } - writer.flush() + writer.flushAll() } finally { writer.clearBuffers() } http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index f1cc1dc..d62d80e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -153,7 +153,7 @@ public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExpo } public void flush() throws IOException { - recordWriter.flush(); + recordWriter.flushAll(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java index 7c47fcf..dad680c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java @@ -43,9 +43,6 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit /** The thread that periodically flushes the output, to give an upper latency bound. */ private final OutputFlusher outputFlusher; - /** Flag indicating whether the output should be flushed after every element. */ - private final boolean flushAlways; - /** The exception encountered in the flushing thread. */ private Throwable flusherException; @@ -58,20 +55,17 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit ChannelSelector<T> channelSelector, long timeout, String taskName) { - super(writer, channelSelector); + super(writer, channelSelector, timeout == 0); checkArgument(timeout >= -1); if (timeout == -1) { - flushAlways = false; outputFlusher = null; } else if (timeout == 0) { - flushAlways = true; outputFlusher = null; } else { - flushAlways = false; String threadName = taskName == null ? DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "Output Timeout Flusher - " + taskName; @@ -84,27 +78,18 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit public void emit(T record) throws IOException, InterruptedException { checkErroneous(); super.emit(record); - if (flushAlways) { - flush(); - } } @Override public void broadcastEmit(T record) throws IOException, InterruptedException { checkErroneous(); super.broadcastEmit(record); - if (flushAlways) { - flush(); - } } @Override public void randomEmit(T record) throws IOException, InterruptedException { checkErroneous(); super.randomEmit(record); - if (flushAlways) { - flush(); - } } /** @@ -182,7 +167,7 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit // any errors here should let the thread come to a halt and be // recognized by the writer - flush(); + flushAll(); } } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java index 7336b6b..b93b867 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java @@ -93,7 +93,7 @@ public class LongRecordWriterThread extends CheckedThread { } value.setValue(records); recordWriter.broadcastEmit(value); - recordWriter.flush(); + recordWriter.flushAll(); finishSendingRecords(); } http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java index cc302e8..7aa218e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java @@ -55,7 +55,7 @@ public class StreamNetworkPointToPointBenchmark { value.setValue(records); recordWriter.broadcastEmit(value); if (flushAfterLastEmit) { - recordWriter.flush(); + recordWriter.flushAll(); } recordsReceived.get(RECEIVER_TIMEOUT, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/flink/blob/a144d0f7/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java index 24a24ae..e6401c0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java @@ -106,7 +106,7 @@ public class NetworkStackThroughputITCase extends TestLogger { } } finally { - writer.flush(); + writer.flushAll(); } } } @@ -139,7 +139,7 @@ public class NetworkStackThroughputITCase extends TestLogger { } finally { reader.clearBuffers(); - writer.flush(); + writer.flushAll(); } } }