[FLINK-3596] DataSet RelNode refactoring - remove the intermediate flink relnode layer and the dataset rules - move code generation from rules to DataSet nodes - remove unused DataSete nodes - move code generation from join rule to DataSetJoin node - merge DataSetMap and DataSetReduce into DataSetAggregate
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22621e02 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22621e02 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22621e02 Branch: refs/heads/tableOnCalcite Commit: 22621e0289ee6e2783b3220685d9aaba84476d77 Parents: e34e439 Author: vasia <va...@apache.org> Authored: Tue Mar 8 17:29:32 2016 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Mar 10 23:30:26 2016 +0100 ---------------------------------------------------------------------- .../api/java/table/JavaBatchTranslator.scala | 28 +--- .../plan/nodes/dataset/DataSetAggregate.scala | 114 ++++++++++++++ .../table/plan/nodes/dataset/DataSetCalc.scala | 152 +++++++++++++++++++ .../plan/nodes/dataset/DataSetExchange.scala | 66 -------- .../plan/nodes/dataset/DataSetFlatMap.scala | 74 --------- .../plan/nodes/dataset/DataSetGroupReduce.scala | 105 ------------- .../table/plan/nodes/dataset/DataSetJoin.scala | 83 ++++++++-- .../table/plan/nodes/dataset/DataSetMap.scala | 90 ----------- .../plan/nodes/dataset/DataSetReduce.scala | 66 -------- .../table/plan/nodes/dataset/DataSetSort.scala | 65 -------- .../plan/nodes/logical/FlinkAggregate.scala | 60 -------- .../table/plan/nodes/logical/FlinkCalc.scala | 37 ----- .../plan/nodes/logical/FlinkConvention.scala | 42 ----- .../table/plan/nodes/logical/FlinkJoin.scala | 46 ------ .../api/table/plan/nodes/logical/FlinkRel.scala | 25 --- .../table/plan/nodes/logical/FlinkScan.scala | 31 ---- .../table/plan/nodes/logical/FlinkUnion.scala | 38 ----- .../api/table/plan/rules/FlinkRuleSets.scala | 11 -- .../rules/dataset/DataSetAggregateRule.scala | 71 --------- .../plan/rules/dataset/DataSetCalcRule.scala | 137 ----------------- .../plan/rules/dataset/DataSetJoinRule.scala | 135 ---------------- .../plan/rules/dataset/DataSetScanRule.scala | 51 ------- .../plan/rules/dataset/DataSetUnionRule.scala | 53 ------- .../plan/rules/logical/FlinkAggregateRule.scala | 22 +-- .../plan/rules/logical/FlinkCalcRule.scala | 15 +- .../plan/rules/logical/FlinkJoinRule.scala | 37 +++-- .../logical/FlinkJoinUnionTransposeRule.scala | 2 +- .../plan/rules/logical/FlinkScanRule.scala | 11 +- .../plan/rules/logical/FlinkUnionRule.scala | 22 +-- .../table/runtime/aggregate/AggregateUtil.scala | 49 +++--- 30 files changed, 421 insertions(+), 1317 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala index f238df3..14ee78e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala @@ -76,8 +76,10 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator { // optimize the logical Flink plan val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES) val flinkOutputProps = RelTraitSet.createEmpty() + .plus(DataSetConvention.INSTANCE) + .plus(RelCollations.of()).simplify() - val optPlan = try { + val dataSetPlan = try { optProgram.run(planner, decorPlan, flinkOutputProps) } catch { @@ -89,30 +91,8 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator { } println("---------------") - println("Optimized Plan:") - println("---------------") - println(RelOptUtil.toString(optPlan)) - - // optimize the logical Flink plan - val dataSetProgram = Programs.ofRules(FlinkRuleSets.DATASET_TRANS_RULES) - val dataSetOutputProps = RelTraitSet.createEmpty() - .plus(DataSetConvention.INSTANCE) - .plus(RelCollations.of()).simplify() - - val dataSetPlan = try { - dataSetProgram.run(planner, optPlan, dataSetOutputProps) - } - catch { - case e: CannotPlanException => - throw new PlanGenException( - s"Cannot generate a valid execution plan for the given query: \n\n" + - s"${RelOptUtil.toString(lPlan)}\n" + - "Please consider filing a bug report.", e) - } - - println("-------------") println("DataSet Plan:") - println("-------------") + println("---------------") println(RelOptUtil.toString(dataSetPlan)) dataSetPlan match { http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala new file mode 100644 index 0000000..3856c5f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.plan.TypeConverter._ +import org.apache.flink.api.table.plan.{PlanGenException, TypeConverter} +import org.apache.flink.api.table.runtime.aggregate.AggregateUtil +import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair +import org.apache.flink.api.table.typeinfo.RowTypeInfo +import org.apache.flink.api.table.{Row, TableConfig} + +import scala.collection.JavaConverters._ + +/** + * Flink RelNode which matches along with a LogicalAggregate. + */ +class DataSetAggregate( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + rowType: RelDataType, + inputType: RelDataType, + opName: String, + grouping: Array[Int]) + extends SingleRel(cluster, traitSet, input) + with DataSetRel { + + override def deriveRowType() = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetAggregate( + cluster, + traitSet, + inputs.get(0), + namedAggregates, + rowType, + inputType, + opName, + grouping) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw).item("name", opName) + } + + override def translateToPlan( + config: TableConfig, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + + expectedType match { + case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => + throw new PlanGenException("Aggregate operations currently only support returning Rows.") + case _ => // ok + } + + val groupingKeys = (0 until grouping.length).toArray + // add grouping fields, position keys in the input, and input type + val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates, + inputType, rowType, grouping, config) + + val inputDS = input.asInstanceOf[DataSetRel].translateToPlan( + config, + // tell the input operator that this operator currently only supports Rows as input + Some(TypeConverter.DEFAULT_ROW_TYPE)) + + // get the output types + val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala + .map(f => f.getType.getSqlTypeName) + .map(n => TypeConverter.sqlTypeToTypeInfo(n)) + .toArray + + val rowTypeInfo = new RowTypeInfo(fieldTypes) + val mappedInput = inputDS.map(aggregateResult.mapFunc) + val groupReduceFunction = aggregateResult.reduceGroupFunc + + if (groupingKeys.length > 0) { + mappedInput.asInstanceOf[DataSet[Row]] + .groupBy(groupingKeys: _*) + .reduceGroup(groupReduceFunction) + .returns(rowTypeInfo) + .asInstanceOf[DataSet[Any]] + } + else { + // global aggregation + mappedInput.asInstanceOf[DataSet[Row]] + .reduceGroup(groupReduceFunction) + .returns(rowTypeInfo) + .asInstanceOf[DataSet[Any]] + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala new file mode 100644 index 0000000..d7c71cc --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.plan.TypeConverter._ +import org.apache.flink.api.table.runtime.FlatMapRunner +import org.apache.flink.api.table.TableConfig +import org.apache.calcite.rex.RexProgram +import scala.collection.JavaConversions._ + +/** + * Flink RelNode which matches along with LogicalCalc. + * + */ +class DataSetCalc( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + rowType: RelDataType, + calcProgram: RexProgram, + opName: String, + ruleDescription: String) + extends SingleRel(cluster, traitSet, input) + with DataSetRel { + + override def deriveRowType() = rowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetCalc( + cluster, + traitSet, + inputs.get(0), + rowType, + calcProgram, + opName, + ruleDescription) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw).item("name", opName) + } + + override def toString = opName + + override def translateToPlan(config: TableConfig, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + + val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config) + + val returnType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + + val generator = new CodeGenerator(config, inputDS.getType) + + val condition = calcProgram.getCondition + val expandedExpressions = calcProgram.getProjectList.map( + expr => calcProgram.expandLocalRef(expr)) + val projection = generator.generateResultExpression( + returnType, + rowType.getFieldNames, + expandedExpressions) + + val body = { + // only projection + if (condition == null) { + s""" + |${projection.code} + |${generator.collectorTerm}.collect(${projection.resultTerm}); + |""".stripMargin + } + else { + val filterCondition = generator.generateExpression( + calcProgram.expandLocalRef(calcProgram.getCondition)) + // only filter + if (projection == null) { + // conversion + if (inputDS.getType != returnType) { + val conversion = generator.generateConverterResultExpression( + returnType, + rowType.getFieldNames) + + s""" + |${filterCondition.code} + |if (${filterCondition.resultTerm}) { + | ${conversion.code} + | ${generator.collectorTerm}.collect(${conversion.resultTerm}); + |} + |""".stripMargin + } + // no conversion + else { + s""" + |${filterCondition.code} + |if (${filterCondition.resultTerm}) { + | ${generator.collectorTerm}.collect(${generator.input1Term}); + |} + |""".stripMargin + } + } + // both filter and projection + else { + s""" + |${filterCondition.code} + |if (${filterCondition.resultTerm}) { + | ${projection.code} + | ${generator.collectorTerm}.collect(${projection.resultTerm}); + |} + |""".stripMargin + } + } + } + + val genFunction = generator.generateFunction( + ruleDescription, + classOf[FlatMapFunction[Any, Any]], + body, + returnType) + + val mapFunc = new FlatMapRunner[Any, Any]( + genFunction.name, + genFunction.code, + genFunction.returnType) + + inputDS.flatMap(mapFunc) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala deleted file mode 100644 index 00cf899..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.dataset - -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.TableConfig - -/** - * Flink RelNode which matches along with PartitionOperator. - */ -class DataSetExchange( - cluster: RelOptCluster, - traitSet: RelTraitSet, - input: RelNode, - rowType: RelDataType, - opName: String, - partitionKey: Array[Int], - partitionMethod: PartitionMethod) - extends SingleRel(cluster, traitSet, input) - with DataSetRel { - - override def deriveRowType() = rowType - - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new DataSetExchange( - cluster, - traitSet, - inputs.get(0), - rowType, - opName, - partitionKey, - partitionMethod - ) - } - - override def explainTerms(pw: RelWriter): RelWriter = { - super.explainTerms(pw).item("name", opName) - } - - override def translateToPlan( - config: TableConfig, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - ??? - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala deleted file mode 100644 index 9744792..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.dataset - -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.flink.api.common.functions.FlatMapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.plan.TypeConverter._ - -/** - * Flink RelNode which matches along with FlatMapOperator. - * - */ -class DataSetFlatMap( - cluster: RelOptCluster, - traitSet: RelTraitSet, - input: RelNode, - rowType: RelDataType, - opName: String, - func: (TableConfig, TypeInformation[Any], TypeInformation[Any]) => FlatMapFunction[Any, Any]) - extends SingleRel(cluster, traitSet, input) - with DataSetRel { - - override def deriveRowType() = rowType - - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new DataSetFlatMap( - cluster, - traitSet, - inputs.get(0), - rowType, - opName, - func - ) - } - - override def explainTerms(pw: RelWriter): RelWriter = { - super.explainTerms(pw).item("name", opName) - } - - override def toString = opName - - override def translateToPlan(config: TableConfig, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config) - val returnType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) - val flatMapFunc = func.apply(config, inputDataSet.getType, returnType) - inputDataSet.flatMap(flatMapFunc) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala deleted file mode 100644 index b87a092..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.dataset - -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.flink.api.common.functions.GroupReduceFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.plan.{PlanGenException, TypeConverter} -import org.apache.flink.api.table.typeinfo.RowTypeInfo -import org.apache.flink.api.table.{Row, TableConfig} - -import scala.collection.JavaConverters._ - -/** - * Flink RelNode which matches along with ReduceGroupOperator. - */ -class DataSetGroupReduce( - cluster: RelOptCluster, - traitSet: RelTraitSet, - input: RelNode, - rowType: RelDataType, - opName: String, - groupingKeys: Array[Int], - func: (TableConfig, TypeInformation[Row], TypeInformation[Row]) => - GroupReduceFunction[Row, Row]) - extends SingleRel(cluster, traitSet, input) - with DataSetRel { - - override def deriveRowType() = rowType - - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new DataSetGroupReduce( - cluster, - traitSet, - inputs.get(0), - rowType, - opName, - groupingKeys, - func - ) - } - - override def explainTerms(pw: RelWriter): RelWriter = { - super.explainTerms(pw).item("name", opName) - } - - override def translateToPlan( - config: TableConfig, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - - expectedType match { - case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => - throw new PlanGenException("GroupReduce operations currently only support returning Rows.") - case _ => // ok - } - - val inputDS = input.asInstanceOf[DataSetRel].translateToPlan( - config, - // tell the input operator that this operator currently only supports Rows as input - Some(TypeConverter.DEFAULT_ROW_TYPE)) - - // get the output types - val fieldsNames = rowType.getFieldNames - val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala - .map(f => f.getType.getSqlTypeName) - .map(n => TypeConverter.sqlTypeToTypeInfo(n)) - .toArray - - val rowTypeInfo = new RowTypeInfo(fieldTypes) - val groupReduceFunction = - func.apply(config, inputDS.getType.asInstanceOf[RowTypeInfo], rowTypeInfo) - - if (groupingKeys.length > 0) { - inputDS.asInstanceOf[DataSet[Row]] - .groupBy(groupingKeys: _*) - .reduceGroup(groupReduceFunction) - .returns(rowTypeInfo) - .asInstanceOf[DataSet[Any]] - } - else { - // global aggregation - inputDS.asInstanceOf[DataSet[Row]].reduceGroup(groupReduceFunction) - .returns(rowTypeInfo).asInstanceOf[DataSet[Any]] - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala index c32853d..1d293d2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala @@ -20,21 +20,21 @@ package org.apache.flink.api.table.plan.nodes.dataset import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinInfo import org.apache.calcite.rel.{RelWriter, BiRel, RelNode} -import org.apache.flink.api.common.functions.JoinFunction +import org.apache.calcite.util.mapping.IntPair import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.operators.join.JoinType -import org.apache.flink.api.table.{TableConfig, Row} +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.runtime.FlatJoinRunner +import org.apache.flink.api.table.{TableException, TableConfig} import org.apache.flink.api.common.functions.FlatJoinFunction import org.apache.flink.api.table.plan.TypeConverter._ -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.java.tuple.Tuple2 -import org.apache.flink.api.table.typeinfo.RowTypeInfo import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ -import org.apache.flink.api.table.plan.TypeConverter +import org.apache.calcite.rex.RexNode /** * Flink RelNode which matches along with JoinOperator and its related operations. @@ -46,12 +46,13 @@ class DataSetJoin( right: RelNode, rowType: RelDataType, opName: String, - joinKeysLeft: Array[Int], - joinKeysRight: Array[Int], + joinCondition: RexNode, + joinRowType: RelDataType, + joinInfo: JoinInfo, + keyPairs: List[IntPair], joinType: JoinType, joinHint: JoinHint, - func: (TableConfig, TypeInformation[Any], TypeInformation[Any], TypeInformation[Any]) => - FlatJoinFunction[Any, Any, Any]) + ruleDescription: String) extends BiRel(cluster, traitSet, left, right) with DataSetRel { @@ -65,12 +66,13 @@ class DataSetJoin( inputs.get(1), rowType, opName, - joinKeysLeft, - joinKeysRight, + joinCondition, + joinRowType, + joinInfo, + keyPairs, joinType, joinHint, - func - ) + ruleDescription) } override def explainTerms(pw: RelWriter): RelWriter = { @@ -90,8 +92,57 @@ class DataSetJoin( config.getNullCheck, config.getEfficientTypeUsage) - val joinFun = func.apply(config, leftDataSet.getType, rightDataSet.getType, returnType) - leftDataSet.join(rightDataSet).where(joinKeysLeft: _*).equalTo(joinKeysRight: _*) + // get the equality keys + val leftKeys = ArrayBuffer.empty[Int] + val rightKeys = ArrayBuffer.empty[Int] + if (keyPairs.isEmpty) { + // if no equality keys => not supported + throw new TableException("Joins should have at least one equality condition") + } + else { + // at least one equality expression => generate a join function + keyPairs.foreach(pair => { + leftKeys.add(pair.source) + rightKeys.add(pair.target) + }) + } + + val generator = new CodeGenerator(config, leftDataSet.getType, Some(rightDataSet.getType)) + val conversion = generator.generateConverterResultExpression( + returnType, + joinRowType.getFieldNames) + + var body = "" + + if (joinInfo.isEqui) { + // only equality condition + body = s""" + |${conversion.code} + |${generator.collectorTerm}.collect(${conversion.resultTerm}); + |""".stripMargin + } + else { + val condition = generator.generateExpression(joinCondition) + body = s""" + |${condition.code} + |if (${condition.resultTerm}) { + | ${conversion.code} + | ${generator.collectorTerm}.collect(${conversion.resultTerm}); + |} + |""".stripMargin + } + val genFunction = generator.generateFunction( + ruleDescription, + classOf[FlatJoinFunction[Any, Any, Any]], + body, + returnType) + + val joinFun = new FlatJoinRunner[Any, Any, Any]( + genFunction.name, + genFunction.code, + genFunction.returnType) + + leftDataSet.join(rightDataSet).where(leftKeys.toArray: _*).equalTo(rightKeys.toArray: _*) .`with`(joinFun).asInstanceOf[DataSet[Any]] } http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala deleted file mode 100644 index d87e047..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.dataset - -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.flink.api.common.functions.{MapFunction, MapPartitionFunction} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.plan.{PlanGenException, TypeConverter} -import org.apache.flink.api.table.plan.TypeConverter._ -import org.apache.flink.api.table.typeinfo.RowTypeInfo -import org.apache.flink.api.table.{Row, TableConfig} - -import scala.collection.JavaConverters._ - -/** - * Flink RelNode which matches along with MapOperator. - * - */ -class DataSetMap( - cluster: RelOptCluster, - traitSet: RelTraitSet, - input: RelNode, - rowType: RelDataType, - opName: String, - func: (TableConfig, TypeInformation[Any], TypeInformation[Any]) => MapFunction[Any, Any]) - extends SingleRel(cluster, traitSet, input) - with DataSetRel { - - override def deriveRowType() = rowType - - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new DataSetMap( - cluster, - traitSet, - inputs.get(0), - rowType, - opName, - func - ) - } - - override def explainTerms(pw: RelWriter): RelWriter = { - super.explainTerms(pw).item("name", opName) - } - - override def toString = opName - - override def translateToPlan(config: TableConfig, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - - expectedType match { - case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => - throw new PlanGenException("GroupReduce operations " + - "currently only support returning Rows.") - case _ => // ok - } - - val inputDS = input.asInstanceOf[DataSetRel].translateToPlan( - config, - // tell the input operator that this operator currently only supports Rows as input - Some(TypeConverter.DEFAULT_ROW_TYPE)) - - val returnType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) - val mapFunc = func.apply(config, inputDS.getType, returnType) - inputDS.map(mapFunc) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala deleted file mode 100644 index 361f869..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.dataset - -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.flink.api.common.functions.ReduceFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.{TableConfig, Row} - -/** - * Flink RelNode which matches along with ReduceOperator. - */ -class DataSetReduce( - cluster: RelOptCluster, - traits: RelTraitSet, - input: RelNode, - rowType: RelDataType, - opName: String, - groupingKeys: Array[Int], - func: ReduceFunction[Any]) - extends SingleRel(cluster, traits, input) - with DataSetRel { - - override def deriveRowType() = rowType - - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new DataSetReduce( - cluster, - traitSet, - inputs.get(0), - rowType, - opName, - groupingKeys, - func - ) - } - - override def explainTerms(pw: RelWriter): RelWriter = { - super.explainTerms(pw).item("name", opName) - } - - override def translateToPlan( - config: TableConfig, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - ??? - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala deleted file mode 100644 index 033711b..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.dataset - -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.TableConfig - -/** - * Flink RelNode which matches along with SortPartitionOperator. - */ -class DataSetSort( - cluster: RelOptCluster, - traitSet: RelTraitSet, - input: RelNode, - rowType: RelDataType, - opName: String, - sortKey: Array[Int], - sortOrder: Array[Boolean]) - extends SingleRel(cluster, traitSet, input) - with DataSetRel { - - override def deriveRowType() = rowType - - override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { - new DataSetSort( - cluster, - traitSet, - inputs.get(0), - rowType, - opName, - sortKey, - sortOrder - ) - } - - override def explainTerms(pw: RelWriter): RelWriter = { - super.explainTerms(pw).item("name", opName) - } - - override def translateToPlan( - config: TableConfig, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - ??? - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala deleted file mode 100644 index 1fca03a..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.logical - -import java.util - -import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.core.{AggregateCall, Aggregate} -import org.apache.calcite.sql.fun.SqlAvgAggFunction -import org.apache.calcite.util.ImmutableBitSet - -import scala.collection.JavaConversions._ - -class FlinkAggregate( - cluster: RelOptCluster, - traitSet: RelTraitSet, - input: RelNode, - indicator: Boolean, - groupSet: ImmutableBitSet, - groupSets: java.util.List[ImmutableBitSet], - aggCalls: java.util.List[AggregateCall]) - extends Aggregate(cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls) - with FlinkRel { - - override def copy( - traitSet: RelTraitSet, - input: RelNode, - indicator: Boolean, - groupSet: ImmutableBitSet, - groupSets: util.List[ImmutableBitSet], - aggCalls: util.List[AggregateCall]): Aggregate = { - - new FlinkAggregate( - cluster, - traitSet, - input, - indicator, - groupSet, - groupSets, - aggCalls - ) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala deleted file mode 100644 index bcfe8d7..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkCalc.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.logical - -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.core.Calc -import org.apache.calcite.rex.RexProgram - -class FlinkCalc( - cluster: RelOptCluster, - traitSet: RelTraitSet, - input: RelNode, - program: RexProgram) - extends Calc(cluster, traitSet, input, program) - with FlinkRel { - - override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = { - new FlinkCalc(cluster, traitSet, child, program) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala deleted file mode 100644 index 80137f2..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkConvention.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.logical - -import org.apache.calcite.plan._ - -class FlinkConvention extends Convention { - - override def toString: String = getName - - def getInterface: Class[_] = classOf[FlinkRel] - - def getName: String = "FLINK" - - def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE - - def satisfies(`trait`: RelTrait): Boolean = this eq `trait` - - def register(planner: RelOptPlanner): Unit = { } - -} - -object FlinkConvention { - - val INSTANCE = new FlinkConvention -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala deleted file mode 100644 index 8b04b50..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkJoin.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.logical - -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.core.{JoinRelType, Join} -import org.apache.calcite.rex.RexNode - -class FlinkJoin( - cluster: RelOptCluster, - traitSet: RelTraitSet, - left: RelNode, - right: RelNode, - condition: RexNode, - joinType: JoinRelType, - variablesStopped: java.util.Set[String]) - extends Join(cluster, traitSet, left, right, condition, joinType, variablesStopped) - with FlinkRel { - - override def copy( - traitSet: RelTraitSet, - condition: RexNode, - left: RelNode, - right: RelNode, - joinType: JoinRelType, - semiJoinDone: Boolean): Join = { - new FlinkJoin(cluster, traitSet, left, right, condition, joinType, getVariablesStopped) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala deleted file mode 100644 index 9ebd7e4..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkRel.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.logical - -import org.apache.calcite.rel.RelNode - -trait FlinkRel extends RelNode { - -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala deleted file mode 100644 index 6d53a75..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkScan.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.logical - -import org.apache.calcite.plan.{RelOptTable, RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.core.TableScan - -class FlinkScan( - cluster: RelOptCluster, - traitSet: RelTraitSet, - table: RelOptTable) - extends TableScan(cluster, traitSet, table) - with FlinkRel { - -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala deleted file mode 100644 index fd791d3..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkUnion.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.nodes.logical - -import java.util - -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.core.{SetOp, Union} - -class FlinkUnion( - cluster: RelOptCluster, - traitSet: RelTraitSet, - inputs: java.util.List[RelNode], - all: Boolean) - extends Union(cluster, traitSet, inputs, all) - with FlinkRel { - - override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode], all: Boolean): SetOp = { - new FlinkUnion(cluster, traitSet, inputs, all) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala index b5c3800..bd128b2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -21,7 +21,6 @@ package org.apache.flink.api.table.plan.rules import org.apache.calcite.rel.rules._ import org.apache.calcite.tools.{RuleSets, RuleSet} import org.apache.flink.api.table.plan.rules.logical._ -import org.apache.flink.api.table.plan.rules.dataset._ object FlinkRuleSets { @@ -102,14 +101,4 @@ object FlinkRuleSets { FlinkUnionRule.INSTANCE ) - val DATASET_TRANS_RULES: RuleSet = RuleSets.ofList( - - // translate to DataSet nodes - DataSetAggregateRule.INSTANCE, - DataSetCalcRule.INSTANCE, - DataSetJoinRule.INSTANCE, - DataSetScanRule.INSTANCE, - DataSetUnionRule.INSTANCE - ) - } http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala deleted file mode 100644 index ba77fea..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataset - -import org.apache.calcite.plan.{RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetGroupReduce, DataSetMap} -import org.apache.flink.api.table.plan.nodes.logical.{FlinkAggregate, FlinkConvention} -import org.apache.flink.api.table.runtime.aggregate.AggregateUtil - -import scala.collection.JavaConversions._ - -class DataSetAggregateRule - extends ConverterRule( - classOf[FlinkAggregate], - FlinkConvention.INSTANCE, - DataSetConvention.INSTANCE, - "DataSetAggregateRule") -{ - - def convert(rel: RelNode): RelNode = { - val agg: FlinkAggregate = rel.asInstanceOf[FlinkAggregate] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE) - - val grouping = agg.getGroupSet.toArray - - val inputType = agg.getInput.getRowType() - - // add grouping fields, position keys in the input, and input type - val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(agg.getNamedAggCalls, - inputType, rel.getRowType, grouping) - - val mapNode = new DataSetMap(rel.getCluster, - traitSet, - convInput, - aggregateResult.intermediateDataType, - agg.toString, - aggregateResult.mapFunc) - - new DataSetGroupReduce( - rel.getCluster, - traitSet, - mapNode, - rel.getRowType, - agg.toString, - (0 until grouping.length).toArray, - aggregateResult.reduceGroupFunc) - } -} - -object DataSetAggregateRule { - val INSTANCE: RelOptRule = new DataSetAggregateRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala deleted file mode 100644 index 256a085..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetCalcRule.scala +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataset - -import org.apache.calcite.plan.{RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetFlatMap} -import org.apache.flink.api.table.plan.nodes.logical.{FlinkCalc, FlinkConvention} -import org.apache.flink.api.table.runtime.FlatMapRunner -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.codegen.CodeGenerator -import org.apache.flink.api.common.functions.FlatMapFunction -import scala.collection.JavaConversions._ -import org.apache.calcite.rex.RexLocalRef -import org.apache.flink.api.table.codegen.GeneratedExpression - -class DataSetCalcRule - extends ConverterRule( - classOf[FlinkCalc], - FlinkConvention.INSTANCE, - DataSetConvention.INSTANCE, - "DataSetCalcRule") -{ - - def convert(rel: RelNode): RelNode = { - val calc: FlinkCalc = rel.asInstanceOf[FlinkCalc] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(calc.getInput, DataSetConvention.INSTANCE) - - val calcFunc = ( - config: TableConfig, - inputType: TypeInformation[Any], - returnType: TypeInformation[Any]) => { - val generator = new CodeGenerator(config, inputType) - - val calcProgram = calc.getProgram - val condition = calcProgram.getCondition - val expandedExpressions = calcProgram.getProjectList.map( - expr => calcProgram.expandLocalRef(expr.asInstanceOf[RexLocalRef])) - val projection = generator.generateResultExpression( - returnType, - calc.getRowType.getFieldNames, - expandedExpressions) - - val body = { - // only projection - if (condition == null) { - s""" - |${projection.code} - |${generator.collectorTerm}.collect(${projection.resultTerm}); - |""".stripMargin - } - else { - val filterCondition = generator.generateExpression( - calcProgram.expandLocalRef(calcProgram.getCondition)) - // only filter - if (projection == null) { - // conversion - if (inputType != returnType) { - val conversion = generator.generateConverterResultExpression( - returnType, - calc.getRowType.getFieldNames) - - s""" - |${filterCondition.code} - |if (${filterCondition.resultTerm}) { - | ${conversion.code} - | ${generator.collectorTerm}.collect(${conversion.resultTerm}); - |} - |""".stripMargin - } - // no conversion - else { - s""" - |${filterCondition.code} - |if (${filterCondition.resultTerm}) { - | ${generator.collectorTerm}.collect(${generator.input1Term}); - |} - |""".stripMargin - } - } - // both filter and projection - else { - s""" - |${filterCondition.code} - |if (${filterCondition.resultTerm}) { - | ${projection.code} - | ${generator.collectorTerm}.collect(${projection.resultTerm}); - |} - |""".stripMargin - } - } - } - - val genFunction = generator.generateFunction( - description, - classOf[FlatMapFunction[Any, Any]], - body, - returnType) - - new FlatMapRunner[Any, Any]( - genFunction.name, - genFunction.code, - genFunction.returnType) - } - - new DataSetFlatMap( - rel.getCluster, - traitSet, - convInput, - rel.getRowType, - calc.toString, - calcFunc) - } -} - -object DataSetCalcRule { - val INSTANCE: RelOptRule = new DataSetCalcRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala deleted file mode 100644 index c045471..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataset - -import org.apache.calcite.plan.{RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.flink.api.java.operators.join.JoinType -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetJoin} -import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, FlinkConvention} -import scala.collection.JavaConversions._ -import scala.collection.mutable.ArrayBuffer -import org.apache.flink.api.table.plan.TypeConverter._ -import org.apache.flink.api.table.runtime.FlatJoinRunner -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.codegen.CodeGenerator -import org.apache.flink.api.common.functions.FlatJoinFunction -import org.apache.calcite.rel.core.JoinInfo -import org.apache.flink.api.table.TableException - -class DataSetJoinRule - extends ConverterRule( - classOf[FlinkJoin], - FlinkConvention.INSTANCE, - DataSetConvention.INSTANCE, - "DataSetJoinRule") -{ - - def convert(rel: RelNode): RelNode = { - val join: FlinkJoin = rel.asInstanceOf[FlinkJoin] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE) - val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE) - - // get the equality keys - val joinInfo = join.analyzeCondition - val keyPairs = joinInfo.pairs - - if (keyPairs.isEmpty) { // if no equality keys => not supported - throw new TableException("Joins should have at least one equality condition") - } - else { // at least one equality expression => generate a join function - val conditionType = join.getCondition.getType - val func = getJoinFunction(join, joinInfo) - val leftKeys = ArrayBuffer.empty[Int] - val rightKeys = ArrayBuffer.empty[Int] - - keyPairs.foreach(pair => { - leftKeys.add(pair.source) - rightKeys.add(pair.target)} - ) - - new DataSetJoin( - rel.getCluster, - traitSet, - convLeft, - convRight, - rel.getRowType, - join.toString, - leftKeys.toArray, - rightKeys.toArray, - JoinType.INNER, - null, - func) - } - } - - def getJoinFunction(join: FlinkJoin, joinInfo: JoinInfo): - ((TableConfig, TypeInformation[Any], TypeInformation[Any], TypeInformation[Any]) => - FlatJoinFunction[Any, Any, Any]) = { - - val func = ( - config: TableConfig, - leftInputType: TypeInformation[Any], - rightInputType: TypeInformation[Any], - returnType: TypeInformation[Any]) => { - - val generator = new CodeGenerator(config, leftInputType, Some(rightInputType)) - val conversion = generator.generateConverterResultExpression( - returnType, - join.getRowType.getFieldNames) - var body = "" - - if (joinInfo.isEqui) { - // only equality condition - body = s""" - |${conversion.code} - |${generator.collectorTerm}.collect(${conversion.resultTerm}); - |""".stripMargin - } - else { - val condition = generator.generateExpression(join.getCondition) - body = s""" - |${condition.code} - |if (${condition.resultTerm}) { - | ${conversion.code} - | ${generator.collectorTerm}.collect(${conversion.resultTerm}); - |} - |""".stripMargin - } - val genFunction = generator.generateFunction( - description, - classOf[FlatJoinFunction[Any, Any, Any]], - body, - returnType) - - new FlatJoinRunner[Any, Any, Any]( - genFunction.name, - genFunction.code, - genFunction.returnType) - } - func - } -} - -object DataSetJoinRule { - val INSTANCE: RelOptRule = new DataSetJoinRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala deleted file mode 100644 index f995201..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetScanRule.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataset - -import org.apache.calcite.plan.{RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSource} -import org.apache.flink.api.table.plan.nodes.logical.{FlinkScan, FlinkConvention} - -class DataSetScanRule - extends ConverterRule( - classOf[FlinkScan], - FlinkConvention.INSTANCE, - DataSetConvention.INSTANCE, - "DataSetScanRule") -{ - - def convert(rel: RelNode): RelNode = { - val scan: FlinkScan = rel.asInstanceOf[FlinkScan] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - - - new DataSetSource( - rel.getCluster, - traitSet, - scan.getTable, - rel.getRowType - ) - } -} - -object DataSetScanRule { - val INSTANCE: RelOptRule = new DataSetScanRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala deleted file mode 100644 index a390374..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetUnionRule.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.dataset - -import org.apache.calcite.plan.{RelOptRule, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetUnion} -import org.apache.flink.api.table.plan.nodes.logical.{FlinkUnion, FlinkConvention} - -class DataSetUnionRule - extends ConverterRule( - classOf[FlinkUnion], - FlinkConvention.INSTANCE, - DataSetConvention.INSTANCE, - "DataSetUnionRule") -{ - - def convert(rel: RelNode): RelNode = { - val union: FlinkUnion = rel.asInstanceOf[FlinkUnion] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE) - val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE) - - new DataSetUnion( - rel.getCluster, - traitSet, - convLeft, - convRight, - rel.getRowType, - union.toString) - } -} - -object DataSetUnionRule { - val INSTANCE: RelOptRule = new DataSetUnionRule -} http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala index a5bfeb6..01a5130 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkAggregateRule.scala @@ -22,30 +22,32 @@ import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.logical.LogicalAggregate -import org.apache.flink.api.table.plan.nodes.logical.{FlinkAggregate, FlinkConvention} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention} +import scala.collection.JavaConversions._ class FlinkAggregateRule extends ConverterRule( classOf[LogicalAggregate], Convention.NONE, - FlinkConvention.INSTANCE, + DataSetConvention.INSTANCE, "FlinkAggregateRule") { def convert(rel: RelNode): RelNode = { val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate] - val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(agg.getInput, FlinkConvention.INSTANCE) + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE) - new FlinkAggregate( + new DataSetAggregate( rel.getCluster, traitSet, convInput, - agg.indicator, - agg.getGroupSet, - agg.getGroupSets, - agg.getAggCallList) - } + agg.getNamedAggCalls, + rel.getRowType, + agg.getInput.getRowType, + agg.toString, + agg.getGroupSet.toArray) + } } object FlinkAggregateRule { http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala index f40b04d..f5e9c68 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkCalcRule.scala @@ -22,26 +22,29 @@ import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.logical.LogicalCalc -import org.apache.flink.api.table.plan.nodes.logical.{FlinkCalc, FlinkConvention} +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetCalc, DataSetConvention} class FlinkCalcRule extends ConverterRule( classOf[LogicalCalc], Convention.NONE, - FlinkConvention.INSTANCE, + DataSetConvention.INSTANCE, "FlinkCalcRule") { def convert(rel: RelNode): RelNode = { val calc: LogicalCalc = rel.asInstanceOf[LogicalCalc] - val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConvention.INSTANCE) - val convInput: RelNode = RelOptRule.convert(calc.getInput, FlinkConvention.INSTANCE) + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(calc.getInput, DataSetConvention.INSTANCE) - new FlinkCalc( + new DataSetCalc( rel.getCluster, traitSet, convInput, - calc.getProgram) + rel.getRowType, + calc.getProgram, + calc.toString, + description) } } http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala index 82f3eaa..c8ce944 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala @@ -24,15 +24,16 @@ import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.logical.LogicalJoin import org.apache.calcite.rex.{RexInputRef, RexCall} import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, FlinkConvention} - +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetJoin, DataSetConvention} import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ class FlinkJoinRule extends ConverterRule( classOf[LogicalJoin], Convention.NONE, - FlinkConvention.INSTANCE, + DataSetConvention.INSTANCE, "FlinkJoinRule") { @@ -85,19 +86,27 @@ class FlinkJoinRule } def convert(rel: RelNode): RelNode = { + val join: LogicalJoin = rel.asInstanceOf[LogicalJoin] - val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConvention.INSTANCE) - val convLeft: RelNode = RelOptRule.convert(join.getInput(0), FlinkConvention.INSTANCE) - val convRight: RelNode = RelOptRule.convert(join.getInput(1), FlinkConvention.INSTANCE) + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE) + val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE) + val joinInfo = join.analyzeCondition - new FlinkJoin( - rel.getCluster, - traitSet, - convLeft, - convRight, - join.getCondition, - join.getJoinType, - join.getVariablesStopped) + new DataSetJoin( + rel.getCluster, + traitSet, + convLeft, + convRight, + rel.getRowType, + join.toString, + join.getCondition, + join.getRowType, + joinInfo, + joinInfo.pairs.toList, + JoinType.INNER, + null, + description) } } http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala index af54f37..ff3ee8f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinUnionTransposeRule.scala @@ -33,7 +33,7 @@ import org.apache.calcite.rel.core.Union /** * This rule is a copy of Calcite's JoinUnionTransposeRule. * Calcite's implementation checks whether one of the operands is a LogicalUnion, - * which fails in our case, when it matches with a FlinkUnion. + * which fails in our case, when it matches with a DataSetUnion. * This rule changes this check to match Union, instead of LogicalUnion only. * The rest of the rule's logic has not been changed. */ http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala index d789770..21da504 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkScanRule.scala @@ -25,25 +25,24 @@ import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rel.logical.LogicalTableScan import org.apache.flink.api.java.DataSet import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetSource} -import org.apache.flink.api.table.plan.nodes.logical.{FlinkScan, FlinkConvention} import org.apache.flink.api.table.plan.schema.DataSetTable class FlinkScanRule extends ConverterRule( classOf[LogicalTableScan], Convention.NONE, - FlinkConvention.INSTANCE, + DataSetConvention.INSTANCE, "FlinkScanRule") { def convert(rel: RelNode): RelNode = { val scan: TableScan = rel.asInstanceOf[TableScan] - val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConvention.INSTANCE) - val dataSet: DataSet[_] = scan.getTable().unwrap(classOf[DataSetTable[_]]).dataSet + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - new FlinkScan( + new DataSetSource( rel.getCluster, traitSet, - scan.getTable + scan.getTable, + rel.getRowType ) } } http://git-wip-us.apache.org/repos/asf/flink/blob/22621e02/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala index d9869f8..11600a2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkUnionRule.scala @@ -22,30 +22,30 @@ import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.logical.LogicalUnion -import org.apache.flink.api.table.plan.nodes.logical.{FlinkUnion, FlinkConvention} - -import scala.collection.JavaConversions._ +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetUnion} class FlinkUnionRule extends ConverterRule( classOf[LogicalUnion], Convention.NONE, - FlinkConvention.INSTANCE, + DataSetConvention.INSTANCE, "FlinkUnionRule") { def convert(rel: RelNode): RelNode = { + val union: LogicalUnion = rel.asInstanceOf[LogicalUnion] - val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConvention.INSTANCE) - val convInputs = union.getInputs.toList.map( - RelOptRule.convert(_, FlinkConvention.INSTANCE) - ) + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE) + val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE) - new FlinkUnion( + new DataSetUnion( rel.getCluster, traitSet, - convInputs, - union.all) + convLeft, + convRight, + rel.getRowType, + union.toString) } }