This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit acd15d2527e7c8c137cb8719e15da3edc0a2c994
Author: Zhijiang <wangzhijiang...@aliyun.com>
AuthorDate: Fri Jul 26 11:25:40 2019 +0200

    [FLINK-13245][network] Make subpartition consumption notification 
independant
---
 .../ReleaseOnConsumptionResultPartition.java       | 42 +++++++++++++++-------
 .../ReleaseOnConsumptionResultPartitionTest.java   | 41 ++++++++++++++++++---
 2 files changed, 67 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
index 19ec681..766f500 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartition.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
 import org.apache.flink.util.function.FunctionWithException;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -31,11 +30,18 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  */
 public class ReleaseOnConsumptionResultPartition extends ResultPartition {
 
+       private static final Object lock = new Object();
+
+       /**
+        * A flag for each subpartition indicating whether it was already 
consumed or not.
+        */
+       private final boolean[] consumedSubpartitions;
+
        /**
         * The total number of references to subpartitions of this result. The 
result partition can be
         * safely released, iff the reference count is zero.
         */
-       private final AtomicInteger pendingReferences = new AtomicInteger();
+       private int numUnconsumedSubpartitions;
 
        ReleaseOnConsumptionResultPartition(
                        String owningTaskName,
@@ -47,12 +53,13 @@ public class ReleaseOnConsumptionResultPartition extends 
ResultPartition {
                        FunctionWithException<BufferPoolOwner, BufferPool, 
IOException> bufferPoolFactory) {
                super(owningTaskName, partitionId, partitionType, 
subpartitions, numTargetKeyGroups, partitionManager, bufferPoolFactory);
 
-               pendingReferences.set(subpartitions.length);
+               this.consumedSubpartitions = new boolean[subpartitions.length];
+               this.numUnconsumedSubpartitions = subpartitions.length;
        }
 
        @Override
        public ResultSubpartitionView createSubpartitionView(int index, 
BufferAvailabilityListener availabilityListener) throws IOException {
-               checkState(pendingReferences.get() > 0, "Partition not 
pinned.");
+               checkState(numUnconsumedSubpartitions > 0, "Partition not 
pinned.");
 
                return super.createSubpartitionView(index, 
availabilityListener);
        }
@@ -63,22 +70,33 @@ public class ReleaseOnConsumptionResultPartition extends 
ResultPartition {
                        return;
                }
 
-               int refCnt = pendingReferences.decrementAndGet();
+               final int remainingUnconsumed;
 
-               if (refCnt == 0) {
-                       partitionManager.onConsumedPartition(this);
-               } else if (refCnt < 0) {
-                       throw new IllegalStateException("All references 
released.");
+               // we synchronize only the bookkeeping section, to avoid 
holding the lock during any
+               // calls into other components
+               synchronized (lock) {
+                       if (consumedSubpartitions[subpartitionIndex]) {
+                               // repeated call - ignore
+                               return;
+                       }
+
+                       consumedSubpartitions[subpartitionIndex] = true;
+                       remainingUnconsumed = (--numUnconsumedSubpartitions);
                }
 
-               LOG.debug("{}: Received release notification for subpartition 
{}.",
-                       this, subpartitionIndex);
+               LOG.debug("{}: Received consumed notification for subpartition 
{}.", this, subpartitionIndex);
+
+               if (remainingUnconsumed == 0) {
+                       partitionManager.onConsumedPartition(this);
+               } else if (remainingUnconsumed < 0) {
+                       throw new IllegalStateException("Received consume 
notification even though all subpartitions are already consumed.");
+               }
        }
 
        @Override
        public String toString() {
                return "ReleaseOnConsumptionResultPartition " + 
partitionId.toString() + " [" + partitionType + ", "
                        + subpartitions.length + " subpartitions, "
-                       + pendingReferences + " pending references]";
+                       + numUnconsumedSubpartitions + " pending consumptions]";
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
index afcd095..ce7467f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ReleaseOnConsumptionResultPartitionTest.java
@@ -21,8 +21,8 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for the {@link ReleaseOnConsumptionResultPartitionTest}.
@@ -41,9 +41,42 @@ public class ReleaseOnConsumptionResultPartitionTest extends 
TestLogger {
                manager.registerResultPartition(partition);
 
                partition.onConsumedSubpartition(0);
-               assertThat(partition.isReleased(), is(false));
+               assertFalse(partition.isReleased());
 
                partition.onConsumedSubpartition(1);
-               assertThat(partition.isReleased(), is(true));
+               assertTrue(partition.isReleased());
+       }
+
+       @Test
+       public void testMultipleReleaseCallsAreIdempotent() {
+               final ResultPartitionManager manager = new 
ResultPartitionManager();
+               final ResultPartition partition = new ResultPartitionBuilder()
+                       .setNumberOfSubpartitions(2)
+                       .isReleasedOnConsumption(true)
+                       .setResultPartitionManager(manager)
+                       .build();
+               manager.registerResultPartition(partition);
+
+               partition.onConsumedSubpartition(0);
+               partition.onConsumedSubpartition(0);
+
+               assertFalse(partition.isReleased());
+       }
+
+       @Test
+       public void testReleaseAfterIdempotentCalls() {
+               final ResultPartitionManager manager = new 
ResultPartitionManager();
+               final ResultPartition partition = new ResultPartitionBuilder()
+                       .setNumberOfSubpartitions(2)
+                       .isReleasedOnConsumption(true)
+                       .setResultPartitionManager(manager)
+                       .build();
+               manager.registerResultPartition(partition);
+
+               partition.onConsumedSubpartition(0);
+               partition.onConsumedSubpartition(0);
+               partition.onConsumedSubpartition(1);
+
+               assertTrue(partition.isReleased());
        }
 }

Reply via email to