NicoK closed pull request #6693: [FLINK-10332][network] move data notification out of the synchronized block URL: https://github.com/apache/flink/pull/6693
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java index 90daf75fcc7..cc0b2220fd2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java @@ -89,9 +89,7 @@ public void addInputChannel(RemoteInputChannel listener) throws IOException { checkError(); - if (!inputChannels.containsKey(listener.getInputChannelId())) { - inputChannels.put(listener.getInputChannelId(), listener); - } + inputChannels.putIfAbsent(listener.getInputChannelId(), listener); } @Override @@ -112,12 +110,7 @@ public void cancelRequestFor(InputChannelID inputChannelId) { @Override public void notifyCreditAvailable(final RemoteInputChannel inputChannel) { - ctx.executor().execute(new Runnable() { - @Override - public void run() { - ctx.pipeline().fireUserEventTriggered(inputChannel); - } - }); + ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel)); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java index 796e86f51b3..c5ba7a4b7f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java @@ -85,9 +85,7 @@ public void addInputChannel(RemoteInputChannel listener) throws IOException { checkError(); - if (!inputChannels.containsKey(listener.getInputChannelId())) { - inputChannels.put(listener.getInputChannelId(), listener); - } + inputChannels.putIfAbsent(listener.getInputChannelId(), listener); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java index 8c05b8208f9..c3d3d1bcc10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java @@ -89,12 +89,7 @@ void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) { // TODO This could potentially have a bad performance impact as in the // worst case (network consumes faster than the producer) each buffer // will trigger a separate event loop task being scheduled. - ctx.executor().execute(new Runnable() { - @Override - public void run() { - ctx.pipeline().fireUserEventTriggered(reader); - } - }); + ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader)); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index d2d7fdb324b..fe27d97adaa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -88,6 +88,7 @@ public void finish() throws IOException { private boolean add(BufferConsumer bufferConsumer, boolean finish) { checkNotNull(bufferConsumer); + final boolean notifyDataAvailable; synchronized (buffers) { if (isFinished || isReleased) { bufferConsumer.close(); @@ -98,14 +99,13 @@ private boolean add(BufferConsumer bufferConsumer, boolean finish) { buffers.add(bufferConsumer); updateStatistics(bufferConsumer); increaseBuffersInBacklog(bufferConsumer); + notifyDataAvailable = shouldNotifyDataAvailable() || finish; - if (finish) { - isFinished = true; - notifyDataAvailable(); - } - else { - maybeNotifyDataAvailable(); - } + isFinished |= finish; + } + + if (notifyDataAvailable) { + notifyDataAvailable(); } return true; @@ -220,6 +220,7 @@ public boolean isReleased() { @Override public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException { + final boolean notifyDataAvailable; synchronized (buffers) { checkState(!isReleased); checkState(readView == null, @@ -230,9 +231,10 @@ public PipelinedSubpartitionView createReadView(BufferAvailabilityListener avail parent.getOwningTaskName(), index, parent.getPartitionId()); readView = new PipelinedSubpartitionView(this, availabilityListener); - if (!buffers.isEmpty()) { - notifyDataAvailable(); - } + notifyDataAvailable = !buffers.isEmpty(); + } + if (notifyDataAvailable) { + notifyDataAvailable(); } return readView; @@ -283,26 +285,24 @@ public int unsynchronizedGetNumberOfQueuedBuffers() { @Override public void flush() { + final boolean notifyDataAvailable; synchronized (buffers) { if (buffers.isEmpty()) { return; } - if (!flushRequested) { - flushRequested = true; // set this before the notification! - // if there is more then 1 buffer, we already notified the reader - // (at the latest when adding the second buffer) - if (buffers.size() == 1) { - notifyDataAvailable(); - } - } + // if there is more then 1 buffer, we already notified the reader + // (at the latest when adding the second buffer) + notifyDataAvailable = !flushRequested && buffers.size() == 1; + flushRequested = true; + } + if (notifyDataAvailable) { + notifyDataAvailable(); } } - private void maybeNotifyDataAvailable() { + private boolean shouldNotifyDataAvailable() { // Notify only when we added first finished buffer. - if (getNumberOfFinishedBuffers() == 1) { - notifyDataAvailable(); - } + return readView != null && !flushRequested && getNumberOfFinishedBuffers() == 1; } private void notifyDataAvailable() { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services