[hotfix][runtime] Simplify PipelinedSubpartition

notifyBuffersAvailable is a quick call that doesn't need to be executed outside 
of the lock


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57228145
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57228145
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57228145

Branch: refs/heads/master
Commit: 572281452b8f9ea657b4bc1a7300b81e94885ad0
Parents: 3eb4cc0
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Thu Jan 25 15:50:42 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:31 2018 +0100

----------------------------------------------------------------------
 .../io/network/partition/PipelinedSubpartition.java      | 11 +++--------
 1 file changed, 3 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57228145/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index b6b55c3..e1c53c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -71,9 +71,6 @@ class PipelinedSubpartition extends ResultSubpartition {
        private boolean add(BufferConsumer bufferConsumer, boolean finish) 
throws IOException {
                checkNotNull(bufferConsumer);
 
-               // view reference accessible outside the lock, but assigned 
inside the locked scope
-               final PipelinedSubpartitionView reader;
-
                synchronized (buffers) {
                        if (isFinished || isReleased) {
                                bufferConsumer.close();
@@ -82,18 +79,16 @@ class PipelinedSubpartition extends ResultSubpartition {
 
                        // Add the bufferConsumer and update the stats
                        buffers.add(bufferConsumer);
-                       reader = readView;
                        updateStatistics(bufferConsumer);
                        increaseBuffersInBacklog(bufferConsumer);
 
                        if (finish) {
                                isFinished = true;
                        }
-               }
 
-               // Notify the listener outside of the synchronized block
-               if (reader != null) {
-                       reader.notifyBuffersAvailable(1);
+                       if (readView != null) {
+                               readView.notifyBuffersAvailable(1);
+                       }
                }
 
                return true;

Reply via email to