godfreyhe commented on a change in pull request #13625:
URL: https://github.com/apache/flink/pull/13625#discussion_r506903320



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/BatchExecNode.scala
##########
@@ -25,11 +25,4 @@ import org.apache.flink.table.planner.utils.Logging
 /**
   * Base class for batch ExecNode.
   */
-trait BatchExecNode[T] extends ExecNode[BatchPlanner, T] with Logging {
-
-  /**
-    * Returns [[DamBehavior]] of this node.
-    */
-  def getDamBehavior: DamBehavior
-
-}
+trait BatchExecNode[T] extends ExecNode[BatchPlanner, T] with Logging

Review comment:
       nit: remove unused imports

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecUnion.scala
##########
@@ -98,11 +98,11 @@ class BatchExecUnion(
 
   //~ ExecNode methods 
-----------------------------------------------------------
 
-  override def getDamBehavior: DamBehavior = DamBehavior.PIPELINED
-
   override def getInputNodes: util.List[ExecNode[BatchPlanner, _]] =
     getInputs.map(_.asInstanceOf[ExecNode[BatchPlanner, _]])
 
+  override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT, 
ExecEdge.DEFAULT)

Review comment:
       may have more than two inputs

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/StreamExecNode.scala
##########
@@ -21,7 +21,19 @@ package org.apache.flink.table.planner.plan.nodes.exec
 import org.apache.flink.table.planner.delegation.StreamPlanner
 import org.apache.flink.table.planner.utils.Logging
 
+import java.util
+
 /**
   * Base class for stream ExecNode.
   */
-trait StreamExecNode[T] extends ExecNode[StreamPlanner, T] with Logging
+trait StreamExecNode[T] extends ExecNode[StreamPlanner, T] with Logging {
+
+  def getInputEdges: util.List[ExecEdge] = {
+    // TODO fill out the required shuffle for each stream exec node
+    val edges = new util.ArrayList[ExecEdge]()
+    for (_ <- 0 until getInputNodes.size()) {
+      edges.add(ExecEdge.DEFAULT)
+    }
+    edges

Review comment:
       can be simplified as `getInputNodes.map(_ => ExecEdge.DEFAULT)`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/DeadlockBreakupProcessor.scala
##########
@@ -146,22 +146,33 @@ class DeadlockBreakupProcessor extends DAGProcessor {
 
   class DeadlockBreakupVisitor(finder: ReuseNodeFinder) extends 
ExecNodeVisitorImpl {
 
-    private def rewriteJoin(
-        join: BatchExecJoinBase,
-        leftIsBuild: Boolean,
-        distribution: FlinkRelDistribution): Unit = {
-      val (buildSideIndex, probeSideIndex) = if (leftIsBuild) (0, 1) else (1, 
0)
-      val buildNode = join.getInputNodes.get(buildSideIndex)
-      val probeNode = join.getInputNodes.get(probeSideIndex)
+    private def rewriteTwoInputNode(
+        node: ExecNode[_, _],
+        leftPriority: Int,
+        requiredShuffle: ExecEdge.RequiredShuffle): Unit = {
+      val (buildSideIndex, probeSideIndex) = if (leftPriority == 0) (0, 1) 
else (1, 0)
+      val buildNode = node.getInputNodes.get(buildSideIndex)

Review comment:
       not build/probe concept here

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/DeadlockBreakupProcessor.scala
##########
@@ -320,20 +334,24 @@ class DeadlockBreakupProcessor extends DAGProcessor {
         // should exclude the reused node (at last position in path)
         while (!hasFullDamNode && idx < inputPath.length - 1) {
           val node = inputPath(idx)
-          val nodeDamBehavior = 
node.asInstanceOf[BatchExecNode[_]].getDamBehavior
-          hasFullDamNode = if (nodeDamBehavior == DamBehavior.FULL_DAM) {
+          val atLeastEndInput = node.getInputEdges.forall(
+            e => 
e.getDamBehavior.stricterOrEqual(ExecEdge.DamBehavior.END_INPUT))
+          hasFullDamNode = if (atLeastEndInput) {
             true
           } else {
-            node match {
-              case h: BatchExecHashJoin =>
-                val buildSideIndex = if (h.leftIsBuild) 0 else 1
-                val buildNode = h.getInputNodes.get(buildSideIndex)
-                checkJoinBuildSide(buildNode, idx, inputPath)
-              case n: BatchExecNestedLoopJoin =>
-                val buildSideIndex = if (n.leftIsBuild) 0 else 1
-                val buildNode = n.getInputNodes.get(buildSideIndex)
-                checkJoinBuildSide(buildNode, idx, inputPath)
-              case _ => false
+            val inputEdges = node.getInputEdges
+            if (inputEdges.size() == 2) {
+              val leftPriority = inputEdges.get(0).getPriority
+              val rightPriority = inputEdges.get(1).getPriority
+              if (leftPriority != rightPriority) {
+                val buildSideIndex = if (leftPriority == 0) 0 else 1

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/DeadlockBreakupProcessor.scala
##########
@@ -175,29 +186,32 @@ class DeadlockBreakupProcessor extends DAGProcessor {
               probeRel,
               distribution)
             e.setRequiredShuffleMode(ShuffleMode.BATCH)
-            // replace join node's input
-            join.replaceInputNode(probeSideIndex, e)
+            // replace node's input
+            
node.asInstanceOf[BatchExecNode[_]].replaceInputNode(probeSideIndex, e)
         }
       }
     }
 
     override def visit(node: ExecNode[_, _]): Unit = {
       super.visit(node)
-      node match {
-        case hashJoin: BatchExecHashJoin =>
-          val joinInfo = hashJoin.getJoinInfo
-          val columns = if (hashJoin.leftIsBuild) joinInfo.rightKeys else 
joinInfo.leftKeys
-          val distribution = FlinkRelDistribution.hash(columns)
-          rewriteJoin(hashJoin, hashJoin.leftIsBuild, distribution)
-        case nestedLoopJoin: BatchExecNestedLoopJoin =>
-          rewriteJoin(nestedLoopJoin, nestedLoopJoin.leftIsBuild, 
FlinkRelDistribution.ANY)
-        case _ => // do nothing
+      val inputEdges = node.getInputEdges
+      if (inputEdges.size() == 2) {
+        val leftPriority = inputEdges.get(0).getPriority
+        val rightPriority = inputEdges.get(1).getPriority
+        val requiredShuffle = if (leftPriority == 1) {

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/reuse/DeadlockBreakupProcessor.scala
##########
@@ -146,22 +146,33 @@ class DeadlockBreakupProcessor extends DAGProcessor {
 
   class DeadlockBreakupVisitor(finder: ReuseNodeFinder) extends 
ExecNodeVisitorImpl {
 
-    private def rewriteJoin(
-        join: BatchExecJoinBase,
-        leftIsBuild: Boolean,
-        distribution: FlinkRelDistribution): Unit = {
-      val (buildSideIndex, probeSideIndex) = if (leftIsBuild) (0, 1) else (1, 
0)
-      val buildNode = join.getInputNodes.get(buildSideIndex)
-      val probeNode = join.getInputNodes.get(probeSideIndex)
+    private def rewriteTwoInputNode(
+        node: ExecNode[_, _],
+        leftPriority: Int,
+        requiredShuffle: ExecEdge.RequiredShuffle): Unit = {
+      val (buildSideIndex, probeSideIndex) = if (leftPriority == 0) (0, 1) 
else (1, 0)

Review comment:
       the highest priority is always 0? do we has this guarantee?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to