zhuzhurk commented on a change in pull request #16687:
URL: https://github.com/apache/flink/pull/16687#discussion_r686783647



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -159,6 +184,12 @@ private ExecutionSlotSharingGroupBuilder(
             return executionSlotSharingGroupMap;
         }
 
+        /**
+         * The vertices are ordered since {@link
+         * 
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology#getVertices}
 are
+         * ordered, i.e., {@link 
DefaultExecutionGraph#getAllExecutionVertices} are topologically

Review comment:
       The `i.e.` statement is confusing because unrelated and I think it can 
be removed.
   I also prefer to change `ordered` to `topologically sorted`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -214,87 +245,87 @@ private ExecutionSlotSharingGroup 
tryFindAvailableProducerExecutionSlotSharingGr
 
             final ExecutionVertexID executionVertexId = 
executionVertex.getId();
 
-            for (SchedulingResultPartition partition : 
executionVertex.getConsumedResults()) {
-                final ExecutionVertexID producerVertexId = 
partition.getProducer().getId();
-                if (!inSameLogicalSlotSharingGroup(producerVertexId, 
executionVertexId)) {
-                    continue;
-                }
-
-                final ExecutionSlotSharingGroup producerGroup =
-                        executionSlotSharingGroupMap.get(producerVertexId);
-
-                checkState(producerGroup != null);
-                if (isGroupAvailableForVertex(producerGroup, 
executionVertexId)) {
-                    return producerGroup;
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    executionVertex.getConsumedPartitionGroups()) {
+
+                Iterator<ExecutionSlotSharingGroup> availableGroupIterator =
+                        getAvailableGroupsForConsumedPartitionGroup(
+                                        executionVertexId.getJobVertexId(), 
consumedPartitionGroup)
+                                .iterator();
+                if (availableGroupIterator.hasNext()) {
+                    ExecutionSlotSharingGroup nextAvailableGroup = 
availableGroupIterator.next();
+                    availableGroupIterator.remove();
+                    return nextAvailableGroup;
                 }
             }
 
             return null;
         }
 
         private boolean inSameLogicalSlotSharingGroup(
-                final ExecutionVertexID executionVertexId1,
-                final ExecutionVertexID executionVertexId2) {
+                final JobVertexID jobVertexId1, final JobVertexID 
jobVertexId2) {
 
             return Objects.equals(
-                    
getSlotSharingGroup(executionVertexId1).getSlotSharingGroupId(),
-                    
getSlotSharingGroup(executionVertexId2).getSlotSharingGroupId());
+                    
checkNotNull(slotSharingGroupMap.get(jobVertexId1)).getSlotSharingGroupId(),
+                    
checkNotNull(slotSharingGroupMap.get(jobVertexId2)).getSlotSharingGroupId());

Review comment:
       I prefer to rework `getSlotSharingGroup(ExecutionVertexID)` to 
`getSlotSharingGroup(JobVertexID)`. In this way it can be reused.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
##########
@@ -214,87 +245,87 @@ private ExecutionSlotSharingGroup 
tryFindAvailableProducerExecutionSlotSharingGr
 
             final ExecutionVertexID executionVertexId = 
executionVertex.getId();
 
-            for (SchedulingResultPartition partition : 
executionVertex.getConsumedResults()) {
-                final ExecutionVertexID producerVertexId = 
partition.getProducer().getId();
-                if (!inSameLogicalSlotSharingGroup(producerVertexId, 
executionVertexId)) {
-                    continue;
-                }
-
-                final ExecutionSlotSharingGroup producerGroup =
-                        executionSlotSharingGroupMap.get(producerVertexId);
-
-                checkState(producerGroup != null);
-                if (isGroupAvailableForVertex(producerGroup, 
executionVertexId)) {
-                    return producerGroup;
+            for (ConsumedPartitionGroup consumedPartitionGroup :
+                    executionVertex.getConsumedPartitionGroups()) {
+
+                Iterator<ExecutionSlotSharingGroup> availableGroupIterator =
+                        getAvailableGroupsForConsumedPartitionGroup(

Review comment:
       I think changing a collection returned by a get method is hard to 
track/understand and introduced potential dangers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to