Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/254#discussion_r21560517
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
 ---
    @@ -0,0 +1,266 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.io.network.partition;
    +
    +import com.google.common.base.Optional;
    +import 
org.apache.flink.runtime.deployment.IntermediateResultPartitionDeploymentDescriptor;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.executiongraph.ExecutionVertex;
    +import org.apache.flink.runtime.io.network.NetworkEnvironment;
    +import org.apache.flink.runtime.io.network.buffer.Buffer;
    +import org.apache.flink.runtime.io.network.buffer.BufferPool;
    +import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
    +import org.apache.flink.runtime.io.network.buffer.BufferProvider;
    +import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
    +import 
org.apache.flink.runtime.io.network.partition.queue.ConsumableOnceInMemoryOnlyPartitionQueue;
    +import 
org.apache.flink.runtime.io.network.partition.queue.IllegalQueueIteratorRequestException;
    +import 
org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueue;
    +import 
org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator;
    +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
    +import org.apache.flink.runtime.jobgraph.IntermediateResultType;
    +import org.apache.flink.runtime.jobgraph.JobID;
    +import org.apache.flink.runtime.protocols.ConsumerNotificationProtocol;
    +import org.apache.flink.runtime.protocols.ConsumerNotificationResult;
    +
    +import java.io.IOException;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +import static com.google.common.base.Preconditions.checkState;
    +
    +public class IntermediateResultPartition implements BufferPoolOwner {
    +
    +   /**
    +    * Note: This index needs to correspond to the index of the partition in
    +    * {@link ExecutionVertex#resultPartitions}, which might be a little
    +    * fragile as the data availability notifications use it.
    +    */
    +   private final int partitionIndex;
    +
    +   private final JobID jobId;
    +
    +   private final ExecutionAttemptID executionId;
    +
    +   private final IntermediateResultPartitionID partitionId;
    +
    +   private final IntermediateResultType partitionType;
    +
    +   private final BufferPool bufferPool;
    +
    +   private final ConsumerNotificationProtocol notificationProtocol;
    +
    +   private final IntermediateResultPartitionQueue[] queues;
    +
    +   private boolean hasNotifiedConsumers;
    +
    +   private boolean isReleased;
    +
    +   private boolean isFinished;
    +
    +   public IntermediateResultPartition(
    +                   int partitionIndex,
    +                   JobID jobId,
    +                   ExecutionAttemptID executionId,
    +                   NetworkEnvironment networkEnvironment,
    +                   IntermediateResultPartitionDeploymentDescriptor irpdd) 
throws IOException {
    +
    +           this.partitionIndex = partitionIndex;
    +           this.jobId = checkNotNull(jobId);
    +           this.executionId = checkNotNull(executionId);
    +
    +           this.partitionId = checkNotNull(irpdd.getPartitionId());
    +           this.queues = new 
IntermediateResultPartitionQueue[irpdd.getNumberOfQueues()];
    +           this.partitionType = irpdd.getPartitionType();
    +
    +           this.bufferPool = 
networkEnvironment.getNetworkBufferPool().createBufferPool(irpdd.getNumberOfQueues(),
 false);
    +           this.notificationProtocol = 
networkEnvironment.getNotificationProtocol();
    +
    +           // TODO The queues need to be created depending on the result 
type
    +           for (int i = 0; i < queues.length; i++) {
    +                   queues[i] = new 
ConsumableOnceInMemoryOnlyPartitionQueue();
    +           }
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   // Properties
    +   // 
------------------------------------------------------------------------
    +
    +   public IntermediateResultPartitionID getPartitionId() {
    +           return partitionId;
    +   }
    +
    +   public JobID getJobId() {
    +           return jobId;
    +   }
    +
    +   public int getNumberOfQueues() {
    +           return queues.length;
    +   }
    +
    +   public BufferProvider getBufferProvider() {
    +           return bufferPool;
    +   }
    +
    +   public boolean isFinished() {
    +           return isFinished;
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   // Produce
    +   // 
------------------------------------------------------------------------
    +
    +   public void add(Buffer buffer, int targetQueue) throws IOException {
    +           checkInProducePhase();
    +
    +           queues[targetQueue].add(buffer);
    +
    +           maybeNotifyConsumers(partitionType.isPipelined());
    +   }
    +
    +   public void addToAllPartitions(Buffer buffer) throws IOException {
    +           checkInProducePhase();
    +
    +           for (IntermediateResultPartitionQueue queue : queues) {
    +                   queue.add(buffer);
    +           }
    +
    +           maybeNotifyConsumers(partitionType.isPipelined());
    +   }
    +
    +   public void finish() throws IOException {
    +           checkInProducePhase();
    +
    +           boolean success = false;
    +
    +           try {
    +                   for (IntermediateResultPartitionQueue queue : queues) {
    +                           queue.finish();
    +                   }
    +
    +                   success = true;
    +           }
    +           finally {
    +                   if (success) {
    +                           // Notify at this point in any case either 
because of the end
    +                           // of a blocking result or an empty pipelined 
result.
    +                           maybeNotifyConsumers(true);
    +                   }
    +
    +                   if (!partitionType.isPersistent() && bufferPool != 
null) {
    +                           // If this partition is not persistent, 
immediately destroy
    +                           // the buffer pool. For persistent intermediate 
results, the
    +                           // partition manager needs to release the 
buffer pool.
    +                           bufferPool.destroy();
    +                   }
    +
    +                   isFinished = true;
    +           }
    +   }
    +
    +   public void releaseAllResources() throws IOException {
    +           if (!isReleased) {
    +                   try {
    +                           if (bufferPool != null) {
    +                                   bufferPool.destroy();
    +                           }
    +                   }
    +                   finally {
    +                           isReleased = true;
    +                   }
    +           }
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   // Consume
    +   // 
------------------------------------------------------------------------
    +
    +   public IntermediateResultPartitionQueueIterator getQueueIterator(int 
queueIndex, Optional<BufferProvider> bufferProvider) throws IOException {
    +           if (queueIndex < 0 || queueIndex >= queues.length) {
    +                   throw new IllegalQueueIteratorRequestException();
    +           }
    +
    +           return queues[queueIndex].getQueueIterator(bufferProvider);
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   public String toString() {
    +           return "Intermediate result partition " + partitionId + " [num 
queues: " + queues.length + ", " + (isFinished ? "finished" : "not finished") + 
"]";
    +   }
    +
    +   private void checkInProducePhase() {
    +           checkState(!isReleased, "Partition has already been 
discarded.");
    +           checkState(!isFinished, "Partition has already been finished.");
    +   }
    +
    +   /**
    +    * Maybe notifies consumers of this result partition.
    +    */
    +   private void maybeNotifyConsumers(boolean doNotify) throws IOException {
    +           if (doNotify && !hasNotifiedConsumers) {
    +                   scheduleOrUpdateConsumers();
    +                   hasNotifiedConsumers = true;
    +           }
    +   }
    +
    +   private void scheduleOrUpdateConsumers() throws IOException {
    +           while (true) {
    +                   ConsumerNotificationResult result = 
notificationProtocol.scheduleOrUpdateConsumers(jobId, executionId, 
partitionIndex);
    +
    +                   if (result.isSuccess()) {
    +                           return;
    +                   }
    +                   else {
    +                           Throwable error = result.getError();
    +                           if (error != null) {
    +                                   throw new 
IOException(error.getMessage());
    +                           }
    +                   }
    +
    +                   try {
    +                           Thread.sleep(100);
    --- End diff --
    
    Haha ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to