[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r311648033 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java ## @@ -99,110 +96,161 @@ public SingleInputGate create( @Nonnull InputGateDeploymentDescriptor igdd, @Nonnull PartitionProducerStateProvider partitionProducerStateProvider, @Nonnull InputChannelMetrics metrics) { - final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId()); - final ResultPartitionType consumedPartitionType = checkNotNull(igdd.getConsumedPartitionType()); - - final int consumedSubpartitionIndex = igdd.getConsumedSubpartitionIndex(); - checkArgument(consumedSubpartitionIndex >= 0); - - final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors()); + SupplierWithException bufferPoolFactory = createBufferPoolFactory( + networkBufferPool, + isCreditBased, + networkBuffersPerChannel, + floatingNetworkBuffersPerGate, + igdd.getInputChannelDescriptors().length, + igdd.getConsumedPartitionType()); - final SingleInputGate inputGate = new SingleInputGate( + SingleInputGate inputGate = new SingleInputGate( owningTaskName, - consumedResultId, - consumedPartitionType, - consumedSubpartitionIndex, - icdd.length, + igdd.getConsumedResultId(), + igdd.getConsumedPartitionType(), + igdd.getConsumedSubpartitionIndex(), + igdd.getInputChannelDescriptors().length, partitionProducerStateProvider, isCreditBased, - createBufferPoolFactory(icdd.length, consumedPartitionType)); + bufferPoolFactory); + + createInputChannels(owningTaskName, igdd, inputGate, metrics); + return inputGate; + } + + private void createInputChannels( + String owningTaskName, + InputGateDeploymentDescriptor inputGateDeploymentDescriptor, + SingleInputGate inputGate, + InputChannelMetrics metrics) { + ShuffleDescriptor[] inputChannelDescriptors = + inputGateDeploymentDescriptor.getInputChannelDescriptors(); // Create the input channels. There is one input channel for each consumed partition. - final InputChannel[] inputChannels = new InputChannel[icdd.length]; + InputChannel[] inputChannels = new InputChannel[inputChannelDescriptors.length]; - int numLocalChannels = 0; - int numRemoteChannels = 0; - int numUnknownChannels = 0; + ChannelStatistics channelStatistics = new ChannelStatistics(); for (int i = 0; i < inputChannels.length; i++) { - final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId(); - final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation(); - - if (partitionLocation.isLocal()) { - inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId, - partitionManager, - taskEventPublisher, - partitionRequestInitialBackoff, - partitionRequestMaxBackoff, - metrics); - - numLocalChannels++; - } - else if (partitionLocation.isRemote()) { - inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId, - partitionLocation.getConnectionId(), - connectionManager, - partitionRequestInitialBackoff, - partitionRequestMaxBackoff, - metrics, - networkBufferPool); - - numRemoteChannels++; - } - else if (partitionLocation.isUnknown()) { - inputChannels[i] = new UnknownInputChannel(inputGate, i,
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r311644940 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -618,11 +734,14 @@ public void deploy() throws JobException { attemptNumber, getAssignedResourceLocation())); } - final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( - attemptId, - slot, - taskRestore, - attemptNumber); + final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory Review comment: https://issues.apache.org/jira/browse/FLINK-13640 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r311636751 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/PartitionShuffleDescriptor.java ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.runtime.executiongraph.ExecutionEdge; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnull; + +import java.io.Serializable; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Partition descriptor for {@link ShuffleMaster} to obtain {@link ShuffleDeploymentDescriptor}. + */ +public class PartitionShuffleDescriptor implements Serializable { + + private static final long serialVersionUID = 6343547936086963705L; + + /** The ID of the result this partition belongs to. */ + @Nonnull + private final IntermediateDataSetID resultId; + + /** The ID of the partition. */ + @Nonnull + private final IntermediateResultPartitionID partitionId; Review comment: https://issues.apache.org/jira/browse/FLINK-13639 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r290387680 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java ## @@ -19,47 +19,107 @@ package org.apache.flink.runtime.deployment; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.ConnectionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; +import org.apache.flink.runtime.shuffle.PartitionDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.net.InetSocketAddress; + +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; /** * Tests for the {@link ResultPartitionDeploymentDescriptor}. */ public class ResultPartitionDeploymentDescriptorTest { + private static final IntermediateDataSetID resultId = new IntermediateDataSetID(); + + private static final IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID(); + private static final ExecutionAttemptID producerExecutionId = new ExecutionAttemptID(); + + private static final ResultPartitionType partitionType = ResultPartitionType.PIPELINED; + private static final int numberOfSubpartitions = 24; + private static final int connectionIndex = 10; + + private static final PartitionDescriptor partitionDescriptor = new PartitionDescriptor( + resultId, + partitionId, + partitionType, + numberOfSubpartitions, + connectionIndex); + + private static final ResultPartitionID resultPartitionID = new ResultPartitionID(partitionId, producerExecutionId); + + private static final ResourceID producerLocation = new ResourceID("producerLocation"); + private static final InetSocketAddress address = new InetSocketAddress("localhost", 1); + private static final ConnectionID connectionID = new ConnectionID(address, connectionIndex); + + /** +* Tests simple de/serialization with {@link UnknownShuffleDescriptor}. +*/ + @Test + public void testSerializationWithUnknownShuffleDescriptor() throws Exception { + ShuffleDescriptor shuffleDescriptor = new UnknownShuffleDescriptor(resultPartitionID); + + ResultPartitionDeploymentDescriptor copy = + createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor); + + assertThat(copy.getShuffleDescriptor(), instanceOf(UnknownShuffleDescriptor.class)); + UnknownShuffleDescriptor copySdd = (UnknownShuffleDescriptor) copy.getShuffleDescriptor(); Review comment: leftover, will remove This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r290385492 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java ## @@ -99,110 +96,161 @@ public SingleInputGate create( @Nonnull InputGateDeploymentDescriptor igdd, @Nonnull PartitionProducerStateProvider partitionProducerStateProvider, @Nonnull InputChannelMetrics metrics) { - final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId()); - final ResultPartitionType consumedPartitionType = checkNotNull(igdd.getConsumedPartitionType()); - - final int consumedSubpartitionIndex = igdd.getConsumedSubpartitionIndex(); - checkArgument(consumedSubpartitionIndex >= 0); - - final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors()); + SupplierWithException bufferPoolFactory = createBufferPoolFactory( + networkBufferPool, + isCreditBased, + networkBuffersPerChannel, + floatingNetworkBuffersPerGate, + igdd.getInputChannelDescriptors().length, + igdd.getConsumedPartitionType()); - final SingleInputGate inputGate = new SingleInputGate( + SingleInputGate inputGate = new SingleInputGate( owningTaskName, - consumedResultId, - consumedPartitionType, - consumedSubpartitionIndex, - icdd.length, + igdd.getConsumedResultId(), + igdd.getConsumedPartitionType(), + igdd.getConsumedSubpartitionIndex(), + igdd.getInputChannelDescriptors().length, partitionProducerStateProvider, isCreditBased, - createBufferPoolFactory(icdd.length, consumedPartitionType)); + bufferPoolFactory); + + createInputChannels(owningTaskName, igdd, inputGate, metrics); + return inputGate; + } + + private void createInputChannels( + String owningTaskName, + InputGateDeploymentDescriptor inputGateDeploymentDescriptor, + SingleInputGate inputGate, + InputChannelMetrics metrics) { + ShuffleDescriptor[] inputChannelDescriptors = + inputGateDeploymentDescriptor.getInputChannelDescriptors(); // Create the input channels. There is one input channel for each consumed partition. - final InputChannel[] inputChannels = new InputChannel[icdd.length]; + InputChannel[] inputChannels = new InputChannel[inputChannelDescriptors.length]; - int numLocalChannels = 0; - int numRemoteChannels = 0; - int numUnknownChannels = 0; + ChannelStatistics channelStatistics = new ChannelStatistics(); for (int i = 0; i < inputChannels.length; i++) { - final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId(); - final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation(); - - if (partitionLocation.isLocal()) { - inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId, - partitionManager, - taskEventPublisher, - partitionRequestInitialBackoff, - partitionRequestMaxBackoff, - metrics); - - numLocalChannels++; - } - else if (partitionLocation.isRemote()) { - inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId, - partitionLocation.getConnectionId(), - connectionManager, - partitionRequestInitialBackoff, - partitionRequestMaxBackoff, - metrics, - networkBufferPool); - - numRemoteChannels++; - } - else if (partitionLocation.isUnknown()) { - inputChannels[i] = new UnknownInputChannel(inputGate, i,
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r290385179 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java ## @@ -99,110 +96,161 @@ public SingleInputGate create( @Nonnull InputGateDeploymentDescriptor igdd, @Nonnull PartitionProducerStateProvider partitionProducerStateProvider, @Nonnull InputChannelMetrics metrics) { - final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId()); - final ResultPartitionType consumedPartitionType = checkNotNull(igdd.getConsumedPartitionType()); - - final int consumedSubpartitionIndex = igdd.getConsumedSubpartitionIndex(); - checkArgument(consumedSubpartitionIndex >= 0); - - final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors()); + SupplierWithException bufferPoolFactory = createBufferPoolFactory( + networkBufferPool, + isCreditBased, + networkBuffersPerChannel, + floatingNetworkBuffersPerGate, + igdd.getInputChannelDescriptors().length, + igdd.getConsumedPartitionType()); - final SingleInputGate inputGate = new SingleInputGate( + SingleInputGate inputGate = new SingleInputGate( owningTaskName, - consumedResultId, - consumedPartitionType, - consumedSubpartitionIndex, - icdd.length, + igdd.getConsumedResultId(), + igdd.getConsumedPartitionType(), + igdd.getConsumedSubpartitionIndex(), + igdd.getInputChannelDescriptors().length, partitionProducerStateProvider, isCreditBased, - createBufferPoolFactory(icdd.length, consumedPartitionType)); + bufferPoolFactory); + + createInputChannels(owningTaskName, igdd, inputGate, metrics); + return inputGate; + } + + private void createInputChannels( + String owningTaskName, + InputGateDeploymentDescriptor inputGateDeploymentDescriptor, + SingleInputGate inputGate, + InputChannelMetrics metrics) { + ShuffleDescriptor[] inputChannelDescriptors = + inputGateDeploymentDescriptor.getInputChannelDescriptors(); // Create the input channels. There is one input channel for each consumed partition. - final InputChannel[] inputChannels = new InputChannel[icdd.length]; + InputChannel[] inputChannels = new InputChannel[inputChannelDescriptors.length]; - int numLocalChannels = 0; - int numRemoteChannels = 0; - int numUnknownChannels = 0; + ChannelStatistics channelStatistics = new ChannelStatistics(); for (int i = 0; i < inputChannels.length; i++) { - final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId(); - final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation(); - - if (partitionLocation.isLocal()) { - inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId, - partitionManager, - taskEventPublisher, - partitionRequestInitialBackoff, - partitionRequestMaxBackoff, - metrics); - - numLocalChannels++; - } - else if (partitionLocation.isRemote()) { - inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId, - partitionLocation.getConnectionId(), - connectionManager, - partitionRequestInitialBackoff, - partitionRequestMaxBackoff, - metrics, - networkBufferPool); - - numRemoteChannels++; - } - else if (partitionLocation.isUnknown()) { - inputChannels[i] = new UnknownInputChannel(inputGate, i,
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r290383005 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java ## @@ -99,110 +96,161 @@ public SingleInputGate create( @Nonnull InputGateDeploymentDescriptor igdd, @Nonnull PartitionProducerStateProvider partitionProducerStateProvider, @Nonnull InputChannelMetrics metrics) { - final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId()); - final ResultPartitionType consumedPartitionType = checkNotNull(igdd.getConsumedPartitionType()); - - final int consumedSubpartitionIndex = igdd.getConsumedSubpartitionIndex(); - checkArgument(consumedSubpartitionIndex >= 0); - - final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors()); + SupplierWithException bufferPoolFactory = createBufferPoolFactory( + networkBufferPool, + isCreditBased, + networkBuffersPerChannel, + floatingNetworkBuffersPerGate, + igdd.getInputChannelDescriptors().length, + igdd.getConsumedPartitionType()); - final SingleInputGate inputGate = new SingleInputGate( + SingleInputGate inputGate = new SingleInputGate( owningTaskName, - consumedResultId, - consumedPartitionType, - consumedSubpartitionIndex, - icdd.length, + igdd.getConsumedResultId(), + igdd.getConsumedPartitionType(), + igdd.getConsumedSubpartitionIndex(), + igdd.getInputChannelDescriptors().length, partitionProducerStateProvider, isCreditBased, - createBufferPoolFactory(icdd.length, consumedPartitionType)); + bufferPoolFactory); + + createInputChannels(owningTaskName, igdd, inputGate, metrics); + return inputGate; + } + + private void createInputChannels( + String owningTaskName, + InputGateDeploymentDescriptor inputGateDeploymentDescriptor, + SingleInputGate inputGate, + InputChannelMetrics metrics) { + ShuffleDescriptor[] inputChannelDescriptors = + inputGateDeploymentDescriptor.getInputChannelDescriptors(); // Create the input channels. There is one input channel for each consumed partition. - final InputChannel[] inputChannels = new InputChannel[icdd.length]; + InputChannel[] inputChannels = new InputChannel[inputChannelDescriptors.length]; - int numLocalChannels = 0; - int numRemoteChannels = 0; - int numUnknownChannels = 0; + ChannelStatistics channelStatistics = new ChannelStatistics(); for (int i = 0; i < inputChannels.length; i++) { - final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId(); - final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation(); - - if (partitionLocation.isLocal()) { - inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId, - partitionManager, - taskEventPublisher, - partitionRequestInitialBackoff, - partitionRequestMaxBackoff, - metrics); - - numLocalChannels++; - } - else if (partitionLocation.isRemote()) { - inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId, - partitionLocation.getConnectionId(), - connectionManager, - partitionRequestInitialBackoff, - partitionRequestMaxBackoff, - metrics, - networkBufferPool); - - numRemoteChannels++; - } - else if (partitionLocation.isUnknown()) { - inputChannels[i] = new UnknownInputChannel(inputGate, i,
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r290360264 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -618,11 +734,14 @@ public void deploy() throws JobException { attemptNumber, getAssignedResourceLocation())); } - final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( - attemptId, - slot, - taskRestore, - attemptNumber); + final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory Review comment: This was the original idea but some stuff is initialised after `Execution` construction so it needs more investigation as a followup. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r290358896 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -268,6 +280,14 @@ public LogicalSlot getAssignedResource() { return assignedResource; } + public Optional getProducedPartition(IntermediateResultPartitionID id) { + if (producedPartitions != null) { Review comment: true, leftover This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r290356202 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java ## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionEdge; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.executiongraph.JobInformation; +import org.apache.flink.runtime.executiongraph.TaskInformation; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor; +import org.apache.flink.types.Either; +import org.apache.flink.util.SerializedValue; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +/** + * Factory of {@link TaskDeploymentDescriptor} to deploy {@link org.apache.flink.runtime.taskmanager.Task} from {@link Execution}. + */ +public class TaskDeploymentDescriptorFactory { + private final ExecutionAttemptID executionId; + private final int attemptNumber; + private final MaybeOffloaded serializedJobInformation; + private final MaybeOffloaded taskInfo; + private final JobID jobID; + private final boolean allowLazyDeployment; + private final int subtaskIndex; + private final ExecutionEdge[][] inputEdges; + + private TaskDeploymentDescriptorFactory( + ExecutionAttemptID executionId, + int attemptNumber, + MaybeOffloaded serializedJobInformation, + MaybeOffloaded taskInfo, + JobID jobID, + boolean allowLazyDeployment, + int subtaskIndex, + ExecutionEdge[][] inputEdges) { + this.executionId = executionId; + this.attemptNumber = attemptNumber; + this.serializedJobInformation = serializedJobInformation; + this.taskInfo = taskInfo; + this.jobID = jobID; + this.allowLazyDeployment = allowLazyDeployment; + this.subtaskIndex = subtaskIndex; + this.inputEdges = inputEdges; + } + + public TaskDeploymentDescriptor createDeploymentDescriptor( + ResourceID location, + AllocationID allocationID, + int targetSlotNumber, + @Nullable JobManagerTaskRestore taskRestore, + Collection producedPartitions) { + return new TaskDeploymentDescriptor( + jobID, + serializedJobInformation, + taskInfo, + executionId, + allocationID, + subtaskIndex, +
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r290354332 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java ## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionEdge; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.executiongraph.JobInformation; +import org.apache.flink.runtime.executiongraph.TaskInformation; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor; +import org.apache.flink.types.Either; +import org.apache.flink.util.SerializedValue; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +/** + * Factory of {@link TaskDeploymentDescriptor} to deploy {@link org.apache.flink.runtime.taskmanager.Task} from {@link Execution}. + */ +public class TaskDeploymentDescriptorFactory { + private final ExecutionAttemptID executionId; + private final int attemptNumber; + private final MaybeOffloaded serializedJobInformation; + private final MaybeOffloaded taskInfo; + private final JobID jobID; + private final boolean allowLazyDeployment; + private final int subtaskIndex; + private final ExecutionEdge[][] inputEdges; + + private TaskDeploymentDescriptorFactory( + ExecutionAttemptID executionId, + int attemptNumber, + MaybeOffloaded serializedJobInformation, + MaybeOffloaded taskInfo, + JobID jobID, + boolean allowLazyDeployment, + int subtaskIndex, + ExecutionEdge[][] inputEdges) { + this.executionId = executionId; + this.attemptNumber = attemptNumber; + this.serializedJobInformation = serializedJobInformation; + this.taskInfo = taskInfo; + this.jobID = jobID; + this.allowLazyDeployment = allowLazyDeployment; + this.subtaskIndex = subtaskIndex; + this.inputEdges = inputEdges; + } + + public TaskDeploymentDescriptor createDeploymentDescriptor( + ResourceID location, + AllocationID allocationID, + int targetSlotNumber, + @Nullable JobManagerTaskRestore taskRestore, + Collection producedPartitions) { + return new TaskDeploymentDescriptor( + jobID, + serializedJobInformation, + taskInfo, + executionId, + allocationID, + subtaskIndex, +
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289818110 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java ## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import java.util.concurrent.CompletableFuture; + +/** + * Intermediate result partition registry to use in {@link org.apache.flink.runtime.jobmaster.JobMaster}. + */ +public interface ShuffleMaster { + CompletableFuture registerPartitionWithProducer( Review comment: I will add class parameter description to class level javadoc, not sure method comment will add too much value to it. The method name already says basically what is happening, unless you think that some specific detail should be mentioned. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289811563 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ## @@ -390,10 +390,16 @@ public InputDependencyConstraint getInputDependencyConstraint() { jobVertex.getInvokableClassName(), jobVertex.getConfiguration()); - taskInformationOrBlobKey = BlobWriter.serializeAndTryOffload( - taskInformation, - getJobId(), - blobWriter); + try { + taskInformationOrBlobKey = BlobWriter.serializeAndTryOffload( + taskInformation, + getJobId(), + blobWriter); + } catch (IOException e) { + throw new ExecutionGraphException( Review comment: True, just some leftover from another refactoring attempt. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289810369 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java ## @@ -18,48 +18,56 @@ package org.apache.flink.runtime.executiongraph; -import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.util.Preconditions; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import java.io.Serializable; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkState; /** * Contains information where to find a partition. The partition is defined by the - * {@link IntermediateDataSetID} and the partition location is specified by - * {@link InputChannelDeploymentDescriptor}. + * {@link IntermediateDataSetID} and the partition is specified by + * {@link ShuffleDescriptor}. */ public class PartitionInfo implements Serializable { private static final long serialVersionUID = 1724490660830968430L; private final IntermediateDataSetID intermediateDataSetID; - private final InputChannelDeploymentDescriptor inputChannelDeploymentDescriptor; - public PartitionInfo(IntermediateDataSetID intermediateResultPartitionID, InputChannelDeploymentDescriptor inputChannelDeploymentDescriptor) { - this.intermediateDataSetID = Preconditions.checkNotNull(intermediateResultPartitionID); - this.inputChannelDeploymentDescriptor = Preconditions.checkNotNull(inputChannelDeploymentDescriptor); + private final ShuffleDescriptor shuffleDescriptor; + + public PartitionInfo( + IntermediateDataSetID intermediateResultPartitionID, + ShuffleDescriptor shuffleDescriptor) { + this.intermediateDataSetID = intermediateResultPartitionID; + this.shuffleDescriptor = shuffleDescriptor; } public IntermediateDataSetID getIntermediateDataSetID() { return intermediateDataSetID; } - public InputChannelDeploymentDescriptor getInputChannelDeploymentDescriptor() { - return inputChannelDeploymentDescriptor; + public ShuffleDescriptor getShuffleDescriptor() { + return shuffleDescriptor; } - // - static PartitionInfo fromEdge(ExecutionEdge executionEdge) { - final InputChannelDeploymentDescriptor inputChannelDeploymentDescriptor = InputChannelDeploymentDescriptor.fromEdge(executionEdge); - - Preconditions.checkState( - !inputChannelDeploymentDescriptor.getConsumedPartitionLocation().isUnknown(), - "PartitionInfo contains an unknown partition location."); + IntermediateDataSetID intermediateDataSetID = executionEdge.getSource().getIntermediateResult().getId(); + ShuffleDescriptor shuffleDescriptor = getKnownConsumedPartitionShuffleDescriptor(executionEdge); + return new PartitionInfo(intermediateDataSetID, shuffleDescriptor); + } - return new PartitionInfo( - executionEdge.getSource().getIntermediateResult().getId(), - inputChannelDeploymentDescriptor); + private static ShuffleDescriptor getKnownConsumedPartitionShuffleDescriptor(ExecutionEdge edge) { Review comment: I think we can reuse it just by passing `allowLazyDeployment=true` to `getConsumedPartitionShuffleDescriptor`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289791224 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java ## @@ -774,6 +687,51 @@ public void testRequestStackTraceSample() throws Exception { } } + private TaskDeploymentDescriptor createSender(NettyShuffleDescriptor sdd) throws IOException { + return createSender(sdd, TestingAbstractInvokables.Sender.class); + } + + private TaskDeploymentDescriptor createSender( + NettyShuffleDescriptor shuffleDeploymentDescriptor, + Class abstractInvokable) throws IOException { + PartitionDescriptor partitionDescriptor = new PartitionDescriptor( + new IntermediateDataSetID(), + shuffleDeploymentDescriptor.getResultPartitionID().getPartitionId(), + ResultPartitionType.PIPELINED, + 1, + 0); + ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor = new ResultPartitionDeploymentDescriptor( + partitionDescriptor, + shuffleDeploymentDescriptor, + 1, + true); + return createTestTaskDeploymentDescriptor( + "Sender", + shuffleDeploymentDescriptor.getResultPartitionID().getProducerId(), + abstractInvokable, + 1, + Collections.singletonList(resultPartitionDeploymentDescriptor), + Collections.emptyList()); + } + + private TaskDeploymentDescriptor createReceiver( + NettyShuffleDescriptor shuffleDescriptor, + ResourceID location) throws IOException { Review comment: some tests rely on it to be the same as in some other components outside of this method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289787348 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ## @@ -235,9 +242,8 @@ public void testUpdateChannelBeforeRequest() throws Exception { .buildUnknownAndSetToGate(inputGate); // Update to a local channel and verify that no request is triggered - inputGate.updateInputChannel(new InputChannelDeploymentDescriptor( - unknown.partitionId, - ResultPartitionLocation.createLocal())); + ResourceID location = ResourceID.generate(); + inputGate.updateInputChannel(location, createPartitionInfo(unknown.getPartitionId(), location)); Review comment: I think the random one provides a better check for tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289787297 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ## @@ -212,7 +217,9 @@ public void testBackwardsEventWithUninitializedChannel() throws Exception { verify(taskEventDispatcher, times(1)).publish(any(ResultPartitionID.class), any(TaskEvent.class)); // After the update, the pending event should be send to local channel - inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(new ResultPartitionID(unknownPartitionId.getPartitionId(), unknownPartitionId.getProducerId()), ResultPartitionLocation.createLocal())); + + ResourceID location = ResourceID.generate(); Review comment: I think the random one provides a better check for tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289759171 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -324,39 +326,39 @@ public void setInputChannel(IntermediateResultPartitionID partitionId, InputChan } } - public void updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOException, InterruptedException { + public void updateInputChannel( + ResourceID localLocation, + PartitionInfo partitionInfo) throws IOException, InterruptedException { synchronized (requestLock) { if (closeFuture.isDone()) { // There was a race with a task failure/cancel return; } - final IntermediateResultPartitionID partitionId = icdd.getConsumedPartitionId().getPartitionId(); + ShuffleDescriptor shuffleDescriptor = partitionInfo.getShuffleDescriptor(); + checkArgument(shuffleDescriptor instanceof NettyShuffleDescriptor, Review comment: True, I will put it under if condition, but `isLocalTo` is a detail of netty implementation and `PartitionInfo` belongs to general shuffle API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289754707 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java ## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionEdge; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.executiongraph.JobInformation; +import org.apache.flink.runtime.executiongraph.TaskInformation; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor; +import org.apache.flink.types.Either; +import org.apache.flink.util.SerializedValue; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +/** + * Factory of {@link TaskDeploymentDescriptor} to deploy {@link org.apache.flink.runtime.taskmanager.Task} from {@link Execution}. + */ +public class TaskDeploymentDescriptorFactory { + private final ExecutionAttemptID executionId; + private final int attemptNumber; + private final MaybeOffloaded serializedJobInformation; + private final MaybeOffloaded taskInfo; + private final JobID jobID; + private final boolean lazyScheduling; + private final int subtaskIndex; + private final ExecutionEdge[][] inputEdges; + + private TaskDeploymentDescriptorFactory( + ExecutionAttemptID executionId, + int attemptNumber, + MaybeOffloaded serializedJobInformation, + MaybeOffloaded taskInfo, + JobID jobID, + boolean lazyScheduling, + int subtaskIndex, + ExecutionEdge[][] inputEdges) { + this.executionId = executionId; + this.attemptNumber = attemptNumber; + this.serializedJobInformation = serializedJobInformation; + this.taskInfo = taskInfo; + this.jobID = jobID; + this.lazyScheduling = lazyScheduling; + this.subtaskIndex = subtaskIndex; + this.inputEdges = inputEdges; + } + + public TaskDeploymentDescriptor createDeploymentDescriptor( + ResourceID location, + AllocationID allocationID, + int targetSlotNumber, + @Nullable JobManagerTaskRestore taskRestore, + Collection producedPartitions) throws Exception { + return new TaskDeploymentDescriptor( + jobID, + serializedJobInformation, + taskInfo, + executionId, + allocationID, +
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289753675 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; + +import java.util.concurrent.CompletableFuture; + +/** + * Default {@link ShuffleMaster} for netty and local file based shuffle implementation. + */ +public enum NettyShuffleMaster implements ShuffleMaster { + INSTANCE; + + @Override + public CompletableFuture registerPartitionWithProducer( + PartitionDescriptor partitionDescriptor, + ProducerDescriptor producerDescriptor) { + ResultPartitionID resultPartitionID = new ResultPartitionID( + partitionDescriptor.getPartitionId(), + producerDescriptor.getProducerExecutionId()); + NettyShuffleDescriptor shuffleDeploymentDescriptor = new NettyShuffleDescriptor( + producerDescriptor.getProducerResourceId(), + createProducerLocation(producerDescriptor, partitionDescriptor.getConnectionIndex()), + resultPartitionID); + return CompletableFuture.completedFuture(shuffleDeploymentDescriptor); + } + + private static NettyShuffleDescriptor.PartitionLocation createProducerLocation( + ProducerDescriptor producerDescriptor, + int connectionIndex) { + return producerDescriptor.getDataPort() >= 0 ? Review comment: I agree that naming is confusing. It should be `PartitionConnectionInfo/NetworkPartitionConnectionInfo/LocalExecutionPartitionConnectionInfo` because it is not about local or remote channels. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289631850 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor.MaybeOffloaded; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionEdge; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.executiongraph.JobInformation; +import org.apache.flink.runtime.executiongraph.TaskInformation; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor; +import org.apache.flink.types.Either; +import org.apache.flink.util.SerializedValue; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Factory of {@link TaskDeploymentDescriptor} to deploy {@link org.apache.flink.runtime.taskmanager.Task} from {@link Execution}. + */ +public class TaskDeploymentDescriptorFactory { + private final ExecutionAttemptID executionId; + private final int attemptNumber; + private final MaybeOffloaded serializedJobInformation; + private final MaybeOffloaded taskInfo; + private final JobID jobID; + private final boolean lazyScheduling; + private final int subtaskIndex; + private final ExecutionEdge[][] inputEdges; + + private TaskDeploymentDescriptorFactory( + ExecutionAttemptID executionId, + int attemptNumber, + MaybeOffloaded serializedJobInformation, + MaybeOffloaded taskInfo, + JobID jobID, + boolean lazyScheduling, + int subtaskIndex, + ExecutionEdge[][] inputEdges) { + this.executionId = executionId; + this.attemptNumber = attemptNumber; + this.serializedJobInformation = serializedJobInformation; + this.taskInfo = taskInfo; + this.jobID = jobID; + this.lazyScheduling = lazyScheduling; + this.subtaskIndex = subtaskIndex; + this.inputEdges = inputEdges; + } + + public TaskDeploymentDescriptor createDeploymentDescriptor( + LogicalSlot targetSlot, + @Nullable JobManagerTaskRestore taskRestore, + Collection producedPartitions) throws Exception { + return createDeploymentDescriptor( + targetSlot.getTaskManagerLocation().getResourceID(), + targetSlot.getAllocationId(), +
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289427441 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -566,6 +619,67 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { } } + @VisibleForTesting + CompletableFuture registerProducedPartitions(TaskManagerLocation location) { + assertRunningInJobMasterMainThread(); + + return FutureUtils.thenApplyAsyncIfNotDone( + registerProducedPartitions(vertex, location, attemptId), + vertex.getExecutionGraph().getJobMasterMainThreadExecutor(), + producedPartitionsCache -> { + producedPartitions = producedPartitionsCache; + return this; + }); + } + + @VisibleForTesting + static CompletableFuture> registerProducedPartitions( Review comment: why not? it is also easier to test without Execution mocking. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289424609 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; + +import java.util.concurrent.CompletableFuture; + +/** + * Default {@link ShuffleMaster} for netty and local file based shuffle implementation. + */ +public enum NettyShuffleMaster implements ShuffleMaster { + INSTANCE; + + @Override + public CompletableFuture registerPartitionWithProducer( + PartitionDescriptor partitionDescriptor, + ProducerDescriptor producerDescriptor) { + ResultPartitionID resultPartitionID = new ResultPartitionID( + partitionDescriptor.getPartitionId(), + producerDescriptor.getProducerExecutionId()); + NettyShuffleDescriptor shuffleDeploymentDescriptor = new NettyShuffleDescriptor( + producerDescriptor.getProducerResourceId(), + createProducerLocation(producerDescriptor, partitionDescriptor.getConnectionIndex()), + resultPartitionID); + return CompletableFuture.completedFuture(shuffleDeploymentDescriptor); + } + + private static NettyShuffleDescriptor.PartitionLocation createProducerLocation( + ProducerDescriptor producerDescriptor, + int connectionIndex) { + return producerDescriptor.getDataPort() >= 0 ? + NettyShuffleDescriptor.RemotePartitionLocation.fromProducerDescriptor(producerDescriptor, connectionIndex) : Review comment: I think factory is more readable, the remote connection info is created only from the `ProducerDescriptor`. I refactored it a bit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289424609 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; + +import java.util.concurrent.CompletableFuture; + +/** + * Default {@link ShuffleMaster} for netty and local file based shuffle implementation. + */ +public enum NettyShuffleMaster implements ShuffleMaster { + INSTANCE; + + @Override + public CompletableFuture registerPartitionWithProducer( + PartitionDescriptor partitionDescriptor, + ProducerDescriptor producerDescriptor) { + ResultPartitionID resultPartitionID = new ResultPartitionID( + partitionDescriptor.getPartitionId(), + producerDescriptor.getProducerExecutionId()); + NettyShuffleDescriptor shuffleDeploymentDescriptor = new NettyShuffleDescriptor( + producerDescriptor.getProducerResourceId(), + createProducerLocation(producerDescriptor, partitionDescriptor.getConnectionIndex()), + resultPartitionID); + return CompletableFuture.completedFuture(shuffleDeploymentDescriptor); + } + + private static NettyShuffleDescriptor.PartitionLocation createProducerLocation( + ProducerDescriptor producerDescriptor, + int connectionIndex) { + return producerDescriptor.getDataPort() >= 0 ? + NettyShuffleDescriptor.RemotePartitionLocation.fromProducerDescriptor(producerDescriptor, connectionIndex) : Review comment: I think factory is more readable, the remote connection info is created only from the `ProducerDescriptor`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289420919 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; + +import java.util.concurrent.CompletableFuture; + +/** + * Default {@link ShuffleMaster} for netty and local file based shuffle implementation. + */ +public enum NettyShuffleMaster implements ShuffleMaster { + INSTANCE; + + @Override + public CompletableFuture registerPartitionWithProducer( + PartitionDescriptor partitionDescriptor, + ProducerDescriptor producerDescriptor) { + ResultPartitionID resultPartitionID = new ResultPartitionID( + partitionDescriptor.getPartitionId(), + producerDescriptor.getProducerExecutionId()); + NettyShuffleDescriptor shuffleDeploymentDescriptor = new NettyShuffleDescriptor( + producerDescriptor.getProducerResourceId(), + createProducerLocation(producerDescriptor, partitionDescriptor.getConnectionIndex()), + resultPartitionID); + return CompletableFuture.completedFuture(shuffleDeploymentDescriptor); + } + + private static NettyShuffleDescriptor.PartitionLocation createProducerLocation( Review comment: the method does need any enum object fields. why not to make it static? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289420919 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; + +import java.util.concurrent.CompletableFuture; + +/** + * Default {@link ShuffleMaster} for netty and local file based shuffle implementation. + */ +public enum NettyShuffleMaster implements ShuffleMaster { + INSTANCE; + + @Override + public CompletableFuture registerPartitionWithProducer( + PartitionDescriptor partitionDescriptor, + ProducerDescriptor producerDescriptor) { + ResultPartitionID resultPartitionID = new ResultPartitionID( + partitionDescriptor.getPartitionId(), + producerDescriptor.getProducerExecutionId()); + NettyShuffleDescriptor shuffleDeploymentDescriptor = new NettyShuffleDescriptor( + producerDescriptor.getProducerResourceId(), + createProducerLocation(producerDescriptor, partitionDescriptor.getConnectionIndex()), + resultPartitionID); + return CompletableFuture.completedFuture(shuffleDeploymentDescriptor); + } + + private static NettyShuffleDescriptor.PartitionLocation createProducerLocation( Review comment: why not to use static? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r289415869 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java ## @@ -54,24 +56,31 @@ * The index of the consumed subpartition of each consumed partition. This index depends on the * {@link DistributionPattern} and the subtask indices of the producing and consuming task. */ + @Nonnegative private final int consumedSubpartitionIndex; /** An input channel for each consumed subpartition. */ - private final InputChannelDeploymentDescriptor[] inputChannels; + private final ShuffleDescriptor[] inputChannels; + + /** +* {@link ResourceID} of partition consume to identify its location. +* +* It can be used e.g. to compare with partition producer {@link ResourceID} in +* {@link ProducerDescriptor} to determine producer/consumer co-location. +*/ + private final ResourceID consumerLocation; Review comment: actually I think it is better to rename it in `ProducerDescriptor & NettyShuffleDescriptor ` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287799750 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/PartitionInfo.java ## @@ -18,48 +18,64 @@ package org.apache.flink.runtime.executiongraph; -import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.util.Preconditions; +import org.apache.flink.runtime.shuffle.ShuffleDeploymentDescriptor; +import javax.annotation.Nonnull; import java.io.Serializable; /** * Contains information where to find a partition. The partition is defined by the - * {@link IntermediateDataSetID} and the partition location is specified by - * {@link InputChannelDeploymentDescriptor}. + * {@link IntermediateDataSetID} and the partition is specified by + * {@link org.apache.flink.runtime.shuffle.ShuffleDeploymentDescriptor}. */ public class PartitionInfo implements Serializable { private static final long serialVersionUID = 1724490660830968430L; + @Nonnull private final IntermediateDataSetID intermediateDataSetID; Review comment: At the moment, `IntermediateDataSetID` is kept once in `InputGateDeploymentDescriptor` for all channels/partitions as before. I suggest we keep it this way in this PR. Later we can consider one step further refactoring where `IntermediateDataSetID` is part of the full partition id. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287611055 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java ## @@ -272,4 +272,18 @@ private InputGate createInputGate( return gates[0]; } } + + private static ShuffleDeploymentDescriptor createLocalSdd(ResultPartitionID resultPartitionID, ResourceID location) { Review comment: we can also use `NettyShuffleDescriptorBuilder` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287609955 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -300,6 +302,9 @@ // -- Fields that are only relevant for archived execution graphs private String jsonPlan; + /** Shuffle master to register partitions for task deployment. */ + private final ShuffleMaster shuffleMaster = DefaultShuffleMaster.getInstance(); Review comment: At the moment the default implementation is hardcoded anyways. I suggest we consider it when we introduce configuration and proper creation of shuffle components as one of final steps. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287609890 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -566,6 +615,62 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { } } + @VisibleForTesting + CompletableFuture registerProducedPartitions(TaskManagerLocation location) { + assertRunningInJobMasterMainThread(); + + return registerProducedPartitions(vertex, location, attemptId) + .thenApplyAsync(producedPartitionsCache -> { + producedPartitions = producedPartitionsCache; + return this; + }, vertex.getExecutionGraph().getJobMasterMainThreadExecutor()); + } + + @VisibleForTesting + static CompletableFuture> + registerProducedPartitions( + ExecutionVertex vertex, + TaskManagerLocation location, + ExecutionAttemptID attemptId) { + + ProducerShuffleDescriptor producerShuffleDescriptor = ProducerShuffleDescriptor.create( + location, attemptId); + + boolean lazyScheduling = vertex.getExecutionGraph().getScheduleMode().allowLazyDeployment(); + + Collection partitions = vertex.getProducedPartitions().values(); + Collection> partitionRegistrations = + new ArrayList<>(partitions.size()); + + for (IntermediateResultPartition partition : partitions) { + PartitionShuffleDescriptor partitionShuffleDescriptor = PartitionShuffleDescriptor.from( + partition, getPartitionMaxParallelism(partition)); + partitionRegistrations.add(vertex.getExecutionGraph().getShuffleMaster() + .registerPartitionWithProducer(partitionShuffleDescriptor, producerShuffleDescriptor) + .thenApply(shuffleDescriptor -> new ResultPartitionDeploymentDescriptor( + partitionShuffleDescriptor, shuffleDescriptor, lazyScheduling))); + } + + return FutureUtils.combineAll(partitionRegistrations).thenApply(rpdds -> { + Map producedPartitions = + new LinkedHashMap<>(partitions.size()); + rpdds.forEach(rpdd -> producedPartitions.put(rpdd.getPartitionId(), rpdd)); + return producedPartitions; + }); + } + + private static int getPartitionMaxParallelism(IntermediateResultPartition partition) { + // TODO consumers.isEmpty() only exists for test, currently there has to be exactly one consumer in real jobs! Review comment: This TODO existed before the PR, I suggest we tackle it separately. I created an issue for this https://issues.apache.org/jira/browse/FLINK-12628. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287368246 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java ## @@ -504,26 +441,11 @@ public void testUpdateTaskInputPartitionsFailure() throws Exception { */ @Test(timeout = 1L) public void testLocalPartitionNotFound() throws Exception { - final ExecutionAttemptID eid = new ExecutionAttemptID(); - - final IntermediateDataSetID resultId = new IntermediateDataSetID(); - final ResultPartitionID partitionId = new ResultPartitionID(); - - final ResultPartitionLocation loc = ResultPartitionLocation.createLocal(); - - final InputChannelDeploymentDescriptor[] inputChannelDeploymentDescriptors = - new InputChannelDeploymentDescriptor[] { - new InputChannelDeploymentDescriptor(partitionId, loc)}; - - final InputGateDeploymentDescriptor inputGateDeploymentDescriptor = - new InputGateDeploymentDescriptor(resultId, ResultPartitionType.PIPELINED, 0, inputChannelDeploymentDescriptors); - - final TaskDeploymentDescriptor tdd = - createTestTaskDeploymentDescriptor("Receiver", - eid, - Tasks.AgnosticReceiver.class, - 1, Collections.emptyList(), - Collections.singletonList(inputGateDeploymentDescriptor)); + ResourceID producerLocation = new ResourceID("local"); + DefaultShuffleDeploymentDescriptor sdd = + createSddWithLocalConnection(new IntermediateResultPartitionID(), producerLocation, 1); + TaskDeploymentDescriptor tdd = createReceiver(sdd, producerLocation); Review comment: it should result in identifying that the partition is located locally in SingleInputGateFactory on the consumer side, by comparing producer and consumer resource ids. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287362965 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java ## @@ -483,12 +419,13 @@ public void testUpdateTaskInputPartitionsFailure() throws Exception { tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get(); taskRunningFuture.get(); + ResourceID producerLocation = new ResourceID("local"); Review comment: I think it would be better to use random `ResourceID.generate()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287362924 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ## @@ -517,21 +514,22 @@ public void testUpdateUnknownInputChannel() throws Exception { assertThat(inputGate.getInputChannels().get(localResultPartitionId.getPartitionId()), is(instanceOf((UnknownInputChannel.class; + ResourceID localLocation = new ResourceID("local"); Review comment: I think it would be better to use random `ResourceID.generate()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287362596 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ## @@ -322,29 +325,24 @@ public void run() { */ @Test public void testRequestBackoffConfiguration() throws Exception { - ResultPartitionID[] partitionIds = new ResultPartitionID[] { - new ResultPartitionID(), - new ResultPartitionID(), - new ResultPartitionID() + IntermediateResultPartitionID[] partitionIds = new IntermediateResultPartitionID[] { + new IntermediateResultPartitionID(), + new IntermediateResultPartitionID(), + new IntermediateResultPartitionID() }; - InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{ + ResourceID localLocation = new ResourceID("local"); + ShuffleDeploymentDescriptor[] channelDescs = new ShuffleDeploymentDescriptor[]{ // Local - new InputChannelDeploymentDescriptor( - partitionIds[0], - ResultPartitionLocation.createLocal()), + createSddWithLocalConnection(partitionIds[0], localLocation, 1), // Remote - new InputChannelDeploymentDescriptor( - partitionIds[1], - ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress("localhost", 5000), 0))), + createSddWithLocalConnection(partitionIds[1], new ResourceID("remote"), 5000), Review comment: refactored to `ShuffleDeploymentDescriptorBuilder` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r287359019 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java ## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionEdge; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.executiongraph.JobInformation; +import org.apache.flink.runtime.executiongraph.TaskInformation; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.shuffle.ShuffleDeploymentDescriptor; +import org.apache.flink.runtime.shuffle.UnknownShuffleDeploymentDescriptor; +import org.apache.flink.types.Either; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.function.CheckedSupplier; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * Factory of {@link TaskDeploymentDescriptor} to deploy {@link Execution}. + */ +public class TaskDeploymentDescriptorFactory { + private final ExecutionAttemptID executionId; + private final int attemptNumber; + + private final TaskDeploymentDescriptor.MaybeOffloaded serializedJobInformation; + private final CheckedSupplier, PermanentBlobKey>> taskInfoSupplier; + private final JobID jobID; + private final boolean lazyScheduling; + private final int subtaskIndex; + private final ExecutionEdge[][] inputEdges; + + private TaskDeploymentDescriptorFactory( + ExecutionAttemptID executionId, + int attemptNumber, + TaskDeploymentDescriptor.MaybeOffloaded serializedJobInformation, + CheckedSupplier, PermanentBlobKey>> taskInfoSupplier, + JobID jobID, + boolean lazyScheduling, + int subtaskIndex, + ExecutionEdge[][] inputEdges) { + + this.executionId = executionId; + this.attemptNumber = attemptNumber; + this.serializedJobInformation = serializedJobInformation; + this.taskInfoSupplier = taskInfoSupplier; + this.jobID = jobID; + this.lazyScheduling = lazyScheduling; + this.subtaskIndex = subtaskIndex; + this.inputEdges = inputEdges; + } + + public static TaskDeploymentDescriptorFactory fromExecutionVertex( + ExecutionVertex executionVertex, ExecutionAttemptID executionId, int attemptNumber) { + + ExecutionGraph executionGraph = executionVertex.getExecutionGraph(); + + return new TaskDeploymentDescriptorFactory( + executionId, + attemptNumber, +
[GitHub] [flink] azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface
azagrebin commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface URL: https://github.com/apache/flink/pull/8362#discussion_r283747682 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/PartitionShuffleDescriptor.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnull; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Partition descriptor for {@link ShuffleMaster} to obtain {@link ShuffleDeploymentDescriptor}. */ +public class PartitionShuffleDescriptor implements Serializable { + + private static final long serialVersionUID = 6343547936086963705L; + + /** The ID of the result this partition belongs to. */ + @Nonnull + private final IntermediateDataSetID resultId; + + /** The ID of the partition. */ + @Nonnull + private final IntermediateResultPartitionID partitionId; + + /** The type of the partition. */ + @Nonnull + private final ResultPartitionType partitionType; + + /** The number of subpartitions. */ + private final int numberOfSubpartitions; + + /** The maximum parallelism. */ + private final int maxParallelism; + + /** Connection index to identify this partition of intermediate result. */ + private final int connectionIndex; + + public PartitionShuffleDescriptor( + @Nonnull IntermediateDataSetID resultId, + @Nonnull IntermediateResultPartitionID partitionId, + @Nonnull ResultPartitionType partitionType, + int numberOfSubpartitions, + int maxParallelism, + int connectionIndex) { + + this.resultId = resultId; + this.partitionId = partitionId; + this.partitionType = partitionType; + this.connectionIndex = connectionIndex; + + KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism); + checkArgument(numberOfSubpartitions >= 1); + this.numberOfSubpartitions = numberOfSubpartitions; + this.maxParallelism = maxParallelism; + } + + @Nonnull + public IntermediateDataSetID getResultId() { + return resultId; + } + + @Nonnull + public IntermediateResultPartitionID getPartitionId() { + return partitionId; + } + + @Nonnull + public ResultPartitionType getPartitionType() { + return partitionType; + } + + public int getNumberOfSubpartitions() { + return numberOfSubpartitions; + } + + public int getMaxParallelism() { + return maxParallelism; + } + + public int getConnectionIndex() { Review comment: it could be potentially used outside of this package in other shuffle implementation. I would leave methods public here because it is basically part of shuffle API then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services