This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f738c4edd5858891236a41fc1e453b06cf815089 Author: Zhijiang <wangzhijiang...@aliyun.com> AuthorDate: Fri Jul 26 11:25:40 2019 +0200 [FLINK-13245][network] Make subpartition consumption notification independant --- .../ReleaseOnConsumptionResultPartition.java | 42 +++++++++++++++------- .../ReleaseOnConsumptionResultPartitionTest.java | 41 ++++++++++++++++++--- 2 files changed, 67 insertions(+), 16 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java index 19ec681..766f500 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java @@ -22,7 +22,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner; import org.apache.flink.util.function.FunctionWithException; import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.util.Preconditions.checkState; @@ -31,11 +30,18 @@ import static org.apache.flink.util.Preconditions.checkState; */ public class ReleaseOnConsumptionResultPartition extends ResultPartition { + private static final Object lock = new Object(); + + /** + * A flag for each subpartition indicating whether it was already consumed or not. + */ + private final boolean[] consumedSubpartitions; + /** * The total number of references to subpartitions of this result. The result partition can be * safely released, iff the reference count is zero. */ - private final AtomicInteger pendingReferences = new AtomicInteger(); + private int numUnconsumedSubpartitions; ReleaseOnConsumptionResultPartition( String owningTaskName, @@ -47,12 +53,13 @@ public class ReleaseOnConsumptionResultPartition extends ResultPartition { FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) { super(owningTaskName, partitionId, partitionType, subpartitions, numTargetKeyGroups, partitionManager, bufferPoolFactory); - pendingReferences.set(subpartitions.length); + this.consumedSubpartitions = new boolean[subpartitions.length]; + this.numUnconsumedSubpartitions = subpartitions.length; } @Override public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException { - checkState(pendingReferences.get() > 0, "Partition not pinned."); + checkState(numUnconsumedSubpartitions > 0, "Partition not pinned."); return super.createSubpartitionView(index, availabilityListener); } @@ -63,22 +70,33 @@ public class ReleaseOnConsumptionResultPartition extends ResultPartition { return; } - int refCnt = pendingReferences.decrementAndGet(); + final int remainingUnconsumed; - if (refCnt == 0) { - partitionManager.onConsumedPartition(this); - } else if (refCnt < 0) { - throw new IllegalStateException("All references released."); + // we synchronize only the bookkeeping section, to avoid holding the lock during any + // calls into other components + synchronized (lock) { + if (consumedSubpartitions[subpartitionIndex]) { + // repeated call - ignore + return; + } + + consumedSubpartitions[subpartitionIndex] = true; + remainingUnconsumed = (--numUnconsumedSubpartitions); } - LOG.debug("{}: Received release notification for subpartition {}.", - this, subpartitionIndex); + LOG.debug("{}: Received consumed notification for subpartition {}.", this, subpartitionIndex); + + if (remainingUnconsumed == 0) { + partitionManager.onConsumedPartition(this); + } else if (remainingUnconsumed < 0) { + throw new IllegalStateException("Received consume notification even though all subpartitions are already consumed."); + } } @Override public String toString() { return "ReleaseOnConsumptionResultPartition " + partitionId.toString() + " [" + partitionType + ", " + subpartitions.length + " subpartitions, " - + pendingReferences + " pending references]"; + + numUnconsumedSubpartitions + " pending consumptions]"; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java index afcd095..ce7467f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java @@ -21,8 +21,8 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * Tests for the {@link ReleaseOnConsumptionResultPartitionTest}. @@ -41,9 +41,42 @@ public class ReleaseOnConsumptionResultPartitionTest extends TestLogger { manager.registerResultPartition(partition); partition.onConsumedSubpartition(0); - assertThat(partition.isReleased(), is(false)); + assertFalse(partition.isReleased()); partition.onConsumedSubpartition(1); - assertThat(partition.isReleased(), is(true)); + assertTrue(partition.isReleased()); + } + + @Test + public void testMultipleReleaseCallsAreIdempotent() { + final ResultPartitionManager manager = new ResultPartitionManager(); + final ResultPartition partition = new ResultPartitionBuilder() + .setNumberOfSubpartitions(2) + .isReleasedOnConsumption(true) + .setResultPartitionManager(manager) + .build(); + manager.registerResultPartition(partition); + + partition.onConsumedSubpartition(0); + partition.onConsumedSubpartition(0); + + assertFalse(partition.isReleased()); + } + + @Test + public void testReleaseAfterIdempotentCalls() { + final ResultPartitionManager manager = new ResultPartitionManager(); + final ResultPartition partition = new ResultPartitionBuilder() + .setNumberOfSubpartitions(2) + .isReleasedOnConsumption(true) + .setResultPartitionManager(manager) + .build(); + manager.registerResultPartition(partition); + + partition.onConsumedSubpartition(0); + partition.onConsumedSubpartition(0); + partition.onConsumedSubpartition(1); + + assertTrue(partition.isReleased()); } }