zhuzhurk commented on a change in pull request #16687:
URL: https://github.com/apache/flink/pull/16687#discussion_r687479198



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -312,5 +344,58 @@ private void 
updateConstraintToExecutionSlotSharingGroupMap(
                 }
             }
         }
+
+        private Set<ExecutionSlotSharingGroup> getAvailableGroupsForJobVertex(
+                JobVertexID jobVertexId) {
+            return availableGroupsForJobVertex.computeIfAbsent(
+                    jobVertexId, ignore -> new LinkedHashSet<>());
+        }
+
+        private Set<ExecutionSlotSharingGroup> 
getAvailableGroupsForConsumedPartitionGroup(
+                JobVertexID consumerJobVertexId, ConsumedPartitionGroup 
consumedPartitionGroup) {
+            return 
availableProducerGroupsForConsumedPartitionGroup.computeIfAbsent(
+                    consumedPartitionGroup,
+                    group ->
+                            computeAllAvailableGroupsForConsumedPartitionGroup(
+                                    consumerJobVertexId, group));
+        }
+
+        private Set<ExecutionSlotSharingGroup> 
computeAllAvailableGroupsForConsumedPartitionGroup(
+                JobVertexID consumerJobVertexId, ConsumedPartitionGroup 
consumedPartitionGroup) {
+
+            // We tend to reserve the order of ExecutionSlotSharingGroups as 
they are traversed
+            // topologically
+            final Set<ExecutionSlotSharingGroup> slotSharingGroups = new 
LinkedHashSet<>();
+
+            JobVertexID producerJobVertexId =
+                    
topology.getResultPartition(consumedPartitionGroup.getFirst())
+                            .getProducer()
+                            .getId()
+                            .getJobVertexId();
+
+            // Check if the producer vertex and the consumer vertex are in the 
same SlotSharingGroup
+            if (inSameLogicalSlotSharingGroup(producerJobVertexId, 
consumerJobVertexId)) {
+
+                // Iterate over the producers of all partitions in the 
ConsumedPartitionGroup
+                for (IntermediateResultPartitionID consumedPartitionId : 
consumedPartitionGroup) {
+                    ExecutionVertexID producerVertexId =
+                            
topology.getResultPartition(consumedPartitionId).getProducer().getId();
+
+                    ExecutionSlotSharingGroup executionSlotSharingGroup =
+                            executionSlotSharingGroupMap.get(producerVertexId);
+                    checkNotNull(executionSlotSharingGroup);
+
+                    // Check if the ExecutionSlotSharingGroup of the producer 
is available for the
+                    // consumer
+                    if (getAvailableGroupsForJobVertex(consumerJobVertexId)

Review comment:
       In which case it can happen that the `executionSlotSharingGroup` is 
unavailable?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -94,12 +98,34 @@ public LocalInputPreferredSlotSharingStrategy create(
         private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
                 executionSlotSharingGroupMap;
 
-        final Map<CoLocationConstraint, ExecutionSlotSharingGroup>
+        private final Map<CoLocationConstraint, ExecutionSlotSharingGroup>
                 constraintToExecutionSlotSharingGroupMap;
 
-        final Map<SlotSharingGroupId, List<ExecutionSlotSharingGroup>> 
executionSlotSharingGroups;
+        private final Map<SlotSharingGroupId, List<ExecutionSlotSharingGroup>>
+                executionSlotSharingGroups;
 
-        private final Map<ExecutionSlotSharingGroup, Set<JobVertexID>> 
assignedJobVerticesForGroups;
+        /**
+         * A JobVertex only belongs to a {@link SlotSharingGroup}. A 
SlotSharingGroup is
+         * corresponding to a set of {@link ExecutionSlotSharingGroup}s. We 
can maintain available
+         * ExecutionSlotSharingGroups for each JobVertex.
+         *
+         * <p>Once an ExecutionSlotSharingGroup is created, it becomes 
available for all JobVertices
+         * in the corresponding SlotSharingGroup in the beginning.
+         *
+         * <p>Once a SchedulingExecutionVertex is added to the 
ExecutionSlotSharingGroup, the group
+         * is no longer available for other SchedulingExecutionVertices 
corresponding to the same
+         * JobVertex.
+         */
+        private final Map<JobVertexID, Set<ExecutionSlotSharingGroup>> 
availableGroupsForJobVertex;
+
+        /**
+         * Maintains the available {@link ExecutionSlotSharingGroup}s for 
every {@link
+         * ConsumedPartitionGroup}. The available groups are computed in {@link
+         * this#computeAllAvailableGroupsForConsumedPartitionGroup} when the 
ConsumedPartitionGroup
+         * is traversed for the first time.
+         */
+        private final Map<ConsumedPartitionGroup, 
Set<ExecutionSlotSharingGroup>>
+                availableProducerGroupsForConsumedPartitionGroup;

Review comment:
       maybe `availableSlotSharingGroupsForPartitionGroupConsumers`? 
   And explain a bit in the java doc what it means? Including that one 
`ConsumedPartitionGroup` only corresponds to one consumer job vertex. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to