zhuzhurk commented on a change in pull request #16688:
URL: https://github.com/apache/flink/pull/16688#discussion_r685733608



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.Function;
+
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.buildRawRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.mergeRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.uniqueRegions;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Utils for computing {@link SchedulingPipelinedRegion}s. */
+public final class SchedulingPipelinedRegionComputeUtil {
+
+    public static Set<Set<SchedulingExecutionVertex>> computePipelinedRegions(
+            final Iterable<? extends SchedulingExecutionVertex> 
topologicallySortedVertices,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever,
+            final Function<IntermediateResultPartitionID, ? extends 
SchedulingResultPartition>
+                    resultPartitionRetriever) {
+
+        final Map<SchedulingExecutionVertex, Set<SchedulingExecutionVertex>> 
vertexToRegion =
+                buildRawRegions(
+                        topologicallySortedVertices,
+                        (vertex) -> getReconnectableResults(vertex, 
resultPartitionRetriever));

Review comment:
       `(vertex)` -> `vertex`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.Function;
+
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.buildRawRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.mergeRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.uniqueRegions;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Utils for computing {@link SchedulingPipelinedRegion}s. */
+public final class SchedulingPipelinedRegionComputeUtil {
+
+    public static Set<Set<SchedulingExecutionVertex>> computePipelinedRegions(
+            final Iterable<? extends SchedulingExecutionVertex> 
topologicallySortedVertices,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever,
+            final Function<IntermediateResultPartitionID, ? extends 
SchedulingResultPartition>
+                    resultPartitionRetriever) {
+
+        final Map<SchedulingExecutionVertex, Set<SchedulingExecutionVertex>> 
vertexToRegion =
+                buildRawRegions(
+                        topologicallySortedVertices,
+                        (vertex) -> getReconnectableResults(vertex, 
resultPartitionRetriever));
+
+        return mergeRegionsOnCycles(vertexToRegion, executionVertexRetriever);
+    }
+
+    /**
+     * Merge the regions base on <a
+     * 
href="https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm";>
+     * Tarjan's strongly connected components algorithm</a>. For more details 
please see <a
+     * 
href="https://issues.apache.org/jira/browse/FLINK-17330";>FLINK-17330</a>.
+     */
+    private static Set<Set<SchedulingExecutionVertex>> mergeRegionsOnCycles(
+            final Map<SchedulingExecutionVertex, 
Set<SchedulingExecutionVertex>> vertexToRegion,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever) {
+
+        final List<Set<SchedulingExecutionVertex>> regionList =
+                new ArrayList<>(uniqueRegions(vertexToRegion));
+        final List<List<Integer>> outEdges =
+                buildOutEdgesDesc(vertexToRegion, regionList, 
executionVertexRetriever);
+        final Set<Set<Integer>> sccs =
+                
StronglyConnectedComponentsComputeUtils.computeStronglyConnectedComponents(
+                        outEdges.size(), outEdges);
+
+        final Set<Set<SchedulingExecutionVertex>> mergedRegions =
+                Collections.newSetFromMap(new IdentityHashMap<>());
+        for (Set<Integer> scc : sccs) {
+            checkState(scc.size() > 0);
+
+            Set<SchedulingExecutionVertex> mergedRegion = new HashSet<>();
+            for (int regionIndex : scc) {
+                mergedRegion =
+                        mergeRegions(mergedRegion, 
regionList.get(regionIndex), vertexToRegion);
+            }
+            mergedRegions.add(mergedRegion);
+        }
+
+        return mergedRegions;
+    }
+
+    private static List<List<Integer>> buildOutEdgesDesc(
+            final Map<SchedulingExecutionVertex, 
Set<SchedulingExecutionVertex>> vertexToRegion,
+            final List<Set<SchedulingExecutionVertex>> regionList,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever) {
+
+        final Map<Set<SchedulingExecutionVertex>, Integer> regionIndices = new 
IdentityHashMap<>();
+        for (int i = 0; i < regionList.size(); i++) {
+            regionIndices.put(regionList.get(i), i);
+        }
+
+        final List<List<Integer>> outEdges = new 
ArrayList<>(regionList.size());
+        for (Set<SchedulingExecutionVertex> currentRegion : regionList) {
+            final List<Integer> currentRegionOutEdges = new ArrayList<>();
+            for (SchedulingExecutionVertex vertex : currentRegion) {
+                for (SchedulingResultPartition producedResult : 
vertex.getProducedResults()) {
+                    if (producedResult.getResultType().isPipelined()) {
+                        continue;
+                    }
+                    for (ConsumerVertexGroup consumerVertexGroup :
+                            producedResult.getConsumerVertexGroups()) {
+                        for (ExecutionVertexID consumerVertexId : 
consumerVertexGroup) {
+                            SchedulingExecutionVertex consumerVertex =
+                                    
executionVertexRetriever.apply(consumerVertexId);
+                            // Since we are merging SchedulingPipelinedRegions 
inside one
+                            // LogicalPipelinedRegion, if any vertex of the 
ConsumerVertexGroup
+                            // doesn't belong to this LogicalPipelinedRegion, 
we can just skip the
+                            // remaining vertices in this group. This can 
decrease the computation
+                            // complexity.
+                            if (!vertexToRegion.containsKey(consumerVertex)) {
+                                break;
+                            }
+                            if (!currentRegion.contains(consumerVertex)) {
+                                currentRegionOutEdges.add(
+                                        
regionIndices.get(vertexToRegion.get(consumerVertex)));
+                            }
+                        }
+                    }
+                }
+            }
+            outEdges.add(currentRegionOutEdges);
+        }
+
+        return outEdges;
+    }
+
+    private static Iterable<SchedulingResultPartition> getReconnectableResults(

Review comment:
       getReconnectableResults -> getConsumedReconnectableResults

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
##########
@@ -22,35 +22,20 @@
 import org.apache.flink.runtime.topology.Result;
 import org.apache.flink.runtime.topology.Vertex;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.util.Preconditions.checkState;
+import java.util.function.Function;
 
-/** Utility for computing pipelined regions. */
+/** Common utils for computing pipelined regions. */
 public final class PipelinedRegionComputeUtil {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(PipelinedRegionComputeUtil.class);
-
-    public static <V extends Vertex<?, ?, V, R>, R extends Result<?, ?, V, R>>
-            Set<Set<V>> computePipelinedRegions(
-                    final Iterable<? extends V> topologicallySortedVertexes) {
-        final Map<V, Set<V>> vertexToRegion = 
buildRawRegions(topologicallySortedVertexes);
-        return mergeRegionsOnCycles(vertexToRegion);
-    }
-
-    private static <V extends Vertex<?, ?, V, R>, R extends Result<?, ?, V, R>>
+    static <V extends Vertex<?, ?, V, R>, R extends Result<?, ?, V, R>>
             Map<V, Set<V>> buildRawRegions(
-                    final Iterable<? extends V> topologicallySortedVertexes) {
+                    final Iterable<? extends V> topologicallySortedVertexes,

Review comment:
       topologicallySortedVertexes -> topologicallySortedVertices

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java
##########
@@ -27,7 +28,7 @@
  * intermediate result partition to a job vertex. An edge is parametrized with 
its {@link
  * DistributionPattern}.
  */
-public class JobEdge implements java.io.Serializable {
+public class JobEdge implements LogicalEdge, java.io.Serializable {

Review comment:
       Let's avoid to change the inheritance of JobEdge to avoid potential 
compatibility issues.
   We can introduce a `DefaultLogicalEdge` which is created from `JobEdge`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/LogicalPipelinedRegionComputeUtil.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.jobgraph.topology.LogicalPipelinedRegion;
+import org.apache.flink.runtime.jobgraph.topology.LogicalResult;
+import org.apache.flink.runtime.jobgraph.topology.LogicalVertex;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.buildRawRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.uniqueRegions;
+
+/** Utils for computing {@link LogicalPipelinedRegion}s. */
+public final class LogicalPipelinedRegionComputeUtil {
+
+    public static Set<Set<LogicalVertex>> computePipelinedRegions(
+            final Iterable<? extends LogicalVertex> 
topologicallySortedVertices) {
+
+        final Map<LogicalVertex, Set<LogicalVertex>> vertexToRegion =
+                buildRawRegions(
+                        topologicallySortedVertices,
+                        
LogicalPipelinedRegionComputeUtil::getReconnectableResults);
+
+        // Since LogicalTopology is a DAG, it cannot be cyclic. Thus, we don't 
need to use Tarjan's
+        // strongly connected components algorithm here.

Review comment:
       I prefer to simplify the explanations to be "Since LogicalTopology is a 
DAG, there is no need to do cycle detection and merge regions on cycles."
   
   It can be easier to understand and there is no need to mention Tarjan's 
strongly connected components which is just the chosen cycle detection 
algorithm in implementation.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
##########
@@ -60,39 +45,27 @@
             currentRegion.add(vertex);
             vertexToRegion.put(vertex, currentRegion);
 
-            for (R consumedResult : vertex.getConsumedResults()) {
+            for (R consumedResult : getReconnectableResults.apply(vertex)) {
                 // Similar to the BLOCKING ResultPartitionType, each vertex 
connected through

Review comment:
       I think this explanation should be moved to be right above the for loop.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/SchedulingPipelinedRegionComputeUtil.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.function.Function;
+
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.buildRawRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.mergeRegions;
+import static 
org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil.uniqueRegions;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Utils for computing {@link SchedulingPipelinedRegion}s. */
+public final class SchedulingPipelinedRegionComputeUtil {
+
+    public static Set<Set<SchedulingExecutionVertex>> computePipelinedRegions(
+            final Iterable<? extends SchedulingExecutionVertex> 
topologicallySortedVertices,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever,
+            final Function<IntermediateResultPartitionID, ? extends 
SchedulingResultPartition>
+                    resultPartitionRetriever) {
+
+        final Map<SchedulingExecutionVertex, Set<SchedulingExecutionVertex>> 
vertexToRegion =
+                buildRawRegions(
+                        topologicallySortedVertices,
+                        (vertex) -> getReconnectableResults(vertex, 
resultPartitionRetriever));
+
+        return mergeRegionsOnCycles(vertexToRegion, executionVertexRetriever);
+    }
+
+    /**
+     * Merge the regions base on <a
+     * 
href="https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm";>
+     * Tarjan's strongly connected components algorithm</a>. For more details 
please see <a
+     * 
href="https://issues.apache.org/jira/browse/FLINK-17330";>FLINK-17330</a>.
+     */
+    private static Set<Set<SchedulingExecutionVertex>> mergeRegionsOnCycles(
+            final Map<SchedulingExecutionVertex, 
Set<SchedulingExecutionVertex>> vertexToRegion,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever) {
+
+        final List<Set<SchedulingExecutionVertex>> regionList =
+                new ArrayList<>(uniqueRegions(vertexToRegion));
+        final List<List<Integer>> outEdges =
+                buildOutEdgesDesc(vertexToRegion, regionList, 
executionVertexRetriever);
+        final Set<Set<Integer>> sccs =
+                
StronglyConnectedComponentsComputeUtils.computeStronglyConnectedComponents(
+                        outEdges.size(), outEdges);
+
+        final Set<Set<SchedulingExecutionVertex>> mergedRegions =
+                Collections.newSetFromMap(new IdentityHashMap<>());
+        for (Set<Integer> scc : sccs) {
+            checkState(scc.size() > 0);
+
+            Set<SchedulingExecutionVertex> mergedRegion = new HashSet<>();
+            for (int regionIndex : scc) {
+                mergedRegion =
+                        mergeRegions(mergedRegion, 
regionList.get(regionIndex), vertexToRegion);
+            }
+            mergedRegions.add(mergedRegion);
+        }
+
+        return mergedRegions;
+    }
+
+    private static List<List<Integer>> buildOutEdgesDesc(
+            final Map<SchedulingExecutionVertex, 
Set<SchedulingExecutionVertex>> vertexToRegion,
+            final List<Set<SchedulingExecutionVertex>> regionList,
+            final Function<ExecutionVertexID, ? extends 
SchedulingExecutionVertex>
+                    executionVertexRetriever) {
+
+        final Map<Set<SchedulingExecutionVertex>, Integer> regionIndices = new 
IdentityHashMap<>();
+        for (int i = 0; i < regionList.size(); i++) {
+            regionIndices.put(regionList.get(i), i);
+        }
+
+        final List<List<Integer>> outEdges = new 
ArrayList<>(regionList.size());
+        for (Set<SchedulingExecutionVertex> currentRegion : regionList) {
+            final List<Integer> currentRegionOutEdges = new ArrayList<>();
+            for (SchedulingExecutionVertex vertex : currentRegion) {
+                for (SchedulingResultPartition producedResult : 
vertex.getProducedResults()) {
+                    if (producedResult.getResultType().isPipelined()) {
+                        continue;
+                    }
+                    for (ConsumerVertexGroup consumerVertexGroup :
+                            producedResult.getConsumerVertexGroups()) {
+                        for (ExecutionVertexID consumerVertexId : 
consumerVertexGroup) {
+                            SchedulingExecutionVertex consumerVertex =
+                                    
executionVertexRetriever.apply(consumerVertexId);
+                            // Since we are merging SchedulingPipelinedRegions 
inside one
+                            // LogicalPipelinedRegion, if any vertex of the 
ConsumerVertexGroup
+                            // doesn't belong to this LogicalPipelinedRegion, 
we can just skip the
+                            // remaining vertices in this group. This can 
decrease the computation
+                            // complexity.

Review comment:
       This documentation is hard to understand because it introduced some 
outside assumptions. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to