zhuzhurk commented on a change in pull request #16687:
URL: https://github.com/apache/flink/pull/16687#discussion_r687479198
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -312,5 +344,58 @@ private void
updateConstraintToExecutionSlotSharingGroupMap(
}
}
}
+
+ private Set<ExecutionSlotSharingGroup> getAvailableGroupsForJobVertex(
+ JobVertexID jobVertexId) {
+ return availableGroupsForJobVertex.computeIfAbsent(
+ jobVertexId, ignore -> new LinkedHashSet<>());
+ }
+
+ private Set<ExecutionSlotSharingGroup>
getAvailableGroupsForConsumedPartitionGroup(
+ JobVertexID consumerJobVertexId, ConsumedPartitionGroup
consumedPartitionGroup) {
+ return
availableProducerGroupsForConsumedPartitionGroup.computeIfAbsent(
+ consumedPartitionGroup,
+ group ->
+ computeAllAvailableGroupsForConsumedPartitionGroup(
+ consumerJobVertexId, group));
+ }
+
+ private Set<ExecutionSlotSharingGroup>
computeAllAvailableGroupsForConsumedPartitionGroup(
+ JobVertexID consumerJobVertexId, ConsumedPartitionGroup
consumedPartitionGroup) {
+
+ // We tend to reserve the order of ExecutionSlotSharingGroups as
they are traversed
+ // topologically
+ final Set<ExecutionSlotSharingGroup> slotSharingGroups = new
LinkedHashSet<>();
+
+ JobVertexID producerJobVertexId =
+
topology.getResultPartition(consumedPartitionGroup.getFirst())
+ .getProducer()
+ .getId()
+ .getJobVertexId();
+
+ // Check if the producer vertex and the consumer vertex are in the
same SlotSharingGroup
+ if (inSameLogicalSlotSharingGroup(producerJobVertexId,
consumerJobVertexId)) {
+
+ // Iterate over the producers of all partitions in the
ConsumedPartitionGroup
+ for (IntermediateResultPartitionID consumedPartitionId :
consumedPartitionGroup) {
+ ExecutionVertexID producerVertexId =
+
topology.getResultPartition(consumedPartitionId).getProducer().getId();
+
+ ExecutionSlotSharingGroup executionSlotSharingGroup =
+ executionSlotSharingGroupMap.get(producerVertexId);
+ checkNotNull(executionSlotSharingGroup);
+
+ // Check if the ExecutionSlotSharingGroup of the producer
is available for the
+ // consumer
+ if (getAvailableGroupsForJobVertex(consumerJobVertexId)
Review comment:
In which case it can happen that the `executionSlotSharingGroup` is
unavailable?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -94,12 +98,34 @@ public LocalInputPreferredSlotSharingStrategy create(
private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
executionSlotSharingGroupMap;
- final Map<CoLocationConstraint, ExecutionSlotSharingGroup>
+ private final Map<CoLocationConstraint, ExecutionSlotSharingGroup>
constraintToExecutionSlotSharingGroupMap;
- final Map<SlotSharingGroupId, List<ExecutionSlotSharingGroup>>
executionSlotSharingGroups;
+ private final Map<SlotSharingGroupId, List<ExecutionSlotSharingGroup>>
+ executionSlotSharingGroups;
- private final Map<ExecutionSlotSharingGroup, Set<JobVertexID>>
assignedJobVerticesForGroups;
+ /**
+ * A JobVertex only belongs to a {@link SlotSharingGroup}. A
SlotSharingGroup is
+ * corresponding to a set of {@link ExecutionSlotSharingGroup}s. We
can maintain available
+ * ExecutionSlotSharingGroups for each JobVertex.
+ *
+ * <p>Once an ExecutionSlotSharingGroup is created, it becomes
available for all JobVertices
+ * in the corresponding SlotSharingGroup in the beginning.
+ *
+ * <p>Once a SchedulingExecutionVertex is added to the
ExecutionSlotSharingGroup, the group
+ * is no longer available for other SchedulingExecutionVertices
corresponding to the same
+ * JobVertex.
+ */
+ private final Map<JobVertexID, Set<ExecutionSlotSharingGroup>>
availableGroupsForJobVertex;
+
+ /**
+ * Maintains the available {@link ExecutionSlotSharingGroup}s for
every {@link
+ * ConsumedPartitionGroup}. The available groups are computed in {@link
+ * this#computeAllAvailableGroupsForConsumedPartitionGroup} when the
ConsumedPartitionGroup
+ * is traversed for the first time.
+ */
+ private final Map<ConsumedPartitionGroup,
Set<ExecutionSlotSharingGroup>>
+ availableProducerGroupsForConsumedPartitionGroup;
Review comment:
maybe `availableSlotSharingGroupsForPartitionGroupConsumers`?
And explain a bit in the java doc what it means? Including that one
`ConsumedPartitionGroup` only corresponds to one consumer job vertex.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]