[hotfix][runtime] Simplify PipelinedSubpartition notifyBuffersAvailable is a quick call that doesn't need to be executed outside of the lock
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57228145 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57228145 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57228145 Branch: refs/heads/master Commit: 572281452b8f9ea657b4bc1a7300b81e94885ad0 Parents: 3eb4cc0 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Thu Jan 25 15:50:42 2018 +0100 Committer: Piotr Nowojski <piotr.nowoj...@gmail.com> Committed: Mon Feb 19 12:21:31 2018 +0100 ---------------------------------------------------------------------- .../io/network/partition/PipelinedSubpartition.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/57228145/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index b6b55c3..e1c53c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -71,9 +71,6 @@ class PipelinedSubpartition extends ResultSubpartition { private boolean add(BufferConsumer bufferConsumer, boolean finish) throws IOException { checkNotNull(bufferConsumer); - // view reference accessible outside the lock, but assigned inside the locked scope - final PipelinedSubpartitionView reader; - synchronized (buffers) { if (isFinished || isReleased) { bufferConsumer.close(); @@ -82,18 +79,16 @@ class PipelinedSubpartition extends ResultSubpartition { // Add the bufferConsumer and update the stats buffers.add(bufferConsumer); - reader = readView; updateStatistics(bufferConsumer); increaseBuffersInBacklog(bufferConsumer); if (finish) { isFinished = true; } - } - // Notify the listener outside of the synchronized block - if (reader != null) { - reader.notifyBuffersAvailable(1); + if (readView != null) { + readView.notifyBuffersAvailable(1); + } } return true;