Repository: flink Updated Branches: refs/heads/master 2ef4900aa -> 6a0ada81e
http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala deleted file mode 100644 index 8cfd748..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushFilterIntoBatchTableSourceScanRule.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.plan.rules.dataSet - -import org.apache.calcite.plan.RelOptRule._ -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc} -import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase -import org.apache.flink.table.plan.schema.TableSourceTable -import org.apache.flink.table.sources.FilterableTableSource - -class PushFilterIntoBatchTableSourceScanRule extends RelOptRule( - operand(classOf[DataSetCalc], - operand(classOf[BatchTableSourceScan], none)), - "PushFilterIntoBatchTableSourceScanRule") - with PushFilterIntoTableSourceScanRuleBase { - - override def matches(call: RelOptRuleCall): Boolean = { - val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] - val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] - scan.tableSource match { - case source: FilterableTableSource[_] => - calc.getProgram.getCondition != null && !source.isFilterPushedDown - case _ => false - } - } - - override def onMatch(call: RelOptRuleCall): Unit = { - val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] - val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] - val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]]) - val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource[_]] - pushFilterIntoScan(call, calc, scan, tableSourceTable, filterableSource, description) - } -} - -object PushFilterIntoBatchTableSourceScanRule { - val INSTANCE: RelOptRule = new PushFilterIntoBatchTableSourceScanRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala deleted file mode 100644 index 8c83047..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.plan.rules.dataSet - -import org.apache.calcite.plan.RelOptRule.{none, operand} -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc} -import org.apache.flink.table.plan.rules.common.PushProjectIntoTableSourceScanRuleBase -import org.apache.flink.table.sources.ProjectableTableSource - -/** - * This rule tries to push projections into a BatchTableSourceScan. - */ -class PushProjectIntoBatchTableSourceScanRule extends RelOptRule( - operand(classOf[DataSetCalc], - operand(classOf[BatchTableSourceScan], none)), - "PushProjectIntoBatchTableSourceScanRule") - with PushProjectIntoTableSourceScanRuleBase { - - override def matches(call: RelOptRuleCall): Boolean = { - val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] - scan.tableSource match { - case _: ProjectableTableSource[_] => true - case _ => false - } - } - - override def onMatch(call: RelOptRuleCall) { - val calc: DataSetCalc = call.rel(0).asInstanceOf[DataSetCalc] - val scan: BatchTableSourceScan = call.rel(1).asInstanceOf[BatchTableSourceScan] - pushProjectIntoScan(call, calc, scan) - } -} - -object PushProjectIntoBatchTableSourceScanRule { - val INSTANCE: RelOptRule = new PushProjectIntoBatchTableSourceScanRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala index 052f738..f011b66 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala @@ -18,24 +18,25 @@ package org.apache.flink.table.plan.rules.datastream -import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.flink.table.api.TableException -import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate -import org.apache.flink.table.plan.nodes.datastream.{DataStreamAggregate, DataStreamConvention} +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalWindowAggregate import scala.collection.JavaConversions._ class DataStreamAggregateRule extends ConverterRule( - classOf[LogicalWindowAggregate], - Convention.NONE, - DataStreamConvention.INSTANCE, - "DataStreamAggregateRule") { + classOf[FlinkLogicalWindowAggregate], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, + "DataStreamAggregateRule") { override def matches(call: RelOptRuleCall): Boolean = { - val agg: LogicalWindowAggregate = call.rel(0).asInstanceOf[LogicalWindowAggregate] + val agg: FlinkLogicalWindowAggregate = call.rel(0).asInstanceOf[FlinkLogicalWindowAggregate] // check if we have distinct aggregates val distinctAggs = agg.getAggCallList.exists(_.isDistinct) @@ -53,9 +54,9 @@ class DataStreamAggregateRule } override def convert(rel: RelNode): RelNode = { - val agg: LogicalWindowAggregate = rel.asInstanceOf[LogicalWindowAggregate] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(agg.getInput, DataStreamConvention.INSTANCE) + val agg: FlinkLogicalWindowAggregate = rel.asInstanceOf[FlinkLogicalWindowAggregate] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM) + val convInput: RelNode = RelOptRule.convert(agg.getInput, FlinkConventions.DATASTREAM) new DataStreamAggregate( agg.getWindow, http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala index 4e620c9..1777264 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala @@ -18,25 +18,25 @@ package org.apache.flink.table.plan.rules.datastream -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.plan.{RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalCalc +import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamCalc -import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCalc class DataStreamCalcRule extends ConverterRule( - classOf[LogicalCalc], - Convention.NONE, - DataStreamConvention.INSTANCE, + classOf[FlinkLogicalCalc], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, "DataStreamCalcRule") { def convert(rel: RelNode): RelNode = { - val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(calc.getInput, DataStreamConvention.INSTANCE) + val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM) + val convInput: RelNode = RelOptRule.convert(calc.getInput, FlinkConventions.DATASTREAM) new DataStreamCalc( rel.getCluster, http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala index adce9f4..ae39d40 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala @@ -18,45 +18,40 @@ package org.apache.flink.table.plan.rules.datastream import org.apache.calcite.plan.volcano.RelSubset -import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.{LogicalFilter, LogicalCorrelate, LogicalTableFunctionScan} import org.apache.calcite.rex.RexNode -import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention +import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan} -/** - * Rule to convert a LogicalCorrelate into a DataStreamCorrelate. - */ class DataStreamCorrelateRule extends ConverterRule( - classOf[LogicalCorrelate], - Convention.NONE, - DataStreamConvention.INSTANCE, + classOf[FlinkLogicalCorrelate], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, "DataStreamCorrelateRule") { override def matches(call: RelOptRuleCall): Boolean = { - val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate] + val join: FlinkLogicalCorrelate = call.rel(0).asInstanceOf[FlinkLogicalCorrelate] val right = join.getRight.asInstanceOf[RelSubset].getOriginal right match { // right node is a table function - case scan: LogicalTableFunctionScan => true + case scan: FlinkLogicalTableFunctionScan => true // a filter is pushed above the table function - case filter: LogicalFilter => - filter - .getInput.asInstanceOf[RelSubset] - .getOriginal - .isInstanceOf[LogicalTableFunctionScan] + case calc: FlinkLogicalCalc => + calc.getInput.asInstanceOf[RelSubset] + .getOriginal.isInstanceOf[FlinkLogicalTableFunctionScan] case _ => false } } override def convert(rel: RelNode): RelNode = { - val join: LogicalCorrelate = rel.asInstanceOf[LogicalCorrelate] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(join.getInput(0), DataStreamConvention.INSTANCE) + val join: FlinkLogicalCorrelate = rel.asInstanceOf[FlinkLogicalCorrelate] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM) + val convInput: RelNode = RelOptRule.convert(join.getInput(0), FlinkConventions.DATASTREAM) val right: RelNode = join.getInput(1) def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): DataStreamCorrelate = { @@ -64,12 +59,12 @@ class DataStreamCorrelateRule case rel: RelSubset => convertToCorrelate(rel.getRelList.get(0), condition) - case filter: LogicalFilter => + case calc: FlinkLogicalCalc => convertToCorrelate( - filter.getInput.asInstanceOf[RelSubset].getOriginal, - Some(filter.getCondition)) + calc.getInput.asInstanceOf[RelSubset].getOriginal, + Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition))) - case scan: LogicalTableFunctionScan => + case scan: FlinkLogicalTableFunctionScan => new DataStreamCorrelate( rel.getCluster, traitSet, http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala index dc46753..8e96970 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala @@ -19,28 +19,25 @@ package org.apache.flink.table.plan.rules.datastream import org.apache.calcite.plan.volcano.RelSubset -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.plan.{RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalWindow -import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention +import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalOverWindow -/** - * Rule to convert a LogicalWindow into a DataStreamOverAggregate. - */ class DataStreamOverAggregateRule extends ConverterRule( - classOf[LogicalWindow], - Convention.NONE, - DataStreamConvention.INSTANCE, + classOf[FlinkLogicalOverWindow], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, "DataStreamOverAggregateRule") { override def convert(rel: RelNode): RelNode = { - val logicWindow: LogicalWindow = rel.asInstanceOf[LogicalWindow] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) + val logicWindow: FlinkLogicalOverWindow = rel.asInstanceOf[FlinkLogicalOverWindow] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM) val convertInput: RelNode = - RelOptRule.convert(logicWindow.getInput, DataStreamConvention.INSTANCE) + RelOptRule.convert(logicWindow.getInput, FlinkConventions.DATASTREAM) val inputRowType = convertInput.asInstanceOf[RelSubset].getOriginal.getRowType http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala index 91fd6e2..5bf60a7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala @@ -18,25 +18,24 @@ package org.apache.flink.table.plan.rules.datastream -import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.core.TableScan -import org.apache.calcite.rel.logical.LogicalTableScan -import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention +import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamScan import org.apache.flink.table.plan.schema.DataStreamTable +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalNativeTableScan class DataStreamScanRule extends ConverterRule( - classOf[LogicalTableScan], - Convention.NONE, - DataStreamConvention.INSTANCE, + classOf[FlinkLogicalNativeTableScan], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, "DataStreamScanRule") { override def matches(call: RelOptRuleCall): Boolean = { - val scan: TableScan = call.rel(0).asInstanceOf[TableScan] + val scan: FlinkLogicalNativeTableScan = call.rel(0).asInstanceOf[FlinkLogicalNativeTableScan] val dataSetTable = scan.getTable.unwrap(classOf[DataStreamTable[Any]]) dataSetTable match { case _: DataStreamTable[Any] => @@ -47,8 +46,8 @@ class DataStreamScanRule } def convert(rel: RelNode): RelNode = { - val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) + val scan: FlinkLogicalNativeTableScan = rel.asInstanceOf[FlinkLogicalNativeTableScan] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM) new DataStreamScan( rel.getCluster, http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala index 475c050..4241f53 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala @@ -18,26 +18,26 @@ package org.apache.flink.table.plan.rules.datastream -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.plan.{RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalUnion -import org.apache.flink.table.plan.nodes.datastream.DataStreamConvention +import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamUnion +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalUnion class DataStreamUnionRule extends ConverterRule( - classOf[LogicalUnion], - Convention.NONE, - DataStreamConvention.INSTANCE, + classOf[FlinkLogicalUnion], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, "DataStreamUnionRule") { def convert(rel: RelNode): RelNode = { - val union: LogicalUnion = rel.asInstanceOf[LogicalUnion] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) - val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataStreamConvention.INSTANCE) - val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataStreamConvention.INSTANCE) + val union: FlinkLogicalUnion = rel.asInstanceOf[FlinkLogicalUnion] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM) + val convLeft: RelNode = RelOptRule.convert(union.getInput(0), FlinkConventions.DATASTREAM) + val convRight: RelNode = RelOptRule.convert(union.getInput(1), FlinkConventions.DATASTREAM) new DataStreamUnion( rel.getCluster, http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala index db33842..fbad21f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala @@ -18,24 +18,24 @@ package org.apache.flink.table.plan.rules.datastream -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.plan.{RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.logical.LogicalValues -import org.apache.flink.table.plan.nodes.datastream.{DataStreamValues, DataStreamConvention} +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.datastream.DataStreamValues +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalValues class DataStreamValuesRule extends ConverterRule( - classOf[LogicalValues], - Convention.NONE, - DataStreamConvention.INSTANCE, + classOf[FlinkLogicalValues], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, "DataStreamValuesRule") { def convert(rel: RelNode): RelNode = { - - val values: LogicalValues = rel.asInstanceOf[LogicalValues] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) + val values: FlinkLogicalValues = rel.asInstanceOf[FlinkLogicalValues] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM) new DataStreamValues( rel.getCluster, http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala deleted file mode 100644 index 53a3bcd..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushFilterIntoStreamTableSourceScanRule.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.plan.rules.datastream - -import org.apache.calcite.plan.RelOptRule._ -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} -import org.apache.flink.table.plan.rules.common.PushFilterIntoTableSourceScanRuleBase -import org.apache.flink.table.plan.schema.TableSourceTable -import org.apache.flink.table.sources.FilterableTableSource - -class PushFilterIntoStreamTableSourceScanRule extends RelOptRule( - operand(classOf[DataStreamCalc], - operand(classOf[StreamTableSourceScan], none)), - "PushFilterIntoStreamTableSourceScanRule") - with PushFilterIntoTableSourceScanRuleBase { - - override def matches(call: RelOptRuleCall): Boolean = { - val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] - val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] - scan.tableSource match { - case source: FilterableTableSource[_] => - calc.getProgram.getCondition != null && !source.isFilterPushedDown - case _ => false - } - } - - override def onMatch(call: RelOptRuleCall): Unit = { - val calc: DataStreamCalc = call.rel(0).asInstanceOf[DataStreamCalc] - val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] - val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]]) - val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource[_]] - pushFilterIntoScan(call, calc, scan, tableSourceTable, filterableSource, description) - } -} - -object PushFilterIntoStreamTableSourceScanRule { - val INSTANCE: RelOptRule = new PushFilterIntoStreamTableSourceScanRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala deleted file mode 100644 index 903162e..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.plan.rules.datastream - -import org.apache.calcite.plan.RelOptRule._ -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} -import org.apache.flink.table.plan.rules.common.PushProjectIntoTableSourceScanRuleBase -import org.apache.flink.table.sources.{ProjectableTableSource, StreamTableSource} - -/** - * The rule is responsible for push project into a [[StreamTableSourceScan]] - */ -class PushProjectIntoStreamTableSourceScanRule extends RelOptRule( - operand(classOf[DataStreamCalc], - operand(classOf[StreamTableSourceScan], none())), - "PushProjectIntoStreamTableSourceScanRule") - with PushProjectIntoTableSourceScanRuleBase { - - /** Rule must only match if [[StreamTableSource]] targets a [[ProjectableTableSource]] */ - override def matches(call: RelOptRuleCall): Boolean = { - val scan: StreamTableSourceScan = call.rel(1).asInstanceOf[StreamTableSourceScan] - scan.tableSource match { - case _: ProjectableTableSource[_] => true - case _ => false - } - } - - override def onMatch(call: RelOptRuleCall): Unit = { - val calc = call.rel(0).asInstanceOf[DataStreamCalc] - val scan = call.rel(1).asInstanceOf[StreamTableSourceScan] - pushProjectIntoScan(call, calc, scan) - } -} - -object PushProjectIntoStreamTableSourceScanRule { - val INSTANCE: RelOptRule = new PushProjectIntoStreamTableSourceScanRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala index a6db084..10cb1f7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala @@ -18,21 +18,21 @@ package org.apache.flink.table.plan.rules.datastream -import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.core.TableScan -import org.apache.calcite.rel.logical.LogicalTableScan -import org.apache.flink.table.plan.nodes.datastream.{StreamTableSourceScan, DataStreamConvention} +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableSourceScan import org.apache.flink.table.sources.StreamTableSource -/** Rule to convert a [[LogicalTableScan]] into a [[StreamTableSourceScan]]. */ class StreamTableSourceScanRule extends ConverterRule( - classOf[LogicalTableScan], - Convention.NONE, - DataStreamConvention.INSTANCE, + classOf[FlinkLogicalTableSourceScan], + FlinkConventions.LOGICAL, + FlinkConventions.DATASTREAM, "StreamTableSourceScanRule") { @@ -54,18 +54,14 @@ class StreamTableSourceScanRule } def convert(rel: RelNode): RelNode = { - val scan: LogicalTableScan = rel.asInstanceOf[LogicalTableScan] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) - - // The original registered table source - val table = scan.getTable.unwrap(classOf[TableSourceTable[_]]) - val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]] + val scan: FlinkLogicalTableSourceScan = rel.asInstanceOf[FlinkLogicalTableSourceScan] + val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM) new StreamTableSourceScan( rel.getCluster, traitSet, scan.getTable, - tableSource + scan.tableSource.asInstanceOf[StreamTableSource[_]] ) } } http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala new file mode 100644 index 0000000..9d02c87 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.plan.RelOptRule.{any, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand} +import org.apache.calcite.rel.logical.LogicalTableScan + +/** + * Rule that converts an EnumerableTableScan into a LogicalTableScan. + * We need this rule because Calcite creates an EnumerableTableScan + * when parsing a SQL query. We convert it into a LogicalTableScan + * so we can merge the optimization process with any plan that might be created + * by the Table API. + */ +class EnumerableToLogicalTableScan( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { + + override def onMatch(call: RelOptRuleCall): Unit = { + val oldRel = call.rel(0).asInstanceOf[EnumerableTableScan] + val table = oldRel.getTable + val newRel = LogicalTableScan.create(oldRel.getCluster, table) + call.transformTo(newRel) + } +} + +object EnumerableToLogicalTableScan { + val INSTANCE = new EnumerableToLogicalTableScan( + operand(classOf[EnumerableTableScan], any), + "EnumerableToLogicalTableScan") +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala new file mode 100644 index 0000000..ae6129e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import java.util + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rex.RexProgram +import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.util.RexProgramExtractor +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableSourceScan} +import org.apache.flink.table.sources.FilterableTableSource +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConverters._ + +class PushFilterIntoTableSourceScanRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], + operand(classOf[FlinkLogicalTableSourceScan], none)), + "PushFilterIntoTableSourceScanRule") { + + override def matches(call: RelOptRuleCall): Boolean = { + val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc] + val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan] + scan.tableSource match { + case source: FilterableTableSource[_] => + calc.getProgram.getCondition != null && !source.isFilterPushedDown + case _ => false + } + } + + override def onMatch(call: RelOptRuleCall): Unit = { + val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc] + val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan] + val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]]) + val filterableSource = scan.tableSource.asInstanceOf[FilterableTableSource[_]] + pushFilterIntoScan(call, calc, scan, tableSourceTable, filterableSource, description) + } + + private def pushFilterIntoScan( + call: RelOptRuleCall, + calc: FlinkLogicalCalc, + scan: FlinkLogicalTableSourceScan, + tableSourceTable: TableSourceTable[_], + filterableSource: FilterableTableSource[_], + description: String): Unit = { + + Preconditions.checkArgument(!filterableSource.isFilterPushedDown) + + val program = calc.getProgram + val functionCatalog = FunctionCatalog.withBuiltIns + val (predicates, unconvertedRexNodes) = + RexProgramExtractor.extractConjunctiveConditions( + program, + call.builder().getRexBuilder, + functionCatalog) + if (predicates.isEmpty) { + // no condition can be translated to expression + return + } + + val remainingPredicates = new util.LinkedList[Expression]() + predicates.foreach(e => remainingPredicates.add(e)) + + val newTableSource = filterableSource.applyPredicate(remainingPredicates) + + // check whether framework still need to do a filter + val relBuilder = call.builder() + val remainingCondition = { + if (!remainingPredicates.isEmpty || unconvertedRexNodes.nonEmpty) { + relBuilder.push(scan) + val remainingConditions = + (remainingPredicates.asScala.map(expr => expr.toRexNode(relBuilder)) + ++ unconvertedRexNodes) + remainingConditions.reduce((l, r) => relBuilder.and(l, r)) + } else { + null + } + } + + // check whether we still need a RexProgram. An RexProgram is needed when either + // projection or filter exists. + val newScan = scan.copy(scan.getTraitSet, newTableSource) + val newRexProgram = { + if (remainingCondition != null || !program.projectsOnlyIdentity) { + val expandedProjectList = program.getProjectList.asScala + .map(ref => program.expandLocalRef(ref)).asJava + RexProgram.create( + program.getInputRowType, + expandedProjectList, + remainingCondition, + program.getOutputRowType, + relBuilder.getRexBuilder) + } else { + null + } + } + + if (newRexProgram != null) { + val newCalc = calc.copy(calc.getTraitSet, newScan, newRexProgram) + call.transformTo(newCalc) + } else { + call.transformTo(newScan) + } + } +} + +object PushFilterIntoTableSourceScanRule { + val INSTANCE: RelOptRule = new PushFilterIntoTableSourceScanRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala new file mode 100644 index 0000000..99a6927 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.plan.util.{RexProgramExtractor, RexProgramRewriter} +import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableSourceScan} +import org.apache.flink.table.sources.{NestedFieldsProjectableTableSource, ProjectableTableSource} + +class PushProjectIntoTableSourceScanRule extends RelOptRule( + operand(classOf[FlinkLogicalCalc], + operand(classOf[FlinkLogicalTableSourceScan], none)), + "PushProjectIntoTableSourceScanRule") { + + override def matches(call: RelOptRuleCall): Boolean = { + val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan] + scan.tableSource match { + case _: ProjectableTableSource[_] => true + case _ => false + } + } + + override def onMatch(call: RelOptRuleCall) { + val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc] + val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan] + val usedFields = RexProgramExtractor.extractRefInputFields(calc.getProgram) + + // if no fields can be projected, we keep the original plan. + val source = scan.tableSource + if (TableEnvironment.getFieldNames(source).length != usedFields.length) { + + val newTableSource = source match { + case nested: NestedFieldsProjectableTableSource[_] => + val nestedFields = RexProgramExtractor + .extractRefNestedInputFields(calc.getProgram, usedFields) + nested.projectNestedFields(usedFields, nestedFields) + case projecting: ProjectableTableSource[_] => + projecting.projectFields(usedFields) + } + + val newScan = scan.copy(scan.getTraitSet, newTableSource) + val newCalcProgram = RexProgramRewriter.rewriteWithFieldProjection( + calc.getProgram, + newScan.getRowType, + calc.getCluster.getRexBuilder, + usedFields) + + if (newCalcProgram.isTrivial) { + // drop calc if the transformed program merely returns its input and doesn't exist filter + call.transformTo(newScan) + } else { + val newCalc = calc.copy(calc.getTraitSet, newScan, newCalcProgram) + call.transformTo(newCalc) + } + } + } +} + +object PushProjectIntoTableSourceScanRule { + val INSTANCE: RelOptRule = new PushProjectIntoTableSourceScanRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java index d4db13e..e165983 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java @@ -443,7 +443,10 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - CalciteConfig cc = new CalciteConfigBuilder().replaceOptRuleSet(RuleSets.ofList()).build(); + CalciteConfig cc = new CalciteConfigBuilder() + .replaceLogicalOptRuleSet(RuleSets.ofList()) + .replacePhysicalOptRuleSet(RuleSets.ofList()) + .build(); tableEnv.getConfig().setCalciteConfig(cc); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala index d0de8fa..ed29f0f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala @@ -37,8 +37,11 @@ class CalciteConfigBuilderTest { assertFalse(cc.replacesNormRuleSet) assertFalse(cc.getNormRuleSet.isDefined) - assertFalse(cc.replacesOptRuleSet) - assertFalse(cc.getOptRuleSet.isDefined) + assertFalse(cc.replacesLogicalOptRuleSet) + assertFalse(cc.getLogicalOptRuleSet.isDefined) + + assertFalse(cc.replacesPhysicalOptRuleSet) + assertFalse(cc.getPhysicalOptRuleSet.isDefined) assertFalse(cc.replacesDecoRuleSet) assertFalse(cc.getDecoRuleSet.isDefined) @@ -48,16 +51,20 @@ class CalciteConfigBuilderTest { def testRules(): Unit = { val cc: CalciteConfig = new CalciteConfigBuilder() - .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) - .replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) - .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) - .build() + .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) + .replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE)) + .build() assertFalse(cc.replacesNormRuleSet) assertTrue(cc.getNormRuleSet.isDefined) - assertTrue(cc.replacesOptRuleSet) - assertTrue(cc.getOptRuleSet.isDefined) + assertTrue(cc.replacesLogicalOptRuleSet) + assertTrue(cc.getLogicalOptRuleSet.isDefined) + + assertTrue(cc.replacesPhysicalOptRuleSet) + assertTrue(cc.getPhysicalOptRuleSet.isDefined) assertTrue(cc.replacesDecoRuleSet) assertTrue(cc.getDecoRuleSet.isDefined) @@ -126,30 +133,30 @@ class CalciteConfigBuilderTest { } @Test - def testReplaceOptimizationRules(): Unit = { + def testReplaceLogicalOptimizationRules(): Unit = { val cc: CalciteConfig = new CalciteConfigBuilder() - .replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) - .build() + .replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .build() - assertEquals(true, cc.replacesOptRuleSet) - assertTrue(cc.getOptRuleSet.isDefined) - val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet + assertEquals(true, cc.replacesLogicalOptRuleSet) + assertTrue(cc.getLogicalOptRuleSet.isDefined) + val cSet = cc.getLogicalOptRuleSet.get.iterator().asScala.toSet assertEquals(1, cSet.size) assertTrue(cSet.contains(FilterMergeRule.INSTANCE)) } @Test - def testReplaceOptimizationAddRules(): Unit = { + def testReplaceLogicalOptimizationAddRules(): Unit = { val cc: CalciteConfig = new CalciteConfigBuilder() - .replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) - .addOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE)) - .build() + .replaceLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .addLogicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE)) + .build() - assertEquals(true, cc.replacesOptRuleSet) - assertTrue(cc.getOptRuleSet.isDefined) - val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet + assertEquals(true, cc.replacesLogicalOptRuleSet) + assertTrue(cc.getLogicalOptRuleSet.isDefined) + val cSet = cc.getLogicalOptRuleSet.get.iterator().asScala.toSet assertEquals(3, cSet.size) assertTrue(cSet.contains(FilterMergeRule.INSTANCE)) assertTrue(cSet.contains(CalcMergeRule.INSTANCE)) @@ -157,30 +164,64 @@ class CalciteConfigBuilderTest { } @Test - def testAddOptimizationRules(): Unit = { + def testAddLogicalOptimizationRules(): Unit = { val cc: CalciteConfig = new CalciteConfigBuilder() - .addOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) - .build() + .addLogicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .addLogicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE)) + .build() - assertEquals(false, cc.replacesOptRuleSet) - assertTrue(cc.getOptRuleSet.isDefined) - val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet + assertEquals(false, cc.replacesLogicalOptRuleSet) + assertTrue(cc.getLogicalOptRuleSet.isDefined) + val cSet = cc.getLogicalOptRuleSet.get.iterator().asScala.toSet + assertEquals(3, cSet.size) + assertTrue(cSet.contains(FilterMergeRule.INSTANCE)) + assertTrue(cSet.contains(CalcMergeRule.INSTANCE)) + assertTrue(cSet.contains(CalcSplitRule.INSTANCE)) + } + + @Test + def testReplacePhysicalOptimizationRules(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .build() + + assertEquals(true, cc.replacesPhysicalOptRuleSet) + assertTrue(cc.getPhysicalOptRuleSet.isDefined) + val cSet = cc.getPhysicalOptRuleSet.get.iterator().asScala.toSet assertEquals(1, cSet.size) assertTrue(cSet.contains(FilterMergeRule.INSTANCE)) } @Test - def testAddAddOptimizationRules(): Unit = { + def testReplacePhysicalOptimizationAddRules(): Unit = { val cc: CalciteConfig = new CalciteConfigBuilder() - .addOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) - .addOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE)) - .build() + .replacePhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .addPhysicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE)) + .build() + + assertEquals(true, cc.replacesPhysicalOptRuleSet) + assertTrue(cc.getPhysicalOptRuleSet.isDefined) + val cSet = cc.getPhysicalOptRuleSet.get.iterator().asScala.toSet + assertEquals(3, cSet.size) + assertTrue(cSet.contains(FilterMergeRule.INSTANCE)) + assertTrue(cSet.contains(CalcMergeRule.INSTANCE)) + assertTrue(cSet.contains(CalcSplitRule.INSTANCE)) + } + + @Test + def testAddPhysicalOptimizationRules(): Unit = { + + val cc: CalciteConfig = new CalciteConfigBuilder() + .addPhysicalOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE)) + .addPhysicalOptRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE)) + .build() - assertEquals(false, cc.replacesOptRuleSet) - assertTrue(cc.getOptRuleSet.isDefined) - val cSet = cc.getOptRuleSet.get.iterator().asScala.toSet + assertEquals(false, cc.replacesPhysicalOptRuleSet) + assertTrue(cc.getPhysicalOptRuleSet.isDefined) + val cSet = cc.getPhysicalOptRuleSet.get.iterator().asScala.toSet assertEquals(3, cSet.size) assertTrue(cSet.contains(FilterMergeRule.INSTANCE)) assertTrue(cSet.contains(CalcMergeRule.INSTANCE)) http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala index 696468d..d801644 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExternalCatalogTest.scala @@ -116,7 +116,7 @@ class ExternalCatalogTest extends TableTestBase { sourceStreamTableNode(table1Path, table1ProjectedFields), term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2") ), - term("union", "_c0", "e", "_c2") + term("union all", "_c0", "e", "_c2") ) util.verifyTable(result, expected) @@ -143,7 +143,7 @@ class ExternalCatalogTest extends TableTestBase { sourceStreamTableNode(table1Path, table1ProjectedFields), term("select", "*(a, 2) AS EXPR$0", "b", "c") ), - term("union", "EXPR$0", "e", "g")) + term("union all", "EXPR$0", "e", "g")) util.verifySql(sqlQuery, expected) } http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala index 50fafbe..3d93f45 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala @@ -333,13 +333,14 @@ class TableEnvironmentTest extends TableTestBase { val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f) - val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 UNION SELECT a, b, c FROM $table") + val sqlTable2 = util.tEnv.sql(s"SELECT d, e, f FROM $table2 " + + s"UNION ALL SELECT a, b, c FROM $table") val expected2 = binaryNode( "DataStreamUnion", streamTableNode(1), streamTableNode(0), - term("union", "d, e, f")) + term("union all", "d, e, f")) util.verifyTable(sqlTable2, expected2) } http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 6f03bec..324b4d6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -516,7 +516,7 @@ class WindowAggregateTest extends TableTestBase { ) streamUtil.verifySql(sql, expected) } - + @Test def testBoundPartitionedProcTimeWindowWithRowRange() = { val sql = "SELECT " + http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala index 9a6562a..e3e292e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala @@ -18,11 +18,11 @@ package org.apache.flink.table.expressions.utils -import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder} import java.util import java.util.concurrent.Future import com.google.common.collect.ImmutableList +import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder} import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql2rel.RelDecorrelator @@ -43,7 +43,8 @@ import org.apache.flink.table.calcite.FlinkPlannerImpl import org.apache.flink.table.codegen.{CodeGenerator, Compiler, GeneratedFunction} import org.apache.flink.table.expressions.{Expression, ExpressionParser} import org.apache.flink.table.functions.ScalarFunction -import org.apache.flink.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention} +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.dataset.DataSetCalc import org.apache.flink.table.plan.rules.FlinkRuleSets import org.apache.flink.types.Row import org.junit.Assert._ @@ -66,7 +67,8 @@ abstract class ExpressionTestBase { context._2.getFrameworkConfig, context._2.getPlanner, context._2.getTypeFactory) - private val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES) + private val logicalOptProgram = Programs.ofRules(FlinkRuleSets.LOGICAL_OPT_RULES) + private val dataSetOptProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES) private def hepPlanner = { val builder = new HepProgramBuilder @@ -194,9 +196,14 @@ abstract class ExpressionTestBase { decorPlan } - // create DataSetCalc - val flinkOutputProps = converted.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() - val dataSetCalc = optProgram.run(context._2.getPlanner, normalizedPlan, flinkOutputProps, + // convert to logical plan + val logicalProps = converted.getTraitSet.replace(FlinkConventions.LOGICAL).simplify() + val logicalCalc = logicalOptProgram.run(context._2.getPlanner, normalizedPlan, logicalProps, + ImmutableList.of(), ImmutableList.of()) + + // convert to dataset plan + val physicalProps = converted.getTraitSet.replace(FlinkConventions.DATASET).simplify() + val dataSetCalc = dataSetOptProgram.run(context._2.getPlanner, logicalCalc, physicalProps, ImmutableList.of(), ImmutableList.of()) // extract RexNode @@ -219,8 +226,15 @@ abstract class ExpressionTestBase { // create DataSetCalc val decorPlan = RelDecorrelator.decorrelateQuery(converted) - val flinkOutputProps = converted.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() - val dataSetCalc = optProgram.run(context._2.getPlanner, decorPlan, flinkOutputProps, + + // convert to logical plan + val flinkLogicalProps = converted.getTraitSet.replace(FlinkConventions.LOGICAL).simplify() + val logicalCalc = logicalOptProgram.run(context._2.getPlanner, decorPlan, flinkLogicalProps, + ImmutableList.of(), ImmutableList.of()) + + // convert to dataset plan + val flinkPhysicalProps = converted.getTraitSet.replace(FlinkConventions.DATASET).simplify() + val dataSetCalc = dataSetOptProgram.run(context._2.getPlanner, logicalCalc, flinkPhysicalProps, ImmutableList.of(), ImmutableList.of()) // extract RexNode http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala index 8b6d6cf..b563a8b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/NormalizationRulesTest.scala @@ -29,14 +29,15 @@ import org.junit.Test class NormalizationRulesTest extends TableTestBase { @Test - def testApplyNormalizationRuleForForBatchSQL(): Unit = { + def testApplyNormalizationRuleForBatchSQL(): Unit = { val util = batchTestUtil() // rewrite distinct aggregate val cc: CalciteConfig = new CalciteConfigBuilder() - .replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN)) - .replaceOptRuleSet(RuleSets.ofList()) - .build() + .replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN)) + .replaceLogicalOptRuleSet(RuleSets.ofList()) + .replacePhysicalOptRuleSet(RuleSets.ofList()) + .build() util.tEnv.getConfig.setCalciteConfig(cc) util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) @@ -62,14 +63,15 @@ class NormalizationRulesTest extends TableTestBase { } @Test - def testApplyNormalizationRuleForForStreamSQL(): Unit = { + def testApplyNormalizationRuleForStreamSQL(): Unit = { val util = streamTestUtil() // rewrite distinct aggregate val cc: CalciteConfig = new CalciteConfigBuilder() - .replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN)) - .replaceOptRuleSet(RuleSets.ofList()) - .build() + .replaceNormRuleSet(RuleSets.ofList(AggregateExpandDistinctAggregatesRule.JOIN)) + .replaceLogicalOptRuleSet(RuleSets.ofList()) + .replacePhysicalOptRuleSet(RuleSets.ofList()) + .build() util.tEnv.getConfig.setCalciteConfig(cc) util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala index 6a86ace..c5e13a1 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala @@ -35,5 +35,5 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) { override protected def getBuiltInNormRuleSet: RuleSet = ??? - override protected def getBuiltInOptRuleSet: RuleSet = ??? + override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ??? } http://git-wip-us.apache.org/repos/asf/flink/blob/6a0ada81/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out index fc83c0d..2d19bdc 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out @@ -4,7 +4,7 @@ LogicalUnion(all=[true]) LogicalTableScan(table=[[_DataStreamTable_1]]) == Optimized Logical Plan == -DataStreamUnion(union=[count, word]) +DataStreamUnion(union all=[count, word]) DataStreamScan(table=[[_DataStreamTable_0]]) DataStreamScan(table=[[_DataStreamTable_1]])