This is an automated email from the ASF dual-hosted git repository.

weizhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae962a8770558d2fd3de9f6c029ec727716ccfbf
Author: Roc Marshal <[email protected]>
AuthorDate: Tue Dec 3 00:03:57 2024 +0800

    [FLINK-33389][runtime] Add the co-location related methods for related 
abstractions of adaptive scheduler.
---
 .../scheduler/adaptive/JobGraphJobInformation.java | 14 +++++
 .../adaptive/allocator/JobInformation.java         | 16 ++++++
 .../adaptive/allocator/TestJobInformation.java     | 22 +++++++-
 .../adaptive/allocator/TestVertexInformation.java  | 64 +++++++++++++++++++++-
 4 files changed, 111 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
index 10642ee0535..27fef933758 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
@@ -30,6 +31,8 @@ import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 
 /** {@link JobInformation} created from a {@link JobGraph}. */
@@ -53,6 +56,11 @@ public class JobGraphJobInformation implements 
JobInformation {
         return jobGraph.getSlotSharingGroups();
     }
 
+    @Override
+    public Collection<CoLocationGroup> getCoLocationGroups() {
+        return jobGraph.getCoLocationGroups();
+    }
+
     @Override
     public JobInformation.VertexInformation getVertexInformation(JobVertexID 
jobVertexId) {
         return new JobVertexInformation(
@@ -123,5 +131,11 @@ public class JobGraphJobInformation implements 
JobInformation {
         public SlotSharingGroup getSlotSharingGroup() {
             return jobVertex.getSlotSharingGroup();
         }
+
+        @Nullable
+        @Override
+        public CoLocationGroup getCoLocationGroup() {
+            return jobVertex.getCoLocationGroup();
+        }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
index 83a761deda5..e5be2502b72 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
@@ -18,8 +18,11 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 
 /** Information about the job. */
@@ -34,6 +37,16 @@ public interface JobInformation {
      */
     Collection<SlotSharingGroup> getSlotSharingGroups();
 
+    /**
+     * Returns all co-location groups of the job.
+     *
+     * <p>Attention: The returned co-location groups should never be modified 
(its are indeed
+     * mutable)!
+     *
+     * @return all co-location groups of the job
+     */
+    Collection<CoLocationGroup> getCoLocationGroups();
+
     VertexInformation getVertexInformation(JobVertexID jobVertexId);
 
     Iterable<VertexInformation> getVertices();
@@ -49,5 +62,8 @@ public interface JobInformation {
         int getMaxParallelism();
 
         SlotSharingGroup getSlotSharingGroup();
+
+        @Nullable
+        CoLocationGroup getCoLocationGroup();
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java
index b9a3671d3c4..7989e50eca8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java
@@ -18,19 +18,24 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-class TestJobInformation implements JobInformation {
+public class TestJobInformation implements JobInformation {
 
     private final Map<JobVertexID, VertexInformation> vertexIdToInformation;
-    private final Collection<SlotSharingGroup> slotSharingGroups;
+    private final Set<SlotSharingGroup> slotSharingGroups;
+    private final Set<CoLocationGroup> coLocationGroups;
 
-    TestJobInformation(Collection<? extends VertexInformation> 
vertexIdToInformation) {
+    public TestJobInformation(Collection<? extends VertexInformation> 
vertexIdToInformation) {
         this.vertexIdToInformation =
                 vertexIdToInformation.stream()
                         .collect(
@@ -40,6 +45,12 @@ class TestJobInformation implements JobInformation {
                 vertexIdToInformation.stream()
                         .map(VertexInformation::getSlotSharingGroup)
                         .collect(Collectors.toSet());
+        this.coLocationGroups =
+                Collections.unmodifiableSet(
+                        vertexIdToInformation.stream()
+                                .map(VertexInformation::getCoLocationGroup)
+                                .filter(Objects::nonNull)
+                                .collect(Collectors.toSet()));
     }
 
     @Override
@@ -47,6 +58,11 @@ class TestJobInformation implements JobInformation {
         return slotSharingGroups;
     }
 
+    @Override
+    public Collection<CoLocationGroup> getCoLocationGroups() {
+        return coLocationGroups;
+    }
+
     @Override
     public VertexInformation getVertexInformation(JobVertexID jobVertexId) {
         return vertexIdToInformation.get(jobVertexId);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java
index 02a9044a72b..bc42c9422fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java
@@ -18,14 +18,34 @@
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 
-class TestVertexInformation implements JobInformation.VertexInformation {
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TestVertexInformation implements JobInformation.VertexInformation 
{
 
     private final JobVertexID jobVertexId;
     private final int minParallelism;
     private final int parallelism;
     private final SlotSharingGroup slotSharingGroup;
+    @Nullable private final TestingCoLocationGroup coLocationGroup;
+
+    public TestVertexInformation(int parallelism, SlotSharingGroup 
slotSharingGroup) {
+        this(new JobVertexID(), 1, parallelism, slotSharingGroup);
+    }
+
+    public TestVertexInformation(
+            int parallelism,
+            SlotSharingGroup slotSharingGroup,
+            TestingCoLocationGroup coLocationGroup) {
+        this(new JobVertexID(), 1, parallelism, slotSharingGroup, 
coLocationGroup);
+    }
 
     TestVertexInformation(
             JobVertexID jobVertexId, int parallelism, SlotSharingGroup 
slotSharingGroup) {
@@ -37,11 +57,24 @@ class TestVertexInformation implements 
JobInformation.VertexInformation {
             int minParallelism,
             int parallelism,
             SlotSharingGroup slotSharingGroup) {
+        this(jobVertexId, minParallelism, parallelism, slotSharingGroup, null);
+    }
+
+    TestVertexInformation(
+            JobVertexID jobVertexId,
+            int minParallelism,
+            int parallelism,
+            SlotSharingGroup slotSharingGroup,
+            @Nullable TestingCoLocationGroup coLocationGroup) {
         this.jobVertexId = jobVertexId;
         this.minParallelism = minParallelism;
         this.parallelism = parallelism;
         this.slotSharingGroup = slotSharingGroup;
-        slotSharingGroup.addVertexToGroup(jobVertexId);
+        this.slotSharingGroup.addVertexToGroup(jobVertexId);
+        this.coLocationGroup = coLocationGroup;
+        if (this.coLocationGroup != null) {
+            this.coLocationGroup.addVertex(jobVertexId);
+        }
     }
 
     @Override
@@ -68,4 +101,31 @@ class TestVertexInformation implements 
JobInformation.VertexInformation {
     public SlotSharingGroup getSlotSharingGroup() {
         return slotSharingGroup;
     }
+
+    @Nullable
+    @Override
+    public TestingCoLocationGroup getCoLocationGroup() {
+        return coLocationGroup;
+    }
+
+    /** Testing util class. */
+    public static class TestingCoLocationGroup extends CoLocationGroupImpl {
+        private final List<JobVertexID> verticesIDs = new ArrayList<>();
+
+        public TestingCoLocationGroup(JobVertexID... verticesIDs) {
+            super();
+            if (verticesIDs != null && verticesIDs.length > 0) {
+                
this.verticesIDs.addAll(Arrays.stream(verticesIDs).collect(Collectors.toList()));
+            }
+        }
+
+        public void addVertex(JobVertexID vertexID) {
+            this.verticesIDs.add(vertexID);
+        }
+
+        @Override
+        public List<JobVertexID> getVertexIds() {
+            return verticesIDs;
+        }
+    }
 }

Reply via email to