This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 63edfd6bf71 [FLINK-36950][table] Migrate WindowPropertiesRules to java
63edfd6bf71 is described below

commit 63edfd6bf7140c8be63cd76727784a875e1fbbe3
Author: Jacky Lau <liuyon...@gmail.com>
AuthorDate: Wed Jan 8 08:20:22 2025 +0800

    [FLINK-36950][table] Migrate WindowPropertiesRules to java
---
 .../plan/rules/logical/WindowPropertiesRules.java  | 413 +++++++++++++++++++++
 .../plan/rules/logical/WindowPropertiesRule.scala  | 306 ---------------
 2 files changed, 413 insertions(+), 306 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRules.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRules.java
new file mode 100644
index 00000000000..74b2990382b
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRules.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate;
+import org.apache.flink.table.planner.plan.utils.AggregateUtil;
+import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
+import org.apache.flink.table.runtime.groupwindow.ProctimeAttribute;
+import org.apache.flink.table.runtime.groupwindow.RowtimeAttribute;
+import org.apache.flink.table.runtime.groupwindow.WindowEnd;
+import org.apache.flink.table.runtime.groupwindow.WindowStart;
+
+import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+
+public class WindowPropertiesRules {
+
+    public static final WindowPropertiesHavingRule 
WINDOW_PROPERTIES_HAVING_RULE =
+            
WindowPropertiesHavingRule.WindowPropertiesHavingRuleConfig.DEFAULT.toRule();
+
+    public static final WindowPropertiesRule WINDOW_PROPERTIES_RULE =
+            WindowPropertiesRule.WindowPropertiesRuleConfig.DEFAULT.toRule();
+
+    public static class WindowPropertiesRule
+            extends RelRule<WindowPropertiesRule.WindowPropertiesRuleConfig> {
+
+        protected 
WindowPropertiesRule(WindowPropertiesRule.WindowPropertiesRuleConfig config) {
+            super(config);
+        }
+
+        @Override
+        public boolean matches(RelOptRuleCall call) {
+            LogicalProject project = call.rel(0);
+            // project includes at least one group auxiliary function
+            return project.getProjects().stream()
+                    .anyMatch(WindowPropertiesRules::hasGroupAuxiliaries);
+        }
+
+        public void onMatch(RelOptRuleCall call) {
+            LogicalProject project = call.rel(0);
+            LogicalProject innerProject = call.rel(1);
+            LogicalWindowAggregate agg = call.rel(2);
+
+            RelNode converted =
+                    convertWindowNodes(
+                            call.builder(), project, Optional.empty(), 
innerProject, agg);
+
+            call.transformTo(converted);
+        }
+
+        /** Rule configuration. */
+        @Value.Immutable(singleton = false)
+        public interface WindowPropertiesRuleConfig extends RelRule.Config {
+            WindowPropertiesRuleConfig DEFAULT =
+                    ImmutableWindowPropertiesRuleConfig.builder()
+                            .build()
+                            .withOperandSupplier(
+                                    b0 ->
+                                            b0.operand(LogicalProject.class)
+                                                    .oneInput(
+                                                            b1 ->
+                                                                    
b1.operand(LogicalProject.class)
+                                                                            
.oneInput(
+                                                                               
     b2 ->
+                                                                               
             b2.operand(
+                                                                               
                             LogicalWindowAggregate
+                                                                               
                                     .class)
+                                                                               
                     .noInputs())))
+                            .withDescription("WindowPropertiesRule");
+
+            @Override
+            default WindowPropertiesRule toRule() {
+                return new WindowPropertiesRule(this);
+            }
+        }
+    }
+
+    public static class WindowPropertiesHavingRule
+            extends 
RelRule<WindowPropertiesHavingRule.WindowPropertiesHavingRuleConfig> {
+
+        protected WindowPropertiesHavingRule(WindowPropertiesHavingRuleConfig 
config) {
+            super(config);
+        }
+
+        @Override
+        public boolean matches(RelOptRuleCall call) {
+            LogicalProject project = call.rel(0);
+            LogicalFilter filter = call.rel(1);
+
+            return project.getProjects().stream()
+                            
.anyMatch(WindowPropertiesRules::hasGroupAuxiliaries)
+                    || 
WindowPropertiesRules.hasGroupAuxiliaries(filter.getCondition());
+        }
+
+        public void onMatch(RelOptRuleCall call) {
+            LogicalProject project = call.rel(0);
+            LogicalFilter filter = call.rel(1);
+            LogicalProject innerProject = call.rel(2);
+            LogicalWindowAggregate agg = call.rel(3);
+
+            RelNode converted =
+                    WindowPropertiesRules.convertWindowNodes(
+                            call.builder(), project, Optional.of(filter), 
innerProject, agg);
+
+            call.transformTo(converted);
+        }
+
+        /** Rule configuration. */
+        @Value.Immutable(singleton = false)
+        public interface WindowPropertiesHavingRuleConfig extends 
RelRule.Config {
+            WindowPropertiesHavingRuleConfig DEFAULT =
+                    ImmutableWindowPropertiesHavingRuleConfig.builder()
+                            .build()
+                            .withOperandSupplier(
+                                    b0 ->
+                                            b0.operand(LogicalProject.class)
+                                                    .oneInput(
+                                                            b1 ->
+                                                                    
b1.operand(LogicalFilter.class)
+                                                                            
.oneInput(
+                                                                               
     b2 ->
+                                                                               
             b2.operand(
+                                                                               
                             LogicalProject
+                                                                               
                                     .class)
+                                                                               
                     .oneInput(
+                                                                               
                             b3 ->
+                                                                               
                                     b3.operand(
+                                                                               
                                                     LogicalWindowAggregate
+                                                                               
                                                             .class)
+                                                                               
                                             .noInputs()))))
+                            .withDescription("WindowPropertiesHavingRule");
+
+            @Override
+            default WindowPropertiesHavingRule toRule() {
+                return new WindowPropertiesHavingRule(this);
+            }
+        }
+    }
+
+    public static RelNode convertWindowNodes(
+            RelBuilder builder,
+            LogicalProject project,
+            Optional<LogicalFilter> filter,
+            LogicalProject innerProject,
+            LogicalWindowAggregate agg) {
+        LogicalWindow w = agg.getWindow();
+        WindowType windowType = getWindowType(w);
+
+        NamedWindowProperty startProperty =
+                new NamedWindowProperty(
+                        propertyName(w, "start"), new 
WindowStart(w.aliasAttribute()));
+        NamedWindowProperty endProperty =
+                new NamedWindowProperty(propertyName(w, "end"), new 
WindowEnd(w.aliasAttribute()));
+        List<NamedWindowProperty> startEndProperties = List.of(startProperty, 
endProperty);
+
+        // allow rowtime/proctime for rowtime windows and proctime for 
proctime windows
+        List<NamedWindowProperty> timeProperties = new ArrayList<>();
+        switch (windowType) {
+            case STREAM_ROWTIME:
+                timeProperties =
+                        List.of(
+                                new NamedWindowProperty(
+                                        propertyName(w, "rowtime"),
+                                        new 
RowtimeAttribute(w.aliasAttribute())),
+                                new NamedWindowProperty(
+                                        propertyName(w, "proctime"),
+                                        new 
ProctimeAttribute(w.aliasAttribute())));
+                break;
+            case STREAM_PROCTIME:
+                timeProperties =
+                        List.of(
+                                new NamedWindowProperty(
+                                        propertyName(w, "proctime"),
+                                        new 
ProctimeAttribute(w.aliasAttribute())));
+                break;
+            case BATCH_ROWTIME:
+                timeProperties =
+                        List.of(
+                                new NamedWindowProperty(
+                                        propertyName(w, "rowtime"),
+                                        new 
RowtimeAttribute(w.aliasAttribute())));
+                break;
+            default:
+                throw new TableException(
+                        "Unknown window type encountered. Please report this 
bug.");
+        }
+
+        List<NamedWindowProperty> properties =
+                Lists.newArrayList(Iterables.concat(startEndProperties, 
timeProperties));
+
+        // retrieve window start and end properties
+        builder.push(agg.copy(properties));
+
+        // forward window start and end properties
+        List<RexNode> projectNodes =
+                Stream.concat(
+                                innerProject.getProjects().stream(),
+                                properties.stream().map(np -> 
builder.field(np.getName())))
+                        .collect(Collectors.toList());
+        builder.project(projectNodes);
+
+        // replace window auxiliary function in filter by access to window 
properties
+        filter.ifPresent(
+                f -> builder.filter(replaceGroupAuxiliaries(f.getCondition(), 
w, builder)));
+
+        // replace window auxiliary unctions in projection by access to window 
properties
+        List<RexNode> finalProjects =
+                project.getProjects().stream()
+                        .map(expr -> replaceGroupAuxiliaries(expr, w, builder))
+                        .collect(Collectors.toList());
+        builder.project(finalProjects, project.getRowType().getFieldNames());
+
+        return builder.build();
+    }
+
+    private static WindowType getWindowType(LogicalWindow window) {
+        if (AggregateUtil.isRowtimeAttribute(window.timeAttribute())) {
+            return WindowType.STREAM_ROWTIME;
+        } else if (AggregateUtil.isProctimeAttribute(window.timeAttribute())) {
+            return WindowType.STREAM_PROCTIME;
+        } else if (window.timeAttribute()
+                .getOutputDataType()
+                .getLogicalType()
+                .is(TIMESTAMP_WITHOUT_TIME_ZONE)) {
+            return WindowType.BATCH_ROWTIME;
+        } else {
+            throw new TableException("Unknown window type encountered. Please 
report this bug.");
+        }
+    }
+
+    /** Generates a property name for a window. */
+    private static String propertyName(LogicalWindow window, String name) {
+        return window.aliasAttribute().getName() + name;
+    }
+
+    /** Replace group auxiliaries with field references. */
+    public static RexNode replaceGroupAuxiliaries(
+            RexNode node, LogicalWindow window, RelBuilder builder) {
+        RexBuilder rexBuilder = builder.getRexBuilder();
+        WindowType windowType = getWindowType(window);
+
+        if (node instanceof RexCall) {
+            RexCall c = (RexCall) node;
+            if (isWindowStart(c)) {
+                return rexBuilder.makeCast(
+                        c.getType(), builder.field(propertyName(window, 
"start")), false);
+            } else if (isWindowEnd(c)) {
+                return rexBuilder.makeCast(
+                        c.getType(), builder.field(propertyName(window, 
"end")), false);
+            } else if (isWindowRowtime(c)) {
+                switch (windowType) {
+                    case STREAM_ROWTIME:
+                    case BATCH_ROWTIME:
+                        // replace expression by access to window rowtime
+                        return rexBuilder.makeCast(
+                                c.getType(), 
builder.field(propertyName(window, "rowtime")), false);
+                    case STREAM_PROCTIME:
+                        throw new ValidationException(
+                                "A proctime window cannot provide a rowtime 
attribute.");
+                    default:
+                        throw new TableException(
+                                "Unknown window type encountered. Please 
report this bug.");
+                }
+
+            } else if (isWindowProctime(c)) {
+                switch (windowType) {
+                    case STREAM_PROCTIME:
+                    case STREAM_ROWTIME:
+                        // replace expression by access to window proctime
+                        return rexBuilder.makeCast(
+                                c.getType(),
+                                builder.field(propertyName(window, 
"proctime")),
+                                false);
+                    case BATCH_ROWTIME:
+                        throw new ValidationException(
+                                "PROCTIME window property is not supported in 
batch queries.");
+                    default:
+                        throw new TableException(
+                                "Unknown window type encountered. Please 
report this bug.");
+                }
+
+            } else {
+                List<RexNode> newOps =
+                        c.getOperands().stream()
+                                .map(op -> replaceGroupAuxiliaries(op, window, 
builder))
+                                .collect(Collectors.toList());
+                return c.clone(c.getType(), newOps);
+            }
+        } else {
+            // preserve expression
+            return node;
+        }
+    }
+
+    /** Checks if a RexNode is a window start auxiliary function. */
+    private static boolean isWindowStart(RexNode node) {
+        if (node instanceof RexCall) {
+            RexCall c = (RexCall) node;
+            if (c.getOperator().isGroupAuxiliary()) {
+                return c.getOperator() == FlinkSqlOperatorTable.TUMBLE_START
+                        || c.getOperator() == FlinkSqlOperatorTable.HOP_START
+                        || c.getOperator() == 
FlinkSqlOperatorTable.SESSION_START;
+            }
+        }
+        return false;
+    }
+
+    /** Checks if a RexNode is a window end auxiliary function. */
+    private static boolean isWindowEnd(RexNode node) {
+        if (node instanceof RexCall) {
+            RexCall c = (RexCall) node;
+            if (c.getOperator().isGroupAuxiliary()) {
+                return c.getOperator() == FlinkSqlOperatorTable.TUMBLE_END
+                        || c.getOperator() == FlinkSqlOperatorTable.HOP_END
+                        || c.getOperator() == 
FlinkSqlOperatorTable.SESSION_END;
+            }
+        }
+        return false;
+    }
+
+    /** Checks if a RexNode is a window rowtime auxiliary function. */
+    private static boolean isWindowRowtime(RexNode node) {
+        if (node instanceof RexCall) {
+            RexCall c = (RexCall) node;
+            if (c.getOperator().isGroupAuxiliary()) {
+                return c.getOperator() == FlinkSqlOperatorTable.TUMBLE_ROWTIME
+                        || c.getOperator() == FlinkSqlOperatorTable.HOP_ROWTIME
+                        || c.getOperator() == 
FlinkSqlOperatorTable.SESSION_ROWTIME;
+            }
+        }
+        return false;
+    }
+
+    /** Checks if a RexNode is a window proctime auxiliary function. */
+    private static boolean isWindowProctime(RexNode node) {
+        if (node instanceof RexCall) {
+            RexCall c = (RexCall) node;
+            if (c.getOperator().isGroupAuxiliary()) {
+                return c.getOperator() == FlinkSqlOperatorTable.TUMBLE_PROCTIME
+                        || c.getOperator() == 
FlinkSqlOperatorTable.HOP_PROCTIME
+                        || c.getOperator() == 
FlinkSqlOperatorTable.SESSION_PROCTIME;
+            }
+        }
+        return false;
+    }
+
+    public static boolean hasGroupAuxiliaries(RexNode node) {
+        if (node instanceof RexCall) {
+            RexCall c = (RexCall) node;
+            if (c.getOperator().isGroupAuxiliary()) {
+                return true;
+            }
+            return 
c.getOperands().stream().anyMatch(WindowPropertiesRules::hasGroupAuxiliaries);
+        }
+        return false;
+    }
+
+    public static boolean hasGroupFunction(RexNode node) {
+        if (node instanceof RexCall) {
+            RexCall c = (RexCall) node;
+            if (c.getOperator().isGroup()) {
+                return true;
+            }
+            return 
c.getOperands().stream().anyMatch(WindowPropertiesRules::hasGroupFunction);
+        }
+        return false;
+    }
+
+    enum WindowType {
+        STREAM_ROWTIME,
+        STREAM_PROCTIME,
+        BATCH_ROWTIME
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRule.scala
deleted file mode 100644
index 60d00ab49d3..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRule.scala
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.logical
-
-import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
-import org.apache.flink.table.planner.plan.logical.LogicalWindow
-import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate
-import org.apache.flink.table.planner.plan.utils.AggregateUtil
-import org.apache.flink.table.runtime.groupwindow._
-import 
org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule._
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject}
-import org.apache.calcite.rex.{RexCall, RexNode}
-import org.apache.calcite.tools.RelBuilder
-
-import scala.collection.JavaConversions._
-
-class WindowPropertiesRule
-  extends RelOptRule(
-    operand(
-      classOf[LogicalProject],
-      operand(classOf[LogicalProject], 
operand(classOf[LogicalWindowAggregate], none()))),
-    "WindowPropertiesRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val project: LogicalProject = call.rel(0)
-    // project includes at least one group auxiliary function
-    project.getProjects.exists(WindowPropertiesRules.hasGroupAuxiliaries)
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val project: LogicalProject = call.rel(0)
-    val innerProject: LogicalProject = call.rel(1)
-    val agg: LogicalWindowAggregate = call.rel(2)
-
-    val converted =
-      WindowPropertiesRules.convertWindowNodes(call.builder(), project, None, 
innerProject, agg)
-
-    call.transformTo(converted)
-  }
-}
-
-class WindowPropertiesHavingRule
-  extends RelOptRule(
-    RelOptRule.operand(
-      classOf[LogicalProject],
-      RelOptRule.operand(
-        classOf[LogicalFilter],
-        RelOptRule.operand(
-          classOf[LogicalProject],
-          RelOptRule.operand(classOf[LogicalWindowAggregate], 
RelOptRule.none())))
-    ),
-    "WindowPropertiesHavingRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val project: LogicalProject = call.rel(0)
-    val filter: LogicalFilter = call.rel(1)
-
-    project.getProjects.exists(WindowPropertiesRules.hasGroupAuxiliaries) ||
-    WindowPropertiesRules.hasGroupAuxiliaries(filter.getCondition)
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val project: LogicalProject = call.rel(0)
-    val filter: LogicalFilter = call.rel(1)
-    val innerProject: LogicalProject = call.rel(2)
-    val agg: LogicalWindowAggregate = call.rel(3)
-
-    val converted = WindowPropertiesRules.convertWindowNodes(
-      call.builder(),
-      project,
-      Some(filter),
-      innerProject,
-      agg)
-
-    call.transformTo(converted)
-  }
-}
-
-object WindowPropertiesRules {
-
-  val WINDOW_PROPERTIES_HAVING_RULE = new WindowPropertiesHavingRule
-
-  val WINDOW_PROPERTIES_RULE = new WindowPropertiesRule
-
-  def convertWindowNodes(
-      builder: RelBuilder,
-      project: LogicalProject,
-      filter: Option[LogicalFilter],
-      innerProject: LogicalProject,
-      agg: LogicalWindowAggregate): RelNode = {
-    val w = agg.getWindow
-    val windowType = getWindowType(w)
-
-    val startEndProperties = Seq(
-      new NamedWindowProperty(propertyName(w, "start"), new 
WindowStart(w.aliasAttribute)),
-      new NamedWindowProperty(propertyName(w, "end"), new 
WindowEnd(w.aliasAttribute))
-    )
-
-    // allow rowtime/proctime for rowtime windows and proctime for proctime 
windows
-    val timeProperties = windowType match {
-      case 'streamRowtime =>
-        Seq(
-          new NamedWindowProperty(
-            propertyName(w, "rowtime"),
-            new RowtimeAttribute(w.aliasAttribute)),
-          new NamedWindowProperty(
-            propertyName(w, "proctime"),
-            new ProctimeAttribute(w.aliasAttribute))
-        )
-      case 'streamProctime =>
-        Seq(
-          new NamedWindowProperty(
-            propertyName(w, "proctime"),
-            new ProctimeAttribute(w.aliasAttribute)))
-      case 'batchRowtime =>
-        Seq(
-          new NamedWindowProperty(
-            propertyName(w, "rowtime"),
-            new RowtimeAttribute(w.aliasAttribute)))
-      case _ =>
-        throw new TableException("Unknown window type encountered. Please 
report this bug.")
-    }
-
-    val properties = startEndProperties ++ timeProperties
-
-    // retrieve window start and end properties
-    builder.push(agg.copy(properties))
-
-    // forward window start and end properties
-    builder.project(innerProject.getProjects ++ properties.map(np => 
builder.field(np.getName)))
-
-    // replace window auxiliary function in filter by access to window 
properties
-    filter.foreach(f => builder.filter(replaceGroupAuxiliaries(f.getCondition, 
w, builder)))
-
-    // replace window auxiliary unctions in projection by access to window 
properties
-    builder.project(
-      project.getProjects.map(expr => replaceGroupAuxiliaries(expr, w, 
builder)),
-      project.getRowType.getFieldNames
-    )
-
-    builder.build()
-  }
-
-  private def getWindowType(window: LogicalWindow): Symbol = {
-    if (AggregateUtil.isRowtimeAttribute(window.timeAttribute)) {
-      'streamRowtime
-    } else if (AggregateUtil.isProctimeAttribute(window.timeAttribute)) {
-      'streamProctime
-    } else if (
-      
window.timeAttribute.getOutputDataType.getLogicalType.is(TIMESTAMP_WITHOUT_TIME_ZONE)
-    ) {
-      'batchRowtime
-    } else {
-      throw new TableException("Unknown window type encountered. Please report 
this bug.")
-    }
-  }
-
-  /** Generates a property name for a window. */
-  private def propertyName(window: LogicalWindow, name: String): String =
-    window.aliasAttribute.asInstanceOf[WindowReference].getName + name
-
-  /** Replace group auxiliaries with field references. */
-  def replaceGroupAuxiliaries(
-      node: RexNode,
-      window: LogicalWindow,
-      builder: RelBuilder): RexNode = {
-    val rexBuilder = builder.getRexBuilder
-    val windowType = getWindowType(window)
-
-    node match {
-      case c: RexCall if isWindowStart(c) =>
-        // replace expression by access to window start
-        rexBuilder.makeCast(c.getType, builder.field(propertyName(window, 
"start")), false)
-
-      case c: RexCall if isWindowEnd(c) =>
-        // replace expression by access to window end
-        rexBuilder.makeCast(c.getType, builder.field(propertyName(window, 
"end")), false)
-
-      case c: RexCall if isWindowRowtime(c) =>
-        windowType match {
-          case 'streamRowtime | 'batchRowtime =>
-            // replace expression by access to window rowtime
-            rexBuilder.makeCast(c.getType, builder.field(propertyName(window, 
"rowtime")), false)
-          case 'streamProctime =>
-            throw new ValidationException("A proctime window cannot provide a 
rowtime attribute.")
-          case _ =>
-            throw new TableException("Unknown window type encountered. Please 
report this bug.")
-        }
-
-      case c: RexCall if isWindowProctime(c) =>
-        windowType match {
-          case 'streamProctime | 'streamRowtime =>
-            // replace expression by access to window proctime
-            rexBuilder.makeCast(c.getType, builder.field(propertyName(window, 
"proctime")), false)
-          case 'batchRowtime =>
-            throw new ValidationException(
-              "PROCTIME window property is not supported in batch queries.")
-          case _ =>
-            throw new TableException("Unknown window type encountered. Please 
report this bug.")
-        }
-
-      case c: RexCall =>
-        // replace expressions in children
-        val newOps = c.getOperands.map(replaceGroupAuxiliaries(_, window, 
builder))
-        c.clone(c.getType, newOps)
-
-      case x =>
-        // preserve expression
-        x
-    }
-  }
-
-  /** Checks if a RexNode is a window start auxiliary function. */
-  private def isWindowStart(node: RexNode): Boolean = {
-    node match {
-      case n: RexCall if n.getOperator.isGroupAuxiliary =>
-        n.getOperator match {
-          case FlinkSqlOperatorTable.TUMBLE_START | 
FlinkSqlOperatorTable.HOP_START |
-              FlinkSqlOperatorTable.SESSION_START =>
-            true
-          case _ => false
-        }
-      case _ => false
-    }
-  }
-
-  /** Checks if a RexNode is a window end auxiliary function. */
-  private def isWindowEnd(node: RexNode): Boolean = {
-    node match {
-      case n: RexCall if n.getOperator.isGroupAuxiliary =>
-        n.getOperator match {
-          case FlinkSqlOperatorTable.TUMBLE_END | 
FlinkSqlOperatorTable.HOP_END |
-              FlinkSqlOperatorTable.SESSION_END =>
-            true
-          case _ => false
-        }
-      case _ => false
-    }
-  }
-
-  /** Checks if a RexNode is a window rowtime auxiliary function. */
-  private def isWindowRowtime(node: RexNode): Boolean = {
-    node match {
-      case n: RexCall if n.getOperator.isGroupAuxiliary =>
-        n.getOperator match {
-          case FlinkSqlOperatorTable.TUMBLE_ROWTIME | 
FlinkSqlOperatorTable.HOP_ROWTIME |
-              FlinkSqlOperatorTable.SESSION_ROWTIME =>
-            true
-          case _ => false
-        }
-      case _ => false
-    }
-  }
-
-  /** Checks if a RexNode is a window proctime auxiliary function. */
-  private def isWindowProctime(node: RexNode): Boolean = {
-    node match {
-      case n: RexCall if n.getOperator.isGroupAuxiliary =>
-        n.getOperator match {
-          case FlinkSqlOperatorTable.TUMBLE_PROCTIME | 
FlinkSqlOperatorTable.HOP_PROCTIME |
-              FlinkSqlOperatorTable.SESSION_PROCTIME =>
-            true
-          case _ => false
-        }
-      case _ => false
-    }
-  }
-
-  def hasGroupAuxiliaries(node: RexNode): Boolean = {
-    node match {
-      case c: RexCall if c.getOperator.isGroupAuxiliary => true
-      case c: RexCall =>
-        c.operands.exists(hasGroupAuxiliaries)
-      case _ => false
-    }
-  }
-
-  def hasGroupFunction(node: RexNode): Boolean = {
-    node match {
-      case c: RexCall if c.getOperator.isGroup => true
-      case c: RexCall => c.operands.exists(hasGroupFunction)
-      case _ => false
-    }
-  }
-
-}

Reply via email to