[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r289770119 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.execution.ExecutionState.CREATED; +import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final InputDependencyConstraintChecker inputConstraintChecker; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.deploymentOptions = new HashMap<>(); + this.inputConstraintChecker = new InputDependencyConstraintChecker(); + } + + @Override + public void startScheduling() { + final DeploymentOption updateOption = new DeploymentOption(true); + final DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + inputConstraintChecker.addSchedulingResultPartition(srp); + } + deploymentOptions.put(schedulingVertex.getId(), option); + } + + allocateSlotsAndDeployExecutionVertexIds(getAllVerticesFromTopology()); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + verticesToRestart + .stream() + .map(this::getSchedulingVertex) + .flatMap(vertex -> vertex.getProducedResultPartitions().stream()) + .forEach(inputConstraintChecker::resetSchedulingResultPartition); + + allocateSlotsAndDeployExecutionVertexIds(verticesToRestart); + } + + @Override + public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) { + if (!FINISHED.equals(executionState)) { + return; + } + + final Set verticesToSchedule = getSchedulingVertex(executionVertexId) + .getProducedResultPartitions() + .stream() +
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r289761627 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java ## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A simple implementation of {@link SchedulingResultPartition} for testing. + */ +public class TestingSchedulingResultPartition implements SchedulingResultPartition { + private final IntermediateDataSetID intermediateDataSetID; + + private final IntermediateResultPartitionID intermediateResultPartitionID; + + private final ResultPartitionType partitionType; + + private SchedulingExecutionVertex producer; + + private Collection consumers; + + private ResultPartitionState state; + + TestingSchedulingResultPartition(IntermediateDataSetID dataSetID, ResultPartitionType type, ResultPartitionState state) { + this.intermediateDataSetID = dataSetID; + this.partitionType = type; + this.state = state; + this.intermediateResultPartitionID = new IntermediateResultPartitionID(); + this.consumers = new ArrayList<>(); + } + + @Override + public IntermediateResultPartitionID getId() { + return intermediateResultPartitionID; + } + + @Override + public IntermediateDataSetID getResultId() { + return intermediateDataSetID; + } + + @Override + public ResultPartitionType getPartitionType() { + return partitionType; + } + + @Override + public ResultPartitionState getState() { + return state; + } + + @Override + public SchedulingExecutionVertex getProducer() { + return producer; + } + + @Override + public Collection getConsumers() { + return Collections.unmodifiableCollection(consumers); + } + + void addConsumer(SchedulingExecutionVertex consumer) { + this.consumers.add(consumer); + } + + void setProducer(TestingSchedulingExecutionVertex producer) { + this.producer = checkNotNull(producer); + } + + /** +* Builder for {@link TestingSchedulingResultPartition}. +*/ + public static final class Builder { + private IntermediateDataSetID intermediateDataSetId; + private ResultPartitionType resultPartitionType; + private ResultPartitionState resultPartitionState; Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r289760804 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.api.common.InputDependencyConstraint.ALL; +import static org.apache.flink.api.common.InputDependencyConstraint.ANY; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; + +/** + * A wrapper class for {@link InputDependencyConstraint} checker. + */ +public class InputDependencyConstraintChecker { + private final SchedulingIntermediateDataSetManager intermediateDataSetManager = + new SchedulingIntermediateDataSetManager(); + + public boolean check(final SchedulingExecutionVertex schedulingExecutionVertex) { + final InputDependencyConstraint inputConstraint = schedulingExecutionVertex.getInputDependencyConstraint(); + if (schedulingExecutionVertex.getConsumedResultPartitions().isEmpty() || ALL.equals(inputConstraint)) { + return checkAll(schedulingExecutionVertex); + } else if (ANY.equals(inputConstraint)) { + return checkAny(schedulingExecutionVertex); + } else { + throw new IllegalArgumentException(); + } + } + + List markSchedulingResultPartitionFinished(SchedulingResultPartition srp) { + return intermediateDataSetManager.markSchedulingResultPartitionFinished(srp); + } + + void resetSchedulingResultPartition(SchedulingResultPartition srp) { + intermediateDataSetManager.resetSchedulingResultPartition(srp); + } + + void addSchedulingResultPartition(SchedulingResultPartition srp) { + intermediateDataSetManager.addSchedulingResultPartition(srp); + } + + private boolean checkAll(final SchedulingExecutionVertex schedulingExecutionVertex) { + return schedulingExecutionVertex.getConsumedResultPartitions() + .stream() + .allMatch(this::partitionConsumable); + } + + private boolean checkAny(final SchedulingExecutionVertex schedulingExecutionVertex) { + return schedulingExecutionVertex.getConsumedResultPartitions() + .stream() + .anyMatch(this::partitionConsumable); + } + + private boolean partitionConsumable(SchedulingResultPartition partition) { + if (BLOCKING.equals(partition.getPartitionType())) { + return intermediateDataSetManager.allPartitionsFinished(partition); + } else { + SchedulingResultPartition.ResultPartitionState state = partition.getState(); + return PRODUCING.equals(state) || DONE.equals(state); + } + } + + private static class SchedulingIntermediateDataSetManager { + + private final Map intermediateDataSets = new HashMap<>(); + + List markSchedulingResultPartitionFinished(SchedulingResultPartition srp) { + SchedulingIntermediateDataSet intermediateDataSet =
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r289759874 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.api.common.InputDependencyConstraint.ALL; +import static org.apache.flink.api.common.InputDependencyConstraint.ANY; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; + +/** + * A wrapper class for {@link InputDependencyConstraint} checker. + */ +public class InputDependencyConstraintChecker { + private final SchedulingIntermediateDataSetManager intermediateDataSetManager = + new SchedulingIntermediateDataSetManager(); + + public boolean check(final SchedulingExecutionVertex schedulingExecutionVertex) { + final InputDependencyConstraint inputConstraint = schedulingExecutionVertex.getInputDependencyConstraint(); + if (schedulingExecutionVertex.getConsumedResultPartitions().isEmpty() || ALL.equals(inputConstraint)) { + return checkAll(schedulingExecutionVertex); + } else if (ANY.equals(inputConstraint)) { + return checkAny(schedulingExecutionVertex); + } else { + throw new IllegalArgumentException(); + } + } + + List markSchedulingResultPartitionFinished(SchedulingResultPartition srp) { + return intermediateDataSetManager.markSchedulingResultPartitionFinished(srp); + } + + void resetSchedulingResultPartition(SchedulingResultPartition srp) { + intermediateDataSetManager.resetSchedulingResultPartition(srp); + } + + void addSchedulingResultPartition(SchedulingResultPartition srp) { + intermediateDataSetManager.addSchedulingResultPartition(srp); + } + + private boolean checkAll(final SchedulingExecutionVertex schedulingExecutionVertex) { + return schedulingExecutionVertex.getConsumedResultPartitions() + .stream() + .allMatch(this::partitionConsumable); + } + + private boolean checkAny(final SchedulingExecutionVertex schedulingExecutionVertex) { + return schedulingExecutionVertex.getConsumedResultPartitions() + .stream() + .anyMatch(this::partitionConsumable); + } + + private boolean partitionConsumable(SchedulingResultPartition partition) { + if (BLOCKING.equals(partition.getPartitionType())) { + return intermediateDataSetManager.allPartitionsFinished(partition); + } else { + SchedulingResultPartition.ResultPartitionState state = partition.getState(); + return PRODUCING.equals(state) || DONE.equals(state); + } + } + + private static class SchedulingIntermediateDataSetManager { + + private final Map intermediateDataSets = new HashMap<>(); + + List markSchedulingResultPartitionFinished(SchedulingResultPartition srp) { + SchedulingIntermediateDataSet intermediateDataSet =
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r289757255 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.api.common.InputDependencyConstraint.ALL; +import static org.apache.flink.api.common.InputDependencyConstraint.ANY; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; + +/** + * A wrapper class for {@link InputDependencyConstraint} checker. + */ +public class InputDependencyConstraintChecker { + private final SchedulingIntermediateDataSetManager intermediateDataSetManager = + new SchedulingIntermediateDataSetManager(); + + public boolean check(final SchedulingExecutionVertex schedulingExecutionVertex) { + final InputDependencyConstraint inputConstraint = schedulingExecutionVertex.getInputDependencyConstraint(); + if (schedulingExecutionVertex.getConsumedResultPartitions().isEmpty() || ALL.equals(inputConstraint)) { + return checkAll(schedulingExecutionVertex); + } else if (ANY.equals(inputConstraint)) { + return checkAny(schedulingExecutionVertex); + } else { + throw new IllegalArgumentException(); + } + } + + List markSchedulingResultPartitionFinished(SchedulingResultPartition srp) { + return intermediateDataSetManager.markSchedulingResultPartitionFinished(srp); + } + + void resetSchedulingResultPartition(SchedulingResultPartition srp) { + intermediateDataSetManager.resetSchedulingResultPartition(srp); + } + + void addSchedulingResultPartition(SchedulingResultPartition srp) { + intermediateDataSetManager.addSchedulingResultPartition(srp); + } + + private boolean checkAll(final SchedulingExecutionVertex schedulingExecutionVertex) { + return schedulingExecutionVertex.getConsumedResultPartitions() + .stream() + .allMatch(this::partitionConsumable); + } + + private boolean checkAny(final SchedulingExecutionVertex schedulingExecutionVertex) { + return schedulingExecutionVertex.getConsumedResultPartitions() + .stream() + .anyMatch(this::partitionConsumable); + } + + private boolean partitionConsumable(SchedulingResultPartition partition) { + if (BLOCKING.equals(partition.getPartitionType())) { + return intermediateDataSetManager.allPartitionsFinished(partition); + } else { + SchedulingResultPartition.ResultPartitionState state = partition.getState(); + return PRODUCING.equals(state) || DONE.equals(state); + } + } + + private static class SchedulingIntermediateDataSetManager { + + private final Map intermediateDataSets = new HashMap<>(); + + List markSchedulingResultPartitionFinished(SchedulingResultPartition srp) { + SchedulingIntermediateDataSet intermediateDataSet =
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r287190944 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final InputDependencyConstraintChecker inputConstraintChecker; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.deploymentOptions = new HashMap<>(); + this.inputConstraintChecker = new InputDependencyConstraintChecker(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + final DeploymentOption updateOption = new DeploymentOption(true); + final DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + if (srp.getPartitionType().isBlocking()) { + inputConstraintChecker.addSchedulingResultPartition(srp); + } else { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID executionVertexId : verticesToRestart) { + final SchedulingExecutionVertex schedulingVertex = schedulingTopology.getVertex(executionVertexId) Review comment: OK. [FLINK-12608](https://issues.apache.org/jira/browse/FLINK-12608), I will do it ASA this PR is merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r287190944 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final InputDependencyConstraintChecker inputConstraintChecker; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.deploymentOptions = new HashMap<>(); + this.inputConstraintChecker = new InputDependencyConstraintChecker(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + final DeploymentOption updateOption = new DeploymentOption(true); + final DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + if (srp.getPartitionType().isBlocking()) { + inputConstraintChecker.addSchedulingResultPartition(srp); + } else { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID executionVertexId : verticesToRestart) { + final SchedulingExecutionVertex schedulingVertex = schedulingTopology.getVertex(executionVertexId) Review comment: OK. [FLINK-12608](https://issues.apache.org/jira/browse/FLINK-12608) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r285842097 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final InputDependencyConstraintChecker inputConstraintChecker; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.deploymentOptions = new HashMap<>(); + this.inputConstraintChecker = new InputDependencyConstraintChecker(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + final DeploymentOption updateOption = new DeploymentOption(true); + final DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + if (srp.getPartitionType().isBlocking()) { + inputConstraintChecker.addSchedulingResultPartition(srp); + } else { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID executionVertexId : verticesToRestart) { + final SchedulingExecutionVertex schedulingVertex = schedulingTopology.getVertex(executionVertexId) + .orElseThrow(() -> new IllegalStateException("can not find scheduling vertex for " + executionVertexId)); + + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + if (srp.getPartitionType().isBlocking()) { Review
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r285841871 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java ## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.List; + +import static org.apache.flink.api.common.InputDependencyConstraint.ALL; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.EMPTY; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for {@link InputDependencyConstraintChecker}. + */ +public class InputDependencyConstraintCheckerTest extends TestLogger { + + @Test + public void testCheckInputVertex() { + final TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology(); + + final List producers = testingSchedulingTopology.addExecutionVertices().finish(); + + final InputDependencyConstraintChecker inputChecker = testingSchedulingTopology Review comment: OK, involving `TestingSchedulingTopology` into `InputDependencyConstraintChecker` is not a good choice. As I would like to test various combinations of `ResultPartitionType`, `ResultPartitionState` and `InputDependencyConstraint`, I add simple `TestingSchedulingExecutionVertexBuilder` and `TestingSchedulingResultPartitionBuilder` in `InputDependencyConstraintCheckerTest`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r285575284 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final InputDependencyConstraintChecker inputConstraintChecker; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.deploymentOptions = new HashMap<>(); + this.inputConstraintChecker = new InputDependencyConstraintChecker(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + final DeploymentOption updateOption = new DeploymentOption(true); + final DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + if (srp.getPartitionType().isBlocking()) { + inputConstraintChecker.addSchedulingResultPartition(srp); + } else { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r285549690 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final InputDependencyConstraintChecker inputConstraintChecker; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.deploymentOptions = new HashMap<>(); + this.inputConstraintChecker = new InputDependencyConstraintChecker(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + final DeploymentOption updateOption = new DeploymentOption(true); + final DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + if (srp.getPartitionType().isBlocking()) { + inputConstraintChecker.addSchedulingResultPartition(srp); + } else { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID executionVertexId : verticesToRestart) { + final SchedulingExecutionVertex schedulingVertex = schedulingTopology.getVertex(executionVertexId) Review comment: Yes, I also think that `Optional` may be a little code redundant since `orElseThrow` is always checked. May be we could optimize this in another JIRA ticket. This is an automated message from the Apache Git Service. To respond to the message, please log on to
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r285010865 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.util.TestLogger; + +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.Test; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.api.common.InputDependencyConstraint.ALL; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +/** + * Unit tests for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + private TestingSchedulerOperations testingSchedulerOperation = new TestingSchedulerOperations(); + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + final TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology(); + + final List producers = testingSchedulingTopology.addExecutionVertices().finish(); + final List consumers = testingSchedulingTopology.addExecutionVertices().finish(); + testingSchedulingTopology.connectAllToAll(producers, consumers).finish(); + + startScheduling(testingSchedulingTopology); + + assertThat(testingSchedulerOperation, hasScheduledVertices(producers)); + } + + /** +* Tests that when restart tasks will only schedule input ready vertices in given ones. +*/ + @Test + public void testRestartTasks() { + final TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology(); + + final List producers = testingSchedulingTopology.addExecutionVertices().finish(); + final List consumers = testingSchedulingTopology.addExecutionVertices().finish(); + testingSchedulingTopology.connectAllToAll(producers, consumers).finish(); + + LazyFromSourcesSchedulingStrategy schedulingStrategy = startScheduling(testingSchedulingTopology); + + assertThat(testingSchedulerOperation, hasScheduledVertices(producers)); + + Set verticesToRestart = producers.stream().map(TestingSchedulingExecutionVertex::getId) + .collect(Collectors.toSet()); + verticesToRestart.addAll(consumers.stream().map( + TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet())); + + schedulingStrategy.restartTasks(verticesToRestart); + assertThat(testingSchedulerOperation, hasScheduledVertices(producers)); + } + Review comment: `SchedulingIntermediateDataSetManager` is a good idea, I will wrap up all the `SchedulingIntermediateDataSet` functions into `SchedulingIntermediateDataSetManager`. ~~For `restartTasks` tests I expand `ProducerConsumerConnectionBuilder`, It seems easy to combine various `ResultPartitionType` and `ResultPartitionState` user cases. Thus I still add tests in `LazyFromSourcesSchedulingStrategyTest`, what do you think?~~ Edit: Misunderstood the class meaning, I will do like that.
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r285011042 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.intermediateDataSets = new HashMap<>(); + this.deploymentOptions = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + DeploymentOption updateOption = new DeploymentOption(true); + DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.addSchedulingResultPartition(srp); + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r285011133 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.intermediateDataSets = new HashMap<>(); + this.deploymentOptions = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + DeploymentOption updateOption = new DeploymentOption(true); + DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.addSchedulingResultPartition(srp); + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r285010865 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.util.TestLogger; + +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.Test; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.api.common.InputDependencyConstraint.ALL; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +/** + * Unit tests for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + private TestingSchedulerOperations testingSchedulerOperation = new TestingSchedulerOperations(); + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + final TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology(); + + final List producers = testingSchedulingTopology.addExecutionVertices().finish(); + final List consumers = testingSchedulingTopology.addExecutionVertices().finish(); + testingSchedulingTopology.connectAllToAll(producers, consumers).finish(); + + startScheduling(testingSchedulingTopology); + + assertThat(testingSchedulerOperation, hasScheduledVertices(producers)); + } + + /** +* Tests that when restart tasks will only schedule input ready vertices in given ones. +*/ + @Test + public void testRestartTasks() { + final TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology(); + + final List producers = testingSchedulingTopology.addExecutionVertices().finish(); + final List consumers = testingSchedulingTopology.addExecutionVertices().finish(); + testingSchedulingTopology.connectAllToAll(producers, consumers).finish(); + + LazyFromSourcesSchedulingStrategy schedulingStrategy = startScheduling(testingSchedulingTopology); + + assertThat(testingSchedulerOperation, hasScheduledVertices(producers)); + + Set verticesToRestart = producers.stream().map(TestingSchedulingExecutionVertex::getId) + .collect(Collectors.toSet()); + verticesToRestart.addAll(consumers.stream().map( + TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet())); + + schedulingStrategy.restartTasks(verticesToRestart); + assertThat(testingSchedulerOperation, hasScheduledVertices(producers)); + } + Review comment: `SchedulingIntermediateDataSetManager` is a good idea, I will wrap up all the `SchedulingIntermediateDataSet` functions into `SchedulingIntermediateDataSetManager`. For `restartTasks` tests I expand `ProducerConsumerConnectionBuilder`, It seems easy to combine various `ResultPartitionType` and `ResultPartitionState` user cases. Thus I still add tests in `LazyFromSourcesSchedulingStrategyTest`, what do you think? This is an automated message from the Apache Git
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r285010823 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.util.TestLogger; + +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.Test; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.api.common.InputDependencyConstraint.ALL; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +/** + * Unit tests for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + private TestingSchedulerOperations testingSchedulerOperation = new TestingSchedulerOperations(); + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + final TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology(); + + final List producers = testingSchedulingTopology.addExecutionVertices().finish(); + final List consumers = testingSchedulingTopology.addExecutionVertices().finish(); + testingSchedulingTopology.connectAllToAll(producers, consumers).finish(); + + startScheduling(testingSchedulingTopology); + + assertThat(testingSchedulerOperation, hasScheduledVertices(producers)); + } + + /** +* Tests that when restart tasks will only schedule input ready vertices in given ones. +*/ + @Test + public void testRestartTasks() { + final TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology(); + + final List producers = testingSchedulingTopology.addExecutionVertices().finish(); + final List consumers = testingSchedulingTopology.addExecutionVertices().finish(); + testingSchedulingTopology.connectAllToAll(producers, consumers).finish(); + + LazyFromSourcesSchedulingStrategy schedulingStrategy = startScheduling(testingSchedulingTopology); + + assertThat(testingSchedulerOperation, hasScheduledVertices(producers)); + + Set verticesToRestart = producers.stream().map(TestingSchedulingExecutionVertex::getId) + .collect(Collectors.toSet()); + verticesToRestart.addAll(consumers.stream().map( + TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet())); + + schedulingStrategy.restartTasks(verticesToRestart); + assertThat(testingSchedulerOperation, hasScheduledVertices(producers)); + } + + /** +* Tests that when partition consumable notified will start available {@link PIPELINED} downstream vertices. +*/ + @Test + public void testPipelinedPartitionConsumable() { + final TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology(); + + final List producers = testingSchedulingTopology.addExecutionVertices() + .withParallelism(2).finish(); + final List consumers = testingSchedulingTopology.addExecutionVertices() +
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r285011042 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.intermediateDataSets = new HashMap<>(); + this.deploymentOptions = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + DeploymentOption updateOption = new DeploymentOption(true); + DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.addSchedulingResultPartition(srp); + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r285010786 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.util.TestLogger; + +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.Test; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.api.common.InputDependencyConstraint.ALL; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +/** + * Unit tests for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + private TestingSchedulerOperations testingSchedulerOperation = new TestingSchedulerOperations(); + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + final TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology(); + + final List producers = testingSchedulingTopology.addExecutionVertices().finish(); + final List consumers = testingSchedulingTopology.addExecutionVertices().finish(); + testingSchedulingTopology.connectAllToAll(producers, consumers).finish(); + + startScheduling(testingSchedulingTopology); + + assertThat(testingSchedulerOperation, hasScheduledVertices(producers)); + } + + /** +* Tests that when restart tasks will only schedule input ready vertices in given ones. +*/ + @Test + public void testRestartTasks() { + final TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology(); + + final List producers = testingSchedulingTopology.addExecutionVertices().finish(); + final List consumers = testingSchedulingTopology.addExecutionVertices().finish(); + testingSchedulingTopology.connectAllToAll(producers, consumers).finish(); + + LazyFromSourcesSchedulingStrategy schedulingStrategy = startScheduling(testingSchedulingTopology); + + assertThat(testingSchedulerOperation, hasScheduledVertices(producers)); Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283719863 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.api.common.InputDependencyConstraint.ANY; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.connectConsumerVerticesToPartition; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.initVerticesAndPartitions; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit tests for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + final int jobVertexCnt = 2; + final int taskCnt = 3; + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + TestingSchedulingExecutionVertex[][] vertices = new TestingSchedulingExecutionVertex[jobVertexCnt][taskCnt]; + TestingSchedulingResultPartition[][] partitions = new TestingSchedulingResultPartition[jobVertexCnt - 1][taskCnt]; + + ResultPartitionType[] partitionTypes = new ResultPartitionType[jobVertexCnt - 1]; + InputDependencyConstraint[] inputDependencyConstraints = new InputDependencyConstraint[jobVertexCnt]; + partitionTypes[0] = BLOCKING; + inputDependencyConstraints[0] = ANY; + inputDependencyConstraints[1] = ANY; + + initVerticesAndPartitions(schedulingTopology, partitionTypes, inputDependencyConstraints, vertices, partitions); + + for (int i = 0; i < taskCnt; i++) { + connectConsumerVerticesToPartition(vertices[1][i], partitions[0][i]); + } + + TestingSchedulerOperations testingSchedulerOperation = new TestingSchedulerOperations(); + LazyFromSourcesSchedulingStrategy schedulingStrategy = new LazyFromSourcesSchedulingStrategy( + testingSchedulerOperation, + schedulingTopology); + + schedulingStrategy.startScheduling(); + + Set toBeScheduledVertices = Arrays.stream(vertices[0]) + .map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet()); + Collection scheduledVertices = testingSchedulerOperation.getScheduledVertices().get(0); + + assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices), + containsInAnyOrder(toBeScheduledVertices.toArray())); + } + + /** +* Tests that when restart tasks will only schedule input ready vertices in given ones. +*/ + @Test +
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283709559 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java ## @@ -50,4 +52,11 @@ * @return Optional containing the respective scheduling result partition or none if the partition does not exist */ Optional getResultPartition(IntermediateResultPartitionID intermediateResultPartitionId); + + /** +* Looks up the {@link InputDependencyConstraint} for the given {@link JobVertexID}. +* @param jobVertexId identifying the respective job vertex id +* @return Optional containing the respective input dependency constraint or none if the job vertex does not exist +*/ + Optional getInputDependencyConstraint(JobVertexID jobVertexId); Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283083182 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.intermediateDataSets = new HashMap<>(); + this.deploymentOptions = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + DeploymentOption updateOption = new DeploymentOption(true); + DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.addSchedulingResultPartition(srp); + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283083182 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.intermediateDataSets = new HashMap<>(); + this.deploymentOptions = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + DeploymentOption updateOption = new DeploymentOption(true); + DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.addSchedulingResultPartition(srp); + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283083182 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.intermediateDataSets = new HashMap<>(); + this.deploymentOptions = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + DeploymentOption updateOption = new DeploymentOption(true); + DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.addSchedulingResultPartition(srp); + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283082959 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; + +import java.util.Collection; +import java.util.stream.Collectors; + +/** + * Strategy test utilities. + */ +public class StrategyTestUtil { + + static void initVerticesAndPartitions( + TestingSchedulingTopology schedulingTopology, + ResultPartitionType[] partitionTypes, + InputDependencyConstraint[] inputDependencyConstraints, + TestingSchedulingExecutionVertex[][] vertices, + TestingSchedulingResultPartition[][] partitions) { + + ResultPartitionID[][] resultPartitionIds = new ResultPartitionID[vertices.length - 1][vertices[0].length]; + initVerticesAndPartitions(schedulingTopology, partitionTypes, inputDependencyConstraints, + vertices, resultPartitionIds, partitions); + } + + static void initVerticesAndPartitions( + TestingSchedulingTopology schedulingTopology, + ResultPartitionType[] partitionTypes, + InputDependencyConstraint[] inputDependencyConstraints, + TestingSchedulingExecutionVertex[][] vertices, + ResultPartitionID[][] resultPartitionIds, + TestingSchedulingResultPartition[][] partitions) { + + final int jobVertexCnt = vertices.length; + final int taskCnt = vertices[0].length; + + JobVertexID[] jobVertexIds = new JobVertexID[jobVertexCnt]; + IntermediateDataSetID[] dataSetIds = new IntermediateDataSetID[jobVertexCnt]; + for (int i = 0; i < jobVertexCnt; i++) { + jobVertexIds[i] = new JobVertexID(); + dataSetIds[i] = new IntermediateDataSetID(); + schedulingTopology.addInputDependencyConstraint(jobVertexIds[i], inputDependencyConstraints[i]); + } + + for (int i = 0; i < jobVertexCnt; i++) { + for (int j = 0; j < taskCnt; j++) { + vertices[i][j] = new TestingSchedulingExecutionVertex(jobVertexIds[i], j); + schedulingTopology.addSchedulingExecutionVertex(vertices[i][j]); + + if (i != jobVertexCnt - 1) { + partitions[i][j] = new TestingSchedulingResultPartition(dataSetIds[i], new IntermediateResultPartitionID(), + partitionTypes[i], vertices[i][j]); + resultPartitionIds[i][j] = new ResultPartitionID(partitions[i][j].getId(), new ExecutionAttemptID()); + schedulingTopology.addResultPartition(partitions[i][j].getId(), partitions[i][j]); Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283082955 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.api.common.InputDependencyConstraint.ANY; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.connectVerticesToPartition; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions; +import static org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.initVerticesAndPartitions; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit tests for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + final int jobVertexCnt = 2; + final int taskCnt = 3; + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + TestingSchedulingExecutionVertex[][] vertices = new TestingSchedulingExecutionVertex[jobVertexCnt][taskCnt]; + TestingSchedulingResultPartition[][] partitions = new TestingSchedulingResultPartition[jobVertexCnt - 1][taskCnt]; + + ResultPartitionType[] partitionTypes = new ResultPartitionType[jobVertexCnt - 1]; + InputDependencyConstraint[] inputDependencyConstraints = new InputDependencyConstraint[jobVertexCnt]; + partitionTypes[0] = BLOCKING; + inputDependencyConstraints[0] = ANY; + inputDependencyConstraints[1] = ANY; + + initVerticesAndPartitions(schedulingTopology, partitionTypes, inputDependencyConstraints, vertices, partitions); + + for (int i = 0; i < taskCnt; i++) { + for (int j = 0; j < taskCnt; j++) { + connectVerticesToPartition(vertices[0][i], vertices[1][i], partitions[0][j]); + } + } + + TestingSchedulerOperations testingSchedulerOperation = new TestingSchedulerOperations(); + LazyFromSourcesSchedulingStrategy schedulingStrategy = new LazyFromSourcesSchedulingStrategy( + testingSchedulerOperation, + schedulingTopology); + + schedulingStrategy.startScheduling(); + + Set toBeScheduledVertices = Arrays.stream(vertices[0]) + .map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet()); + Collection scheduledVertices = testingSchedulerOperation.getScheduledVertices().get(0); + + assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices), + containsInAnyOrder(toBeScheduledVertices.toArray())); + } + + /** +* Tests that when
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283082635 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.intermediateDataSets = new HashMap<>(); + this.deploymentOptions = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + DeploymentOption updateOption = new DeploymentOption(true); + DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.addSchedulingResultPartition(srp); + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283082502 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.intermediateDataSets = new HashMap<>(); + this.deploymentOptions = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + DeploymentOption updateOption = new DeploymentOption(true); + DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.addSchedulingResultPartition(srp); + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r283082509 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE; +import static org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final Map deploymentOptions; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.intermediateDataSets = new HashMap<>(); + this.deploymentOptions = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new ArrayList<>(); + DeploymentOption updateOption = new DeploymentOption(true); + DeploymentOption nonUpdateOption = new DeploymentOption(false); + + for (SchedulingExecutionVertex schedulingVertex : schedulingTopology.getVertices()) { + DeploymentOption option = nonUpdateOption; + for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.addSchedulingResultPartition(srp); + if (srp.getPartitionType().isPipelined()) { + option = updateOption; + } + } + deploymentOptions.put(schedulingVertex.getId(), option); + + if (schedulingVertex.getConsumedResultPartitions().isEmpty()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), option)); + } + } + + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void restartTasks(Set verticesToRestart) { + // increase counter of the dataset first + for (ExecutionVertexID
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281085847 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { Review comment: Thanks for the excellent example, it really simplifies `restartTasks` code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service,
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281049742 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); + } + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) { Review comment: @GJL @tillrohrmann What do you think of using both vertex state transition and `onPartitionConsumable` notification ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281004053 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); + } + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) { Review comment: Since using vertex state transitions to schedule vertices has the benefit of avoiding flood `onPartitionConsumable` notifications, while there may be idle-waiting-result of PIPELINED shuffle mode. So, I think we could keep the benefit by relying on both vertex state transitions and `onPartitionConsumable` notifications. 1. `DeploymentOption#sendScheduleOrUpdateConsumerMessage` set to `true` if the vertex has PIPELINED produced result partition and set to `false` if all the produced result partitions are BLOCKING 2. Schedule vertices with BLOCKING input result partition using vertex state transition. 3.
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002357 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281004053 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); + } + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) { Review comment: Since using vertex state transitions to schedule vertices has the benefit of avoiding flood `onPartitionConsumable` notifications, while there may be operators that take a very long time until they produce a result of PIPELINED shuffle mode. So, I think we could keep both the benefit by relying on both vertex state transitions and `onPartitionConsumable` notifications. 1. DeploymentOption#sendScheduleOrUpdateConsumerMessage set to true only the vertex has PIPELINED produced result partition. 2. Schedule vertices with BLOCKING input result partition using vertex state transition. 3. Schedule vertices with
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002555 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Unit test for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + JobVertex jobVertex1 = new JobVertex("vertex#1"); + JobVertex jobVertex2 = new JobVertex("vertex#2"); + JobGraph graph = new JobGraph(jobVertex1, jobVertex2); + jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + + for (int i = 0; i < 3; i++) { + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex1.getID(), i)); + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex2.getID(), i)); + } + + TestingSchedulerOperation testingSchedulerOperation = new TestingSchedulerOperation(); + LazyFromSourcesSchedulingStrategy schedulingStrategy = new LazyFromSourcesSchedulingStrategy( + testingSchedulerOperation, + schedulingTopology, + graph); + + schedulingStrategy.startScheduling(); + + assertEquals(3, testingSchedulerOperation.getScheduledVertices().size()); + } + + /** +* Tests that when on execution state change will start available downstream vertices. +* vertex#0vertex#1 +* \ / +*\ / +* \ / +* (BLOCKING, ALL) +* vertex#3 vertex#2 +* \/ +*\ / +* \/ +* (BLOCKING, ANY) +* vertex#4 +*| +*| +*| +*(PIPELINED) +* vertex#5 +*/ + @Test + public void testOnExecutionStateChange() { + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + JobVertex[] jobVertices = new JobVertex[6]; + + for (int i = 0; i < 6; i++) { + jobVertices[i] = new JobVertex("vertex#" + i); + } + + jobVertices[3].connectNewDataSetAsInput(jobVertices[0], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + jobVertices[3].connectNewDataSetAsInput(jobVertices[1], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + jobVertices[4].connectNewDataSetAsInput(jobVertices[3], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002374 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Unit test for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + JobVertex jobVertex1 = new JobVertex("vertex#1"); + JobVertex jobVertex2 = new JobVertex("vertex#2"); + JobGraph graph = new JobGraph(jobVertex1, jobVertex2); + jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + + for (int i = 0; i < 3; i++) { + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex1.getID(), i)); + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex2.getID(), i)); + } + + TestingSchedulerOperation testingSchedulerOperation = new TestingSchedulerOperation(); + LazyFromSourcesSchedulingStrategy schedulingStrategy = new LazyFromSourcesSchedulingStrategy( + testingSchedulerOperation, + schedulingTopology, + graph); + + schedulingStrategy.startScheduling(); + + assertEquals(3, testingSchedulerOperation.getScheduledVertices().size()); + } + + /** +* Tests that when on execution state change will start available downstream vertices. +* vertex#0vertex#1 +* \ / +*\ / +* \ / +* (BLOCKING, ALL) +* vertex#3 vertex#2 +* \/ +*\ / +* \/ +* (BLOCKING, ANY) +* vertex#4 +*| +*| +*| +*(PIPELINED) +* vertex#5 +*/ + @Test + public void testOnExecutionStateChange() { + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + JobVertex[] jobVertices = new JobVertex[6]; + + for (int i = 0; i < 6; i++) { + jobVertices[i] = new JobVertex("vertex#" + i); + } + + jobVertices[3].connectNewDataSetAsInput(jobVertices[0], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + jobVertices[3].connectNewDataSetAsInput(jobVertices[1], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + jobVertices[4].connectNewDataSetAsInput(jobVertices[3], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002364 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Unit test for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + JobVertex jobVertex1 = new JobVertex("vertex#1"); + JobVertex jobVertex2 = new JobVertex("vertex#2"); + JobGraph graph = new JobGraph(jobVertex1, jobVertex2); + jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + + for (int i = 0; i < 3; i++) { + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex1.getID(), i)); + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex2.getID(), i)); + } + + TestingSchedulerOperation testingSchedulerOperation = new TestingSchedulerOperation(); + LazyFromSourcesSchedulingStrategy schedulingStrategy = new LazyFromSourcesSchedulingStrategy( + testingSchedulerOperation, + schedulingTopology, + graph); + + schedulingStrategy.startScheduling(); + + assertEquals(3, testingSchedulerOperation.getScheduledVertices().size()); + } + + /** +* Tests that when on execution state change will start available downstream vertices. +* vertex#0vertex#1 +* \ / +*\ / +* \ / +* (BLOCKING, ALL) +* vertex#3 vertex#2 +* \/ +*\ / +* \/ +* (BLOCKING, ANY) +* vertex#4 +*| +*| +*| +*(PIPELINED) +* vertex#5 +*/ + @Test + public void testOnExecutionStateChange() { + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002357 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002317 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002177 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002120 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002108 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.Collection; + +/** + * Simple implementation of {@link SchedulingResultPartition} for testing. + */ +public class TestingSchedulingResultPartition implements SchedulingResultPartition { + private final IntermediateDataSetID intermediateDataSetID; + + private final IntermediateResultPartitionID intermediateResultPartitionID; + + private final ResultPartitionType partitionType; + + private final SchedulingVertex producer; + + private Collection consumers; Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002116 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002114 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002106 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.Collection; + +/** + * Simple implementation of {@link SchedulingResultPartition} for testing. + */ +public class TestingSchedulingResultPartition implements SchedulingResultPartition { + private final IntermediateDataSetID intermediateDataSetID; + + private final IntermediateResultPartitionID intermediateResultPartitionID; + + private final ResultPartitionType partitionType; + + private final SchedulingVertex producer; + + private Collection consumers; + + TestingSchedulingResultPartition(IntermediateDataSetID dataSetID, + IntermediateResultPartitionID partitionID, ResultPartitionType type, SchedulingVertex producer, + Collection consumers) { + this.intermediateDataSetID = dataSetID; + this.intermediateResultPartitionID = partitionID; + this.partitionType = type; + this.producer = producer; + this.consumers = consumers; + } + + @Override + public IntermediateResultPartitionID getId() { + return intermediateResultPartitionID; + } + + @Override + public IntermediateDataSetID getResultId() { + return intermediateDataSetID; + } + + @Override + public ResultPartitionType getPartitionType() { + return partitionType; + } + + @Override + public ResultPartitionState getState() { + return ResultPartitionState.DONE; + } + + @Override + public SchedulingVertex getProducer() { + return producer; + } + + @Override + public Collection getConsumers() { + return consumers; + } + + public void setConsumers(Collection consumers) { + this.consumers = consumers; Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002110 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281001965 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281001882 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { Review comment: Yes, `schedulingVertex.getConsumedResultPartitions().isEmpty()` would work. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281001867 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { Review comment: OK, I thought the `verticesNeedingRestart` of `restartTasks` are well chosen for directly scheduling vertices before. There would be another little difference between `startScheduling` and `restartTasks` in implementation, the former schedules the vertices without input result partition of all vertices in the topology, and the
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281001575 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r279343134 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.runtime.execution.ExecutionState.CREATED; +import static org.apache.flink.runtime.execution.ExecutionState.RUNNING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); + } + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) { + Optional schedulingVertex = schedulingTopology.getVertex(executionVertexId); + if (!schedulingVertex.isPresent()) { + throw new IllegalStateException("can not find scheduling vertex for " + executionVertexId); + } + + if (ExecutionState.RUNNING.equals(executionState)) { + List executionVertexDeploymentOptions = new ArrayList<>(); + for (SchedulingResultPartition srp : schedulingVertex.get().getProducedResultPartitions()) { + if
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r279343944 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.runtime.execution.ExecutionState.CREATED; +import static org.apache.flink.runtime.execution.ExecutionState.RUNNING; +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); + } + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) { + Optional schedulingVertex = schedulingTopology.getVertex(executionVertexId); + if (!schedulingVertex.isPresent()) { + throw new IllegalStateException("can not find scheduling vertex for " + executionVertexId); + } + + if (ExecutionState.RUNNING.equals(executionState)) { + List executionVertexDeploymentOptions = new ArrayList<>(); + for (SchedulingResultPartition srp : schedulingVertex.get().getProducedResultPartitions()) { + if
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r279340476 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); + } + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) { Review comment: Reacting to onPartitionConsumable is much more complicated to check whether ANY/ALL the input `dataset` are consumable which depends on all the `partitions` are finished of `BLOCKING` type and data are producing of `PIPELINED` type. It seems that reacting to onExecutionStateChange is much easier to handle both `PIPELINED` and `BLOCKING` result type. In the discussion of slack channel, relying on onExecutionStateChange should be an easier and final way. This is an automated message from the Apache Git Service. To respond to the message, please log on to
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r279331019 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingIntermediateDataSet.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.executiongraph.IntermediateResult; + +/** + * Representation of {@link IntermediateResult}. + */ +public interface SchedulingIntermediateDataSet { Review comment: In origin scheduler, `LazyFromSourceScheduler` schedules vertices when ANY/ALL input `dataset` are consumable, and when all the `partitions` are finished the `dataset` is consumable. Without `SchedulingIntermediateDataSet`, the input `SchedulingResultPartitions` may be checked O(n^2) times, while, using a counter in `SchedulingIntermediateDataSet` only check O(n) times. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services