This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9a4c3dc54b462c44987b36bb6af6f7a570d829a7 Author: shuai-xu <shua...@foxmail.com> AuthorDate: Mon Apr 22 11:32:39 2019 +0800 [FLINK-12227][runtime] Introduce SchedulingStrategy interface This closes #8233. --- .../flink/runtime/scheduler/DeploymentOption.java | 35 ++++++++++++ .../scheduler/ExecutionVertexDeploymentOption.java | 36 +++++++++++++ .../runtime/scheduler/SchedulerOperations.java | 36 +++++++++++++ .../scheduler/strategy/SchedulingStrategy.java | 62 ++++++++++++++++++++++ .../strategy/SchedulingStrategyFactory.java | 33 ++++++++++++ 5 files changed, 202 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java new file mode 100644 index 0000000..9fb9ace --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +/** + * Deployment option which indicates whether the task should send scheduleOrUpdateConsumer message to master. + */ +public class DeploymentOption { + + private final boolean sendScheduleOrUpdateConsumerMessage; + + public DeploymentOption(boolean sendScheduleOrUpdateConsumerMessage) { + this.sendScheduleOrUpdateConsumerMessage = sendScheduleOrUpdateConsumerMessage; + } + + public boolean sendScheduleOrUpdateConsumerMessage() { + return sendScheduleOrUpdateConsumerMessage; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java new file mode 100644 index 0000000..829f6ba --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +/** + * Component that stores the task need to be scheduled and the option for deployment. + */ +public class ExecutionVertexDeploymentOption { + + private final ExecutionVertexID executionVertexId; + + private final DeploymentOption deploymentOption; + + public ExecutionVertexDeploymentOption(ExecutionVertexID executionVertexId, DeploymentOption deploymentOption) { + this.executionVertexId = executionVertexId; + this.deploymentOption = deploymentOption; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerOperations.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerOperations.java new file mode 100644 index 0000000..50d3f87 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerOperations.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy; + +import java.util.Collection; + +/** + * Component which is used by {@link SchedulingStrategy} to commit scheduling decisions. + */ +public interface SchedulerOperations { + + /** + * Allocate slots and deploy the vertex when slots are returned. + * + * @param executionVertexDeploymentOptions The tasks to be deployed and deployment options + */ + void allocateSlotsAndDeploy(Collection<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java new file mode 100644 index 0000000..4522a35 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; + +import java.util.Set; + +/** + * Component which encapsulates the scheduling logic. + * It can react to execution state changes and partition consumable events. + * Moreover, it is responsible for resolving task failures. + */ +public interface SchedulingStrategy { + + /** + * Called when the scheduling is started (initial scheduling operation). + */ + void startScheduling(); + + /** + * Called whenever vertices need to be restarted (due to task failure). + * + * @param verticesNeedingRestart The tasks need to be restarted + */ + void restartTasks(Set<ExecutionVertexID> verticesNeedingRestart); + + /** + * Called whenever an {@link Execution} changes its state. + * + * @param executionVertexId The id of the task + * @param executionState The new state of the execution + */ + void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState); + + /** + * Called whenever an {@link IntermediateResultPartition} becomes consumable. + * + * @param executionVertexId The id of the producer + * @param resultPartitionId The id of the result partition + */ + void onPartitionConsumable(ExecutionVertexID executionVertexId, ResultPartitionID resultPartitionId); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java new file mode 100644 index 0000000..28b8257 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +/** + * Factory interface for {@link SchedulingStrategy}. + */ +public interface SchedulingStrategyFactory { + + SchedulingStrategy createInstance( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph); +}