azagrebin commented on a change in pull request #7856: [FLINK-11776][coordination] Refactor to simplify the process of scheduleOrUpdateConsumers URL: https://github.com/apache/flink/pull/7856#discussion_r268177944
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ########## @@ -818,71 +800,46 @@ else if (numConsumers == 0) { consumerVertex.checkInputDependencyConstraints()) { scheduleConsumer(consumerVertex); } - - // double check to resolve race conditions - if (consumerVertex.getExecutionState() == RUNNING) { - consumerVertex.sendPartitionInfos(); - } } // ---------------------------------------------------------------- // Consumer is running => send update message now + // Consumer is deploying => cache the partition info which would be + // sent after switching to running // ---------------------------------------------------------------- - else { - if (consumerState == RUNNING) { - final LogicalSlot consumerSlot = consumer.getAssignedResource(); - - if (consumerSlot == null) { - // The consumer has been reset concurrently - continue; - } - - final TaskManagerLocation partitionTaskManagerLocation = partition.getProducer() - .getCurrentAssignedResource().getTaskManagerLocation(); - final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID(); - - final ResourceID consumerTaskManager = consumerSlot.getTaskManagerLocation().getResourceID(); - - final ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), attemptId); - - final ResultPartitionLocation partitionLocation; + else if (consumerState == DEPLOYING || consumerState == RUNNING) { + final LogicalSlot consumerSlot = consumer.getAssignedResource(); + if (consumerSlot == null) { + // The consumer has been reset concurrently + continue; + } - if (consumerTaskManager.equals(partitionTaskManager)) { - // Consuming task is deployed to the same instance as the partition => local - partitionLocation = ResultPartitionLocation.createLocal(); - } - else { - // Different instances => remote - final ConnectionID connectionId = new ConnectionID( - partitionTaskManagerLocation, - partition.getIntermediateResult().getConnectionIndex()); + final TaskManagerLocation partitionTaskManagerLocation = partition.getProducer() + .getCurrentAssignedResource().getTaskManagerLocation(); + final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID(); + final ResourceID consumerTaskManager = consumerSlot.getTaskManagerLocation().getResourceID(); - partitionLocation = ResultPartitionLocation.createRemote(connectionId); - } + final ResultPartitionLocation partitionLocation; + if (consumerTaskManager.equals(partitionTaskManager)) { + // Consuming task is deployed to the same instance as the partition => local + partitionLocation = ResultPartitionLocation.createLocal(); + } else { + // Different instances => remote + final ConnectionID connectionId = new ConnectionID( + partitionTaskManagerLocation, + partition.getIntermediateResult().getConnectionIndex()); + partitionLocation = ResultPartitionLocation.createRemote(connectionId); + } - final InputChannelDeploymentDescriptor descriptor = new InputChannelDeploymentDescriptor( - partitionId, partitionLocation); + final ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), attemptId); + final InputChannelDeploymentDescriptor descriptor = new InputChannelDeploymentDescriptor( + partitionId, partitionLocation); + final PartitionInfo partitionInfo = new PartitionInfo(partition.getIntermediateResult().getId(), descriptor); Review comment: all code, needed to create `PartitionInfo`, could be packed into e.g. `PartitionInfo .create(ExecutionEdge)` to simplify `scheduleOrUpdateConsumers`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services