zhuzhurk commented on a change in pull request #16687:
URL: https://github.com/apache/flink/pull/16687#discussion_r686783647
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -159,6 +184,12 @@ private ExecutionSlotSharingGroupBuilder(
return executionSlotSharingGroupMap;
}
+ /**
+ * The vertices are ordered since {@link
+ *
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology#getVertices}
are
+ * ordered, i.e., {@link
DefaultExecutionGraph#getAllExecutionVertices} are topologically
Review comment:
The `i.e.` statement is confusing because unrelated and I think it can
be removed.
I also prefer to change `ordered` to `topologically sorted`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -214,87 +245,87 @@ private ExecutionSlotSharingGroup
tryFindAvailableProducerExecutionSlotSharingGr
final ExecutionVertexID executionVertexId =
executionVertex.getId();
- for (SchedulingResultPartition partition :
executionVertex.getConsumedResults()) {
- final ExecutionVertexID producerVertexId =
partition.getProducer().getId();
- if (!inSameLogicalSlotSharingGroup(producerVertexId,
executionVertexId)) {
- continue;
- }
-
- final ExecutionSlotSharingGroup producerGroup =
- executionSlotSharingGroupMap.get(producerVertexId);
-
- checkState(producerGroup != null);
- if (isGroupAvailableForVertex(producerGroup,
executionVertexId)) {
- return producerGroup;
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ executionVertex.getConsumedPartitionGroups()) {
+
+ Iterator<ExecutionSlotSharingGroup> availableGroupIterator =
+ getAvailableGroupsForConsumedPartitionGroup(
+ executionVertexId.getJobVertexId(),
consumedPartitionGroup)
+ .iterator();
+ if (availableGroupIterator.hasNext()) {
+ ExecutionSlotSharingGroup nextAvailableGroup =
availableGroupIterator.next();
+ availableGroupIterator.remove();
+ return nextAvailableGroup;
}
}
return null;
}
private boolean inSameLogicalSlotSharingGroup(
- final ExecutionVertexID executionVertexId1,
- final ExecutionVertexID executionVertexId2) {
+ final JobVertexID jobVertexId1, final JobVertexID
jobVertexId2) {
return Objects.equals(
-
getSlotSharingGroup(executionVertexId1).getSlotSharingGroupId(),
-
getSlotSharingGroup(executionVertexId2).getSlotSharingGroupId());
+
checkNotNull(slotSharingGroupMap.get(jobVertexId1)).getSlotSharingGroupId(),
+
checkNotNull(slotSharingGroupMap.get(jobVertexId2)).getSlotSharingGroupId());
Review comment:
I prefer to rework `getSlotSharingGroup(ExecutionVertexID)` to
`getSlotSharingGroup(JobVertexID)`. In this way it can be reused.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -214,87 +245,87 @@ private ExecutionSlotSharingGroup
tryFindAvailableProducerExecutionSlotSharingGr
final ExecutionVertexID executionVertexId =
executionVertex.getId();
- for (SchedulingResultPartition partition :
executionVertex.getConsumedResults()) {
- final ExecutionVertexID producerVertexId =
partition.getProducer().getId();
- if (!inSameLogicalSlotSharingGroup(producerVertexId,
executionVertexId)) {
- continue;
- }
-
- final ExecutionSlotSharingGroup producerGroup =
- executionSlotSharingGroupMap.get(producerVertexId);
-
- checkState(producerGroup != null);
- if (isGroupAvailableForVertex(producerGroup,
executionVertexId)) {
- return producerGroup;
+ for (ConsumedPartitionGroup consumedPartitionGroup :
+ executionVertex.getConsumedPartitionGroups()) {
+
+ Iterator<ExecutionSlotSharingGroup> availableGroupIterator =
+ getAvailableGroupsForConsumedPartitionGroup(
Review comment:
I think changing a collection returned by a get method is hard to
track/understand and introduced potential dangers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]