[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-08-07 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r311648033
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
 ##
 @@ -99,110 +96,161 @@ public SingleInputGate create(
@Nonnull InputGateDeploymentDescriptor igdd,
@Nonnull PartitionProducerStateProvider 
partitionProducerStateProvider,
@Nonnull InputChannelMetrics metrics) {
-   final IntermediateDataSetID consumedResultId = 
checkNotNull(igdd.getConsumedResultId());
-   final ResultPartitionType consumedPartitionType = 
checkNotNull(igdd.getConsumedPartitionType());
-
-   final int consumedSubpartitionIndex = 
igdd.getConsumedSubpartitionIndex();
-   checkArgument(consumedSubpartitionIndex >= 0);
-
-   final InputChannelDeploymentDescriptor[] icdd = 
checkNotNull(igdd.getInputChannelDeploymentDescriptors());
+   SupplierWithException 
bufferPoolFactory = createBufferPoolFactory(
+   networkBufferPool,
+   isCreditBased,
+   networkBuffersPerChannel,
+   floatingNetworkBuffersPerGate,
+   igdd.getInputChannelDescriptors().length,
+   igdd.getConsumedPartitionType());
 
-   final SingleInputGate inputGate = new SingleInputGate(
+   SingleInputGate inputGate = new SingleInputGate(
owningTaskName,
-   consumedResultId,
-   consumedPartitionType,
-   consumedSubpartitionIndex,
-   icdd.length,
+   igdd.getConsumedResultId(),
+   igdd.getConsumedPartitionType(),
+   igdd.getConsumedSubpartitionIndex(),
+   igdd.getInputChannelDescriptors().length,
partitionProducerStateProvider,
isCreditBased,
-   createBufferPoolFactory(icdd.length, 
consumedPartitionType));
+   bufferPoolFactory);
+
+   createInputChannels(owningTaskName, igdd, inputGate, metrics);
+   return inputGate;
+   }
+
+   private void createInputChannels(
+   String owningTaskName,
+   InputGateDeploymentDescriptor 
inputGateDeploymentDescriptor,
+   SingleInputGate inputGate,
+   InputChannelMetrics metrics) {
+   ShuffleDescriptor[] inputChannelDescriptors =
+   
inputGateDeploymentDescriptor.getInputChannelDescriptors();
 
// Create the input channels. There is one input channel for 
each consumed partition.
-   final InputChannel[] inputChannels = new 
InputChannel[icdd.length];
+   InputChannel[] inputChannels = new 
InputChannel[inputChannelDescriptors.length];
 
-   int numLocalChannels = 0;
-   int numRemoteChannels = 0;
-   int numUnknownChannels = 0;
+   ChannelStatistics channelStatistics = new ChannelStatistics();
 
for (int i = 0; i < inputChannels.length; i++) {
-   final ResultPartitionID partitionId = 
icdd[i].getConsumedPartitionId();
-   final ResultPartitionLocation partitionLocation = 
icdd[i].getConsumedPartitionLocation();
-
-   if (partitionLocation.isLocal()) {
-   inputChannels[i] = new 
LocalInputChannel(inputGate, i, partitionId,
-   partitionManager,
-   taskEventPublisher,
-   partitionRequestInitialBackoff,
-   partitionRequestMaxBackoff,
-   metrics);
-
-   numLocalChannels++;
-   }
-   else if (partitionLocation.isRemote()) {
-   inputChannels[i] = new 
RemoteInputChannel(inputGate, i, partitionId,
-   partitionLocation.getConnectionId(),
-   connectionManager,
-   partitionRequestInitialBackoff,
-   partitionRequestMaxBackoff,
-   metrics,
-   networkBufferPool);
-
-   numRemoteChannels++;
-   }
-   else if (partitionLocation.isUnknown()) {
-   inputChannels[i] = new 
UnknownInputChannel(inputGate, i, 

[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-08-07 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r311644940
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -618,11 +734,14 @@ public void deploy() throws JobException {
attemptNumber, 
getAssignedResourceLocation()));
}
 
-   final TaskDeploymentDescriptor deployment = 
vertex.createDeploymentDescriptor(
-   attemptId,
-   slot,
-   taskRestore,
-   attemptNumber);
+   final TaskDeploymentDescriptor deployment = 
TaskDeploymentDescriptorFactory
 
 Review comment:
   https://issues.apache.org/jira/browse/FLINK-13640


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-08-07 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r311636751
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/PartitionShuffleDescriptor.java
 ##
 @@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Partition descriptor for {@link ShuffleMaster} to obtain {@link 
ShuffleDeploymentDescriptor}.
+ */
+public class PartitionShuffleDescriptor implements Serializable {
+
+   private static final long serialVersionUID = 6343547936086963705L;
+
+   /** The ID of the result this partition belongs to. */
+   @Nonnull
+   private final IntermediateDataSetID resultId;
+
+   /** The ID of the partition. */
+   @Nonnull
+   private final IntermediateResultPartitionID partitionId;
 
 Review comment:
   https://issues.apache.org/jira/browse/FLINK-13639


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-04 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r290387680
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 ##
 @@ -19,47 +19,107 @@
 package org.apache.flink.runtime.deployment;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
 
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
 
 /**
  * Tests for the {@link ResultPartitionDeploymentDescriptor}.
  */
 public class ResultPartitionDeploymentDescriptorTest {
+   private static final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
+
+   private static final IntermediateResultPartitionID partitionId = new 
IntermediateResultPartitionID();
+   private static final ExecutionAttemptID producerExecutionId = new 
ExecutionAttemptID();
+
+   private static final ResultPartitionType partitionType = 
ResultPartitionType.PIPELINED;
+   private static final int numberOfSubpartitions = 24;
+   private static final int connectionIndex = 10;
+
+   private static final PartitionDescriptor partitionDescriptor = new 
PartitionDescriptor(
+   resultId,
+   partitionId,
+   partitionType,
+   numberOfSubpartitions,
+   connectionIndex);
+
+   private static final ResultPartitionID resultPartitionID = new 
ResultPartitionID(partitionId, producerExecutionId);
+
+   private static final ResourceID producerLocation = new 
ResourceID("producerLocation");
+   private static final InetSocketAddress address = new 
InetSocketAddress("localhost", 1);
+   private static final ConnectionID connectionID = new 
ConnectionID(address, connectionIndex);
+
+   /**
+* Tests simple de/serialization with {@link UnknownShuffleDescriptor}.
+*/
+   @Test
+   public void testSerializationWithUnknownShuffleDescriptor() throws 
Exception {
+   ShuffleDescriptor shuffleDescriptor = new 
UnknownShuffleDescriptor(resultPartitionID);
+
+   ResultPartitionDeploymentDescriptor copy =
+   
createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor);
+
+   assertThat(copy.getShuffleDescriptor(), 
instanceOf(UnknownShuffleDescriptor.class));
+   UnknownShuffleDescriptor copySdd = (UnknownShuffleDescriptor) 
copy.getShuffleDescriptor();
 
 Review comment:
   leftover, will remove


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-04 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r290385492
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
 ##
 @@ -99,110 +96,161 @@ public SingleInputGate create(
@Nonnull InputGateDeploymentDescriptor igdd,
@Nonnull PartitionProducerStateProvider 
partitionProducerStateProvider,
@Nonnull InputChannelMetrics metrics) {
-   final IntermediateDataSetID consumedResultId = 
checkNotNull(igdd.getConsumedResultId());
-   final ResultPartitionType consumedPartitionType = 
checkNotNull(igdd.getConsumedPartitionType());
-
-   final int consumedSubpartitionIndex = 
igdd.getConsumedSubpartitionIndex();
-   checkArgument(consumedSubpartitionIndex >= 0);
-
-   final InputChannelDeploymentDescriptor[] icdd = 
checkNotNull(igdd.getInputChannelDeploymentDescriptors());
+   SupplierWithException 
bufferPoolFactory = createBufferPoolFactory(
+   networkBufferPool,
+   isCreditBased,
+   networkBuffersPerChannel,
+   floatingNetworkBuffersPerGate,
+   igdd.getInputChannelDescriptors().length,
+   igdd.getConsumedPartitionType());
 
-   final SingleInputGate inputGate = new SingleInputGate(
+   SingleInputGate inputGate = new SingleInputGate(
owningTaskName,
-   consumedResultId,
-   consumedPartitionType,
-   consumedSubpartitionIndex,
-   icdd.length,
+   igdd.getConsumedResultId(),
+   igdd.getConsumedPartitionType(),
+   igdd.getConsumedSubpartitionIndex(),
+   igdd.getInputChannelDescriptors().length,
partitionProducerStateProvider,
isCreditBased,
-   createBufferPoolFactory(icdd.length, 
consumedPartitionType));
+   bufferPoolFactory);
+
+   createInputChannels(owningTaskName, igdd, inputGate, metrics);
+   return inputGate;
+   }
+
+   private void createInputChannels(
+   String owningTaskName,
+   InputGateDeploymentDescriptor 
inputGateDeploymentDescriptor,
+   SingleInputGate inputGate,
+   InputChannelMetrics metrics) {
+   ShuffleDescriptor[] inputChannelDescriptors =
+   
inputGateDeploymentDescriptor.getInputChannelDescriptors();
 
// Create the input channels. There is one input channel for 
each consumed partition.
-   final InputChannel[] inputChannels = new 
InputChannel[icdd.length];
+   InputChannel[] inputChannels = new 
InputChannel[inputChannelDescriptors.length];
 
-   int numLocalChannels = 0;
-   int numRemoteChannels = 0;
-   int numUnknownChannels = 0;
+   ChannelStatistics channelStatistics = new ChannelStatistics();
 
for (int i = 0; i < inputChannels.length; i++) {
-   final ResultPartitionID partitionId = 
icdd[i].getConsumedPartitionId();
-   final ResultPartitionLocation partitionLocation = 
icdd[i].getConsumedPartitionLocation();
-
-   if (partitionLocation.isLocal()) {
-   inputChannels[i] = new 
LocalInputChannel(inputGate, i, partitionId,
-   partitionManager,
-   taskEventPublisher,
-   partitionRequestInitialBackoff,
-   partitionRequestMaxBackoff,
-   metrics);
-
-   numLocalChannels++;
-   }
-   else if (partitionLocation.isRemote()) {
-   inputChannels[i] = new 
RemoteInputChannel(inputGate, i, partitionId,
-   partitionLocation.getConnectionId(),
-   connectionManager,
-   partitionRequestInitialBackoff,
-   partitionRequestMaxBackoff,
-   metrics,
-   networkBufferPool);
-
-   numRemoteChannels++;
-   }
-   else if (partitionLocation.isUnknown()) {
-   inputChannels[i] = new 
UnknownInputChannel(inputGate, i, 

[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-04 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r290385179
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
 ##
 @@ -99,110 +96,161 @@ public SingleInputGate create(
@Nonnull InputGateDeploymentDescriptor igdd,
@Nonnull PartitionProducerStateProvider 
partitionProducerStateProvider,
@Nonnull InputChannelMetrics metrics) {
-   final IntermediateDataSetID consumedResultId = 
checkNotNull(igdd.getConsumedResultId());
-   final ResultPartitionType consumedPartitionType = 
checkNotNull(igdd.getConsumedPartitionType());
-
-   final int consumedSubpartitionIndex = 
igdd.getConsumedSubpartitionIndex();
-   checkArgument(consumedSubpartitionIndex >= 0);
-
-   final InputChannelDeploymentDescriptor[] icdd = 
checkNotNull(igdd.getInputChannelDeploymentDescriptors());
+   SupplierWithException 
bufferPoolFactory = createBufferPoolFactory(
+   networkBufferPool,
+   isCreditBased,
+   networkBuffersPerChannel,
+   floatingNetworkBuffersPerGate,
+   igdd.getInputChannelDescriptors().length,
+   igdd.getConsumedPartitionType());
 
-   final SingleInputGate inputGate = new SingleInputGate(
+   SingleInputGate inputGate = new SingleInputGate(
owningTaskName,
-   consumedResultId,
-   consumedPartitionType,
-   consumedSubpartitionIndex,
-   icdd.length,
+   igdd.getConsumedResultId(),
+   igdd.getConsumedPartitionType(),
+   igdd.getConsumedSubpartitionIndex(),
+   igdd.getInputChannelDescriptors().length,
partitionProducerStateProvider,
isCreditBased,
-   createBufferPoolFactory(icdd.length, 
consumedPartitionType));
+   bufferPoolFactory);
+
+   createInputChannels(owningTaskName, igdd, inputGate, metrics);
+   return inputGate;
+   }
+
+   private void createInputChannels(
+   String owningTaskName,
+   InputGateDeploymentDescriptor 
inputGateDeploymentDescriptor,
+   SingleInputGate inputGate,
+   InputChannelMetrics metrics) {
+   ShuffleDescriptor[] inputChannelDescriptors =
+   
inputGateDeploymentDescriptor.getInputChannelDescriptors();
 
// Create the input channels. There is one input channel for 
each consumed partition.
-   final InputChannel[] inputChannels = new 
InputChannel[icdd.length];
+   InputChannel[] inputChannels = new 
InputChannel[inputChannelDescriptors.length];
 
-   int numLocalChannels = 0;
-   int numRemoteChannels = 0;
-   int numUnknownChannels = 0;
+   ChannelStatistics channelStatistics = new ChannelStatistics();
 
for (int i = 0; i < inputChannels.length; i++) {
-   final ResultPartitionID partitionId = 
icdd[i].getConsumedPartitionId();
-   final ResultPartitionLocation partitionLocation = 
icdd[i].getConsumedPartitionLocation();
-
-   if (partitionLocation.isLocal()) {
-   inputChannels[i] = new 
LocalInputChannel(inputGate, i, partitionId,
-   partitionManager,
-   taskEventPublisher,
-   partitionRequestInitialBackoff,
-   partitionRequestMaxBackoff,
-   metrics);
-
-   numLocalChannels++;
-   }
-   else if (partitionLocation.isRemote()) {
-   inputChannels[i] = new 
RemoteInputChannel(inputGate, i, partitionId,
-   partitionLocation.getConnectionId(),
-   connectionManager,
-   partitionRequestInitialBackoff,
-   partitionRequestMaxBackoff,
-   metrics,
-   networkBufferPool);
-
-   numRemoteChannels++;
-   }
-   else if (partitionLocation.isUnknown()) {
-   inputChannels[i] = new 
UnknownInputChannel(inputGate, i, 

[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-04 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r290383005
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
 ##
 @@ -99,110 +96,161 @@ public SingleInputGate create(
@Nonnull InputGateDeploymentDescriptor igdd,
@Nonnull PartitionProducerStateProvider 
partitionProducerStateProvider,
@Nonnull InputChannelMetrics metrics) {
-   final IntermediateDataSetID consumedResultId = 
checkNotNull(igdd.getConsumedResultId());
-   final ResultPartitionType consumedPartitionType = 
checkNotNull(igdd.getConsumedPartitionType());
-
-   final int consumedSubpartitionIndex = 
igdd.getConsumedSubpartitionIndex();
-   checkArgument(consumedSubpartitionIndex >= 0);
-
-   final InputChannelDeploymentDescriptor[] icdd = 
checkNotNull(igdd.getInputChannelDeploymentDescriptors());
+   SupplierWithException 
bufferPoolFactory = createBufferPoolFactory(
+   networkBufferPool,
+   isCreditBased,
+   networkBuffersPerChannel,
+   floatingNetworkBuffersPerGate,
+   igdd.getInputChannelDescriptors().length,
+   igdd.getConsumedPartitionType());
 
-   final SingleInputGate inputGate = new SingleInputGate(
+   SingleInputGate inputGate = new SingleInputGate(
owningTaskName,
-   consumedResultId,
-   consumedPartitionType,
-   consumedSubpartitionIndex,
-   icdd.length,
+   igdd.getConsumedResultId(),
+   igdd.getConsumedPartitionType(),
+   igdd.getConsumedSubpartitionIndex(),
+   igdd.getInputChannelDescriptors().length,
partitionProducerStateProvider,
isCreditBased,
-   createBufferPoolFactory(icdd.length, 
consumedPartitionType));
+   bufferPoolFactory);
+
+   createInputChannels(owningTaskName, igdd, inputGate, metrics);
+   return inputGate;
+   }
+
+   private void createInputChannels(
+   String owningTaskName,
+   InputGateDeploymentDescriptor 
inputGateDeploymentDescriptor,
+   SingleInputGate inputGate,
+   InputChannelMetrics metrics) {
+   ShuffleDescriptor[] inputChannelDescriptors =
+   
inputGateDeploymentDescriptor.getInputChannelDescriptors();
 
// Create the input channels. There is one input channel for 
each consumed partition.
-   final InputChannel[] inputChannels = new 
InputChannel[icdd.length];
+   InputChannel[] inputChannels = new 
InputChannel[inputChannelDescriptors.length];
 
-   int numLocalChannels = 0;
-   int numRemoteChannels = 0;
-   int numUnknownChannels = 0;
+   ChannelStatistics channelStatistics = new ChannelStatistics();
 
for (int i = 0; i < inputChannels.length; i++) {
-   final ResultPartitionID partitionId = 
icdd[i].getConsumedPartitionId();
-   final ResultPartitionLocation partitionLocation = 
icdd[i].getConsumedPartitionLocation();
-
-   if (partitionLocation.isLocal()) {
-   inputChannels[i] = new 
LocalInputChannel(inputGate, i, partitionId,
-   partitionManager,
-   taskEventPublisher,
-   partitionRequestInitialBackoff,
-   partitionRequestMaxBackoff,
-   metrics);
-
-   numLocalChannels++;
-   }
-   else if (partitionLocation.isRemote()) {
-   inputChannels[i] = new 
RemoteInputChannel(inputGate, i, partitionId,
-   partitionLocation.getConnectionId(),
-   connectionManager,
-   partitionRequestInitialBackoff,
-   partitionRequestMaxBackoff,
-   metrics,
-   networkBufferPool);
-
-   numRemoteChannels++;
-   }
-   else if (partitionLocation.isUnknown()) {
-   inputChannels[i] = new 
UnknownInputChannel(inputGate, i, 

[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-04 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r290360264
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -618,11 +734,14 @@ public void deploy() throws JobException {
attemptNumber, 
getAssignedResourceLocation()));
}
 
-   final TaskDeploymentDescriptor deployment = 
vertex.createDeploymentDescriptor(
-   attemptId,
-   slot,
-   taskRestore,
-   attemptNumber);
+   final TaskDeploymentDescriptor deployment = 
TaskDeploymentDescriptorFactory
 
 Review comment:
   This was the original idea but some stuff is initialised after `Execution` 
construction so it needs more investigation as a followup.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-04 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r290358896
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -268,6 +280,14 @@ public LogicalSlot getAssignedResource() {
return assignedResource;
}
 
+   public Optional 
getProducedPartition(IntermediateResultPartitionID id) {
+   if (producedPartitions != null) {
 
 Review comment:
   true, leftover


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-04 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r290356202
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
 ##
 @@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Factory of {@link TaskDeploymentDescriptor} to deploy {@link 
org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
+ */
+public class TaskDeploymentDescriptorFactory {
+   private final ExecutionAttemptID executionId;
+   private final int attemptNumber;
+   private final MaybeOffloaded serializedJobInformation;
+   private final MaybeOffloaded taskInfo;
+   private final JobID jobID;
+   private final boolean allowLazyDeployment;
+   private final int subtaskIndex;
+   private final ExecutionEdge[][] inputEdges;
+
+   private TaskDeploymentDescriptorFactory(
+   ExecutionAttemptID executionId,
+   int attemptNumber,
+   MaybeOffloaded serializedJobInformation,
+   MaybeOffloaded taskInfo,
+   JobID jobID,
+   boolean allowLazyDeployment,
+   int subtaskIndex,
+   ExecutionEdge[][] inputEdges) {
+   this.executionId = executionId;
+   this.attemptNumber = attemptNumber;
+   this.serializedJobInformation = serializedJobInformation;
+   this.taskInfo = taskInfo;
+   this.jobID = jobID;
+   this.allowLazyDeployment = allowLazyDeployment;
+   this.subtaskIndex = subtaskIndex;
+   this.inputEdges = inputEdges;
+   }
+
+   public TaskDeploymentDescriptor createDeploymentDescriptor(
+   ResourceID location,
+   AllocationID allocationID,
+   int targetSlotNumber,
+   @Nullable JobManagerTaskRestore taskRestore,
+   Collection 
producedPartitions) {
+   return new TaskDeploymentDescriptor(
+   jobID,
+   serializedJobInformation,
+   taskInfo,
+   executionId,
+   allocationID,
+   subtaskIndex,
+ 

[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-04 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r290354332
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
 ##
 @@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Factory of {@link TaskDeploymentDescriptor} to deploy {@link 
org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
+ */
+public class TaskDeploymentDescriptorFactory {
+   private final ExecutionAttemptID executionId;
+   private final int attemptNumber;
+   private final MaybeOffloaded serializedJobInformation;
+   private final MaybeOffloaded taskInfo;
+   private final JobID jobID;
+   private final boolean allowLazyDeployment;
+   private final int subtaskIndex;
+   private final ExecutionEdge[][] inputEdges;
+
+   private TaskDeploymentDescriptorFactory(
+   ExecutionAttemptID executionId,
+   int attemptNumber,
+   MaybeOffloaded serializedJobInformation,
+   MaybeOffloaded taskInfo,
+   JobID jobID,
+   boolean allowLazyDeployment,
+   int subtaskIndex,
+   ExecutionEdge[][] inputEdges) {
+   this.executionId = executionId;
+   this.attemptNumber = attemptNumber;
+   this.serializedJobInformation = serializedJobInformation;
+   this.taskInfo = taskInfo;
+   this.jobID = jobID;
+   this.allowLazyDeployment = allowLazyDeployment;
+   this.subtaskIndex = subtaskIndex;
+   this.inputEdges = inputEdges;
+   }
+
+   public TaskDeploymentDescriptor createDeploymentDescriptor(
+   ResourceID location,
+   AllocationID allocationID,
+   int targetSlotNumber,
+   @Nullable JobManagerTaskRestore taskRestore,
+   Collection 
producedPartitions) {
+   return new TaskDeploymentDescriptor(
+   jobID,
+   serializedJobInformation,
+   taskInfo,
+   executionId,
+   allocationID,
+   subtaskIndex,
+ 

[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-03 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289818110
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
 ##
 @@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Intermediate result partition registry to use in {@link 
org.apache.flink.runtime.jobmaster.JobMaster}.
+ */
+public interface ShuffleMaster {
+   CompletableFuture registerPartitionWithProducer(
 
 Review comment:
   I will add class parameter description to class level javadoc, not sure 
method comment will add too much value to it. The method name already says 
basically what is happening, unless you think that some specific detail should 
be mentioned.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-03 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289811563
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ##
 @@ -390,10 +390,16 @@ public InputDependencyConstraint 
getInputDependencyConstraint() {
jobVertex.getInvokableClassName(),
jobVertex.getConfiguration());
 
-   taskInformationOrBlobKey = 
BlobWriter.serializeAndTryOffload(
-   taskInformation,
-   getJobId(),
-   blobWriter);
+   try {
+   taskInformationOrBlobKey = 
BlobWriter.serializeAndTryOffload(
+   taskInformation,
+   getJobId(),
+   blobWriter);
+   } catch (IOException e) {
+   throw new ExecutionGraphException(
 
 Review comment:
   True, just some leftover from another refactoring attempt.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-03 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289810369
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
 ##
 @@ -18,48 +18,56 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 
 import java.io.Serializable;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Contains information where to find a partition. The partition is defined by 
the
- * {@link IntermediateDataSetID} and the partition location is specified by
- * {@link InputChannelDeploymentDescriptor}.
+ * {@link IntermediateDataSetID} and the partition is specified by
+ * {@link ShuffleDescriptor}.
  */
 public class PartitionInfo implements Serializable {
 
private static final long serialVersionUID = 1724490660830968430L;
 
private final IntermediateDataSetID intermediateDataSetID;
-   private final InputChannelDeploymentDescriptor 
inputChannelDeploymentDescriptor;
 
-   public PartitionInfo(IntermediateDataSetID 
intermediateResultPartitionID, InputChannelDeploymentDescriptor 
inputChannelDeploymentDescriptor) {
-   this.intermediateDataSetID = 
Preconditions.checkNotNull(intermediateResultPartitionID);
-   this.inputChannelDeploymentDescriptor = 
Preconditions.checkNotNull(inputChannelDeploymentDescriptor);
+   private final ShuffleDescriptor shuffleDescriptor;
+
+   public PartitionInfo(
+   IntermediateDataSetID intermediateResultPartitionID,
+   ShuffleDescriptor shuffleDescriptor) {
+   this.intermediateDataSetID = intermediateResultPartitionID;
+   this.shuffleDescriptor = shuffleDescriptor;
}
 
public IntermediateDataSetID getIntermediateDataSetID() {
return intermediateDataSetID;
}
 
-   public InputChannelDeploymentDescriptor 
getInputChannelDeploymentDescriptor() {
-   return inputChannelDeploymentDescriptor;
+   public ShuffleDescriptor getShuffleDescriptor() {
+   return shuffleDescriptor;
}
 
-   // 

-
static PartitionInfo fromEdge(ExecutionEdge executionEdge) {
-   final InputChannelDeploymentDescriptor 
inputChannelDeploymentDescriptor = 
InputChannelDeploymentDescriptor.fromEdge(executionEdge);
-
-   Preconditions.checkState(
-   
!inputChannelDeploymentDescriptor.getConsumedPartitionLocation().isUnknown(),
-   "PartitionInfo contains an unknown partition 
location.");
+   IntermediateDataSetID intermediateDataSetID = 
executionEdge.getSource().getIntermediateResult().getId();
+   ShuffleDescriptor shuffleDescriptor = 
getKnownConsumedPartitionShuffleDescriptor(executionEdge);
+   return new PartitionInfo(intermediateDataSetID, 
shuffleDescriptor);
+   }
 
-   return new PartitionInfo(
-   
executionEdge.getSource().getIntermediateResult().getId(),
-   inputChannelDeploymentDescriptor);
+   private static ShuffleDescriptor 
getKnownConsumedPartitionShuffleDescriptor(ExecutionEdge edge) {
 
 Review comment:
   I think we can reuse it just by passing `allowLazyDeployment=true` to 
`getConsumedPartitionShuffleDescriptor`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-03 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289791224
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
 ##
 @@ -774,6 +687,51 @@ public void testRequestStackTraceSample() throws 
Exception {
}
}
 
+   private TaskDeploymentDescriptor createSender(NettyShuffleDescriptor 
sdd) throws IOException {
+   return createSender(sdd, 
TestingAbstractInvokables.Sender.class);
+   }
+
+   private TaskDeploymentDescriptor createSender(
+   NettyShuffleDescriptor shuffleDeploymentDescriptor,
+   Class abstractInvokable) 
throws IOException {
+   PartitionDescriptor partitionDescriptor = new 
PartitionDescriptor(
+   new IntermediateDataSetID(),
+   
shuffleDeploymentDescriptor.getResultPartitionID().getPartitionId(),
+   ResultPartitionType.PIPELINED,
+   1,
+   0);
+   ResultPartitionDeploymentDescriptor 
resultPartitionDeploymentDescriptor = new ResultPartitionDeploymentDescriptor(
+   partitionDescriptor,
+   shuffleDeploymentDescriptor,
+   1,
+   true);
+   return createTestTaskDeploymentDescriptor(
+   "Sender",
+   
shuffleDeploymentDescriptor.getResultPartitionID().getProducerId(),
+   abstractInvokable,
+   1,
+   
Collections.singletonList(resultPartitionDeploymentDescriptor),
+   Collections.emptyList());
+   }
+
+   private TaskDeploymentDescriptor createReceiver(
+   NettyShuffleDescriptor shuffleDescriptor,
+   ResourceID location) throws IOException {
 
 Review comment:
   some tests rely on it to be the same as in some other components outside of 
this method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-03 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289787348
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -235,9 +242,8 @@ public void testUpdateChannelBeforeRequest() throws 
Exception {
.buildUnknownAndSetToGate(inputGate);
 
// Update to a local channel and verify that no request is 
triggered
-   inputGate.updateInputChannel(new 
InputChannelDeploymentDescriptor(
-   unknown.partitionId,
-   ResultPartitionLocation.createLocal()));
+   ResourceID location = ResourceID.generate();
+   inputGate.updateInputChannel(location, 
createPartitionInfo(unknown.getPartitionId(), location));
 
 Review comment:
   I think the random one provides a better check for tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-03 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289787297
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -212,7 +217,9 @@ public void testBackwardsEventWithUninitializedChannel() 
throws Exception {
verify(taskEventDispatcher, 
times(1)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
 
// After the update, the pending event should be send to local 
channel
-   inputGate.updateInputChannel(new 
InputChannelDeploymentDescriptor(new 
ResultPartitionID(unknownPartitionId.getPartitionId(), 
unknownPartitionId.getProducerId()), ResultPartitionLocation.createLocal()));
+
+   ResourceID location = ResourceID.generate();
 
 Review comment:
   I think the random one provides a better check for tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-03 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289759171
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##
 @@ -324,39 +326,39 @@ public void 
setInputChannel(IntermediateResultPartitionID partitionId, InputChan
}
}
 
-   public void updateInputChannel(InputChannelDeploymentDescriptor icdd) 
throws IOException, InterruptedException {
+   public void updateInputChannel(
+   ResourceID localLocation,
+   PartitionInfo partitionInfo) throws IOException, 
InterruptedException {
synchronized (requestLock) {
if (closeFuture.isDone()) {
// There was a race with a task failure/cancel
return;
}
 
-   final IntermediateResultPartitionID partitionId = 
icdd.getConsumedPartitionId().getPartitionId();
+   ShuffleDescriptor shuffleDescriptor = 
partitionInfo.getShuffleDescriptor();
+   checkArgument(shuffleDescriptor instanceof 
NettyShuffleDescriptor,
 
 Review comment:
   True, I will put it under if condition, but `isLocalTo` is a detail of netty 
implementation and `PartitionInfo` belongs to general shuffle API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-03 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289754707
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
 ##
 @@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Factory of {@link TaskDeploymentDescriptor} to deploy {@link 
org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
+ */
+public class TaskDeploymentDescriptorFactory {
+   private final ExecutionAttemptID executionId;
+   private final int attemptNumber;
+   private final MaybeOffloaded serializedJobInformation;
+   private final MaybeOffloaded taskInfo;
+   private final JobID jobID;
+   private final boolean lazyScheduling;
+   private final int subtaskIndex;
+   private final ExecutionEdge[][] inputEdges;
+
+   private TaskDeploymentDescriptorFactory(
+   ExecutionAttemptID executionId,
+   int attemptNumber,
+   MaybeOffloaded serializedJobInformation,
+   MaybeOffloaded taskInfo,
+   JobID jobID,
+   boolean lazyScheduling,
+   int subtaskIndex,
+   ExecutionEdge[][] inputEdges) {
+   this.executionId = executionId;
+   this.attemptNumber = attemptNumber;
+   this.serializedJobInformation = serializedJobInformation;
+   this.taskInfo = taskInfo;
+   this.jobID = jobID;
+   this.lazyScheduling = lazyScheduling;
+   this.subtaskIndex = subtaskIndex;
+   this.inputEdges = inputEdges;
+   }
+
+   public TaskDeploymentDescriptor createDeploymentDescriptor(
+   ResourceID location,
+   AllocationID allocationID,
+   int targetSlotNumber,
+   @Nullable JobManagerTaskRestore taskRestore,
+   Collection 
producedPartitions) throws Exception {
+   return new TaskDeploymentDescriptor(
+   jobID,
+   serializedJobInformation,
+   taskInfo,
+   executionId,
+   allocationID,
+  

[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-03 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289753675
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Default {@link ShuffleMaster} for netty and local file based shuffle 
implementation.
+ */
+public enum NettyShuffleMaster implements 
ShuffleMaster {
+   INSTANCE;
+
+   @Override
+   public CompletableFuture 
registerPartitionWithProducer(
+   PartitionDescriptor partitionDescriptor,
+   ProducerDescriptor producerDescriptor) {
+   ResultPartitionID resultPartitionID = new ResultPartitionID(
+   partitionDescriptor.getPartitionId(),
+   producerDescriptor.getProducerExecutionId());
+   NettyShuffleDescriptor shuffleDeploymentDescriptor = new 
NettyShuffleDescriptor(
+   producerDescriptor.getProducerResourceId(),
+   createProducerLocation(producerDescriptor, 
partitionDescriptor.getConnectionIndex()),
+   resultPartitionID);
+   return 
CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
+   }
+
+   private static NettyShuffleDescriptor.PartitionLocation 
createProducerLocation(
+   ProducerDescriptor producerDescriptor,
+   int connectionIndex) {
+   return producerDescriptor.getDataPort() >= 0 ?
 
 Review comment:
   I agree that naming is confusing. It should be 
`PartitionConnectionInfo/NetworkPartitionConnectionInfo/LocalExecutionPartitionConnectionInfo`
 because it is not about local or remote channels.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-06-02 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289631850
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Factory of {@link TaskDeploymentDescriptor} to deploy {@link 
org.apache.flink.runtime.taskmanager.Task} from {@link Execution}.
+ */
+public class TaskDeploymentDescriptorFactory {
+   private final ExecutionAttemptID executionId;
+   private final int attemptNumber;
+   private final MaybeOffloaded serializedJobInformation;
+   private final MaybeOffloaded taskInfo;
+   private final JobID jobID;
+   private final boolean lazyScheduling;
+   private final int subtaskIndex;
+   private final ExecutionEdge[][] inputEdges;
+
+   private TaskDeploymentDescriptorFactory(
+   ExecutionAttemptID executionId,
+   int attemptNumber,
+   MaybeOffloaded serializedJobInformation,
+   MaybeOffloaded taskInfo,
+   JobID jobID,
+   boolean lazyScheduling,
+   int subtaskIndex,
+   ExecutionEdge[][] inputEdges) {
+   this.executionId = executionId;
+   this.attemptNumber = attemptNumber;
+   this.serializedJobInformation = serializedJobInformation;
+   this.taskInfo = taskInfo;
+   this.jobID = jobID;
+   this.lazyScheduling = lazyScheduling;
+   this.subtaskIndex = subtaskIndex;
+   this.inputEdges = inputEdges;
+   }
+
+   public TaskDeploymentDescriptor createDeploymentDescriptor(
+   LogicalSlot targetSlot,
+   @Nullable JobManagerTaskRestore taskRestore,
+   Collection 
producedPartitions) throws Exception {
+   return createDeploymentDescriptor(
+   targetSlot.getTaskManagerLocation().getResourceID(),
+   targetSlot.getAllocationId(),
+   

[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289427441
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -566,6 +619,67 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
}
}
 
+   @VisibleForTesting
+   CompletableFuture 
registerProducedPartitions(TaskManagerLocation location) {
+   assertRunningInJobMasterMainThread();
+
+   return FutureUtils.thenApplyAsyncIfNotDone(
+   registerProducedPartitions(vertex, location, attemptId),
+   
vertex.getExecutionGraph().getJobMasterMainThreadExecutor(),
+   producedPartitionsCache -> {
+   producedPartitions = producedPartitionsCache;
+   return this;
+   });
+   }
+
+   @VisibleForTesting
+   static CompletableFuture> registerProducedPartitions(
 
 Review comment:
   why not? it is also easier to test without Execution mocking.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289424609
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Default {@link ShuffleMaster} for netty and local file based shuffle 
implementation.
+ */
+public enum NettyShuffleMaster implements 
ShuffleMaster {
+   INSTANCE;
+
+   @Override
+   public CompletableFuture 
registerPartitionWithProducer(
+   PartitionDescriptor partitionDescriptor,
+   ProducerDescriptor producerDescriptor) {
+   ResultPartitionID resultPartitionID = new ResultPartitionID(
+   partitionDescriptor.getPartitionId(),
+   producerDescriptor.getProducerExecutionId());
+   NettyShuffleDescriptor shuffleDeploymentDescriptor = new 
NettyShuffleDescriptor(
+   producerDescriptor.getProducerResourceId(),
+   createProducerLocation(producerDescriptor, 
partitionDescriptor.getConnectionIndex()),
+   resultPartitionID);
+   return 
CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
+   }
+
+   private static NettyShuffleDescriptor.PartitionLocation 
createProducerLocation(
+   ProducerDescriptor producerDescriptor,
+   int connectionIndex) {
+   return producerDescriptor.getDataPort() >= 0 ?
+   
NettyShuffleDescriptor.RemotePartitionLocation.fromProducerDescriptor(producerDescriptor,
 connectionIndex) :
 
 Review comment:
   I think factory is more readable, the remote connection info is created only 
from the `ProducerDescriptor`. I refactored it a bit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289424609
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Default {@link ShuffleMaster} for netty and local file based shuffle 
implementation.
+ */
+public enum NettyShuffleMaster implements 
ShuffleMaster {
+   INSTANCE;
+
+   @Override
+   public CompletableFuture 
registerPartitionWithProducer(
+   PartitionDescriptor partitionDescriptor,
+   ProducerDescriptor producerDescriptor) {
+   ResultPartitionID resultPartitionID = new ResultPartitionID(
+   partitionDescriptor.getPartitionId(),
+   producerDescriptor.getProducerExecutionId());
+   NettyShuffleDescriptor shuffleDeploymentDescriptor = new 
NettyShuffleDescriptor(
+   producerDescriptor.getProducerResourceId(),
+   createProducerLocation(producerDescriptor, 
partitionDescriptor.getConnectionIndex()),
+   resultPartitionID);
+   return 
CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
+   }
+
+   private static NettyShuffleDescriptor.PartitionLocation 
createProducerLocation(
+   ProducerDescriptor producerDescriptor,
+   int connectionIndex) {
+   return producerDescriptor.getDataPort() >= 0 ?
+   
NettyShuffleDescriptor.RemotePartitionLocation.fromProducerDescriptor(producerDescriptor,
 connectionIndex) :
 
 Review comment:
   I think factory is more readable, the remote connection info is created only 
from the `ProducerDescriptor`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289420919
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Default {@link ShuffleMaster} for netty and local file based shuffle 
implementation.
+ */
+public enum NettyShuffleMaster implements 
ShuffleMaster {
+   INSTANCE;
+
+   @Override
+   public CompletableFuture 
registerPartitionWithProducer(
+   PartitionDescriptor partitionDescriptor,
+   ProducerDescriptor producerDescriptor) {
+   ResultPartitionID resultPartitionID = new ResultPartitionID(
+   partitionDescriptor.getPartitionId(),
+   producerDescriptor.getProducerExecutionId());
+   NettyShuffleDescriptor shuffleDeploymentDescriptor = new 
NettyShuffleDescriptor(
+   producerDescriptor.getProducerResourceId(),
+   createProducerLocation(producerDescriptor, 
partitionDescriptor.getConnectionIndex()),
+   resultPartitionID);
+   return 
CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
+   }
+
+   private static NettyShuffleDescriptor.PartitionLocation 
createProducerLocation(
 
 Review comment:
   the method does need any enum object fields.
   why not to make it static?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289420919
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Default {@link ShuffleMaster} for netty and local file based shuffle 
implementation.
+ */
+public enum NettyShuffleMaster implements 
ShuffleMaster {
+   INSTANCE;
+
+   @Override
+   public CompletableFuture 
registerPartitionWithProducer(
+   PartitionDescriptor partitionDescriptor,
+   ProducerDescriptor producerDescriptor) {
+   ResultPartitionID resultPartitionID = new ResultPartitionID(
+   partitionDescriptor.getPartitionId(),
+   producerDescriptor.getProducerExecutionId());
+   NettyShuffleDescriptor shuffleDeploymentDescriptor = new 
NettyShuffleDescriptor(
+   producerDescriptor.getProducerResourceId(),
+   createProducerLocation(producerDescriptor, 
partitionDescriptor.getConnectionIndex()),
+   resultPartitionID);
+   return 
CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
+   }
+
+   private static NettyShuffleDescriptor.PartitionLocation 
createProducerLocation(
 
 Review comment:
   why not to use static?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-31 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289415869
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
 ##
 @@ -54,24 +56,31 @@
 * The index of the consumed subpartition of each consumed partition. 
This index depends on the
 * {@link DistributionPattern} and the subtask indices of the producing 
and consuming task.
 */
+   @Nonnegative
private final int consumedSubpartitionIndex;
 
/** An input channel for each consumed subpartition. */
-   private final InputChannelDeploymentDescriptor[] inputChannels;
+   private final ShuffleDescriptor[] inputChannels;
+
+   /**
+* {@link ResourceID} of partition consume to identify its location.
+*
+* It can be used e.g. to compare with partition producer {@link 
ResourceID} in
+* {@link ProducerDescriptor} to determine producer/consumer 
co-location.
+*/
+   private final ResourceID consumerLocation;
 
 Review comment:
   actually I think it is better to rename it in `ProducerDescriptor & 
NettyShuffleDescriptor `


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-27 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r287799750
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java
 ##
 @@ -18,48 +18,64 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.runtime.shuffle.ShuffleDeploymentDescriptor;
 
+import javax.annotation.Nonnull;
 import java.io.Serializable;
 
 /**
  * Contains information where to find a partition. The partition is defined by 
the
- * {@link IntermediateDataSetID} and the partition location is specified by
- * {@link InputChannelDeploymentDescriptor}.
+ * {@link IntermediateDataSetID} and the partition is specified by
+ * {@link org.apache.flink.runtime.shuffle.ShuffleDeploymentDescriptor}.
  */
 public class PartitionInfo implements Serializable {
 
private static final long serialVersionUID = 1724490660830968430L;
 
+   @Nonnull
private final IntermediateDataSetID intermediateDataSetID;
 
 Review comment:
   At the moment, `IntermediateDataSetID` is kept once in 
`InputGateDeploymentDescriptor` for all channels/partitions as before. I 
suggest we keep it this way in this PR. Later we can consider one step further 
refactoring where `IntermediateDataSetID` is part of the full partition id.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-26 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r287611055
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 ##
 @@ -272,4 +272,18 @@ private InputGate createInputGate(
return gates[0];
}
}
+
+   private static ShuffleDeploymentDescriptor 
createLocalSdd(ResultPartitionID resultPartitionID, ResourceID location) {
 
 Review comment:
   we can also use `NettyShuffleDescriptorBuilder` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-26 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r287609955
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -300,6 +302,9 @@
// -- Fields that are only relevant for archived execution graphs 

private String jsonPlan;
 
+   /** Shuffle master to register partitions for task deployment. */
+   private final ShuffleMaster shuffleMaster = 
DefaultShuffleMaster.getInstance();
 
 Review comment:
   At the moment the default implementation is hardcoded anyways. I suggest we 
consider it when we introduce configuration and proper creation of shuffle 
components as one of final steps.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-26 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r287609890
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -566,6 +615,62 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
}
}
 
+   @VisibleForTesting
+   CompletableFuture 
registerProducedPartitions(TaskManagerLocation location) {
+   assertRunningInJobMasterMainThread();
+
+   return registerProducedPartitions(vertex, location, attemptId)
+   .thenApplyAsync(producedPartitionsCache -> {
+   producedPartitions = producedPartitionsCache;
+   return this;
+   }, 
vertex.getExecutionGraph().getJobMasterMainThreadExecutor());
+   }
+
+   @VisibleForTesting
+   static CompletableFuture>
+   registerProducedPartitions(
+   ExecutionVertex vertex,
+   TaskManagerLocation location,
+   ExecutionAttemptID attemptId) {
+
+   ProducerShuffleDescriptor producerShuffleDescriptor = 
ProducerShuffleDescriptor.create(
+   location, attemptId);
+
+   boolean lazyScheduling = 
vertex.getExecutionGraph().getScheduleMode().allowLazyDeployment();
+
+   Collection partitions = 
vertex.getProducedPartitions().values();
+   
Collection> 
partitionRegistrations =
+   new ArrayList<>(partitions.size());
+
+   for (IntermediateResultPartition partition : partitions) {
+   PartitionShuffleDescriptor partitionShuffleDescriptor = 
PartitionShuffleDescriptor.from(
+   partition, 
getPartitionMaxParallelism(partition));
+   
partitionRegistrations.add(vertex.getExecutionGraph().getShuffleMaster()
+   
.registerPartitionWithProducer(partitionShuffleDescriptor, 
producerShuffleDescriptor)
+   .thenApply(shuffleDescriptor -> new 
ResultPartitionDeploymentDescriptor(
+   partitionShuffleDescriptor, 
shuffleDescriptor, lazyScheduling)));
+   }
+
+   return 
FutureUtils.combineAll(partitionRegistrations).thenApply(rpdds -> {
+   Map producedPartitions =
+   new LinkedHashMap<>(partitions.size());
+   rpdds.forEach(rpdd -> 
producedPartitions.put(rpdd.getPartitionId(), rpdd));
+   return producedPartitions;
+   });
+   }
+
+   private static int 
getPartitionMaxParallelism(IntermediateResultPartition partition) {
+   // TODO consumers.isEmpty() only exists for test, currently 
there has to be exactly one consumer in real jobs!
 
 Review comment:
   This TODO existed before the PR, I suggest we tackle it separately.
   I created an issue for this 
https://issues.apache.org/jira/browse/FLINK-12628.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-24 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r287368246
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
 ##
 @@ -504,26 +441,11 @@ public void testUpdateTaskInputPartitionsFailure() 
throws Exception {
 */
@Test(timeout = 1L)
public void testLocalPartitionNotFound() throws Exception {
-   final ExecutionAttemptID eid = new ExecutionAttemptID();
-
-   final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
-   final ResultPartitionID partitionId = new ResultPartitionID();
-
-   final ResultPartitionLocation loc = 
ResultPartitionLocation.createLocal();
-
-   final InputChannelDeploymentDescriptor[] 
inputChannelDeploymentDescriptors =
-   new InputChannelDeploymentDescriptor[] {
-   new 
InputChannelDeploymentDescriptor(partitionId, loc)};
-
-   final InputGateDeploymentDescriptor 
inputGateDeploymentDescriptor =
-   new InputGateDeploymentDescriptor(resultId, 
ResultPartitionType.PIPELINED, 0, inputChannelDeploymentDescriptors);
-
-   final TaskDeploymentDescriptor tdd =
-   createTestTaskDeploymentDescriptor("Receiver",
-   eid,
-   Tasks.AgnosticReceiver.class,
-   1, Collections.emptyList(),
-   
Collections.singletonList(inputGateDeploymentDescriptor));
+   ResourceID producerLocation = new ResourceID("local");
+   DefaultShuffleDeploymentDescriptor sdd =
+   createSddWithLocalConnection(new 
IntermediateResultPartitionID(), producerLocation, 1);
+   TaskDeploymentDescriptor tdd = createReceiver(sdd, 
producerLocation);
 
 Review comment:
   it should result in identifying that the partition is located locally in 
SingleInputGateFactory on the consumer side, by comparing producer and consumer 
resource ids.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-24 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r287362965
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
 ##
 @@ -483,12 +419,13 @@ public void testUpdateTaskInputPartitionsFailure() 
throws Exception {
tmGateway.submitTask(tdd, env.getJobMasterId(), 
timeout).get();
taskRunningFuture.get();
 
+   ResourceID producerLocation = new ResourceID("local");
 
 Review comment:
   I think it would be better to use random `ResourceID.generate()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-24 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r287362924
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -517,21 +514,22 @@ public void testUpdateUnknownInputChannel() throws 
Exception {

assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()),
is(instanceOf((UnknownInputChannel.class;
 
+   ResourceID localLocation = new ResourceID("local");
 
 Review comment:
   I think it would be better to use random `ResourceID.generate()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-24 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r287362596
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 ##
 @@ -322,29 +325,24 @@ public void run() {
 */
@Test
public void testRequestBackoffConfiguration() throws Exception {
-   ResultPartitionID[] partitionIds = new ResultPartitionID[] {
-   new ResultPartitionID(),
-   new ResultPartitionID(),
-   new ResultPartitionID()
+   IntermediateResultPartitionID[] partitionIds = new 
IntermediateResultPartitionID[] {
+   new IntermediateResultPartitionID(),
+   new IntermediateResultPartitionID(),
+   new IntermediateResultPartitionID()
};
 
-   InputChannelDeploymentDescriptor[] channelDescs = new 
InputChannelDeploymentDescriptor[]{
+   ResourceID localLocation = new ResourceID("local");
+   ShuffleDeploymentDescriptor[] channelDescs = new 
ShuffleDeploymentDescriptor[]{
// Local
-   new InputChannelDeploymentDescriptor(
-   partitionIds[0],
-   ResultPartitionLocation.createLocal()),
+   createSddWithLocalConnection(partitionIds[0], 
localLocation, 1),
// Remote
-   new InputChannelDeploymentDescriptor(
-   partitionIds[1],
-   ResultPartitionLocation.createRemote(new 
ConnectionID(new InetSocketAddress("localhost", 5000), 0))),
+   createSddWithLocalConnection(partitionIds[1], new 
ResourceID("remote"), 5000),
 
 Review comment:
   refactored to `ShuffleDeploymentDescriptorBuilder`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-24 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r287359019
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
 ##
 @@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.shuffle.ShuffleDeploymentDescriptor;
+import org.apache.flink.runtime.shuffle.UnknownShuffleDeploymentDescriptor;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.CheckedSupplier;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Factory of {@link TaskDeploymentDescriptor} to deploy {@link Execution}.
+ */
+public class TaskDeploymentDescriptorFactory {
+   private final ExecutionAttemptID executionId;
+   private final int attemptNumber;
+
+   private final TaskDeploymentDescriptor.MaybeOffloaded 
serializedJobInformation;
+   private final CheckedSupplier, 
PermanentBlobKey>> taskInfoSupplier;
+   private final JobID jobID;
+   private final boolean lazyScheduling;
+   private final int subtaskIndex;
+   private final ExecutionEdge[][] inputEdges;
+
+   private TaskDeploymentDescriptorFactory(
+   ExecutionAttemptID executionId,
+   int attemptNumber,
+   TaskDeploymentDescriptor.MaybeOffloaded 
serializedJobInformation,
+   CheckedSupplier, 
PermanentBlobKey>> taskInfoSupplier,
+   JobID jobID,
+   boolean lazyScheduling,
+   int subtaskIndex,
+   ExecutionEdge[][] inputEdges) {
+
+   this.executionId = executionId;
+   this.attemptNumber = attemptNumber;
+   this.serializedJobInformation = serializedJobInformation;
+   this.taskInfoSupplier = taskInfoSupplier;
+   this.jobID = jobID;
+   this.lazyScheduling = lazyScheduling;
+   this.subtaskIndex = subtaskIndex;
+   this.inputEdges = inputEdges;
+   }
+
+   public static TaskDeploymentDescriptorFactory fromExecutionVertex(
+   ExecutionVertex executionVertex, ExecutionAttemptID 
executionId, int attemptNumber) {
+
+   ExecutionGraph executionGraph = 
executionVertex.getExecutionGraph();
+
+   return new TaskDeploymentDescriptorFactory(
+   executionId,
+   attemptNumber,
+   

[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-14 Thread GitBox
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r283747682
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/PartitionShuffleDescriptor.java
 ##
 @@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Partition descriptor for {@link ShuffleMaster} to obtain {@link 
ShuffleDeploymentDescriptor}. */
+public class PartitionShuffleDescriptor implements Serializable {
+
+   private static final long serialVersionUID = 6343547936086963705L;
+
+   /** The ID of the result this partition belongs to. */
+   @Nonnull
+   private final IntermediateDataSetID resultId;
+
+   /** The ID of the partition. */
+   @Nonnull
+   private final IntermediateResultPartitionID partitionId;
+
+   /** The type of the partition. */
+   @Nonnull
+   private final ResultPartitionType partitionType;
+
+   /** The number of subpartitions. */
+   private final int numberOfSubpartitions;
+
+   /** The maximum parallelism. */
+   private final int maxParallelism;
+
+   /** Connection index to identify this partition of intermediate result. 
*/
+   private final int connectionIndex;
+
+   public PartitionShuffleDescriptor(
+   @Nonnull IntermediateDataSetID resultId,
+   @Nonnull IntermediateResultPartitionID partitionId,
+   @Nonnull ResultPartitionType partitionType,
+   int numberOfSubpartitions,
+   int maxParallelism,
+   int connectionIndex) {
+
+   this.resultId = resultId;
+   this.partitionId = partitionId;
+   this.partitionType = partitionType;
+   this.connectionIndex = connectionIndex;
+
+   
KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
+   checkArgument(numberOfSubpartitions >= 1);
+   this.numberOfSubpartitions = numberOfSubpartitions;
+   this.maxParallelism = maxParallelism;
+   }
+
+   @Nonnull
+   public IntermediateDataSetID getResultId() {
+   return resultId;
+   }
+
+   @Nonnull
+   public IntermediateResultPartitionID getPartitionId() {
+   return partitionId;
+   }
+
+   @Nonnull
+   public ResultPartitionType getPartitionType() {
+   return partitionType;
+   }
+
+   public int getNumberOfSubpartitions() {
+   return numberOfSubpartitions;
+   }
+
+   public int getMaxParallelism() {
+   return maxParallelism;
+   }
+
+   public int getConnectionIndex() {
 
 Review comment:
   it could be potentially used outside of this package in other shuffle 
implementation. I would leave methods public here because it is basically part 
of shuffle API then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services