[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-06-03 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r289770119
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.execution.ExecutionState.CREATED;
+import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final InputDependencyConstraintChecker inputConstraintChecker;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.deploymentOptions = new HashMap<>();
+   this.inputConstraintChecker = new 
InputDependencyConstraintChecker();
+   }
+
+   @Override
+   public void startScheduling() {
+   final DeploymentOption updateOption = new 
DeploymentOption(true);
+   final DeploymentOption nonUpdateOption = new 
DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   
inputConstraintChecker.addSchedulingResultPartition(srp);
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+   }
+
+   
allocateSlotsAndDeployExecutionVertexIds(getAllVerticesFromTopology());
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   verticesToRestart
+   .stream()
+   .map(this::getSchedulingVertex)
+   .flatMap(vertex -> 
vertex.getProducedResultPartitions().stream())
+   
.forEach(inputConstraintChecker::resetSchedulingResultPartition);
+
+   allocateSlotsAndDeployExecutionVertexIds(verticesToRestart);
+   }
+
+   @Override
+   public void onExecutionStateChange(ExecutionVertexID executionVertexId, 
ExecutionState executionState) {
+   if (!FINISHED.equals(executionState)) {
+   return;
+   }
+
+   final Set verticesToSchedule = 
getSchedulingVertex(executionVertexId)
+   .getProducedResultPartitions()
+   .stream()
+   

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-06-03 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r289761627
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
 ##
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple implementation of {@link SchedulingResultPartition} for testing.
+ */
+public class TestingSchedulingResultPartition implements 
SchedulingResultPartition {
+   private final IntermediateDataSetID intermediateDataSetID;
+
+   private final IntermediateResultPartitionID 
intermediateResultPartitionID;
+
+   private final ResultPartitionType partitionType;
+
+   private SchedulingExecutionVertex producer;
+
+   private Collection consumers;
+
+   private ResultPartitionState state;
+
+   TestingSchedulingResultPartition(IntermediateDataSetID dataSetID, 
ResultPartitionType type, ResultPartitionState state) {
+   this.intermediateDataSetID = dataSetID;
+   this.partitionType = type;
+   this.state = state;
+   this.intermediateResultPartitionID = new 
IntermediateResultPartitionID();
+   this.consumers = new ArrayList<>();
+   }
+
+   @Override
+   public IntermediateResultPartitionID getId() {
+   return intermediateResultPartitionID;
+   }
+
+   @Override
+   public IntermediateDataSetID getResultId() {
+   return intermediateDataSetID;
+   }
+
+   @Override
+   public ResultPartitionType getPartitionType() {
+   return partitionType;
+   }
+
+   @Override
+   public ResultPartitionState getState() {
+   return state;
+   }
+
+   @Override
+   public SchedulingExecutionVertex getProducer() {
+   return producer;
+   }
+
+   @Override
+   public Collection getConsumers() {
+   return Collections.unmodifiableCollection(consumers);
+   }
+
+   void addConsumer(SchedulingExecutionVertex consumer) {
+   this.consumers.add(consumer);
+   }
+
+   void setProducer(TestingSchedulingExecutionVertex producer) {
+   this.producer = checkNotNull(producer);
+   }
+
+   /**
+* Builder for {@link TestingSchedulingResultPartition}.
+*/
+   public static final class Builder {
+   private IntermediateDataSetID intermediateDataSetId;
+   private ResultPartitionType resultPartitionType;
+   private ResultPartitionState resultPartitionState;
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-06-03 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r289760804
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java
 ##
 @@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
+import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+
+/**
+ * A wrapper class for {@link InputDependencyConstraint} checker.
+ */
+public class InputDependencyConstraintChecker {
+   private final SchedulingIntermediateDataSetManager 
intermediateDataSetManager =
+   new SchedulingIntermediateDataSetManager();
+
+   public boolean check(final SchedulingExecutionVertex 
schedulingExecutionVertex) {
+   final InputDependencyConstraint inputConstraint = 
schedulingExecutionVertex.getInputDependencyConstraint();
+   if 
(schedulingExecutionVertex.getConsumedResultPartitions().isEmpty() || 
ALL.equals(inputConstraint)) {
+   return checkAll(schedulingExecutionVertex);
+   } else if (ANY.equals(inputConstraint)) {
+   return checkAny(schedulingExecutionVertex);
+   } else {
+   throw new IllegalArgumentException();
+   }
+   }
+
+   List 
markSchedulingResultPartitionFinished(SchedulingResultPartition srp) {
+   return 
intermediateDataSetManager.markSchedulingResultPartitionFinished(srp);
+   }
+
+   void resetSchedulingResultPartition(SchedulingResultPartition srp) {
+   intermediateDataSetManager.resetSchedulingResultPartition(srp);
+   }
+
+   void addSchedulingResultPartition(SchedulingResultPartition srp) {
+   intermediateDataSetManager.addSchedulingResultPartition(srp);
+   }
+
+   private boolean checkAll(final SchedulingExecutionVertex 
schedulingExecutionVertex) {
+   return schedulingExecutionVertex.getConsumedResultPartitions()
+   .stream()
+   .allMatch(this::partitionConsumable);
+   }
+
+   private boolean checkAny(final SchedulingExecutionVertex 
schedulingExecutionVertex) {
+   return schedulingExecutionVertex.getConsumedResultPartitions()
+   .stream()
+   .anyMatch(this::partitionConsumable);
+   }
+
+   private boolean partitionConsumable(SchedulingResultPartition 
partition) {
+   if (BLOCKING.equals(partition.getPartitionType())) {
+   return 
intermediateDataSetManager.allPartitionsFinished(partition);
+   } else {
+   SchedulingResultPartition.ResultPartitionState state = 
partition.getState();
+   return PRODUCING.equals(state) || DONE.equals(state);
+   }
+   }
+
+   private static class SchedulingIntermediateDataSetManager {
+
+   private final Map intermediateDataSets = new HashMap<>();
+
+   List 
markSchedulingResultPartitionFinished(SchedulingResultPartition srp) {
+   SchedulingIntermediateDataSet intermediateDataSet = 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-06-03 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r289759874
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java
 ##
 @@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
+import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+
+/**
+ * A wrapper class for {@link InputDependencyConstraint} checker.
+ */
+public class InputDependencyConstraintChecker {
+   private final SchedulingIntermediateDataSetManager 
intermediateDataSetManager =
+   new SchedulingIntermediateDataSetManager();
+
+   public boolean check(final SchedulingExecutionVertex 
schedulingExecutionVertex) {
+   final InputDependencyConstraint inputConstraint = 
schedulingExecutionVertex.getInputDependencyConstraint();
+   if 
(schedulingExecutionVertex.getConsumedResultPartitions().isEmpty() || 
ALL.equals(inputConstraint)) {
+   return checkAll(schedulingExecutionVertex);
+   } else if (ANY.equals(inputConstraint)) {
+   return checkAny(schedulingExecutionVertex);
+   } else {
+   throw new IllegalArgumentException();
+   }
+   }
+
+   List 
markSchedulingResultPartitionFinished(SchedulingResultPartition srp) {
+   return 
intermediateDataSetManager.markSchedulingResultPartitionFinished(srp);
+   }
+
+   void resetSchedulingResultPartition(SchedulingResultPartition srp) {
+   intermediateDataSetManager.resetSchedulingResultPartition(srp);
+   }
+
+   void addSchedulingResultPartition(SchedulingResultPartition srp) {
+   intermediateDataSetManager.addSchedulingResultPartition(srp);
+   }
+
+   private boolean checkAll(final SchedulingExecutionVertex 
schedulingExecutionVertex) {
+   return schedulingExecutionVertex.getConsumedResultPartitions()
+   .stream()
+   .allMatch(this::partitionConsumable);
+   }
+
+   private boolean checkAny(final SchedulingExecutionVertex 
schedulingExecutionVertex) {
+   return schedulingExecutionVertex.getConsumedResultPartitions()
+   .stream()
+   .anyMatch(this::partitionConsumable);
+   }
+
+   private boolean partitionConsumable(SchedulingResultPartition 
partition) {
+   if (BLOCKING.equals(partition.getPartitionType())) {
+   return 
intermediateDataSetManager.allPartitionsFinished(partition);
+   } else {
+   SchedulingResultPartition.ResultPartitionState state = 
partition.getState();
+   return PRODUCING.equals(state) || DONE.equals(state);
+   }
+   }
+
+   private static class SchedulingIntermediateDataSetManager {
+
+   private final Map intermediateDataSets = new HashMap<>();
+
+   List 
markSchedulingResultPartitionFinished(SchedulingResultPartition srp) {
+   SchedulingIntermediateDataSet intermediateDataSet = 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-06-03 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r289757255
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintChecker.java
 ##
 @@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
+import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+
+/**
+ * A wrapper class for {@link InputDependencyConstraint} checker.
+ */
+public class InputDependencyConstraintChecker {
+   private final SchedulingIntermediateDataSetManager 
intermediateDataSetManager =
+   new SchedulingIntermediateDataSetManager();
+
+   public boolean check(final SchedulingExecutionVertex 
schedulingExecutionVertex) {
+   final InputDependencyConstraint inputConstraint = 
schedulingExecutionVertex.getInputDependencyConstraint();
+   if 
(schedulingExecutionVertex.getConsumedResultPartitions().isEmpty() || 
ALL.equals(inputConstraint)) {
+   return checkAll(schedulingExecutionVertex);
+   } else if (ANY.equals(inputConstraint)) {
+   return checkAny(schedulingExecutionVertex);
+   } else {
+   throw new IllegalArgumentException();
+   }
+   }
+
+   List 
markSchedulingResultPartitionFinished(SchedulingResultPartition srp) {
+   return 
intermediateDataSetManager.markSchedulingResultPartitionFinished(srp);
+   }
+
+   void resetSchedulingResultPartition(SchedulingResultPartition srp) {
+   intermediateDataSetManager.resetSchedulingResultPartition(srp);
+   }
+
+   void addSchedulingResultPartition(SchedulingResultPartition srp) {
+   intermediateDataSetManager.addSchedulingResultPartition(srp);
+   }
+
+   private boolean checkAll(final SchedulingExecutionVertex 
schedulingExecutionVertex) {
+   return schedulingExecutionVertex.getConsumedResultPartitions()
+   .stream()
+   .allMatch(this::partitionConsumable);
+   }
+
+   private boolean checkAny(final SchedulingExecutionVertex 
schedulingExecutionVertex) {
+   return schedulingExecutionVertex.getConsumedResultPartitions()
+   .stream()
+   .anyMatch(this::partitionConsumable);
+   }
+
+   private boolean partitionConsumable(SchedulingResultPartition 
partition) {
+   if (BLOCKING.equals(partition.getPartitionType())) {
+   return 
intermediateDataSetManager.allPartitionsFinished(partition);
+   } else {
+   SchedulingResultPartition.ResultPartitionState state = 
partition.getState();
+   return PRODUCING.equals(state) || DONE.equals(state);
+   }
+   }
+
+   private static class SchedulingIntermediateDataSetManager {
+
+   private final Map intermediateDataSets = new HashMap<>();
+
+   List 
markSchedulingResultPartitionFinished(SchedulingResultPartition srp) {
+   SchedulingIntermediateDataSet intermediateDataSet = 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-23 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r287190944
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final InputDependencyConstraintChecker inputConstraintChecker;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.deploymentOptions = new HashMap<>();
+   this.inputConstraintChecker = new 
InputDependencyConstraintChecker();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   final DeploymentOption updateOption = new 
DeploymentOption(true);
+   final DeploymentOption nonUpdateOption = new 
DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   if (srp.getPartitionType().isBlocking()) {
+   
inputConstraintChecker.addSchedulingResultPartition(srp);
+   } else {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID executionVertexId : verticesToRestart) {
+   final SchedulingExecutionVertex schedulingVertex = 
schedulingTopology.getVertex(executionVertexId)
 
 Review comment:
   OK. [FLINK-12608](https://issues.apache.org/jira/browse/FLINK-12608), I will 
do it ASA this PR is merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-23 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r287190944
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final InputDependencyConstraintChecker inputConstraintChecker;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.deploymentOptions = new HashMap<>();
+   this.inputConstraintChecker = new 
InputDependencyConstraintChecker();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   final DeploymentOption updateOption = new 
DeploymentOption(true);
+   final DeploymentOption nonUpdateOption = new 
DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   if (srp.getPartitionType().isBlocking()) {
+   
inputConstraintChecker.addSchedulingResultPartition(srp);
+   } else {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID executionVertexId : verticesToRestart) {
+   final SchedulingExecutionVertex schedulingVertex = 
schedulingTopology.getVertex(executionVertexId)
 
 Review comment:
   OK. [FLINK-12608](https://issues.apache.org/jira/browse/FLINK-12608)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-20 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r285842097
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final InputDependencyConstraintChecker inputConstraintChecker;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.deploymentOptions = new HashMap<>();
+   this.inputConstraintChecker = new 
InputDependencyConstraintChecker();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   final DeploymentOption updateOption = new 
DeploymentOption(true);
+   final DeploymentOption nonUpdateOption = new 
DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   if (srp.getPartitionType().isBlocking()) {
+   
inputConstraintChecker.addSchedulingResultPartition(srp);
+   } else {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID executionVertexId : verticesToRestart) {
+   final SchedulingExecutionVertex schedulingVertex = 
schedulingTopology.getVertex(executionVertexId)
+   .orElseThrow(() -> new 
IllegalStateException("can not find scheduling vertex for " + 
executionVertexId));
+
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   if (srp.getPartitionType().isBlocking()) {
 
 Review 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-20 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r285841871
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/InputDependencyConstraintCheckerTest.java
 ##
 @@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.EMPTY;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link InputDependencyConstraintChecker}.
+ */
+public class InputDependencyConstraintCheckerTest extends TestLogger {
+
+   @Test
+   public void testCheckInputVertex() {
+   final TestingSchedulingTopology testingSchedulingTopology = new 
TestingSchedulingTopology();
+
+   final List producers = 
testingSchedulingTopology.addExecutionVertices().finish();
+
+   final InputDependencyConstraintChecker inputChecker = 
testingSchedulingTopology
 
 Review comment:
   OK, involving `TestingSchedulingTopology` into 
`InputDependencyConstraintChecker` is not a good choice.
   
   As I would like to test various combinations of `ResultPartitionType`, 
`ResultPartitionState` and `InputDependencyConstraint`, I add simple 
`TestingSchedulingExecutionVertexBuilder` and 
`TestingSchedulingResultPartitionBuilder` in 
`InputDependencyConstraintCheckerTest`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-20 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r285575284
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final InputDependencyConstraintChecker inputConstraintChecker;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.deploymentOptions = new HashMap<>();
+   this.inputConstraintChecker = new 
InputDependencyConstraintChecker();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   final DeploymentOption updateOption = new 
DeploymentOption(true);
+   final DeploymentOption nonUpdateOption = new 
DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   if (srp.getPartitionType().isBlocking()) {
+   
inputConstraintChecker.addSchedulingResultPartition(srp);
+   } else {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-20 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r285549690
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final InputDependencyConstraintChecker inputConstraintChecker;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.deploymentOptions = new HashMap<>();
+   this.inputConstraintChecker = new 
InputDependencyConstraintChecker();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   final DeploymentOption updateOption = new 
DeploymentOption(true);
+   final DeploymentOption nonUpdateOption = new 
DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   if (srp.getPartitionType().isBlocking()) {
+   
inputConstraintChecker.addSchedulingResultPartition(srp);
+   } else {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID executionVertexId : verticesToRestart) {
+   final SchedulingExecutionVertex schedulingVertex = 
schedulingTopology.getVertex(executionVertexId)
 
 Review comment:
   Yes, I also think that `Optional` may be a little code redundant since 
`orElseThrow` is always checked. May be we could optimize this in another JIRA 
ticket.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-17 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r285010865
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   private TestingSchedulerOperations testingSchedulerOperation = new 
TestingSchedulerOperations();
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   final TestingSchedulingTopology testingSchedulingTopology = new 
TestingSchedulingTopology();
+
+   final List producers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   final List consumers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   testingSchedulingTopology.connectAllToAll(producers, 
consumers).finish();
+
+   startScheduling(testingSchedulingTopology);
+
+   assertThat(testingSchedulerOperation, 
hasScheduledVertices(producers));
+   }
+
+   /**
+* Tests that when restart tasks will only schedule input ready 
vertices in given ones.
+*/
+   @Test
+   public void testRestartTasks() {
+   final TestingSchedulingTopology testingSchedulingTopology = new 
TestingSchedulingTopology();
+
+   final List producers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   final List consumers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   testingSchedulingTopology.connectAllToAll(producers, 
consumers).finish();
+
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = 
startScheduling(testingSchedulingTopology);
+
+   assertThat(testingSchedulerOperation, 
hasScheduledVertices(producers));
+
+   Set verticesToRestart = 
producers.stream().map(TestingSchedulingExecutionVertex::getId)
+   .collect(Collectors.toSet());
+   verticesToRestart.addAll(consumers.stream().map(
+   
TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet()));
+
+   schedulingStrategy.restartTasks(verticesToRestart);
+   assertThat(testingSchedulerOperation, 
hasScheduledVertices(producers));
+   }
+
 
 Review comment:
   `SchedulingIntermediateDataSetManager` is a good idea, I will wrap up all 
the  `SchedulingIntermediateDataSet` functions into 
`SchedulingIntermediateDataSetManager`.
   
   ~~For `restartTasks` tests I expand `ProducerConsumerConnectionBuilder`, It 
seems easy to combine various `ResultPartitionType` and `ResultPartitionState` 
user cases. Thus I still add tests in `LazyFromSourcesSchedulingStrategyTest`, 
what do you think?~~
   Edit: Misunderstood the class meaning, I will do like that.


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-17 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r285011042
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.intermediateDataSets = new HashMap<>();
+   this.deploymentOptions = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   DeploymentOption updateOption = new DeploymentOption(true);
+   DeploymentOption nonUpdateOption = new DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.addSchedulingResultPartition(srp);
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-17 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r285011133
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.intermediateDataSets = new HashMap<>();
+   this.deploymentOptions = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   DeploymentOption updateOption = new DeploymentOption(true);
+   DeploymentOption nonUpdateOption = new DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.addSchedulingResultPartition(srp);
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-17 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r285010865
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   private TestingSchedulerOperations testingSchedulerOperation = new 
TestingSchedulerOperations();
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   final TestingSchedulingTopology testingSchedulingTopology = new 
TestingSchedulingTopology();
+
+   final List producers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   final List consumers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   testingSchedulingTopology.connectAllToAll(producers, 
consumers).finish();
+
+   startScheduling(testingSchedulingTopology);
+
+   assertThat(testingSchedulerOperation, 
hasScheduledVertices(producers));
+   }
+
+   /**
+* Tests that when restart tasks will only schedule input ready 
vertices in given ones.
+*/
+   @Test
+   public void testRestartTasks() {
+   final TestingSchedulingTopology testingSchedulingTopology = new 
TestingSchedulingTopology();
+
+   final List producers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   final List consumers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   testingSchedulingTopology.connectAllToAll(producers, 
consumers).finish();
+
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = 
startScheduling(testingSchedulingTopology);
+
+   assertThat(testingSchedulerOperation, 
hasScheduledVertices(producers));
+
+   Set verticesToRestart = 
producers.stream().map(TestingSchedulingExecutionVertex::getId)
+   .collect(Collectors.toSet());
+   verticesToRestart.addAll(consumers.stream().map(
+   
TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet()));
+
+   schedulingStrategy.restartTasks(verticesToRestart);
+   assertThat(testingSchedulerOperation, 
hasScheduledVertices(producers));
+   }
+
 
 Review comment:
   `SchedulingIntermediateDataSetManager` is a good idea, I will wrap up all 
the  `SchedulingIntermediateDataSet` functions into 
`SchedulingIntermediateDataSetManager`.
   
   For `restartTasks` tests I expand `ProducerConsumerConnectionBuilder`, It 
seems easy to combine various `ResultPartitionType` and `ResultPartitionState` 
user cases. Thus I still add tests in `LazyFromSourcesSchedulingStrategyTest`, 
what do you think?


This is an automated message from the Apache Git 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-17 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r285010823
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   private TestingSchedulerOperations testingSchedulerOperation = new 
TestingSchedulerOperations();
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   final TestingSchedulingTopology testingSchedulingTopology = new 
TestingSchedulingTopology();
+
+   final List producers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   final List consumers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   testingSchedulingTopology.connectAllToAll(producers, 
consumers).finish();
+
+   startScheduling(testingSchedulingTopology);
+
+   assertThat(testingSchedulerOperation, 
hasScheduledVertices(producers));
+   }
+
+   /**
+* Tests that when restart tasks will only schedule input ready 
vertices in given ones.
+*/
+   @Test
+   public void testRestartTasks() {
+   final TestingSchedulingTopology testingSchedulingTopology = new 
TestingSchedulingTopology();
+
+   final List producers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   final List consumers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   testingSchedulingTopology.connectAllToAll(producers, 
consumers).finish();
+
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = 
startScheduling(testingSchedulingTopology);
+
+   assertThat(testingSchedulerOperation, 
hasScheduledVertices(producers));
+
+   Set verticesToRestart = 
producers.stream().map(TestingSchedulingExecutionVertex::getId)
+   .collect(Collectors.toSet());
+   verticesToRestart.addAll(consumers.stream().map(
+   
TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet()));
+
+   schedulingStrategy.restartTasks(verticesToRestart);
+   assertThat(testingSchedulerOperation, 
hasScheduledVertices(producers));
+   }
+
+   /**
+* Tests that when partition consumable notified will start available 
{@link PIPELINED} downstream vertices.
+*/
+   @Test
+   public void testPipelinedPartitionConsumable() {
+   final TestingSchedulingTopology testingSchedulingTopology = new 
TestingSchedulingTopology();
+
+   final List producers = 
testingSchedulingTopology.addExecutionVertices()
+   .withParallelism(2).finish();
+   final List consumers = 
testingSchedulingTopology.addExecutionVertices()
+   

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-17 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r285011042
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.intermediateDataSets = new HashMap<>();
+   this.deploymentOptions = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   DeploymentOption updateOption = new DeploymentOption(true);
+   DeploymentOption nonUpdateOption = new DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.addSchedulingResultPartition(srp);
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-17 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r285010786
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.api.common.InputDependencyConstraint.ALL;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   private TestingSchedulerOperations testingSchedulerOperation = new 
TestingSchedulerOperations();
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   final TestingSchedulingTopology testingSchedulingTopology = new 
TestingSchedulingTopology();
+
+   final List producers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   final List consumers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   testingSchedulingTopology.connectAllToAll(producers, 
consumers).finish();
+
+   startScheduling(testingSchedulingTopology);
+
+   assertThat(testingSchedulerOperation, 
hasScheduledVertices(producers));
+   }
+
+   /**
+* Tests that when restart tasks will only schedule input ready 
vertices in given ones.
+*/
+   @Test
+   public void testRestartTasks() {
+   final TestingSchedulingTopology testingSchedulingTopology = new 
TestingSchedulingTopology();
+
+   final List producers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   final List consumers = 
testingSchedulingTopology.addExecutionVertices().finish();
+   testingSchedulingTopology.connectAllToAll(producers, 
consumers).finish();
+
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = 
startScheduling(testingSchedulingTopology);
+
+   assertThat(testingSchedulerOperation, 
hasScheduledVertices(producers));
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-14 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283719863
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.connectConsumerVerticesToPartition;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.initVerticesAndPartitions;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   final int jobVertexCnt = 2;
+   final int taskCnt = 3;
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+   TestingSchedulingExecutionVertex[][] vertices = new 
TestingSchedulingExecutionVertex[jobVertexCnt][taskCnt];
+   TestingSchedulingResultPartition[][] partitions = new 
TestingSchedulingResultPartition[jobVertexCnt - 1][taskCnt];
+
+   ResultPartitionType[] partitionTypes = new 
ResultPartitionType[jobVertexCnt - 1];
+   InputDependencyConstraint[] inputDependencyConstraints = new 
InputDependencyConstraint[jobVertexCnt];
+   partitionTypes[0] = BLOCKING;
+   inputDependencyConstraints[0] = ANY;
+   inputDependencyConstraints[1] = ANY;
+
+   initVerticesAndPartitions(schedulingTopology, partitionTypes, 
inputDependencyConstraints, vertices, partitions);
+
+   for (int i = 0; i < taskCnt; i++) {
+   connectConsumerVerticesToPartition(vertices[1][i], 
partitions[0][i]);
+   }
+
+   TestingSchedulerOperations testingSchedulerOperation = new 
TestingSchedulerOperations();
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = new 
LazyFromSourcesSchedulingStrategy(
+   testingSchedulerOperation,
+   schedulingTopology);
+
+   schedulingStrategy.startScheduling();
+
+   Set toBeScheduledVertices = 
Arrays.stream(vertices[0])
+   
.map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet());
+   Collection scheduledVertices = 
testingSchedulerOperation.getScheduledVertices().get(0);
+
+   
assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices),
+   containsInAnyOrder(toBeScheduledVertices.toArray()));
+   }
+
+   /**
+* Tests that when restart tasks will only schedule input ready 
vertices in given ones.
+*/
+   @Test
+

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-14 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283709559
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java
 ##
 @@ -50,4 +52,11 @@
 * @return Optional containing the respective scheduling result 
partition or none if the partition does not exist
 */
Optional 
getResultPartition(IntermediateResultPartitionID intermediateResultPartitionId);
+
+   /**
+* Looks up the {@link InputDependencyConstraint} for the given {@link 
JobVertexID}.
+* @param jobVertexId identifying the respective job vertex id
+* @return Optional containing the respective input dependency 
constraint or none if the job vertex does not exist
+*/
+   Optional 
getInputDependencyConstraint(JobVertexID jobVertexId);
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-11 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283083182
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.intermediateDataSets = new HashMap<>();
+   this.deploymentOptions = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   DeploymentOption updateOption = new DeploymentOption(true);
+   DeploymentOption nonUpdateOption = new DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.addSchedulingResultPartition(srp);
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-11 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283083182
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.intermediateDataSets = new HashMap<>();
+   this.deploymentOptions = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   DeploymentOption updateOption = new DeploymentOption(true);
+   DeploymentOption nonUpdateOption = new DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.addSchedulingResultPartition(srp);
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283083182
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.intermediateDataSets = new HashMap<>();
+   this.deploymentOptions = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   DeploymentOption updateOption = new DeploymentOption(true);
+   DeploymentOption nonUpdateOption = new DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.addSchedulingResultPartition(srp);
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283082959
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/StrategyTestUtil.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+/**
+ * Strategy test utilities.
+ */
+public class StrategyTestUtil {
+
+   static void initVerticesAndPartitions(
+   TestingSchedulingTopology schedulingTopology,
+   ResultPartitionType[] partitionTypes,
+   InputDependencyConstraint[] inputDependencyConstraints,
+   TestingSchedulingExecutionVertex[][] vertices,
+   TestingSchedulingResultPartition[][] partitions) {
+
+   ResultPartitionID[][] resultPartitionIds = new 
ResultPartitionID[vertices.length - 1][vertices[0].length];
+   initVerticesAndPartitions(schedulingTopology, partitionTypes, 
inputDependencyConstraints,
+   vertices, resultPartitionIds, partitions);
+   }
+
+   static void initVerticesAndPartitions(
+   TestingSchedulingTopology schedulingTopology,
+   ResultPartitionType[] partitionTypes,
+   InputDependencyConstraint[] inputDependencyConstraints,
+   TestingSchedulingExecutionVertex[][] vertices,
+   ResultPartitionID[][] resultPartitionIds,
+   TestingSchedulingResultPartition[][] partitions) {
+
+   final int jobVertexCnt = vertices.length;
+   final int taskCnt = vertices[0].length;
+
+   JobVertexID[] jobVertexIds = new JobVertexID[jobVertexCnt];
+   IntermediateDataSetID[] dataSetIds = new 
IntermediateDataSetID[jobVertexCnt];
+   for (int i = 0; i < jobVertexCnt; i++) {
+   jobVertexIds[i] = new JobVertexID();
+   dataSetIds[i] = new IntermediateDataSetID();
+   
schedulingTopology.addInputDependencyConstraint(jobVertexIds[i], 
inputDependencyConstraints[i]);
+   }
+
+   for (int i = 0; i < jobVertexCnt; i++) {
+   for (int j = 0; j < taskCnt; j++) {
+   vertices[i][j] = new 
TestingSchedulingExecutionVertex(jobVertexIds[i], j);
+   
schedulingTopology.addSchedulingExecutionVertex(vertices[i][j]);
+
+   if (i != jobVertexCnt - 1) {
+   partitions[i][j] = new 
TestingSchedulingResultPartition(dataSetIds[i], new 
IntermediateResultPartitionID(),
+   partitionTypes[i], 
vertices[i][j]);
+   resultPartitionIds[i][j] = new 
ResultPartitionID(partitions[i][j].getId(), new ExecutionAttemptID());
+   
schedulingTopology.addResultPartition(partitions[i][j].getId(), 
partitions[i][j]);
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283082955
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.api.common.InputDependencyConstraint.ANY;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.connectVerticesToPartition;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.getExecutionVertexIdsFromDeployOptions;
+import static 
org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil.initVerticesAndPartitions;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   final int jobVertexCnt = 2;
+   final int taskCnt = 3;
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+   TestingSchedulingExecutionVertex[][] vertices = new 
TestingSchedulingExecutionVertex[jobVertexCnt][taskCnt];
+   TestingSchedulingResultPartition[][] partitions = new 
TestingSchedulingResultPartition[jobVertexCnt - 1][taskCnt];
+
+   ResultPartitionType[] partitionTypes = new 
ResultPartitionType[jobVertexCnt - 1];
+   InputDependencyConstraint[] inputDependencyConstraints = new 
InputDependencyConstraint[jobVertexCnt];
+   partitionTypes[0] = BLOCKING;
+   inputDependencyConstraints[0] = ANY;
+   inputDependencyConstraints[1] = ANY;
+
+   initVerticesAndPartitions(schedulingTopology, partitionTypes, 
inputDependencyConstraints, vertices, partitions);
+
+   for (int i = 0; i < taskCnt; i++) {
+   for (int j = 0; j < taskCnt; j++) {
+   connectVerticesToPartition(vertices[0][i], 
vertices[1][i], partitions[0][j]);
+   }
+   }
+
+   TestingSchedulerOperations testingSchedulerOperation = new 
TestingSchedulerOperations();
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = new 
LazyFromSourcesSchedulingStrategy(
+   testingSchedulerOperation,
+   schedulingTopology);
+
+   schedulingStrategy.startScheduling();
+
+   Set toBeScheduledVertices = 
Arrays.stream(vertices[0])
+   
.map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet());
+   Collection scheduledVertices = 
testingSchedulerOperation.getScheduledVertices().get(0);
+
+   
assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices),
+   containsInAnyOrder(toBeScheduledVertices.toArray()));
+   }
+
+   /**
+* Tests that when 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283082635
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.intermediateDataSets = new HashMap<>();
+   this.deploymentOptions = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   DeploymentOption updateOption = new DeploymentOption(true);
+   DeploymentOption nonUpdateOption = new DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.addSchedulingResultPartition(srp);
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283082502
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.intermediateDataSets = new HashMap<>();
+   this.deploymentOptions = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   DeploymentOption updateOption = new DeploymentOption(true);
+   DeploymentOption nonUpdateOption = new DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.addSchedulingResultPartition(srp);
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-10 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r283082509
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.DONE;
+import static 
org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition.ResultPartitionState.PRODUCING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final Map 
deploymentOptions;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.intermediateDataSets = new HashMap<>();
+   this.deploymentOptions = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   DeploymentOption updateOption = new DeploymentOption(true);
+   DeploymentOption nonUpdateOption = new DeploymentOption(false);
+
+   for (SchedulingExecutionVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   DeploymentOption option = nonUpdateOption;
+   for (SchedulingResultPartition srp : 
schedulingVertex.getProducedResultPartitions()) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.addSchedulingResultPartition(srp);
+   if (srp.getPartitionType().isPipelined()) {
+   option = updateOption;
+   }
+   }
+   deploymentOptions.put(schedulingVertex.getId(), option);
+
+   if 
(schedulingVertex.getConsumedResultPartitions().isEmpty()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), option));
+   }
+   }
+
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void restartTasks(Set verticesToRestart) {
+   // increase counter of the dataset first
+   for (ExecutionVertexID 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-06 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281085847
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
 
 Review comment:
   Thanks for the excellent example, it really simplifies `restartTasks` code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-05 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281049742
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   }
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void onExecutionStateChange(ExecutionVertexID executionVertexId, 
ExecutionState executionState) {
 
 Review comment:
   @GJL @tillrohrmann  What do you think of using both vertex state transition 
and `onPartitionConsumable` notification ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-05 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281004053
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   }
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void onExecutionStateChange(ExecutionVertexID executionVertexId, 
ExecutionState executionState) {
 
 Review comment:
   Since using vertex state transitions to schedule vertices has the benefit of 
avoiding flood `onPartitionConsumable` notifications, while there may be 
idle-waiting-result of PIPELINED shuffle mode. So, I think we could keep the 
benefit by relying on both vertex state transitions and  
`onPartitionConsumable` notifications.
   1. `DeploymentOption#sendScheduleOrUpdateConsumerMessage` set to `true` if 
the vertex has PIPELINED produced result partition and set to `false` if all 
the  produced result partitions are BLOCKING
   2. Schedule vertices with BLOCKING input result partition using vertex state 
transition.
   3. 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-05 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002357
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281004053
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   }
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void onExecutionStateChange(ExecutionVertexID executionVertexId, 
ExecutionState executionState) {
 
 Review comment:
   Since using vertex state transitions to schedule vertices has the benefit of 
avoiding flood `onPartitionConsumable` notifications, while there may be 
operators that take a very long time until they produce a result of PIPELINED 
shuffle mode. So, I think we could keep both the benefit by relying on both 
vertex state transitions and  `onPartitionConsumable` notifications.
   1. DeploymentOption#sendScheduleOrUpdateConsumerMessage set to true only the 
vertex has PIPELINED produced result partition.
   2. Schedule vertices with BLOCKING input result partition using vertex state 
transition.
   3. Schedule vertices with 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002555
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit test for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   JobVertex jobVertex1 = new JobVertex("vertex#1");
+   JobVertex jobVertex2 = new JobVertex("vertex#2");
+   JobGraph graph = new JobGraph(jobVertex1, jobVertex2);
+   jobVertex2.connectNewDataSetAsInput(jobVertex1, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+
+   for (int i = 0; i < 3; i++) {
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex1.getID(), i));
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex2.getID(), i));
+   }
+
+   TestingSchedulerOperation testingSchedulerOperation = new 
TestingSchedulerOperation();
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = new 
LazyFromSourcesSchedulingStrategy(
+   testingSchedulerOperation,
+   schedulingTopology,
+   graph);
+
+   schedulingStrategy.startScheduling();
+
+   assertEquals(3, 
testingSchedulerOperation.getScheduledVertices().size());
+   }
+
+   /**
+* Tests that when on execution state change will start available 
downstream vertices.
+* vertex#0vertex#1
+*   \ /
+*\   /
+* \ /
+*  (BLOCKING, ALL)
+* vertex#3  vertex#2
+*   \/
+*\  /
+* \/
+*  (BLOCKING, ANY)
+* vertex#4
+*|
+*|
+*|
+*(PIPELINED)
+* vertex#5
+*/
+   @Test
+   public void testOnExecutionStateChange() {
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+   JobVertex[] jobVertices = new JobVertex[6];
+
+   for (int i = 0; i < 6; i++) {
+   jobVertices[i] = new JobVertex("vertex#" + i);
+   }
+
+   jobVertices[3].connectNewDataSetAsInput(jobVertices[0], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+   jobVertices[3].connectNewDataSetAsInput(jobVertices[1], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+   jobVertices[4].connectNewDataSetAsInput(jobVertices[3], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002374
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit test for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   JobVertex jobVertex1 = new JobVertex("vertex#1");
+   JobVertex jobVertex2 = new JobVertex("vertex#2");
+   JobGraph graph = new JobGraph(jobVertex1, jobVertex2);
+   jobVertex2.connectNewDataSetAsInput(jobVertex1, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+
+   for (int i = 0; i < 3; i++) {
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex1.getID(), i));
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex2.getID(), i));
+   }
+
+   TestingSchedulerOperation testingSchedulerOperation = new 
TestingSchedulerOperation();
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = new 
LazyFromSourcesSchedulingStrategy(
+   testingSchedulerOperation,
+   schedulingTopology,
+   graph);
+
+   schedulingStrategy.startScheduling();
+
+   assertEquals(3, 
testingSchedulerOperation.getScheduledVertices().size());
+   }
+
+   /**
+* Tests that when on execution state change will start available 
downstream vertices.
+* vertex#0vertex#1
+*   \ /
+*\   /
+* \ /
+*  (BLOCKING, ALL)
+* vertex#3  vertex#2
+*   \/
+*\  /
+* \/
+*  (BLOCKING, ANY)
+* vertex#4
+*|
+*|
+*|
+*(PIPELINED)
+* vertex#5
+*/
+   @Test
+   public void testOnExecutionStateChange() {
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+   JobVertex[] jobVertices = new JobVertex[6];
+
+   for (int i = 0; i < 6; i++) {
+   jobVertices[i] = new JobVertex("vertex#" + i);
+   }
+
+   jobVertices[3].connectNewDataSetAsInput(jobVertices[0], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+   jobVertices[3].connectNewDataSetAsInput(jobVertices[1], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+   jobVertices[4].connectNewDataSetAsInput(jobVertices[3], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002364
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit test for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   JobVertex jobVertex1 = new JobVertex("vertex#1");
+   JobVertex jobVertex2 = new JobVertex("vertex#2");
+   JobGraph graph = new JobGraph(jobVertex1, jobVertex2);
+   jobVertex2.connectNewDataSetAsInput(jobVertex1, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+
+   for (int i = 0; i < 3; i++) {
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex1.getID(), i));
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex2.getID(), i));
+   }
+
+   TestingSchedulerOperation testingSchedulerOperation = new 
TestingSchedulerOperation();
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = new 
LazyFromSourcesSchedulingStrategy(
+   testingSchedulerOperation,
+   schedulingTopology,
+   graph);
+
+   schedulingStrategy.startScheduling();
+
+   assertEquals(3, 
testingSchedulerOperation.getScheduledVertices().size());
+   }
+
+   /**
+* Tests that when on execution state change will start available 
downstream vertices.
+* vertex#0vertex#1
+*   \ /
+*\   /
+* \ /
+*  (BLOCKING, ALL)
+* vertex#3  vertex#2
+*   \/
+*\  /
+* \/
+*  (BLOCKING, ANY)
+* vertex#4
+*|
+*|
+*|
+*(PIPELINED)
+* vertex#5
+*/
+   @Test
+   public void testOnExecutionStateChange() {
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002357
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002317
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002177
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002120
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002108
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
 ##
 @@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+
+/**
+ * Simple implementation of {@link SchedulingResultPartition} for testing.
+ */
+public class TestingSchedulingResultPartition implements 
SchedulingResultPartition {
+   private final IntermediateDataSetID intermediateDataSetID;
+
+   private final IntermediateResultPartitionID 
intermediateResultPartitionID;
+
+   private final ResultPartitionType partitionType;
+
+   private final SchedulingVertex producer;
+
+   private Collection consumers;
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002116
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002114
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002106
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
 ##
 @@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+
+/**
+ * Simple implementation of {@link SchedulingResultPartition} for testing.
+ */
+public class TestingSchedulingResultPartition implements 
SchedulingResultPartition {
+   private final IntermediateDataSetID intermediateDataSetID;
+
+   private final IntermediateResultPartitionID 
intermediateResultPartitionID;
+
+   private final ResultPartitionType partitionType;
+
+   private final SchedulingVertex producer;
+
+   private Collection consumers;
+
+   TestingSchedulingResultPartition(IntermediateDataSetID dataSetID,
+   IntermediateResultPartitionID partitionID, 
ResultPartitionType type, SchedulingVertex producer,
+   Collection consumers) {
+   this.intermediateDataSetID = dataSetID;
+   this.intermediateResultPartitionID = partitionID;
+   this.partitionType = type;
+   this.producer = producer;
+   this.consumers = consumers;
+   }
+
+   @Override
+   public IntermediateResultPartitionID getId() {
+   return intermediateResultPartitionID;
+   }
+
+   @Override
+   public IntermediateDataSetID getResultId() {
+   return intermediateDataSetID;
+   }
+
+   @Override
+   public ResultPartitionType getPartitionType() {
+   return partitionType;
+   }
+
+   @Override
+   public ResultPartitionState getState() {
+   return ResultPartitionState.DONE;
+   }
+
+   @Override
+   public SchedulingVertex getProducer() {
+   return producer;
+   }
+
+   @Override
+   public Collection getConsumers() {
+   return consumers;
+   }
+
+   public void setConsumers(Collection consumers) {
+   this.consumers = consumers;
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002110
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281001965
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281001882
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
 
 Review comment:
   Yes, `schedulingVertex.getConsumedResultPartitions().isEmpty()` would work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281001867
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
 
 Review comment:
   OK, I thought the `verticesNeedingRestart` of `restartTasks` are well chosen 
for directly scheduling vertices before.
   There would be another little difference between `startScheduling` and 
`restartTasks` in implementation, the former schedules the vertices without 
input result partition of all vertices in the topology, and the 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281001575
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-04-29 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r279343134
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.runtime.execution.ExecutionState.CREATED;
+import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   }
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void onExecutionStateChange(ExecutionVertexID executionVertexId, 
ExecutionState executionState) {
+   Optional schedulingVertex = 
schedulingTopology.getVertex(executionVertexId);
+   if (!schedulingVertex.isPresent()) {
+   throw new IllegalStateException("can not find 
scheduling vertex for " + executionVertexId);
+   }
+
+   if (ExecutionState.RUNNING.equals(executionState)) {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   for (SchedulingResultPartition srp : 
schedulingVertex.get().getProducedResultPartitions()) {
+   if 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-04-29 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r279343944
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.runtime.execution.ExecutionState.CREATED;
+import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   }
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void onExecutionStateChange(ExecutionVertexID executionVertexId, 
ExecutionState executionState) {
+   Optional schedulingVertex = 
schedulingTopology.getVertex(executionVertexId);
+   if (!schedulingVertex.isPresent()) {
+   throw new IllegalStateException("can not find 
scheduling vertex for " + executionVertexId);
+   }
+
+   if (ExecutionState.RUNNING.equals(executionState)) {
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
+   for (SchedulingResultPartition srp : 
schedulingVertex.get().getProducedResultPartitions()) {
+   if 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-04-29 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r279340476
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   }
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void onExecutionStateChange(ExecutionVertexID executionVertexId, 
ExecutionState executionState) {
 
 Review comment:
   Reacting to onPartitionConsumable is much more complicated to check whether 
ANY/ALL the input `dataset` are consumable which depends on all the 
`partitions` are finished of `BLOCKING` type and data are producing of 
`PIPELINED` type. It seems that reacting to onExecutionStateChange is much 
easier to handle both `PIPELINED` and `BLOCKING` result type. In the discussion 
of slack channel, relying on onExecutionStateChange should be an easier and 
final way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to 

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-04-29 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r279331019
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingIntermediateDataSet.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+
+/**
+ * Representation of {@link IntermediateResult}.
+ */
+public interface SchedulingIntermediateDataSet {
 
 Review comment:
   In origin scheduler, `LazyFromSourceScheduler` schedules vertices when 
ANY/ALL input `dataset` are consumable, and when all the `partitions` are 
finished the `dataset` is consumable. Without `SchedulingIntermediateDataSet`,  
the input `SchedulingResultPartitions` may be checked O(n^2) times, while, 
using a counter in `SchedulingIntermediateDataSet` only check O(n) times.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services