zhuzhurk commented on a change in pull request #18102:
URL: https://github.com/apache/flink/pull/18102#discussion_r785714256



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandler.java
##########
@@ -62,11 +62,17 @@ public DefaultOperatorCoordinatorHandler(
     private static Map<OperatorID, OperatorCoordinatorHolder> 
createCoordinatorMap(
             ExecutionGraph executionGraph) {
         Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap = new 
HashMap<>();
-        for (ExecutionJobVertex vertex : 
executionGraph.getAllVertices().values()) {
-            for (OperatorCoordinatorHolder holder : 
vertex.getOperatorCoordinators()) {
-                coordinatorMap.put(holder.operatorId(), holder);
-            }
-        }
+
+        executionGraph.getAllVertices().values().stream()
+                .filter(ExecutionJobVertex::isInitialized)
+                .forEach(

Review comment:
       can be simplified to
   
   ```
   return executionGraph.getAllVertices().values().stream()
                   .flatMap(v -> v.getOperatorCoordinators().stream())
                   .collect(
                           Collectors.toMap(
                                   OperatorCoordinatorHolder::operatorId, 
Function.identity()));
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java
##########
@@ -54,16 +55,32 @@
             IntermediateDataSetID intermediateDataSetId,
             ResultPartitionType partitionType,
             Supplier<ResultPartitionState> resultPartitionStateSupplier,
-            ConsumerVertexGroup consumerVertexGroup,
+            Supplier<ConsumerVertexGroup> consumerVertexGroupSupplier,
             Supplier<List<ConsumedPartitionGroup>> 
consumerPartitionGroupSupplier) {
         this.resultPartitionId = checkNotNull(partitionId);
         this.intermediateDataSetId = checkNotNull(intermediateDataSetId);
         this.partitionType = checkNotNull(partitionType);
         this.resultPartitionStateSupplier = 
checkNotNull(resultPartitionStateSupplier);
-        this.consumerVertexGroup = consumerVertexGroup;
+        this.consumerVertexGroupSupplier = consumerVertexGroupSupplier;

Review comment:
       I think `checkNotNull` is needed.
   
   I also noticed that in testing constructors, the suppliers are set to null. 
I think it is not right because they are never expected to be null in 
production.
   
   I would remove the testing constructors. And change the test to invoke the 
production constructor, passing `() -> throw new 
UnsupportedOperationException()` instead of a `null` as the parameter.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
##########
@@ -114,19 +115,21 @@
                     if (!producedResult.getResultType().isReconnectable()) {
                         continue;
                     }
-                    final ConsumerVertexGroup consumerVertexGroup =
+                    final Optional<ConsumerVertexGroup> consumerVertexGroup =
                             producedResult.getConsumerVertexGroup();
-                    for (ExecutionVertexID consumerVertexId : 
consumerVertexGroup) {
-                        SchedulingExecutionVertex consumerVertex =
-                                
executionVertexRetriever.apply(consumerVertexId);
-                        // Skip the ConsumerVertexGroup if its vertices are 
outside current
-                        // regions and cannot be merged
-                        if (!vertexToRegion.containsKey(consumerVertex)) {
-                            break;
-                        }
-                        if (!currentRegion.contains(consumerVertex)) {
-                            currentRegionOutEdges.add(
-                                    
regionIndices.get(vertexToRegion.get(consumerVertex)));
+                    if (consumerVertexGroup.isPresent()) {

Review comment:
       maybe `if (consumerVertexGroup.isPresent()) { continue; }` to avoid 
nested code blocks.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
##########
@@ -66,31 +70,34 @@
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultExecutionTopology.class);
 
-    private final Map<ExecutionVertexID, DefaultExecutionVertex> 
executionVerticesById;
+    private final Map<ExecutionVertexID, DefaultExecutionVertex> 
executionVerticesById =

Review comment:
       NIT: It's better to initialize these maps and lists in the constructor.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
##########
@@ -159,50 +162,79 @@ public static DefaultExecutionTopology fromExecutionGraph(
                                 topologicallySortedJobVertices)
                         .getAllPipelinedRegions();
 
-        ExecutionGraphIndex executionGraphIndex =
-                computeExecutionGraphIndex(
-                        executionGraph.getAllExecutionVertices(),
-                        logicalPipelinedRegions,
-                        edgeManager);
-
-        IndexedPipelinedRegions indexedPipelinedRegions =
-                computePipelinedRegions(
-                        
executionGraphIndex.sortedExecutionVerticesInPipelinedRegion.keySet(),
-                        
executionGraphIndex.sortedExecutionVerticesInPipelinedRegion::get,
-                        executionGraphIndex.executionVerticesById::get,
-                        executionGraphIndex.resultPartitionsById::get);
-
-        ensureCoLocatedVerticesInSameRegion(
-                indexedPipelinedRegions.pipelinedRegions, executionGraph);
-
-        return new DefaultExecutionTopology(
-                executionGraphIndex.executionVerticesById,
-                executionGraphIndex.executionVerticesList,
-                executionGraphIndex.resultPartitionsById,
-                indexedPipelinedRegions.pipelinedRegionsByVertex,
-                indexedPipelinedRegions.pipelinedRegions,
-                edgeManager);
-    }
-
-    private static ExecutionGraphIndex computeExecutionGraphIndex(
-            Iterable<ExecutionVertex> executionVertices,
-            Iterable<DefaultLogicalPipelinedRegion> logicalPipelinedRegions,
-            EdgeManager edgeManager) {
-        Map<ExecutionVertexID, DefaultExecutionVertex> executionVerticesById = 
new HashMap<>();
-        List<DefaultExecutionVertex> executionVerticesList = new ArrayList<>();
-        Map<IntermediateResultPartitionID, DefaultResultPartition> 
resultPartitionsById =
-                new HashMap<>();
-        Map<DefaultLogicalPipelinedRegion, List<DefaultExecutionVertex>>
-                sortedExecutionVerticesInPipelinedRegion = new 
IdentityHashMap<>();
-
-        Map<JobVertexID, DefaultLogicalPipelinedRegion> 
logicalPipelinedRegionByJobVertexId =
+        Map<JobVertexID, DefaultLogicalPipelinedRegion> 
logicalPipelinedRegionsByJobVertexId =
                 new HashMap<>();
         for (DefaultLogicalPipelinedRegion logicalPipelinedRegion : 
logicalPipelinedRegions) {
             for (LogicalVertex vertex : logicalPipelinedRegion.getVertices()) {
-                logicalPipelinedRegionByJobVertexId.put(vertex.getId(), 
logicalPipelinedRegion);
+                logicalPipelinedRegionsByJobVertexId.put(vertex.getId(), 
logicalPipelinedRegion);
             }
         }
 
+        return logicalPipelinedRegionsByJobVertexId;
+    }
+
+    public void notifyExecutionGraphUpdated(
+            final DefaultExecutionGraph executionGraph,
+            final List<ExecutionJobVertex> newJobVertices) {
+
+        checkNotNull(executionGraph, "execution graph can not be null");
+
+        final Set<JobVertexID> newVertexSet =
+                newJobVertices.stream()
+                        .map(ExecutionJobVertex::getJobVertexId)
+                        .collect(Collectors.toSet());
+
+        // any PIPELINED input should be from within this new set so that 
existing pipelined regions
+        // will not change
+        newJobVertices.stream()
+                .map(ExecutionJobVertex::getJobVertex)
+                .flatMap(v -> v.getInputs().stream())
+                .map(JobEdge::getSource)
+                .filter(r -> r.getResultType().isPipelined())
+                .map(IntermediateDataSet::getProducer)
+                .map(JobVertex::getID)
+                .forEach(id -> checkState(newVertexSet.contains(id)));
+
+        final Iterable<ExecutionVertex> newAddedExecutionVertices =
+                newJobVertices.stream()
+                        .flatMap(jobVertex -> 
Stream.of(jobVertex.getTaskVertices()))
+                        .collect(Collectors.toList());
+
+        updateExecutionGraphIndex(newAddedExecutionVertices);
+
+        updatePipelinedRegions(newAddedExecutionVertices);
+
+        ensureCoLocatedVerticesInSameRegion(pipelinedRegions, executionGraph);
+    }
+
+    public static DefaultExecutionTopology fromExecutionGraph(
+            DefaultExecutionGraph executionGraph) {
+        checkNotNull(executionGraph, "execution graph can not be null");
+
+        EdgeManager edgeManager = executionGraph.getEdgeManager();
+
+        Iterable<ExecutionVertex> executionVertices = 
executionGraph.getAllExecutionVertices();
+
+        DefaultExecutionTopology schedulingTopology =
+                new DefaultExecutionTopology(
+                        () ->
+                                IterableUtils.toStream(executionVertices)

Review comment:
       executionVertices -> executionGraph.getAllExecutionVertices()
   
   I feel this makes the `sortedExecutionVertexIds` easier to understand.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
##########
@@ -126,6 +130,13 @@ public DefaultResultPartition getResultPartition(
         return resultPartition;
     }
 
+    @Override
+    public void registerSchedulingTopologyListener(SchedulingTopologyListener 
listener) {
+        if (listener != null) {

Review comment:
       Better to do `checkNotNull`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java
##########
@@ -54,16 +55,32 @@
             IntermediateDataSetID intermediateDataSetId,
             ResultPartitionType partitionType,
             Supplier<ResultPartitionState> resultPartitionStateSupplier,
-            ConsumerVertexGroup consumerVertexGroup,
+            Supplier<ConsumerVertexGroup> consumerVertexGroupSupplier,
             Supplier<List<ConsumedPartitionGroup>> 
consumerPartitionGroupSupplier) {
         this.resultPartitionId = checkNotNull(partitionId);
         this.intermediateDataSetId = checkNotNull(intermediateDataSetId);
         this.partitionType = checkNotNull(partitionType);
         this.resultPartitionStateSupplier = 
checkNotNull(resultPartitionStateSupplier);
-        this.consumerVertexGroup = consumerVertexGroup;
+        this.consumerVertexGroupSupplier = consumerVertexGroupSupplier;

Review comment:
       I noticed `DefaultExecutionVertex` has the same problem, would you add a 
hotfix commit to fix it along with `DefaultResultPartition`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
##########
@@ -58,6 +74,19 @@ public void testGetPartitionState() {
         }
     }
 
+    @Test
+    public void testGetConsumerVertexGroup() {
+
+        assertFalse(resultPartition.getConsumerVertexGroup().isPresent());
+
+        // test update consumers
+        ExecutionVertexID executionVertexID = new ExecutionVertexID(new 
JobVertexID(), 0);

Review comment:
       NIT: executionVertexID -> executionVertexId

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
##########
@@ -159,50 +162,79 @@ public static DefaultExecutionTopology fromExecutionGraph(
                                 topologicallySortedJobVertices)
                         .getAllPipelinedRegions();
 
-        ExecutionGraphIndex executionGraphIndex =
-                computeExecutionGraphIndex(
-                        executionGraph.getAllExecutionVertices(),
-                        logicalPipelinedRegions,
-                        edgeManager);
-
-        IndexedPipelinedRegions indexedPipelinedRegions =
-                computePipelinedRegions(
-                        
executionGraphIndex.sortedExecutionVerticesInPipelinedRegion.keySet(),
-                        
executionGraphIndex.sortedExecutionVerticesInPipelinedRegion::get,
-                        executionGraphIndex.executionVerticesById::get,
-                        executionGraphIndex.resultPartitionsById::get);
-
-        ensureCoLocatedVerticesInSameRegion(
-                indexedPipelinedRegions.pipelinedRegions, executionGraph);
-
-        return new DefaultExecutionTopology(
-                executionGraphIndex.executionVerticesById,
-                executionGraphIndex.executionVerticesList,
-                executionGraphIndex.resultPartitionsById,
-                indexedPipelinedRegions.pipelinedRegionsByVertex,
-                indexedPipelinedRegions.pipelinedRegions,
-                edgeManager);
-    }
-
-    private static ExecutionGraphIndex computeExecutionGraphIndex(
-            Iterable<ExecutionVertex> executionVertices,
-            Iterable<DefaultLogicalPipelinedRegion> logicalPipelinedRegions,
-            EdgeManager edgeManager) {
-        Map<ExecutionVertexID, DefaultExecutionVertex> executionVerticesById = 
new HashMap<>();
-        List<DefaultExecutionVertex> executionVerticesList = new ArrayList<>();
-        Map<IntermediateResultPartitionID, DefaultResultPartition> 
resultPartitionsById =
-                new HashMap<>();
-        Map<DefaultLogicalPipelinedRegion, List<DefaultExecutionVertex>>
-                sortedExecutionVerticesInPipelinedRegion = new 
IdentityHashMap<>();
-
-        Map<JobVertexID, DefaultLogicalPipelinedRegion> 
logicalPipelinedRegionByJobVertexId =
+        Map<JobVertexID, DefaultLogicalPipelinedRegion> 
logicalPipelinedRegionsByJobVertexId =
                 new HashMap<>();
         for (DefaultLogicalPipelinedRegion logicalPipelinedRegion : 
logicalPipelinedRegions) {
             for (LogicalVertex vertex : logicalPipelinedRegion.getVertices()) {
-                logicalPipelinedRegionByJobVertexId.put(vertex.getId(), 
logicalPipelinedRegion);
+                logicalPipelinedRegionsByJobVertexId.put(vertex.getId(), 
logicalPipelinedRegion);
             }
         }
 
+        return logicalPipelinedRegionsByJobVertexId;
+    }
+
+    public void notifyExecutionGraphUpdated(
+            final DefaultExecutionGraph executionGraph,
+            final List<ExecutionJobVertex> newJobVertices) {

Review comment:
       newJobVertices -> newlyInitializedJobVertices

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartition.java
##########
@@ -28,12 +28,13 @@
 import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Supplier;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Default implementation of {@link SchedulingResultPartition}. */
-class DefaultResultPartition implements SchedulingResultPartition {
+public class DefaultResultPartition implements SchedulingResultPartition {

Review comment:
       Why is it needed to be public?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to