azagrebin commented on a change in pull request #7856: 
[FLINK-11776][coordination] Refactor to simplify the process of 
scheduleOrUpdateConsumers
URL: https://github.com/apache/flink/pull/7856#discussion_r268177944
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##########
 @@ -818,71 +800,46 @@ else if (numConsumers == 0) {
                                                
consumerVertex.checkInputDependencyConstraints()) {
                                        scheduleConsumer(consumerVertex);
                                }
-
-                               // double check to resolve race conditions
-                               if (consumerVertex.getExecutionState() == 
RUNNING) {
-                                       consumerVertex.sendPartitionInfos();
-                               }
                        }
                        // 
----------------------------------------------------------------
                        // Consumer is running => send update message now
+                       // Consumer is deploying => cache the partition info 
which would be
+                       // sent after switching to running
                        // 
----------------------------------------------------------------
-                       else {
-                               if (consumerState == RUNNING) {
-                                       final LogicalSlot consumerSlot = 
consumer.getAssignedResource();
-
-                                       if (consumerSlot == null) {
-                                               // The consumer has been reset 
concurrently
-                                               continue;
-                                       }
-
-                                       final TaskManagerLocation 
partitionTaskManagerLocation = partition.getProducer()
-                                                       
.getCurrentAssignedResource().getTaskManagerLocation();
-                                       final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
-
-                                       final ResourceID consumerTaskManager = 
consumerSlot.getTaskManagerLocation().getResourceID();
-
-                                       final ResultPartitionID partitionId = 
new ResultPartitionID(partition.getPartitionId(), attemptId);
-
-                                       final ResultPartitionLocation 
partitionLocation;
+                       else if (consumerState == DEPLOYING || consumerState == 
RUNNING) {
+                               final LogicalSlot consumerSlot = 
consumer.getAssignedResource();
+                               if (consumerSlot == null) {
+                                       // The consumer has been reset 
concurrently
+                                       continue;
+                               }
 
-                                       if 
(consumerTaskManager.equals(partitionTaskManager)) {
-                                               // Consuming task is deployed 
to the same instance as the partition => local
-                                               partitionLocation = 
ResultPartitionLocation.createLocal();
-                                       }
-                                       else {
-                                               // Different instances => remote
-                                               final ConnectionID connectionId 
= new ConnectionID(
-                                                               
partitionTaskManagerLocation,
-                                                               
partition.getIntermediateResult().getConnectionIndex());
+                               final TaskManagerLocation 
partitionTaskManagerLocation = partition.getProducer()
+                                       
.getCurrentAssignedResource().getTaskManagerLocation();
+                               final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
+                               final ResourceID consumerTaskManager = 
consumerSlot.getTaskManagerLocation().getResourceID();
 
-                                               partitionLocation = 
ResultPartitionLocation.createRemote(connectionId);
-                                       }
+                               final ResultPartitionLocation partitionLocation;
+                               if 
(consumerTaskManager.equals(partitionTaskManager)) {
+                                       // Consuming task is deployed to the 
same instance as the partition => local
+                                       partitionLocation = 
ResultPartitionLocation.createLocal();
+                               } else {
+                                       // Different instances => remote
+                                       final ConnectionID connectionId = new 
ConnectionID(
+                                               partitionTaskManagerLocation,
+                                               
partition.getIntermediateResult().getConnectionIndex());
+                                       partitionLocation = 
ResultPartitionLocation.createRemote(connectionId);
+                               }
 
-                                       final InputChannelDeploymentDescriptor 
descriptor = new InputChannelDeploymentDescriptor(
-                                                       partitionId, 
partitionLocation);
+                               final ResultPartitionID partitionId = new 
ResultPartitionID(partition.getPartitionId(), attemptId);
+                               final InputChannelDeploymentDescriptor 
descriptor = new InputChannelDeploymentDescriptor(
+                                       partitionId, partitionLocation);
+                               final PartitionInfo partitionInfo = new 
PartitionInfo(partition.getIntermediateResult().getId(), descriptor);
 
 Review comment:
   all code, needed to create `PartitionInfo`, could be packed into e.g. 
`PartitionInfo .create(ExecutionEdge)` to simplify `scheduleOrUpdateConsumers`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to