zhijiangW commented on a change in pull request #7549: [FLINK-11403][network] 
Remove ResultPartitionConsumableNotifier from ResultPartition
URL: https://github.com/apache/flink/pull/7549#discussion_r267174001
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NetworkResultPartition.java
 ##########
 @@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkElementIndex;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A result partition for data produced by a single task.
+ *
+ * <p>This class is the runtime part of a logical {@link 
IntermediateResultPartition}. Essentially,
+ * a result partition is a collection of {@link Buffer} instances. The buffers 
are organized in one
+ * or more {@link ResultSubpartition} instances, which further partition the 
data depending on the
+ * number of consuming tasks and the data {@link DistributionPattern}.
+ *
+ * <p>Tasks, which consume a result partition have to request one of its 
subpartitions. The request
+ * happens either remotely (see {@link RemoteInputChannel}) or locally (see 
{@link LocalInputChannel})
+ *
+ * <h2>Life-cycle</h2>
+ *
+ * <p>The life-cycle of each result partition has three (possibly overlapping) 
phases:
+ * <ol>
+ * <li><strong>Produce</strong>: </li>
+ * <li><strong>Consume</strong>: </li>
+ * <li><strong>Release</strong>: </li>
+ * </ol>
+ *
+ * <h2>Lazy deployment and updates of consuming tasks</h2>
+ *
+ * <p>Before a consuming task can request the result, it has to be deployed. 
The time of deployment
+ * depends on the PIPELINED vs. BLOCKING characteristic of the result 
partition. With pipelined
+ * results, receivers are deployed as soon as the first buffer is added to the 
result partition.
+ * With blocking results on the other hand, receivers are deployed after the 
partition is finished.
+ *
+ * <h2>Buffer management</h2>
+ *
+ * <h2>State management</h2>
+ */
+public class NetworkResultPartition implements ResultPartitionWriter, 
BufferPoolOwner {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(NetworkResultPartition.class);
+
+       private final String owningTaskName;
+
+       private final ResultPartitionID partitionId;
+
+       /** Type of this partition. Defines the concrete subpartition 
implementation to use. */
+       private final ResultPartitionType partitionType;
+
+       /** The subpartitions of this partition. At least one. */
+       private final ResultSubpartition[] subpartitions;
+
+       private final ResultPartitionManager partitionManager;
+
+       public final int numTargetKeyGroups;
+
+       // - Runtime state 
--------------------------------------------------------
+
+       private final AtomicBoolean isReleased = new AtomicBoolean();
+
+       /**
+        * The total number of references to subpartitions of this result. The 
result partition can be
+        * safely released, iff the reference count is zero. A reference count 
of -1 denotes that the
+        * result partition has been released.
+        */
+       private final AtomicInteger pendingReferences = new AtomicInteger();
+
+       private BufferPool bufferPool;
+
+       private boolean isFinished;
+
+       private volatile Throwable cause;
+
+       public NetworkResultPartition(
+                       String owningTaskName,
+                       ResultPartitionID partitionId,
+                       ResultPartitionType partitionType,
+                       int numberOfSubpartitions,
+                       int numTargetKeyGroups,
+                       ResultPartitionManager partitionManager,
+                       IOManager ioManager) {
+
+               this.owningTaskName = checkNotNull(owningTaskName);
+               this.partitionId = checkNotNull(partitionId);
+               this.partitionType = checkNotNull(partitionType);
+               this.subpartitions = new 
ResultSubpartition[numberOfSubpartitions];
+               this.numTargetKeyGroups = numTargetKeyGroups;
+               this.partitionManager = checkNotNull(partitionManager);
+
+               // Create the subpartitions.
+               switch (partitionType) {
+                       case BLOCKING:
+                               for (int i = 0; i < subpartitions.length; i++) {
+                                       subpartitions[i] = new 
SpillableSubpartition(i, this, ioManager);
+                               }
+
+                               break;
+
+                       case PIPELINED:
+                       case PIPELINED_BOUNDED:
+                               for (int i = 0; i < subpartitions.length; i++) {
+                                       subpartitions[i] = new 
PipelinedSubpartition(i, this);
+                               }
+
+                               break;
+
+                       default:
+                               throw new IllegalArgumentException("Unsupported 
result partition type.");
+               }
+
+               // Initially, partitions should be consumed once before release.
+               pin();
+
+               LOG.debug("{}: Initialized {}", owningTaskName, this);
+       }
+
+       /**
+        * Registers a buffer pool with this result partition.
+        *
+        * <p>There is one pool for each result partition, which is shared by 
all its sub partitions.
+        *
+        * <p>The pool is registered with the partition *after* it as been 
constructed in order to conform
+        * to the life-cycle of task registrations in the {@link TaskManager}.
+        */
+       public void registerBufferPool(BufferPool bufferPool) {
+               checkArgument(bufferPool.getNumberOfRequiredMemorySegments() >= 
getNumberOfSubpartitions(),
+                               "Bug in result partition setup logic: Buffer 
pool has not enough guaranteed buffers for this result partition.");
+
+               checkState(this.bufferPool == null, "Bug in result partition 
setup logic: Already registered buffer pool.");
+
+               this.bufferPool = checkNotNull(bufferPool);
+       }
+
+       public String getOwningTaskName() {
+               return owningTaskName;
+       }
+
+       public ResultPartitionID getPartitionId() {
+               return partitionId;
+       }
+
+       @Override
+       public int getNumberOfSubpartitions() {
+               return subpartitions.length;
+       }
+
+       @Override
+       public BufferProvider getBufferProvider() {
+               return bufferPool;
+       }
+
+       public BufferPool getBufferPool() {
+               return bufferPool;
+       }
+
+       public int getNumberOfQueuedBuffers() {
+               int totalBuffers = 0;
+
+               for (ResultSubpartition subpartition : subpartitions) {
+                       totalBuffers += 
subpartition.unsynchronizedGetNumberOfQueuedBuffers();
+               }
+
+               return totalBuffers;
+       }
+
+       /**
+        * Returns the type of this result partition.
+        *
+        * @return result partition type
+        */
+       public ResultPartitionType getPartitionType() {
+               return partitionType;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public boolean addBufferConsumer(BufferConsumer bufferConsumer, int 
subpartitionIndex) throws IOException {
+               checkNotNull(bufferConsumer);
+
+               ResultSubpartition subpartition;
+               try {
+                       checkInProduceState();
+                       subpartition = subpartitions[subpartitionIndex];
+               }
+               catch (Exception ex) {
+                       bufferConsumer.close();
+                       throw ex;
+               }
+
+               return subpartition.add(bufferConsumer);
+       }
+
+       @Override
+       public void flushAll() {
+               for (ResultSubpartition subpartition : subpartitions) {
+                       subpartition.flush();
+               }
+       }
+
+       @Override
+       public void flush(int subpartitionIndex) {
+               subpartitions[subpartitionIndex].flush();
+       }
+
+       /**
+        * Finishes the result partition.
+        *
+        * <p>After this operation, it is not possible to add further data to 
the result partition.
+        *
+        * <p>For BLOCKING results, this will trigger the deployment of 
consuming tasks.
+        */
+       @Override
+       public void finish() throws IOException {
+               boolean success = false;
+
+               try {
+                       checkInProduceState();
+
+                       for (ResultSubpartition subpartition : subpartitions) {
+                               subpartition.finish();
+                       }
+
+                       success = true;
+               }
+               finally {
+                       if (success) {
+                               isFinished = true;
+                       }
+               }
+       }
+
+       public void release() {
+               release(null);
+       }
+
+       /**
+        * Releases the result partition.
+        */
+       public void release(Throwable cause) {
+               if (isReleased.compareAndSet(false, true)) {
+                       LOG.debug("{}: Releasing {}.", owningTaskName, this);
+
+                       // Set the error cause
+                       if (cause != null) {
+                               this.cause = cause;
+                       }
+
+                       // Release all subpartitions
+                       for (ResultSubpartition subpartition : subpartitions) {
+                               try {
+                                       subpartition.release();
+                               }
+                               // Catch this in order to ensure that release 
is called on all subpartitions
+                               catch (Throwable t) {
+                                       LOG.error("Error during release of 
result subpartition: " + t.getMessage(), t);
+                               }
+                       }
+               }
+       }
+
+       public void destroyBufferPool() {
+               if (bufferPool != null) {
+                       bufferPool.lazyDestroy();
+               }
+       }
+
+       /**
+        * Returns the requested subpartition.
+        */
+       public ResultSubpartitionView createSubpartitionView(int index, 
BufferAvailabilityListener availabilityListener) throws IOException {
+               int refCnt = pendingReferences.get();
+
+               checkState(refCnt != -1, "Partition released.");
+               checkState(refCnt > 0, "Partition not pinned.");
+
+               checkElementIndex(index, subpartitions.length, "Subpartition 
not found.");
+
+               ResultSubpartitionView readView = 
subpartitions[index].createReadView(availabilityListener);
+
+               LOG.debug("Created {}", readView);
+
+               return readView;
+       }
+
+       public Throwable getFailureCause() {
+               return cause;
+       }
+
+       @Override
+       public int getNumTargetKeyGroups() {
+               return numTargetKeyGroups;
+       }
+
+       /**
+        * Releases buffers held by this result partition.
+        *
+        * <p>This is a callback from the buffer pool, which is registered for 
result partitions, which
+        * are back pressure-free.
+        */
+       @Override
+       public void releaseMemory(int toRelease) throws IOException {
+               checkArgument(toRelease > 0);
+
+               for (ResultSubpartition subpartition : subpartitions) {
+                       toRelease -= subpartition.releaseMemory();
+
+                       // Only release as much memory as needed
+                       if (toRelease <= 0) {
+                               break;
+                       }
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "ResultPartition " + partitionId.toString() + " [" + 
partitionType + ", "
+                               + subpartitions.length + " subpartitions, "
+                               + pendingReferences + " pending references]";
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Pins the result partition.
+        *
+        * <p>The partition can only be released after each subpartition has 
been consumed once per pin
+        * operation.
+        */
+       void pin() {
 
 Review comment:
   ditto

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to