lincoln-lil commented on code in PR #23001:
URL: https://github.com/apache/flink/pull/23001#discussion_r1266723489


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##########
@@ -343,4 +369,59 @@ object WindowUtil {
     }
   }
 
+  private def containsNeighbourWindowOperator(
+      agg: FlinkLogicalAggregate,
+      fmq: FlinkRelMetadataQuery): Boolean = {
+
+    def find(rel: RelNode, fmq: FlinkRelMetadataQuery): Unit = {
+      rel match {
+        case rss: RelSubset =>
+          val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
+          find(innerRel, fmq)
+
+        case scan: FlinkLogicalTableFunctionScan =>
+          if (WindowUtil.isWindowTableFunctionCall(scan.getCall)) {
+            throw new Util.FoundOne(scan)
+          }
+          find(scan.getInput(0), fmq)
+
+        case aggregate: FlinkLogicalAggregate =>
+          val winProperties = fmq.getRelWindowProperties(aggregate.getInput)
+          val groups = aggregate.getGroupSet
+          // window agg
+          if (WindowUtil.groupingContainsWindowStartEnd(groups, 
winProperties)) {
+            throw new Util.FoundOne(aggregate)
+          }
+          find(aggregate.getInput, fmq)
+
+        case rank: FlinkLogicalRank =>
+          val winProperties = fmq.getRelWindowProperties(rank.getInput)
+          val partitionKey = rank.partitionKey
+          // both window rank & deduplicate
+          if (WindowUtil.groupingContainsWindowStartEnd(partitionKey, 
winProperties)) {
+            throw new Util.FoundOne(rank)
+          }
+          find(rank.getInput, fmq)
+
+        case join: FlinkLogicalJoin =>
+          // window join
+          if (satisfyWindowJoin(join)) {
+            throw new Util.FoundOne(join)
+          }
+        // others joins can not propagate both window_start and window_end 
time attribute, so
+        // further traversal of child nodes is unnecessary

Review Comment:
   the comments here can't be automatically formatted to get a proper indent, 
they need to be adjusted to the front



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##########
@@ -343,4 +369,59 @@ object WindowUtil {
     }
   }
 
+  private def containsNeighbourWindowOperator(
+      agg: FlinkLogicalAggregate,
+      fmq: FlinkRelMetadataQuery): Boolean = {
+
+    def find(rel: RelNode, fmq: FlinkRelMetadataQuery): Unit = {
+      rel match {
+        case rss: RelSubset =>
+          val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
+          find(innerRel, fmq)
+
+        case scan: FlinkLogicalTableFunctionScan =>
+          if (WindowUtil.isWindowTableFunctionCall(scan.getCall)) {
+            throw new Util.FoundOne(scan)
+          }
+          find(scan.getInput(0), fmq)
+
+        case aggregate: FlinkLogicalAggregate =>
+          val winProperties = fmq.getRelWindowProperties(aggregate.getInput)
+          val groups = aggregate.getGroupSet
+          // window agg
+          if (WindowUtil.groupingContainsWindowStartEnd(groups, 
winProperties)) {
+            throw new Util.FoundOne(aggregate)

Review Comment:
   good question! On second thought, this could be further simplified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to