This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2542d86861cd5dd94feb9fcc92ef8e06398c8241 Author: Zhu Zhu <reed...@gmail.com> AuthorDate: Tue Aug 4 11:35:59 2020 +0800 [FLINK-18690][runtime] Introduce ExecutionSlotSharingGroup and SlotSharingStrategy interface --- .../scheduler/CoLocationConstraintDesc.java | 58 ++++++++++++++++++ .../jobmanager/scheduler/CoLocationGroupDesc.java | 69 ++++++++++++++++++++++ .../scheduler/ExecutionSlotSharingGroup.java | 45 ++++++++++++++ .../runtime/scheduler/SlotSharingStrategy.java | 45 ++++++++++++++ 4 files changed, 217 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintDesc.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintDesc.java new file mode 100644 index 0000000..461ee3c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintDesc.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.util.AbstractID; + +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A read-only and light weight version of {@link CoLocationConstraint}. + */ +public class CoLocationConstraintDesc { + + private final AbstractID coLocationGroupId; + + private final int constraintIndex; + + CoLocationConstraintDesc(final AbstractID coLocationGroupId, final int constraintIndex) { + this.coLocationGroupId = checkNotNull(coLocationGroupId); + this.constraintIndex = constraintIndex; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj != null && obj.getClass() == getClass()) { + CoLocationConstraintDesc that = (CoLocationConstraintDesc) obj; + return Objects.equals(that.coLocationGroupId, this.coLocationGroupId) && + that.constraintIndex == this.constraintIndex; + } else { + return false; + } + } + + @Override + public int hashCode() { + return 31 * coLocationGroupId.hashCode() + constraintIndex; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroupDesc.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroupDesc.java new file mode 100644 index 0000000..8f3c124 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroupDesc.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.AbstractID; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A read-only and light weight version of {@link CoLocationGroup}. + */ +public class CoLocationGroupDesc { + + private final AbstractID id; + + private final List<JobVertexID> vertices; + + private CoLocationGroupDesc(final AbstractID id, final List<JobVertexID> vertices) { + this.id = checkNotNull(id); + this.vertices = checkNotNull(vertices); + } + + public AbstractID getId() { + return id; + } + + public List<JobVertexID> getVertices() { + return Collections.unmodifiableList(vertices); + } + + public CoLocationConstraintDesc getLocationConstraint(final int index) { + return new CoLocationConstraintDesc(id, index); + } + + public static CoLocationGroupDesc from(final CoLocationGroup group) { + return new CoLocationGroupDesc( + group.getId(), + group.getVertices().stream().map(JobVertex::getID).collect(Collectors.toList())); + } + + @VisibleForTesting + public static CoLocationGroupDesc from(final JobVertexID ...ids) { + return new CoLocationGroupDesc(new AbstractID(), Arrays.asList(ids)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java new file mode 100644 index 0000000..45fb652 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Represents execution vertices that will run the same shared slot. + */ +class ExecutionSlotSharingGroup { + + private final Set<ExecutionVertexID> executionVertexIds; + + ExecutionSlotSharingGroup() { + this.executionVertexIds = new HashSet<>(); + } + + void addVertex(final ExecutionVertexID executionVertexId) { + executionVertexIds.add(executionVertexId); + } + + Set<ExecutionVertexID> getExecutionVertexIds() { + return Collections.unmodifiableSet(executionVertexIds); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingStrategy.java new file mode 100644 index 0000000..dc1815e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingStrategy.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupDesc; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import java.util.Set; + +/** + * Strategy which determines {@link ExecutionSlotSharingGroup} for each execution vertex. + */ +interface SlotSharingStrategy { + + ExecutionSlotSharingGroup getExecutionSlotSharingGroup( + ExecutionVertexID executionVertexId); + + Set<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups(); + + @FunctionalInterface + interface Factory { + SlotSharingStrategy create( + SchedulingTopology topology, + Set<SlotSharingGroup> logicalSlotSharingGroups, + Set<CoLocationGroupDesc> coLocationGroups); + } +}