This is an automated email from the ASF dual-hosted git repository. weizhong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ae962a8770558d2fd3de9f6c029ec727716ccfbf Author: Roc Marshal <[email protected]> AuthorDate: Tue Dec 3 00:03:57 2024 +0800 [FLINK-33389][runtime] Add the co-location related methods for related abstractions of adaptive scheduler. --- .../scheduler/adaptive/JobGraphJobInformation.java | 14 +++++ .../adaptive/allocator/JobInformation.java | 16 ++++++ .../adaptive/allocator/TestJobInformation.java | 22 +++++++- .../adaptive/allocator/TestVertexInformation.java | 64 +++++++++++++++++++++- 4 files changed, 111 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java index 10642ee0535..27fef933758 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.scheduler.VertexParallelismInformation; import org.apache.flink.runtime.scheduler.VertexParallelismStore; @@ -30,6 +31,8 @@ import org.apache.flink.util.InstantiationUtil; import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables; +import javax.annotation.Nullable; + import java.util.Collection; /** {@link JobInformation} created from a {@link JobGraph}. */ @@ -53,6 +56,11 @@ public class JobGraphJobInformation implements JobInformation { return jobGraph.getSlotSharingGroups(); } + @Override + public Collection<CoLocationGroup> getCoLocationGroups() { + return jobGraph.getCoLocationGroups(); + } + @Override public JobInformation.VertexInformation getVertexInformation(JobVertexID jobVertexId) { return new JobVertexInformation( @@ -123,5 +131,11 @@ public class JobGraphJobInformation implements JobInformation { public SlotSharingGroup getSlotSharingGroup() { return jobVertex.getSlotSharingGroup(); } + + @Nullable + @Override + public CoLocationGroup getCoLocationGroup() { + return jobVertex.getCoLocationGroup(); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java index 83a761deda5..e5be2502b72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java @@ -18,8 +18,11 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import javax.annotation.Nullable; + import java.util.Collection; /** Information about the job. */ @@ -34,6 +37,16 @@ public interface JobInformation { */ Collection<SlotSharingGroup> getSlotSharingGroups(); + /** + * Returns all co-location groups of the job. + * + * <p>Attention: The returned co-location groups should never be modified (its are indeed + * mutable)! + * + * @return all co-location groups of the job + */ + Collection<CoLocationGroup> getCoLocationGroups(); + VertexInformation getVertexInformation(JobVertexID jobVertexId); Iterable<VertexInformation> getVertices(); @@ -49,5 +62,8 @@ public interface JobInformation { int getMaxParallelism(); SlotSharingGroup getSlotSharingGroup(); + + @Nullable + CoLocationGroup getCoLocationGroup(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java index b9a3671d3c4..7989e50eca8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java @@ -18,19 +18,24 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import java.util.Collection; +import java.util.Collections; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -class TestJobInformation implements JobInformation { +public class TestJobInformation implements JobInformation { private final Map<JobVertexID, VertexInformation> vertexIdToInformation; - private final Collection<SlotSharingGroup> slotSharingGroups; + private final Set<SlotSharingGroup> slotSharingGroups; + private final Set<CoLocationGroup> coLocationGroups; - TestJobInformation(Collection<? extends VertexInformation> vertexIdToInformation) { + public TestJobInformation(Collection<? extends VertexInformation> vertexIdToInformation) { this.vertexIdToInformation = vertexIdToInformation.stream() .collect( @@ -40,6 +45,12 @@ class TestJobInformation implements JobInformation { vertexIdToInformation.stream() .map(VertexInformation::getSlotSharingGroup) .collect(Collectors.toSet()); + this.coLocationGroups = + Collections.unmodifiableSet( + vertexIdToInformation.stream() + .map(VertexInformation::getCoLocationGroup) + .filter(Objects::nonNull) + .collect(Collectors.toSet())); } @Override @@ -47,6 +58,11 @@ class TestJobInformation implements JobInformation { return slotSharingGroups; } + @Override + public Collection<CoLocationGroup> getCoLocationGroups() { + return coLocationGroups; + } + @Override public VertexInformation getVertexInformation(JobVertexID jobVertexId) { return vertexIdToInformation.get(jobVertexId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java index 02a9044a72b..bc42c9422fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java @@ -18,14 +18,34 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -class TestVertexInformation implements JobInformation.VertexInformation { +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class TestVertexInformation implements JobInformation.VertexInformation { private final JobVertexID jobVertexId; private final int minParallelism; private final int parallelism; private final SlotSharingGroup slotSharingGroup; + @Nullable private final TestingCoLocationGroup coLocationGroup; + + public TestVertexInformation(int parallelism, SlotSharingGroup slotSharingGroup) { + this(new JobVertexID(), 1, parallelism, slotSharingGroup); + } + + public TestVertexInformation( + int parallelism, + SlotSharingGroup slotSharingGroup, + TestingCoLocationGroup coLocationGroup) { + this(new JobVertexID(), 1, parallelism, slotSharingGroup, coLocationGroup); + } TestVertexInformation( JobVertexID jobVertexId, int parallelism, SlotSharingGroup slotSharingGroup) { @@ -37,11 +57,24 @@ class TestVertexInformation implements JobInformation.VertexInformation { int minParallelism, int parallelism, SlotSharingGroup slotSharingGroup) { + this(jobVertexId, minParallelism, parallelism, slotSharingGroup, null); + } + + TestVertexInformation( + JobVertexID jobVertexId, + int minParallelism, + int parallelism, + SlotSharingGroup slotSharingGroup, + @Nullable TestingCoLocationGroup coLocationGroup) { this.jobVertexId = jobVertexId; this.minParallelism = minParallelism; this.parallelism = parallelism; this.slotSharingGroup = slotSharingGroup; - slotSharingGroup.addVertexToGroup(jobVertexId); + this.slotSharingGroup.addVertexToGroup(jobVertexId); + this.coLocationGroup = coLocationGroup; + if (this.coLocationGroup != null) { + this.coLocationGroup.addVertex(jobVertexId); + } } @Override @@ -68,4 +101,31 @@ class TestVertexInformation implements JobInformation.VertexInformation { public SlotSharingGroup getSlotSharingGroup() { return slotSharingGroup; } + + @Nullable + @Override + public TestingCoLocationGroup getCoLocationGroup() { + return coLocationGroup; + } + + /** Testing util class. */ + public static class TestingCoLocationGroup extends CoLocationGroupImpl { + private final List<JobVertexID> verticesIDs = new ArrayList<>(); + + public TestingCoLocationGroup(JobVertexID... verticesIDs) { + super(); + if (verticesIDs != null && verticesIDs.length > 0) { + this.verticesIDs.addAll(Arrays.stream(verticesIDs).collect(Collectors.toList())); + } + } + + public void addVertex(JobVertexID vertexID) { + this.verticesIDs.add(vertexID); + } + + @Override + public List<JobVertexID> getVertexIds() { + return verticesIDs; + } + } }
