[hotfix][runtime] Refactor ResultPartition for cleaner recycle path

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ad84509
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ad84509
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ad84509

Branch: refs/heads/master
Commit: 5ad845098bb1a0e697b89f5cf389fe8f46c9cf89
Parents: ec7934e
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Thu Jan 18 10:22:23 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:16 2018 +0100

----------------------------------------------------------------------
 .../io/network/partition/ResultPartition.java   | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5ad84509/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index aac8fb9..64939f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -240,21 +240,19 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
        @Override
        public void writeBuffer(Buffer buffer, int subpartitionIndex) throws 
IOException {
                checkNotNull(buffer);
-               boolean success = false;
 
+               ResultSubpartition subpartition;
                try {
                        checkInProduceState();
-
-                       final ResultSubpartition subpartition = 
subpartitions[subpartitionIndex];
-
-                       // retain for buffer use after add() but also to have a 
simple path for recycle()
-                       buffer.retainBuffer();
-                       success = subpartition.add(buffer);
-               } finally {
-                       if (success) {
-                               notifyPipelinedConsumers();
-                       }
+                       subpartition = subpartitions[subpartitionIndex];
+               }
+               catch (Exception ex) {
                        buffer.recycleBuffer();
+                       throw ex;
+               }
+
+               if (subpartition.add(buffer)) {
+                       notifyPipelinedConsumers();
                }
        }
 

Reply via email to