[hotfix][runtime] Refactor ResultPartition for cleaner recycle path
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ad84509 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ad84509 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ad84509 Branch: refs/heads/master Commit: 5ad845098bb1a0e697b89f5cf389fe8f46c9cf89 Parents: ec7934e Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Thu Jan 18 10:22:23 2018 +0100 Committer: Piotr Nowojski <piotr.nowoj...@gmail.com> Committed: Mon Feb 19 12:21:16 2018 +0100 ---------------------------------------------------------------------- .../io/network/partition/ResultPartition.java | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5ad84509/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index aac8fb9..64939f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -240,21 +240,19 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { @Override public void writeBuffer(Buffer buffer, int subpartitionIndex) throws IOException { checkNotNull(buffer); - boolean success = false; + ResultSubpartition subpartition; try { checkInProduceState(); - - final ResultSubpartition subpartition = subpartitions[subpartitionIndex]; - - // retain for buffer use after add() but also to have a simple path for recycle() - buffer.retainBuffer(); - success = subpartition.add(buffer); - } finally { - if (success) { - notifyPipelinedConsumers(); - } + subpartition = subpartitions[subpartitionIndex]; + } + catch (Exception ex) { buffer.recycleBuffer(); + throw ex; + } + + if (subpartition.add(buffer)) { + notifyPipelinedConsumers(); } }