This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 63edfd6bf71 [FLINK-36950][table] Migrate WindowPropertiesRules to java 63edfd6bf71 is described below commit 63edfd6bf7140c8be63cd76727784a875e1fbbe3 Author: Jacky Lau <liuyon...@gmail.com> AuthorDate: Wed Jan 8 08:20:22 2025 +0800 [FLINK-36950][table] Migrate WindowPropertiesRules to java --- .../plan/rules/logical/WindowPropertiesRules.java | 413 +++++++++++++++++++++ .../plan/rules/logical/WindowPropertiesRule.scala | 306 --------------- 2 files changed, 413 insertions(+), 306 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRules.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRules.java new file mode 100644 index 00000000000..74b2990382b --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRules.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; +import org.apache.flink.table.planner.plan.logical.LogicalWindow; +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate; +import org.apache.flink.table.planner.plan.utils.AggregateUtil; +import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty; +import org.apache.flink.table.runtime.groupwindow.ProctimeAttribute; +import org.apache.flink.table.runtime.groupwindow.RowtimeAttribute; +import org.apache.flink.table.runtime.groupwindow.WindowEnd; +import org.apache.flink.table.runtime.groupwindow.WindowStart; + +import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables; +import org.apache.flink.shaded.guava32.com.google.common.collect.Lists; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilder; +import org.immutables.value.Value; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; + +public class WindowPropertiesRules { + + public static final WindowPropertiesHavingRule WINDOW_PROPERTIES_HAVING_RULE = + WindowPropertiesHavingRule.WindowPropertiesHavingRuleConfig.DEFAULT.toRule(); + + public static final WindowPropertiesRule WINDOW_PROPERTIES_RULE = + WindowPropertiesRule.WindowPropertiesRuleConfig.DEFAULT.toRule(); + + public static class WindowPropertiesRule + extends RelRule<WindowPropertiesRule.WindowPropertiesRuleConfig> { + + protected WindowPropertiesRule(WindowPropertiesRule.WindowPropertiesRuleConfig config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + LogicalProject project = call.rel(0); + // project includes at least one group auxiliary function + return project.getProjects().stream() + .anyMatch(WindowPropertiesRules::hasGroupAuxiliaries); + } + + public void onMatch(RelOptRuleCall call) { + LogicalProject project = call.rel(0); + LogicalProject innerProject = call.rel(1); + LogicalWindowAggregate agg = call.rel(2); + + RelNode converted = + convertWindowNodes( + call.builder(), project, Optional.empty(), innerProject, agg); + + call.transformTo(converted); + } + + /** Rule configuration. */ + @Value.Immutable(singleton = false) + public interface WindowPropertiesRuleConfig extends RelRule.Config { + WindowPropertiesRuleConfig DEFAULT = + ImmutableWindowPropertiesRuleConfig.builder() + .build() + .withOperandSupplier( + b0 -> + b0.operand(LogicalProject.class) + .oneInput( + b1 -> + b1.operand(LogicalProject.class) + .oneInput( + b2 -> + b2.operand( + LogicalWindowAggregate + .class) + .noInputs()))) + .withDescription("WindowPropertiesRule"); + + @Override + default WindowPropertiesRule toRule() { + return new WindowPropertiesRule(this); + } + } + } + + public static class WindowPropertiesHavingRule + extends RelRule<WindowPropertiesHavingRule.WindowPropertiesHavingRuleConfig> { + + protected WindowPropertiesHavingRule(WindowPropertiesHavingRuleConfig config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + LogicalProject project = call.rel(0); + LogicalFilter filter = call.rel(1); + + return project.getProjects().stream() + .anyMatch(WindowPropertiesRules::hasGroupAuxiliaries) + || WindowPropertiesRules.hasGroupAuxiliaries(filter.getCondition()); + } + + public void onMatch(RelOptRuleCall call) { + LogicalProject project = call.rel(0); + LogicalFilter filter = call.rel(1); + LogicalProject innerProject = call.rel(2); + LogicalWindowAggregate agg = call.rel(3); + + RelNode converted = + WindowPropertiesRules.convertWindowNodes( + call.builder(), project, Optional.of(filter), innerProject, agg); + + call.transformTo(converted); + } + + /** Rule configuration. */ + @Value.Immutable(singleton = false) + public interface WindowPropertiesHavingRuleConfig extends RelRule.Config { + WindowPropertiesHavingRuleConfig DEFAULT = + ImmutableWindowPropertiesHavingRuleConfig.builder() + .build() + .withOperandSupplier( + b0 -> + b0.operand(LogicalProject.class) + .oneInput( + b1 -> + b1.operand(LogicalFilter.class) + .oneInput( + b2 -> + b2.operand( + LogicalProject + .class) + .oneInput( + b3 -> + b3.operand( + LogicalWindowAggregate + .class) + .noInputs())))) + .withDescription("WindowPropertiesHavingRule"); + + @Override + default WindowPropertiesHavingRule toRule() { + return new WindowPropertiesHavingRule(this); + } + } + } + + public static RelNode convertWindowNodes( + RelBuilder builder, + LogicalProject project, + Optional<LogicalFilter> filter, + LogicalProject innerProject, + LogicalWindowAggregate agg) { + LogicalWindow w = agg.getWindow(); + WindowType windowType = getWindowType(w); + + NamedWindowProperty startProperty = + new NamedWindowProperty( + propertyName(w, "start"), new WindowStart(w.aliasAttribute())); + NamedWindowProperty endProperty = + new NamedWindowProperty(propertyName(w, "end"), new WindowEnd(w.aliasAttribute())); + List<NamedWindowProperty> startEndProperties = List.of(startProperty, endProperty); + + // allow rowtime/proctime for rowtime windows and proctime for proctime windows + List<NamedWindowProperty> timeProperties = new ArrayList<>(); + switch (windowType) { + case STREAM_ROWTIME: + timeProperties = + List.of( + new NamedWindowProperty( + propertyName(w, "rowtime"), + new RowtimeAttribute(w.aliasAttribute())), + new NamedWindowProperty( + propertyName(w, "proctime"), + new ProctimeAttribute(w.aliasAttribute()))); + break; + case STREAM_PROCTIME: + timeProperties = + List.of( + new NamedWindowProperty( + propertyName(w, "proctime"), + new ProctimeAttribute(w.aliasAttribute()))); + break; + case BATCH_ROWTIME: + timeProperties = + List.of( + new NamedWindowProperty( + propertyName(w, "rowtime"), + new RowtimeAttribute(w.aliasAttribute()))); + break; + default: + throw new TableException( + "Unknown window type encountered. Please report this bug."); + } + + List<NamedWindowProperty> properties = + Lists.newArrayList(Iterables.concat(startEndProperties, timeProperties)); + + // retrieve window start and end properties + builder.push(agg.copy(properties)); + + // forward window start and end properties + List<RexNode> projectNodes = + Stream.concat( + innerProject.getProjects().stream(), + properties.stream().map(np -> builder.field(np.getName()))) + .collect(Collectors.toList()); + builder.project(projectNodes); + + // replace window auxiliary function in filter by access to window properties + filter.ifPresent( + f -> builder.filter(replaceGroupAuxiliaries(f.getCondition(), w, builder))); + + // replace window auxiliary unctions in projection by access to window properties + List<RexNode> finalProjects = + project.getProjects().stream() + .map(expr -> replaceGroupAuxiliaries(expr, w, builder)) + .collect(Collectors.toList()); + builder.project(finalProjects, project.getRowType().getFieldNames()); + + return builder.build(); + } + + private static WindowType getWindowType(LogicalWindow window) { + if (AggregateUtil.isRowtimeAttribute(window.timeAttribute())) { + return WindowType.STREAM_ROWTIME; + } else if (AggregateUtil.isProctimeAttribute(window.timeAttribute())) { + return WindowType.STREAM_PROCTIME; + } else if (window.timeAttribute() + .getOutputDataType() + .getLogicalType() + .is(TIMESTAMP_WITHOUT_TIME_ZONE)) { + return WindowType.BATCH_ROWTIME; + } else { + throw new TableException("Unknown window type encountered. Please report this bug."); + } + } + + /** Generates a property name for a window. */ + private static String propertyName(LogicalWindow window, String name) { + return window.aliasAttribute().getName() + name; + } + + /** Replace group auxiliaries with field references. */ + public static RexNode replaceGroupAuxiliaries( + RexNode node, LogicalWindow window, RelBuilder builder) { + RexBuilder rexBuilder = builder.getRexBuilder(); + WindowType windowType = getWindowType(window); + + if (node instanceof RexCall) { + RexCall c = (RexCall) node; + if (isWindowStart(c)) { + return rexBuilder.makeCast( + c.getType(), builder.field(propertyName(window, "start")), false); + } else if (isWindowEnd(c)) { + return rexBuilder.makeCast( + c.getType(), builder.field(propertyName(window, "end")), false); + } else if (isWindowRowtime(c)) { + switch (windowType) { + case STREAM_ROWTIME: + case BATCH_ROWTIME: + // replace expression by access to window rowtime + return rexBuilder.makeCast( + c.getType(), builder.field(propertyName(window, "rowtime")), false); + case STREAM_PROCTIME: + throw new ValidationException( + "A proctime window cannot provide a rowtime attribute."); + default: + throw new TableException( + "Unknown window type encountered. Please report this bug."); + } + + } else if (isWindowProctime(c)) { + switch (windowType) { + case STREAM_PROCTIME: + case STREAM_ROWTIME: + // replace expression by access to window proctime + return rexBuilder.makeCast( + c.getType(), + builder.field(propertyName(window, "proctime")), + false); + case BATCH_ROWTIME: + throw new ValidationException( + "PROCTIME window property is not supported in batch queries."); + default: + throw new TableException( + "Unknown window type encountered. Please report this bug."); + } + + } else { + List<RexNode> newOps = + c.getOperands().stream() + .map(op -> replaceGroupAuxiliaries(op, window, builder)) + .collect(Collectors.toList()); + return c.clone(c.getType(), newOps); + } + } else { + // preserve expression + return node; + } + } + + /** Checks if a RexNode is a window start auxiliary function. */ + private static boolean isWindowStart(RexNode node) { + if (node instanceof RexCall) { + RexCall c = (RexCall) node; + if (c.getOperator().isGroupAuxiliary()) { + return c.getOperator() == FlinkSqlOperatorTable.TUMBLE_START + || c.getOperator() == FlinkSqlOperatorTable.HOP_START + || c.getOperator() == FlinkSqlOperatorTable.SESSION_START; + } + } + return false; + } + + /** Checks if a RexNode is a window end auxiliary function. */ + private static boolean isWindowEnd(RexNode node) { + if (node instanceof RexCall) { + RexCall c = (RexCall) node; + if (c.getOperator().isGroupAuxiliary()) { + return c.getOperator() == FlinkSqlOperatorTable.TUMBLE_END + || c.getOperator() == FlinkSqlOperatorTable.HOP_END + || c.getOperator() == FlinkSqlOperatorTable.SESSION_END; + } + } + return false; + } + + /** Checks if a RexNode is a window rowtime auxiliary function. */ + private static boolean isWindowRowtime(RexNode node) { + if (node instanceof RexCall) { + RexCall c = (RexCall) node; + if (c.getOperator().isGroupAuxiliary()) { + return c.getOperator() == FlinkSqlOperatorTable.TUMBLE_ROWTIME + || c.getOperator() == FlinkSqlOperatorTable.HOP_ROWTIME + || c.getOperator() == FlinkSqlOperatorTable.SESSION_ROWTIME; + } + } + return false; + } + + /** Checks if a RexNode is a window proctime auxiliary function. */ + private static boolean isWindowProctime(RexNode node) { + if (node instanceof RexCall) { + RexCall c = (RexCall) node; + if (c.getOperator().isGroupAuxiliary()) { + return c.getOperator() == FlinkSqlOperatorTable.TUMBLE_PROCTIME + || c.getOperator() == FlinkSqlOperatorTable.HOP_PROCTIME + || c.getOperator() == FlinkSqlOperatorTable.SESSION_PROCTIME; + } + } + return false; + } + + public static boolean hasGroupAuxiliaries(RexNode node) { + if (node instanceof RexCall) { + RexCall c = (RexCall) node; + if (c.getOperator().isGroupAuxiliary()) { + return true; + } + return c.getOperands().stream().anyMatch(WindowPropertiesRules::hasGroupAuxiliaries); + } + return false; + } + + public static boolean hasGroupFunction(RexNode node) { + if (node instanceof RexCall) { + RexCall c = (RexCall) node; + if (c.getOperator().isGroup()) { + return true; + } + return c.getOperands().stream().anyMatch(WindowPropertiesRules::hasGroupFunction); + } + return false; + } + + enum WindowType { + STREAM_ROWTIME, + STREAM_PROCTIME, + BATCH_ROWTIME + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRule.scala deleted file mode 100644 index 60d00ab49d3..00000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRule.scala +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.logical - -import org.apache.flink.table.api.{TableException, ValidationException} -import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable -import org.apache.flink.table.planner.plan.logical.LogicalWindow -import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate -import org.apache.flink.table.planner.plan.utils.AggregateUtil -import org.apache.flink.table.runtime.groupwindow._ -import org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.plan.RelOptRule._ -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject} -import org.apache.calcite.rex.{RexCall, RexNode} -import org.apache.calcite.tools.RelBuilder - -import scala.collection.JavaConversions._ - -class WindowPropertiesRule - extends RelOptRule( - operand( - classOf[LogicalProject], - operand(classOf[LogicalProject], operand(classOf[LogicalWindowAggregate], none()))), - "WindowPropertiesRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val project: LogicalProject = call.rel(0) - // project includes at least one group auxiliary function - project.getProjects.exists(WindowPropertiesRules.hasGroupAuxiliaries) - } - - override def onMatch(call: RelOptRuleCall): Unit = { - val project: LogicalProject = call.rel(0) - val innerProject: LogicalProject = call.rel(1) - val agg: LogicalWindowAggregate = call.rel(2) - - val converted = - WindowPropertiesRules.convertWindowNodes(call.builder(), project, None, innerProject, agg) - - call.transformTo(converted) - } -} - -class WindowPropertiesHavingRule - extends RelOptRule( - RelOptRule.operand( - classOf[LogicalProject], - RelOptRule.operand( - classOf[LogicalFilter], - RelOptRule.operand( - classOf[LogicalProject], - RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none()))) - ), - "WindowPropertiesHavingRule") { - - override def matches(call: RelOptRuleCall): Boolean = { - val project: LogicalProject = call.rel(0) - val filter: LogicalFilter = call.rel(1) - - project.getProjects.exists(WindowPropertiesRules.hasGroupAuxiliaries) || - WindowPropertiesRules.hasGroupAuxiliaries(filter.getCondition) - } - - override def onMatch(call: RelOptRuleCall): Unit = { - val project: LogicalProject = call.rel(0) - val filter: LogicalFilter = call.rel(1) - val innerProject: LogicalProject = call.rel(2) - val agg: LogicalWindowAggregate = call.rel(3) - - val converted = WindowPropertiesRules.convertWindowNodes( - call.builder(), - project, - Some(filter), - innerProject, - agg) - - call.transformTo(converted) - } -} - -object WindowPropertiesRules { - - val WINDOW_PROPERTIES_HAVING_RULE = new WindowPropertiesHavingRule - - val WINDOW_PROPERTIES_RULE = new WindowPropertiesRule - - def convertWindowNodes( - builder: RelBuilder, - project: LogicalProject, - filter: Option[LogicalFilter], - innerProject: LogicalProject, - agg: LogicalWindowAggregate): RelNode = { - val w = agg.getWindow - val windowType = getWindowType(w) - - val startEndProperties = Seq( - new NamedWindowProperty(propertyName(w, "start"), new WindowStart(w.aliasAttribute)), - new NamedWindowProperty(propertyName(w, "end"), new WindowEnd(w.aliasAttribute)) - ) - - // allow rowtime/proctime for rowtime windows and proctime for proctime windows - val timeProperties = windowType match { - case 'streamRowtime => - Seq( - new NamedWindowProperty( - propertyName(w, "rowtime"), - new RowtimeAttribute(w.aliasAttribute)), - new NamedWindowProperty( - propertyName(w, "proctime"), - new ProctimeAttribute(w.aliasAttribute)) - ) - case 'streamProctime => - Seq( - new NamedWindowProperty( - propertyName(w, "proctime"), - new ProctimeAttribute(w.aliasAttribute))) - case 'batchRowtime => - Seq( - new NamedWindowProperty( - propertyName(w, "rowtime"), - new RowtimeAttribute(w.aliasAttribute))) - case _ => - throw new TableException("Unknown window type encountered. Please report this bug.") - } - - val properties = startEndProperties ++ timeProperties - - // retrieve window start and end properties - builder.push(agg.copy(properties)) - - // forward window start and end properties - builder.project(innerProject.getProjects ++ properties.map(np => builder.field(np.getName))) - - // replace window auxiliary function in filter by access to window properties - filter.foreach(f => builder.filter(replaceGroupAuxiliaries(f.getCondition, w, builder))) - - // replace window auxiliary unctions in projection by access to window properties - builder.project( - project.getProjects.map(expr => replaceGroupAuxiliaries(expr, w, builder)), - project.getRowType.getFieldNames - ) - - builder.build() - } - - private def getWindowType(window: LogicalWindow): Symbol = { - if (AggregateUtil.isRowtimeAttribute(window.timeAttribute)) { - 'streamRowtime - } else if (AggregateUtil.isProctimeAttribute(window.timeAttribute)) { - 'streamProctime - } else if ( - window.timeAttribute.getOutputDataType.getLogicalType.is(TIMESTAMP_WITHOUT_TIME_ZONE) - ) { - 'batchRowtime - } else { - throw new TableException("Unknown window type encountered. Please report this bug.") - } - } - - /** Generates a property name for a window. */ - private def propertyName(window: LogicalWindow, name: String): String = - window.aliasAttribute.asInstanceOf[WindowReference].getName + name - - /** Replace group auxiliaries with field references. */ - def replaceGroupAuxiliaries( - node: RexNode, - window: LogicalWindow, - builder: RelBuilder): RexNode = { - val rexBuilder = builder.getRexBuilder - val windowType = getWindowType(window) - - node match { - case c: RexCall if isWindowStart(c) => - // replace expression by access to window start - rexBuilder.makeCast(c.getType, builder.field(propertyName(window, "start")), false) - - case c: RexCall if isWindowEnd(c) => - // replace expression by access to window end - rexBuilder.makeCast(c.getType, builder.field(propertyName(window, "end")), false) - - case c: RexCall if isWindowRowtime(c) => - windowType match { - case 'streamRowtime | 'batchRowtime => - // replace expression by access to window rowtime - rexBuilder.makeCast(c.getType, builder.field(propertyName(window, "rowtime")), false) - case 'streamProctime => - throw new ValidationException("A proctime window cannot provide a rowtime attribute.") - case _ => - throw new TableException("Unknown window type encountered. Please report this bug.") - } - - case c: RexCall if isWindowProctime(c) => - windowType match { - case 'streamProctime | 'streamRowtime => - // replace expression by access to window proctime - rexBuilder.makeCast(c.getType, builder.field(propertyName(window, "proctime")), false) - case 'batchRowtime => - throw new ValidationException( - "PROCTIME window property is not supported in batch queries.") - case _ => - throw new TableException("Unknown window type encountered. Please report this bug.") - } - - case c: RexCall => - // replace expressions in children - val newOps = c.getOperands.map(replaceGroupAuxiliaries(_, window, builder)) - c.clone(c.getType, newOps) - - case x => - // preserve expression - x - } - } - - /** Checks if a RexNode is a window start auxiliary function. */ - private def isWindowStart(node: RexNode): Boolean = { - node match { - case n: RexCall if n.getOperator.isGroupAuxiliary => - n.getOperator match { - case FlinkSqlOperatorTable.TUMBLE_START | FlinkSqlOperatorTable.HOP_START | - FlinkSqlOperatorTable.SESSION_START => - true - case _ => false - } - case _ => false - } - } - - /** Checks if a RexNode is a window end auxiliary function. */ - private def isWindowEnd(node: RexNode): Boolean = { - node match { - case n: RexCall if n.getOperator.isGroupAuxiliary => - n.getOperator match { - case FlinkSqlOperatorTable.TUMBLE_END | FlinkSqlOperatorTable.HOP_END | - FlinkSqlOperatorTable.SESSION_END => - true - case _ => false - } - case _ => false - } - } - - /** Checks if a RexNode is a window rowtime auxiliary function. */ - private def isWindowRowtime(node: RexNode): Boolean = { - node match { - case n: RexCall if n.getOperator.isGroupAuxiliary => - n.getOperator match { - case FlinkSqlOperatorTable.TUMBLE_ROWTIME | FlinkSqlOperatorTable.HOP_ROWTIME | - FlinkSqlOperatorTable.SESSION_ROWTIME => - true - case _ => false - } - case _ => false - } - } - - /** Checks if a RexNode is a window proctime auxiliary function. */ - private def isWindowProctime(node: RexNode): Boolean = { - node match { - case n: RexCall if n.getOperator.isGroupAuxiliary => - n.getOperator match { - case FlinkSqlOperatorTable.TUMBLE_PROCTIME | FlinkSqlOperatorTable.HOP_PROCTIME | - FlinkSqlOperatorTable.SESSION_PROCTIME => - true - case _ => false - } - case _ => false - } - } - - def hasGroupAuxiliaries(node: RexNode): Boolean = { - node match { - case c: RexCall if c.getOperator.isGroupAuxiliary => true - case c: RexCall => - c.operands.exists(hasGroupAuxiliaries) - case _ => false - } - } - - def hasGroupFunction(node: RexNode): Boolean = { - node match { - case c: RexCall if c.getOperator.isGroup => true - case c: RexCall => c.operands.exists(hasGroupFunction) - case _ => false - } - } - -}