This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f8cb19e70ca7da6423dfb01b97e05c4d520c9fde Author: Timo Walther <twal...@apache.org> AuthorDate: Wed Mar 9 12:14:46 2022 +0100 [FLINK-26518][table-planner] Port FlinkRelBuilder to Java --- .../table/planner/calcite/FlinkRelBuilder.java | 235 +++++++++++++++++++++ .../catalog/QueryOperationCatalogViewTable.java | 8 +- .../table/planner/delegation/PlannerContext.java | 2 +- .../plan/rules/logical/SubQueryDecorrelator.java | 3 +- .../BatchPhysicalPythonWindowAggregateRule.java | 3 +- ...reamPhysicalPythonGroupWindowAggregateRule.java | 2 +- .../table/planner/calcite/FlinkRelBuilder.scala | 233 -------------------- .../planner/expressions/fieldExpression.scala | 7 - .../planner/expressions/windowProperties.scala | 5 - .../nodes/calcite/LogicalWindowAggregate.scala | 6 +- .../calcite/LogicalWindowTableAggregate.scala | 4 +- .../plan/nodes/calcite/WindowAggregate.scala | 11 +- .../plan/nodes/calcite/WindowTableAggregate.scala | 10 +- .../logical/FlinkLogicalWindowAggregate.scala | 2 +- .../logical/FlinkLogicalWindowTableAggregate.scala | 2 +- .../rules/logical/CorrelateSortToRankRule.scala | 2 +- 16 files changed, 267 insertions(+), 268 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java new file mode 100644 index 0000000..35ab473 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.calcite; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.table.planner.calcite.FlinkRelFactories.ExpandFactory; +import org.apache.flink.table.planner.calcite.FlinkRelFactories.RankFactory; +import org.apache.flink.table.planner.hint.FlinkHints; +import org.apache.flink.table.planner.plan.QueryOperationConverter; +import org.apache.flink.table.planner.plan.logical.LogicalWindow; +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalTableAggregate; +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate; +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowTableAggregate; +import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty; +import org.apache.flink.table.runtime.operators.rank.RankRange; +import org.apache.flink.table.runtime.operators.rank.RankType; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; + +import org.apache.calcite.plan.Context; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptSchema; +import org.apache.calcite.plan.RelOptTable.ToRelContext; +import org.apache.calcite.plan.ViewExpanders; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Util; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.UnaryOperator; + +import static org.apache.flink.table.planner.plan.utils.AggregateUtil.isTableAggregate; + +/** Flink-specific {@link RelBuilder}. */ +@Internal +public final class FlinkRelBuilder extends RelBuilder { + + private final QueryOperationConverter toRelNodeConverter; + + private final ExpandFactory expandFactory; + + private final RankFactory rankFactory; + + private FlinkRelBuilder(Context context, RelOptCluster cluster, RelOptSchema relOptSchema) { + super(context, cluster, relOptSchema); + + this.toRelNodeConverter = + new QueryOperationConverter(this, context.unwrap(FlinkContext.class).isBatchMode()); + this.expandFactory = + Util.first( + context.unwrap(ExpandFactory.class), + FlinkRelFactories.DEFAULT_EXPAND_FACTORY()); + this.rankFactory = + Util.first( + context.unwrap(RankFactory.class), + FlinkRelFactories.DEFAULT_RANK_FACTORY()); + } + + public static FlinkRelBuilder of( + Context context, RelOptCluster cluster, RelOptSchema relOptSchema) { + return new FlinkRelBuilder(Preconditions.checkNotNull(context), cluster, relOptSchema); + } + + public static FlinkRelBuilder of(RelOptCluster cluster, RelOptSchema relOptSchema) { + return FlinkRelBuilder.of(cluster.getPlanner().getContext(), cluster, relOptSchema); + } + + public static RelBuilderFactory proto(Context context) { + return (cluster, schema) -> { + final Context clusterContext = cluster.getPlanner().getContext(); + final Context chain = Contexts.chain(context, clusterContext); + return FlinkRelBuilder.of(chain, cluster, schema); + }; + } + + public RelBuilder expand(List<List<RexNode>> projects, int expandIdIndex) { + final RelNode input = build(); + final RelNode expand = expandFactory.createExpand(input, projects, expandIdIndex); + return push(expand); + } + + public RelBuilder rank( + ImmutableBitSet partitionKey, + RelCollation orderKey, + RankType rankType, + RankRange rankRange, + RelDataTypeField rankNumberType, + boolean outputRankNumber) { + final RelNode input = build(); + final RelNode rank = + rankFactory.createRank( + input, + partitionKey, + orderKey, + rankType, + rankRange, + rankNumberType, + outputRankNumber); + return push(rank); + } + + /** Build non-window aggregate for either aggregate or table aggregate. */ + @Override + public RelBuilder aggregate( + RelBuilder.GroupKey groupKey, Iterable<RelBuilder.AggCall> aggCalls) { + // build a relNode, the build() may also return a project + RelNode relNode = super.aggregate(groupKey, aggCalls).build(); + + if (relNode instanceof LogicalAggregate) { + final LogicalAggregate logicalAggregate = (LogicalAggregate) relNode; + if (isTableAggregate(logicalAggregate.getAggCallList())) { + relNode = LogicalTableAggregate.create(logicalAggregate); + } else if (isCountStarAgg(logicalAggregate)) { + final RelNode newAggInput = + push(logicalAggregate.getInput(0)).project(literal(0)).build(); + relNode = + logicalAggregate.copy( + logicalAggregate.getTraitSet(), ImmutableList.of(newAggInput)); + } + } + + return push(relNode); + } + + /** Build window aggregate for either aggregate or table aggregate. */ + public RelBuilder windowAggregate( + LogicalWindow window, + GroupKey groupKey, + List<NamedWindowProperty> namedProperties, + Iterable<AggCall> aggCalls) { + // build logical aggregate + + // Because of: + // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the input, + // if the input is a Project. + // + // the field can not be pruned if it is referenced by other expressions + // of the window aggregation(i.e. the TUMBLE_START/END). + // To solve this, we config the RelBuilder to forbidden this feature. + final LogicalAggregate aggregate = + (LogicalAggregate) + super.transform(t -> t.withPruneInputOfAggregate(false)) + .push(build()) + .aggregate(groupKey, aggCalls) + .build(); + + // build logical window aggregate from it + final RelNode windowAggregate; + if (isTableAggregate(aggregate.getAggCallList())) { + windowAggregate = + LogicalWindowTableAggregate.create(window, namedProperties, aggregate); + } else { + windowAggregate = LogicalWindowAggregate.create(window, namedProperties, aggregate); + } + return push(windowAggregate); + } + + /** Build watermark assigner relational node. */ + public RelBuilder watermark(int rowtimeFieldIndex, RexNode watermarkExpr) { + final RelNode input = build(); + final RelNode relNode = + LogicalWatermarkAssigner.create(cluster, input, rowtimeFieldIndex, watermarkExpr); + return push(relNode); + } + + public RelBuilder queryOperation(QueryOperation queryOperation) { + final RelNode relNode = queryOperation.accept(toRelNodeConverter); + return push(relNode); + } + + public RelBuilder scan(ObjectIdentifier identifier, Map<String, String> dynamicOptions) { + final List<RelHint> hints = new ArrayList<>(); + hints.add( + RelHint.builder(FlinkHints.HINT_NAME_OPTIONS).hintOptions(dynamicOptions).build()); + final ToRelContext toRelContext = ViewExpanders.simpleContext(cluster, hints); + final RelNode relNode = + relOptSchema.getTableForMember(identifier.toList()).toRel(toRelContext); + return push(relNode); + } + + @Override + public FlinkTypeFactory getTypeFactory() { + return (FlinkTypeFactory) super.getTypeFactory(); + } + + @Override + public RelBuilder transform(UnaryOperator<Config> transform) { + // Override in order to return a FlinkRelBuilder. + final Context mergedContext = + Contexts.of(transform.apply(Config.DEFAULT), cluster.getPlanner().getContext()); + return FlinkRelBuilder.of(mergedContext, cluster, relOptSchema); + } + + private static boolean isCountStarAgg(LogicalAggregate agg) { + if (agg.getGroupCount() != 0 || agg.getAggCallList().size() != 1) { + return false; + } + final AggregateCall call = agg.getAggCallList().get(0); + return call.getAggregation().getKind() == SqlKind.COUNT + && call.filterArg == -1 + && call.getArgList().isEmpty(); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java index 9ba8e6e..32353c1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java @@ -25,6 +25,9 @@ import org.apache.flink.table.planner.calcite.FlinkRelBuilder; import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable; import org.apache.flink.table.planner.plan.stats.FlinkStatistic; +import org.apache.calcite.plan.Context; +import org.apache.calcite.plan.Contexts; +import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptSchema; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.RelNode; @@ -70,8 +73,9 @@ public class QueryOperationCatalogViewTable extends ExpandingPreparingTable { @Override public RelNode convertToRel(RelOptTable.ToRelContext context) { - FlinkRelBuilder relBuilder = - FlinkRelBuilder.of(context, context.getCluster(), this.getRelOptSchema()); + final RelOptCluster cluster = context.getCluster(); + final Context chain = Contexts.of(context, cluster.getPlanner().getContext()); + final FlinkRelBuilder relBuilder = FlinkRelBuilder.of(chain, cluster, getRelOptSchema()); return relBuilder.queryOperation(catalogView.getQueryOperation()).build(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java index 19a4b6e..e98e582 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java @@ -184,7 +184,7 @@ public class PlannerContext { context, // Sets up the ViewExpander explicitly for FlinkRelBuilder. createFlinkPlanner(currentCatalog, currentDatabase).createToRelContext()); - return new FlinkRelBuilder(chain, cluster, relOptSchema); + return FlinkRelBuilder.of(chain, cluster, relOptSchema); } /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java index b80b884..828286b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java @@ -134,8 +134,7 @@ public class SubQueryDecorrelator extends RelShuttleImpl { } RelOptCluster cluster = rootRel.getCluster(); - RelBuilder relBuilder = - new FlinkRelBuilder(cluster.getPlanner().getContext(), cluster, null); + RelBuilder relBuilder = FlinkRelBuilder.of(cluster, null); RexBuilder rexBuilder = cluster.getRexBuilder(); final SubQueryDecorrelator decorrelator = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java index 9eadb76..847b803 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java @@ -34,6 +34,7 @@ import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; import org.apache.flink.table.planner.plan.utils.AggregateUtil; import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; import org.apache.flink.table.planner.plan.utils.PythonUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.types.DataType; import org.apache.calcite.plan.RelOptRule; @@ -160,7 +161,7 @@ public class BatchPhysicalPythonWindowAggregateRule extends RelOptRule { window, inputTimeFieldIndex, inputTimeIsDate, - agg.getNamedProperties()); + JavaScalaConversionUtil.toScala(agg.getNamedProperties())); call.transformTo(windowAgg); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java index 002f1ae..5f080d7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java @@ -143,7 +143,7 @@ public class StreamPhysicalPythonGroupWindowAggregateRule extends ConverterRule agg.getGroupSet().toArray(), JavaScalaConversionUtil.toScala(aggCalls), agg.getWindow(), - agg.getNamedProperties(), + JavaScalaConversionUtil.toScala(agg.getNamedProperties()), emitStrategy); } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala deleted file mode 100644 index b96e510..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.calcite - -import org.apache.flink.table.operations.QueryOperation -import org.apache.flink.table.planner.calcite.FlinkRelFactories.{ExpandFactory, RankFactory} -import org.apache.flink.table.planner.expressions.WindowProperty -import org.apache.flink.table.planner.plan.QueryOperationConverter -import org.apache.flink.table.planner.plan.logical.LogicalWindow -import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalTableAggregate, LogicalWatermarkAssigner, LogicalWindowAggregate, LogicalWindowTableAggregate} -import org.apache.flink.table.planner.plan.utils.AggregateUtil -import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty -import org.apache.flink.table.runtime.operators.rank.{RankRange, RankType} - -import com.google.common.collect.ImmutableList -import org.apache.calcite.plan._ -import org.apache.calcite.rel.RelCollation -import org.apache.calcite.rel.`type`.RelDataTypeField -import org.apache.calcite.rel.hint.RelHint -import org.apache.calcite.rel.logical.LogicalAggregate -import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.SqlKind -import org.apache.calcite.tools.RelBuilder.{AggCall, Config, GroupKey} -import org.apache.calcite.tools.{RelBuilder, RelBuilderFactory} -import org.apache.calcite.util.{ImmutableBitSet, Util} -import org.apache.flink.table.catalog.ObjectIdentifier -import org.apache.flink.table.planner.hint.FlinkHints - -import java.lang.Iterable -import java.util -import java.util.List -import java.util.function.UnaryOperator - -import scala.collection.JavaConversions._ - -/** - * Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]]. - */ -class FlinkRelBuilder( - context: Context, - relOptCluster: RelOptCluster, - relOptSchema: RelOptSchema) - extends RelBuilder( - context, - relOptCluster, - relOptSchema) { - - require(context != null) - - private val toRelNodeConverter = { - new QueryOperationConverter(this, context.unwrap(classOf[FlinkContext]).isBatchMode) - } - - private val expandFactory: ExpandFactory = { - Util.first(context.unwrap(classOf[ExpandFactory]), FlinkRelFactories.DEFAULT_EXPAND_FACTORY) - } - - private val rankFactory: RankFactory = { - Util.first(context.unwrap(classOf[RankFactory]), FlinkRelFactories.DEFAULT_RANK_FACTORY) - } - - override def getRelOptSchema: RelOptSchema = relOptSchema - - override def getCluster: RelOptCluster = relOptCluster - - override def getTypeFactory: FlinkTypeFactory = - super.getTypeFactory.asInstanceOf[FlinkTypeFactory] - - override def transform(transform: UnaryOperator[RelBuilder.Config]): FlinkRelBuilder = { - // Override in order to return a FlinkRelBuilder. - FlinkRelBuilder.of(transform.apply(Config.DEFAULT), cluster, relOptSchema) - } - - def expand( - projects: util.List[util.List[RexNode]], - expandIdIndex: Int): RelBuilder = { - val input = build() - val expand = expandFactory.createExpand(input, projects, expandIdIndex) - push(expand) - } - - def rank( - partitionKey: ImmutableBitSet, - orderKey: RelCollation, - rankType: RankType, - rankRange: RankRange, - rankNumberType: RelDataTypeField, - outputRankNumber: Boolean): RelBuilder = { - val input = build() - val rank = rankFactory.createRank(input, partitionKey, orderKey, rankType, rankRange, - rankNumberType, outputRankNumber) - push(rank) - } - - /** - * Build non-window aggregate for either aggregate or table aggregate. - */ - override def aggregate(groupKey: GroupKey, aggCalls: Iterable[AggCall]): RelBuilder = { - // build a relNode, the build() may also return a project - val relNode = super.aggregate(groupKey, aggCalls).build() - - def isCountStartAgg(agg: LogicalAggregate): Boolean = { - if (agg.getGroupCount != 0 || agg.getAggCallList.size() != 1) { - return false - } - val call = agg.getAggCallList.head - call.getAggregation.getKind == SqlKind.COUNT && - call.filterArg == -1 && call.getArgList.isEmpty - } - - relNode match { - case logicalAggregate: LogicalAggregate - if AggregateUtil.isTableAggregate(logicalAggregate.getAggCallList) => - push(LogicalTableAggregate.create(logicalAggregate)) - case logicalAggregate2: LogicalAggregate - if isCountStartAgg(logicalAggregate2) => - val newAggInput = push(logicalAggregate2.getInput(0)) - .project(literal(0)).build() - push(logicalAggregate2.copy(logicalAggregate2.getTraitSet, ImmutableList.of(newAggInput))) - case _ => push(relNode) - } - } - - /** - * Build window aggregate for either aggregate or table aggregate. - */ - def windowAggregate( - window: LogicalWindow, - groupKey: GroupKey, - namedProperties: List[NamedWindowProperty], - aggCalls: Iterable[AggCall]): RelBuilder = { - // build logical aggregate - - // Because of: - // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the input, - // if the input is a Project. - // - // the field can not be pruned if it is referenced by other expressions - // of the window aggregation(i.e. the TUMBLE_START/END). - // To solve this, we config the RelBuilder to forbidden this feature. - val aggregate = super.transform( - new UnaryOperator[RelBuilder.Config] { - override def apply(t: RelBuilder.Config) - : RelBuilder.Config = t.withPruneInputOfAggregate(false) - }) - .push(build()) - .aggregate(groupKey, aggCalls) - .build() - .asInstanceOf[LogicalAggregate] - - // build logical window aggregate from it - aggregate match { - case logicalAggregate: LogicalAggregate - if AggregateUtil.isTableAggregate(logicalAggregate.getAggCallList) => - push(LogicalWindowTableAggregate.create(window, namedProperties, aggregate)) - case _ => push(LogicalWindowAggregate.create(window, namedProperties, aggregate)) - } - } - - /** - * Build watermark assigner relation node. - */ - def watermark(rowtimeFieldIndex: Int, watermarkExpr: RexNode): RelBuilder = { - val input = build() - val watermarkAssigner = LogicalWatermarkAssigner - .create(cluster, input, rowtimeFieldIndex, watermarkExpr) - push(watermarkAssigner) - this - } - - def queryOperation(queryOperation: QueryOperation): RelBuilder = { - val relNode = queryOperation.accept(toRelNodeConverter) - push(relNode) - this - } - - def scan( - identifier: ObjectIdentifier, - dynamicOptions: util.Map[String, String]): RelBuilder = { - val hints = new util.ArrayList[RelHint] - hints.add(RelHint.builder(FlinkHints.HINT_NAME_OPTIONS).hintOptions(dynamicOptions).build) - val toRelContext = ViewExpanders.simpleContext(cluster, hints) - push(relOptSchema.getTableForMember(identifier.toList).toRel(toRelContext)) - this - } -} - -object FlinkRelBuilder { - - case class NamedWindowProperty(name: String, property: WindowProperty) - - def proto(context: Context): RelBuilderFactory = new RelBuilderFactory() { - def create(cluster: RelOptCluster, schema: RelOptSchema): RelBuilder = { - val clusterContext = cluster.getPlanner.getContext.unwrap(classOf[FlinkContext]) - val mergedContext = Contexts.chain(context, clusterContext) - - new FlinkRelBuilder(mergedContext, cluster, schema) - } - } - - def of(cluster: RelOptCluster, relOptSchema: RelOptSchema): FlinkRelBuilder = { - val clusterContext = cluster.getPlanner.getContext - new FlinkRelBuilder( - clusterContext, - cluster, - relOptSchema) - } - - def of(contextVar: Object, cluster: RelOptCluster, relOptSchema: RelOptSchema) - : FlinkRelBuilder = { - val mergedContext = Contexts.of(contextVar, cluster.getPlanner.getContext) - new FlinkRelBuilder( - mergedContext, - cluster, - relOptSchema) - } -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala index fd86f67..013e687 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala @@ -20,7 +20,6 @@ package org.apache.flink.table.planner.expressions import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api._ import org.apache.flink.table.operations.QueryOperation -import org.apache.flink.table.planner.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.calcite.FlinkTypeFactory._ import org.apache.flink.table.planner.validate.{ValidationFailure, ValidationResult, ValidationSuccess} @@ -150,9 +149,6 @@ case class RowtimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr) } } - override def toNamedWindowProperty(name: String): NamedWindowProperty = - NamedWindowProperty(name, this) - override def toString: String = s"rowtime($child)" } @@ -174,9 +170,6 @@ case class ProctimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr override def resultType: TypeInformation[_] = TimeIndicatorTypeInfo.PROCTIME_INDICATOR - override def toNamedWindowProperty(name: String): NamedWindowProperty = - NamedWindowProperty(name, this) - override def toString: String = s"proctime($child)" } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala index 0e68163..ce6940a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala @@ -19,13 +19,10 @@ package org.apache.flink.table.planner.expressions import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} -import org.apache.flink.table.planner.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.planner.validate.{ValidationFailure, ValidationSuccess} trait WindowProperty { - def toNamedWindowProperty(name: String): NamedWindowProperty - def resultType: TypeInformation[_] } @@ -42,8 +39,6 @@ abstract class AbstractWindowProperty(child: PlannerExpression) } else { ValidationFailure("Child must be a window reference.") } - - def toNamedWindowProperty(name: String): NamedWindowProperty = NamedWindowProperty(name, this) } case class WindowStart(child: PlannerExpression) extends AbstractWindowProperty(child) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala index 0d70fd7..f3e35e1 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala @@ -36,7 +36,7 @@ final class LogicalWindowAggregate( groupSet: ImmutableBitSet, aggCalls: util.List[AggregateCall], window: LogicalWindow, - namedProperties: Seq[NamedWindowProperty]) + namedProperties: util.List[NamedWindowProperty]) extends WindowAggregate(cluster, traitSet, child, groupSet, aggCalls, window, namedProperties) { override def copy( @@ -55,7 +55,7 @@ final class LogicalWindowAggregate( namedProperties) } - def copy(namedProperties: Seq[NamedWindowProperty]): LogicalWindowAggregate = { + def copy(namedProperties: util.List[NamedWindowProperty]): LogicalWindowAggregate = { new LogicalWindowAggregate( cluster, traitSet, @@ -71,7 +71,7 @@ object LogicalWindowAggregate { def create( window: LogicalWindow, - namedProperties: Seq[NamedWindowProperty], + namedProperties: util.List[NamedWindowProperty], agg: Aggregate): LogicalWindowAggregate = { require(agg.getGroupType == Group.SIMPLE) val cluster: RelOptCluster = agg.getCluster diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala index 6ae042f..f9a2ed2 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala @@ -41,7 +41,7 @@ class LogicalWindowTableAggregate( groupSets: util.List[ImmutableBitSet], aggCalls: util.List[AggregateCall], window: LogicalWindow, - namedProperties: Seq[NamedWindowProperty]) + namedProperties: util.List[NamedWindowProperty]) extends WindowTableAggregate( cluster, traitSet, @@ -69,7 +69,7 @@ object LogicalWindowTableAggregate { def create( window: LogicalWindow, - namedProperties: Seq[NamedWindowProperty], + namedProperties: util.List[NamedWindowProperty], aggregate: Aggregate): LogicalWindowTableAggregate = { val cluster: RelOptCluster = aggregate.getCluster diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala index 884be0a..c28dd1a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala @@ -31,6 +31,9 @@ import org.apache.calcite.util.ImmutableBitSet import java.util +import scala.collection.JavaConverters._ + + /** * Relational operator that eliminates duplicates and computes totals with time window group. * @@ -43,7 +46,7 @@ abstract class WindowAggregate( groupSet: ImmutableBitSet, aggCalls: util.List[AggregateCall], window: LogicalWindow, - namedProperties: Seq[NamedWindowProperty]) + namedProperties: util.List[NamedWindowProperty]) extends Aggregate( cluster, traitSet, @@ -54,7 +57,7 @@ abstract class WindowAggregate( def getWindow: LogicalWindow = window - def getNamedProperties: Seq[NamedWindowProperty] = namedProperties + def getNamedProperties: util.List[NamedWindowProperty] = namedProperties override def accept(shuttle: RelShuttle): RelNode = shuttle.visit(this) @@ -63,7 +66,7 @@ abstract class WindowAggregate( val typeFactory = getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] val builder = typeFactory.builder builder.addAll(aggregateRowType.getFieldList) - namedProperties.foreach { namedProp => + namedProperties.asScala.foreach { namedProp => builder.add( namedProp.getName, typeFactory.createFieldTypeFromLogicalType(namedProp.getProperty.getResultType) @@ -82,6 +85,6 @@ abstract class WindowAggregate( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) .item("window", window) - .item("properties", namedProperties.map(_.getName).mkString(", ")) + .item("properties", namedProperties.asScala.map(_.getName).mkString(", ")) } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala index 98bcaea..d388b5b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala @@ -30,6 +30,8 @@ import org.apache.calcite.util.ImmutableBitSet import java.util +import scala.collection.JavaConverters._ + /** * Relational operator that represents a window table aggregate. A TableAggregate is similar to the * [[org.apache.calcite.rel.core.Aggregate]] but may output 0 or more records for a group. @@ -42,19 +44,19 @@ abstract class WindowTableAggregate( groupSets: util.List[ImmutableBitSet], aggCalls: util.List[AggregateCall], window: LogicalWindow, - namedProperties: Seq[NamedWindowProperty]) + namedProperties: util.List[NamedWindowProperty]) extends TableAggregate(cluster, traitSet, input, groupSet, groupSets, aggCalls) { def getWindow: LogicalWindow = window - def getNamedProperties: Seq[NamedWindowProperty] = namedProperties + def getNamedProperties: util.List[NamedWindowProperty] = namedProperties override def deriveRowType(): RelDataType = { val aggregateRowType = super.deriveRowType() val typeFactory = getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] val builder = typeFactory.builder builder.addAll(aggregateRowType.getFieldList) - namedProperties.foreach { namedProp => + namedProperties.asScala.foreach { namedProp => builder.add( namedProp.getName, typeFactory.createFieldTypeFromLogicalType(namedProp.getProperty.getResultType) @@ -66,6 +68,6 @@ abstract class WindowTableAggregate( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) .item("window", window) - .item("properties", namedProperties.map(_.getName).mkString(", ")) + .item("properties", namedProperties.asScala.map(_.getName).mkString(", ")) } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala index 8cef117..7884f59 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala @@ -43,7 +43,7 @@ class FlinkLogicalWindowAggregate( groupSet: ImmutableBitSet, aggCalls: util.List[AggregateCall], window: LogicalWindow, - namedProperties: Seq[NamedWindowProperty]) + namedProperties: util.List[NamedWindowProperty]) extends WindowAggregate(cluster, traitSet, child, groupSet, aggCalls, window, namedProperties) with FlinkLogicalRel { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala index add5d41..7dcc81b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala @@ -44,7 +44,7 @@ class FlinkLogicalWindowTableAggregate( groupSets: util.List[ImmutableBitSet], aggCalls: util.List[AggregateCall], window: LogicalWindow, - namedProperties: Seq[NamedWindowProperty]) + namedProperties: util.List[NamedWindowProperty]) extends WindowTableAggregate( cluster, traitSet, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala index b2d0797..e8013a8 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala @@ -175,7 +175,7 @@ class CorrelateSortToRankRule extends RelOptRule( 1, sort.fetch.asInstanceOf[RexLiteral].getValueAs(classOf[java.lang.Long])), null, - outputRankNumber = false) + false) .project(projects) .build()