This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2542d86861cd5dd94feb9fcc92ef8e06398c8241
Author: Zhu Zhu <reed...@gmail.com>
AuthorDate: Tue Aug 4 11:35:59 2020 +0800

    [FLINK-18690][runtime] Introduce ExecutionSlotSharingGroup and 
SlotSharingStrategy interface
---
 .../scheduler/CoLocationConstraintDesc.java        | 58 ++++++++++++++++++
 .../jobmanager/scheduler/CoLocationGroupDesc.java  | 69 ++++++++++++++++++++++
 .../scheduler/ExecutionSlotSharingGroup.java       | 45 ++++++++++++++
 .../runtime/scheduler/SlotSharingStrategy.java     | 45 ++++++++++++++
 4 files changed, 217 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintDesc.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintDesc.java
new file mode 100644
index 0000000..461ee3c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintDesc.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.util.AbstractID;
+
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A read-only and light weight version of {@link CoLocationConstraint}.
+ */
+public class CoLocationConstraintDesc {
+
+       private final AbstractID coLocationGroupId;
+
+       private final int constraintIndex;
+
+       CoLocationConstraintDesc(final AbstractID coLocationGroupId, final int 
constraintIndex) {
+               this.coLocationGroupId = checkNotNull(coLocationGroupId);
+               this.constraintIndex = constraintIndex;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               } else if (obj != null && obj.getClass() == getClass()) {
+                       CoLocationConstraintDesc that = 
(CoLocationConstraintDesc) obj;
+                       return Objects.equals(that.coLocationGroupId, 
this.coLocationGroupId) &&
+                               that.constraintIndex == this.constraintIndex;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return 31 * coLocationGroupId.hashCode() + constraintIndex;
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroupDesc.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroupDesc.java
new file mode 100644
index 0000000..8f3c124
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroupDesc.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.AbstractID;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A read-only and light weight version of {@link CoLocationGroup}.
+ */
+public class CoLocationGroupDesc {
+
+       private final AbstractID id;
+
+       private final List<JobVertexID> vertices;
+
+       private CoLocationGroupDesc(final AbstractID id, final 
List<JobVertexID> vertices) {
+               this.id = checkNotNull(id);
+               this.vertices = checkNotNull(vertices);
+       }
+
+       public AbstractID getId() {
+               return id;
+       }
+
+       public List<JobVertexID> getVertices() {
+               return Collections.unmodifiableList(vertices);
+       }
+
+       public CoLocationConstraintDesc getLocationConstraint(final int index) {
+               return new CoLocationConstraintDesc(id, index);
+       }
+
+       public static CoLocationGroupDesc from(final CoLocationGroup group) {
+               return new CoLocationGroupDesc(
+                       group.getId(),
+                       
group.getVertices().stream().map(JobVertex::getID).collect(Collectors.toList()));
+       }
+
+       @VisibleForTesting
+       public static CoLocationGroupDesc from(final JobVertexID ...ids) {
+               return new CoLocationGroupDesc(new AbstractID(), 
Arrays.asList(ids));
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
new file mode 100644
index 0000000..45fb652
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Represents execution vertices that will run the same shared slot.
+ */
+class ExecutionSlotSharingGroup {
+
+       private final Set<ExecutionVertexID> executionVertexIds;
+
+       ExecutionSlotSharingGroup() {
+               this.executionVertexIds = new HashSet<>();
+       }
+
+       void addVertex(final ExecutionVertexID executionVertexId) {
+               executionVertexIds.add(executionVertexId);
+       }
+
+       Set<ExecutionVertexID> getExecutionVertexIds() {
+               return Collections.unmodifiableSet(executionVertexIds);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingStrategy.java
new file mode 100644
index 0000000..dc1815e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingStrategy.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import java.util.Set;
+
+/**
+ * Strategy which determines {@link ExecutionSlotSharingGroup} for each 
execution vertex.
+ */
+interface SlotSharingStrategy {
+
+       ExecutionSlotSharingGroup getExecutionSlotSharingGroup(
+               ExecutionVertexID executionVertexId);
+
+       Set<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups();
+
+       @FunctionalInterface
+       interface Factory {
+               SlotSharingStrategy create(
+                       SchedulingTopology topology,
+                       Set<SlotSharingGroup> logicalSlotSharingGroups,
+                       Set<CoLocationGroupDesc> coLocationGroups);
+       }
+}

Reply via email to