This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new fd04309 [FLINK-11834] [table-planner-blink] Introduce flink logical relational nodes (#7910) fd04309 is described below commit fd04309c8828637de895cbb8470bc210a3f33b66 Author: godfrey he <godfre...@163.com> AuthorDate: Thu Mar 7 08:54:43 2019 +0800 [FLINK-11834] [table-planner-blink] Introduce flink logical relational nodes (#7910) * [FLINK-11834] [table-planner-blink] Introduce flink logical relational nodes This commit includes most flink logical relational nodes, the rest will be introduced by other commit. * add FlinkLogicalTableFunctionScan * add FlinkLogicalTableSourceScan * move RankRange into Rank.scala file --- .../flink/table/calcite/FlinkTypeFactory.scala | 67 ++++- .../flink/table/plan/nodes/FlinkRelNode.scala | 89 +++++++ .../flink/table/plan/nodes/calcite/Expand.scala | 96 ++++++++ .../table/plan/nodes/calcite/LogicalExpand.scala | 72 ++++++ .../table/plan/nodes/calcite/LogicalRank.scala | 84 +++++++ .../table/plan/nodes/calcite/LogicalSink.scala | 57 +++++ .../LogicalWatermarkAssigner.scala} | 21 +- .../flink/table/plan/nodes/calcite/Rank.scala | 190 ++++++++++++++ .../flink/table/plan/nodes/calcite/Sink.scala | 59 +++++ .../plan/nodes/calcite/WatermarkAssigner.scala | 64 +++++ .../plan/nodes/logical/FlinkLogicalAggregate.scala | 160 ++++++++++++ .../plan/nodes/logical/FlinkLogicalCalc.scala | 108 ++++++++ .../plan/nodes/logical/FlinkLogicalCorrelate.scala | 106 ++++++++ .../logical/FlinkLogicalDataStreamTableScan.scala | 4 + .../plan/nodes/logical/FlinkLogicalExpand.scala | 103 ++++++++ .../plan/nodes/logical/FlinkLogicalIntersect.scala | 89 +++++++ .../plan/nodes/logical/FlinkLogicalJoin.scala | 102 ++++++++ .../plan/nodes/logical/FlinkLogicalMinus.scala | 87 +++++++ .../nodes/logical/FlinkLogicalOverWindow.scala | 98 ++++++++ .../plan/nodes/logical/FlinkLogicalRank.scala | 116 +++++++++ .../plan/nodes/logical/FlinkLogicalSink.scala | 80 ++++++ .../plan/nodes/logical/FlinkLogicalSort.scala | 122 +++++++++ .../logical/FlinkLogicalTableFunctionScan.scala | 155 ++++++++++++ .../logical/FlinkLogicalTableSourceScan.scala | 116 +++++++++ .../plan/nodes/logical/FlinkLogicalUnion.scala | 93 +++++++ .../plan/nodes/logical/FlinkLogicalValues.scala | 84 +++++++ .../logical/FlinkLogicalWatermarkAssigner.scala | 81 ++++++ .../flink/table/plan/schema/DataStreamTable.scala | 3 + .../flink/table/plan/schema/FlinkRelOptTable.scala | 272 +++++++++++++++++++++ .../TableSourceTable.scala} | 25 +- .../plan/schema/TimeIndicatorRelDataType.scala | 57 +++++ .../flink/table/plan/util/AggregateUtil.scala} | 42 ++-- .../apache/flink/table/plan/util/CalcUtil.scala | 64 +++++ .../FlinkRelMdUtil.scala} | 15 +- .../flink/table/plan/util/FlinkRelOptUtil.scala} | 41 ++-- .../FlinkRelNode.scala => util/SortUtil.scala} | 17 +- .../BatchTableSource.scala} | 18 +- .../StreamTableSource.scala} | 18 +- .../flink/table/sources/TableSourceUtil.scala | 65 +++++ .../org/apache/flink/table/type/InternalTypes.java | 4 + .../org/apache/flink/table/type/TimestampType.java | 30 ++- 41 files changed, 3099 insertions(+), 75 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 0ba2252..219c1f4 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -19,13 +19,13 @@ package org.apache.flink.table.calcite import org.apache.flink.table.`type`.{ArrayType, DecimalType, InternalType, InternalTypes, MapType, RowType} -import org.apache.flink.table.api.TableException -import org.apache.flink.table.plan.schema.{ArrayRelDataType, MapRelDataType, RowRelDataType, RowSchema} +import org.apache.flink.table.api.{TableException, TableSchema} +import org.apache.flink.table.plan.schema.{ArrayRelDataType, MapRelDataType, RowRelDataType, RowSchema, TimeIndicatorRelDataType} import org.apache.calcite.jdbc.JavaTypeFactoryImpl import org.apache.calcite.rel.`type`._ -import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.`type`.SqlTypeName._ +import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName} import java.util @@ -92,6 +92,19 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp } /** + * Creates a indicator type for event-time, but with similar properties as SQL timestamp. + */ + def createRowtimeIndicatorType(): RelDataType = { + val originalType = createTypeFromInternalType(InternalTypes.TIMESTAMP, isNullable = false) + canonize( + new TimeIndicatorRelDataType( + getTypeSystem, + originalType.asInstanceOf[BasicSqlType], + isEventTime = true) + ) + } + + /** * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory * * @param fieldNames field names @@ -129,6 +142,49 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp logicalRowTypeBuilder.build } + /** + * Created a struct type with the input table schema using FlinkTypeFactory + * @param tableSchema the table schema + * @return a struct type with the input fieldNames, input fieldTypes, and system fields + */ + def buildLogicalRowType(tableSchema: TableSchema, isStreaming: Option[Boolean]): RelDataType = { + buildRelDataType( + tableSchema.getFieldNames.toSeq, + tableSchema.getFieldTypes map { + case InternalTypes.PROCTIME_INDICATOR if isStreaming.isDefined && !isStreaming.get => + InternalTypes.TIMESTAMP + case InternalTypes.ROWTIME_INDICATOR if isStreaming.isDefined && !isStreaming.get => + InternalTypes.TIMESTAMP + case tpe: InternalType => tpe + }) + } + + def buildRelDataType( + fieldNames: Seq[String], + fieldTypes: Seq[InternalType]): RelDataType = { + buildRelDataType( + fieldNames, + fieldTypes, + fieldTypes.map(!FlinkTypeFactory.isTimeIndicatorType(_))) + } + + def buildRelDataType( + fieldNames: Seq[String], + fieldTypes: Seq[InternalType], + fieldNullables: Seq[Boolean]): RelDataType = { + val b = builder + val fields = fieldNames.zip(fieldTypes).zip(fieldNullables) + fields foreach { + case ((fieldName, fieldType), fieldNullable) => + if (FlinkTypeFactory.isTimeIndicatorType(fieldType) && fieldNullable) { + throw new TableException( + s"$fieldName can not be nullable because it is TimeIndicatorType!") + } + b.add(fieldName, createTypeFromInternalType(fieldType, fieldNullable)) + } + b.build + } + // ---------------------------------------------------------------------------------------------- override def getJavaClass(`type`: RelDataType): java.lang.reflect.Type = { @@ -221,6 +277,11 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp object FlinkTypeFactory { + def isTimeIndicatorType(t: InternalType): Boolean = t match { + case InternalTypes.ROWTIME_INDICATOR | InternalTypes.PROCTIME_INDICATOR => true + case _ => false + } + def toInternalType(relDataType: RelDataType): InternalType = relDataType.getSqlTypeName match { case BOOLEAN => InternalTypes.BOOLEAN case TINYINT => InternalTypes.BYTE diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala index 48b0bc6..fee8b3e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala @@ -18,11 +18,100 @@ package org.apache.flink.table.plan.nodes +import org.apache.flink.table.plan.nodes.ExpressionFormat.ExpressionFormat + import org.apache.calcite.rel.RelNode +import org.apache.calcite.rex._ +import org.apache.calcite.sql.SqlAsOperator +import org.apache.calcite.sql.SqlKind._ + +import scala.collection.JavaConversions._ /** * Base class for flink relational expression. */ trait FlinkRelNode extends RelNode { + private[flink] def getExpressionString( + expr: RexNode, + inFields: List[String], + localExprsTable: Option[List[RexNode]]): String = { + getExpressionString(expr, inFields, localExprsTable, ExpressionFormat.Prefix) + } + + private[flink] def getExpressionString( + expr: RexNode, + inFields: List[String], + localExprsTable: Option[List[RexNode]], + expressionFormat: ExpressionFormat): String = { + + expr match { + case pr: RexPatternFieldRef => + val alpha = pr.getAlpha + val field = inFields.get(pr.getIndex) + s"$alpha.$field" + + case i: RexInputRef => + inFields.get(i.getIndex) + + case l: RexLiteral => + l.toString + + case l: RexLocalRef if localExprsTable.isEmpty => + throw new IllegalArgumentException("Encountered RexLocalRef without " + + "local expression table") + + case l: RexLocalRef => + val lExpr = localExprsTable.get(l.getIndex) + getExpressionString(lExpr, inFields, localExprsTable, expressionFormat) + + case c: RexCall => + val op = c.getOperator.toString + val ops = c.getOperands.map( + getExpressionString(_, inFields, localExprsTable, expressionFormat)) + c.getOperator match { + case _ : SqlAsOperator => ops.head + case _ => + expressionFormat match { + case ExpressionFormat.Infix if ops.size() == 1 => + val operand = ops.head + c.getKind match { + case IS_FALSE | IS_NOT_FALSE | IS_TRUE | IS_NOT_TRUE | IS_UNKNOWN | IS_NULL | + IS_NOT_NULL => s"$operand $op" + case _ => s"$op($operand)" + } + case ExpressionFormat.Infix => s"(${ops.mkString(s" $op ")})" + case ExpressionFormat.PostFix => s"(${ops.mkString(", ")})$op" + case ExpressionFormat.Prefix => s"$op(${ops.mkString(", ")})" + } + } + + case fa: RexFieldAccess => + val referenceExpr = getExpressionString( + fa.getReferenceExpr, + inFields, + localExprsTable, + expressionFormat) + val field = fa.getField.getName + s"$referenceExpr.$field" + case cv: RexCorrelVariable => + cv.toString + case _ => + throw new IllegalArgumentException(s"Unknown expression type '${expr.getClass}': $expr") + } + } + +} + +/** + * Infix, Postfix and Prefix notations are three different but equivalent ways of writing + * expressions. It is easiest to demonstrate the differences by looking at examples of operators + * that take two operands. + * Infix notation: (X + Y) + * Postfix notation: (X Y) + + * Prefix notation: + (X Y) + */ +object ExpressionFormat extends Enumeration { + type ExpressionFormat = Value + val Infix, PostFix, Prefix = Value } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Expand.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Expand.scala new file mode 100644 index 0000000..6a3fc2a --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Expand.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.calcite + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.util.Litmus + +import java.util + +import scala.collection.JavaConversions._ + +/** + * Relational expression that apply a number of projects to every input row, + * hence we will get multiple output rows for an input row. + * + * <p/> Values of expand_id should be unique. + * + * @param cluster cluster that this relational expression belongs to + * @param traits the traits of this rel + * @param input input relational expression + * @param outputRowType output row type + * @param projects all projects, each project contains list of expressions for + * the output columns + * @param expandIdIndex expand_id('$e') field index + */ +abstract class Expand( + cluster: RelOptCluster, + traits: RelTraitSet, + input: RelNode, + outputRowType: RelDataType, + val projects: util.List[util.List[RexNode]], + val expandIdIndex: Int) + extends SingleRel(cluster, traits, input) { + + isValid(Litmus.THROW, null) + + override def isValid(litmus: Litmus, context: RelNode.Context): Boolean = { + if (projects.size() <= 1) { + return litmus.fail("Expand should output more than one rows, otherwise use Project.") + } + if (projects.exists(_.size != outputRowType.getFieldCount)) { + return litmus.fail("project filed count is not equal to output field count.") + } + if (expandIdIndex < 0 || expandIdIndex >= outputRowType.getFieldCount) { + return litmus.fail( + "expand_id field index should be greater than 0 and less than output field count.") + } + val expandIdValues = new util.HashSet[Any]() + for (project <- projects) { + project.get(expandIdIndex) match { + case literal: RexLiteral => expandIdValues.add(literal.getValue) + case _ => return litmus.fail("expand_id value should not be null.") + } + } + if (expandIdValues.size() != projects.size()) { + return litmus.fail("values of expand_id should be unique.") + } + litmus.succeed() + } + + override def deriveRowType(): RelDataType = outputRowType + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + val rowCnt = mq.getRowCount(this.getInput) * projects.size() + planner.getCostFactory.makeCost(rowCnt, rowCnt, 0) + } + + override def estimateRowCount(mq: RelMetadataQuery): Double = { + val childRowCnt = mq.getRowCount(this.getInput) + if (childRowCnt != null) { + childRowCnt * projects.size() + } else { + null.asInstanceOf[Double] + } + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalExpand.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalExpand.scala new file mode 100644 index 0000000..38ae0fb --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalExpand.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.calcite + +import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelNode, RelWriter} +import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexNode} + +import java.util + +import scala.collection.JavaConversions._ + +/** + * Sub-class of [[Expand]] that is a relational expression + * which returns multiple rows expanded from one input row. + * This class corresponds to Calcite logical rel. + */ +final class LogicalExpand( + cluster: RelOptCluster, + traits: RelTraitSet, + input: RelNode, + outputRowType: RelDataType, + projects: util.List[util.List[RexNode]], + expandIdIndex: Int) + extends Expand(cluster, traits, input, outputRowType, projects, expandIdIndex) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new LogicalExpand(cluster, traitSet, inputs.get(0), outputRowType, projects, expandIdIndex) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + val names = outputRowType.getFieldNames + val terms = projects.map { + project => + project.zipWithIndex.map { + case (r: RexInputRef, i: Int) => s"${names.get(i)}=[${r.getName}]" + case (l: RexLiteral, i: Int) => s"${names.get(i)}=[${l.getValue3}]" + case (o, _) => s"$o" + }.mkString("{", ", ", "}") + }.mkString(", ") + super.explainTerms(pw).item("projects", terms) + } +} + +object LogicalExpand { + def create( + input: RelNode, + outputRowType: RelDataType, + projects: util.List[util.List[RexNode]], + expandIdIndex: Int): LogicalExpand = { + val traits = input.getCluster.traitSetOf(Convention.NONE) + new LogicalExpand(input.getCluster, traits, input, outputRowType, projects, expandIdIndex) + } +} + diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalRank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalRank.scala new file mode 100644 index 0000000..c5eb9ab --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalRank.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.calcite + +import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelCollation, RelNode} +import org.apache.calcite.sql.SqlRankFunction +import org.apache.calcite.util.ImmutableBitSet + +import java.util + +import scala.collection.JavaConversions._ + +/** + * Sub-class of [[Rank]] that is a relational expression which returns + * the rows in which the rank function value of each row is in the given range. + * This class corresponds to Calcite logical rel. + */ +final class LogicalRank( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + rankFunction: SqlRankFunction, + partitionKey: ImmutableBitSet, + sortCollation: RelCollation, + rankRange: RankRange) + extends Rank( + cluster, + traitSet, + input, + rankFunction, + partitionKey, + sortCollation, + rankRange) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new LogicalRank( + cluster, + traitSet, + inputs.head, + rankFunction, + partitionKey, + sortCollation, + rankRange + ) + } +} + +object LogicalRank { + + def create( + input: RelNode, + rankFunction: SqlRankFunction, + partitionKey: ImmutableBitSet, + sortCollation: RelCollation, + rankRange: RankRange): LogicalRank = { + val traits = input.getCluster.traitSetOf(Convention.NONE) + new LogicalRank( + input.getCluster, + traits, + input, + rankFunction, + partitionKey, + sortCollation, + rankRange + ) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalSink.scala new file mode 100644 index 0000000..dcd6a29 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalSink.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.calcite + +import java.util + +import org.apache.flink.table.sinks.TableSink + +import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode + +import scala.collection.JavaConversions._ + +/** + * Sub-class of [[Sink]] that is a relational expression + * which writes out data of input node into a [[TableSink]]. + * This class corresponds to Calcite logical rel. + */ +final class LogicalSink( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + sink: TableSink[_], + sinkName: String) + extends Sink(cluster, traitSet, input, sink, sinkName) { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new LogicalSink(cluster, traitSet, inputs.head, sink, sinkName) + } + +} + +object LogicalSink { + + def create(input: RelNode, + sink: TableSink[_], + sinkName: String): LogicalSink = { + val traits = input.getCluster.traitSetOf(Convention.NONE) + new LogicalSink(input.getCluster, traits, input, sink, sinkName) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWatermarkAssigner.scala similarity index 54% copy from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala copy to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWatermarkAssigner.scala index 48b0bc6..ab92a16 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWatermarkAssigner.scala @@ -16,13 +16,28 @@ * limitations under the License. */ -package org.apache.flink.table.plan.nodes +package org.apache.flink.table.plan.nodes.calcite +import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode +import java.util + /** - * Base class for flink relational expression. + * Sub-class of [[WatermarkAssigner]] that is a relational operator + * which generates [[org.apache.flink.streaming.api.watermark.Watermark]]. + * This class corresponds to Calcite logical rel. */ -trait FlinkRelNode extends RelNode { +final class LogicalWatermarkAssigner( + cluster: RelOptCluster, + traits: RelTraitSet, + input: RelNode, + rowtimeField: String, + watermarkOffset: Long) + extends WatermarkAssigner(cluster, traits, input, rowtimeField, watermarkOffset) { + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new LogicalWatermarkAssigner(cluster, traits, inputs.get(0), rowtimeField, watermarkOffset) + } } + diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala new file mode 100644 index 0000000..b7008f8 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.calcite + +import org.apache.flink.table.api.TableException +import org.apache.flink.table.plan.util._ + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel} +import org.apache.calcite.sql.SqlRankFunction +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.util.{ImmutableBitSet, NumberUtil} + +import java.util +import scala.collection.JavaConversions._ + +/** + * Relational expression that returns the rows in which the rank function value of each row + * is in the given range. + * + * <p>NOTES: Different from [[org.apache.calcite.sql.fun.SqlStdOperatorTable.RANK]], + * [[Rank]] is a Relational expression, not a window function. + * + * <p>[[Rank]] will output rank function value as its last column. + * + * <p>This RelNode only handles single rank function, is an optimization for some cases. e.g. + * <ol> + * <li> + * single rank function (on `OVER`) with filter in a SQL query statement + * </li> + * <li> + * `ORDER BY` with `LIMIT` in a SQL query statement + * (equivalent to `ROW_NUMBER` with filter and project) + * </li> + * </ol> + * + * @param cluster cluster that this relational expression belongs to + * @param traitSet the traits of this rel + * @param input input relational expression + * @param rankFunction rank function, including: CUME_DIST, DENSE_RANK, PERCENT_RANK, RANK, + * ROW_NUMBER + * @param partitionKey partition keys (may be empty) + * @param sortCollation order keys for rank function + * @param rankRange the expected range of rank function value + */ +abstract class Rank( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + val rankFunction: SqlRankFunction, + val partitionKey: ImmutableBitSet, + val sortCollation: RelCollation, + val rankRange: RankRange) + extends SingleRel(cluster, traitSet, input) { + + rankRange match { + case r: ConstantRankRange => + if (r.rankEnd <= 0) { + throw new TableException(s"Rank end can't smaller than zero. The rank end is ${r.rankEnd}") + } + if (r.rankStart > r.rankEnd) { + throw new TableException( + s"Rank start '${r.rankStart}' can't greater than rank end '${r.rankEnd}'.") + } + case v: VariableRankRange => + if (v.rankEndIndex < 0) { + throw new TableException(s"Rank end index can't smaller than zero.") + } + if (v.rankEndIndex >= input.getRowType.getFieldCount) { + throw new TableException(s"Rank end index can't greater than input field count.") + } + } + + override def deriveRowType(): RelDataType = { + val typeFactory = cluster.getRexBuilder.getTypeFactory + val typeBuilder = typeFactory.builder() + input.getRowType.getFieldList.foreach(typeBuilder.add) + // rank function column is always the last column, and its type is BIGINT NOT NULL + val allFieldNames = new util.HashSet[String]() + allFieldNames.addAll(input.getRowType.getFieldNames) + val rankFieldName = FlinkRelOptUtil.buildUniqueFieldName(allFieldNames, "rk") + val bigIntType = typeFactory.createSqlType(SqlTypeName.BIGINT) + typeBuilder.add(rankFieldName, typeFactory.createTypeWithNullability(bigIntType, false)) + typeBuilder.build() + } + + override def explainTerms(pw: RelWriter): RelWriter = { + val select = getRowType.getFieldNames.zipWithIndex.map { + case (name, idx) => s"$name=$$$idx" + }.mkString(", ") + super.explainTerms(pw) + .item("rankFunction", rankFunction) + .item("partitionBy", partitionKey.map(i => s"$$$i").mkString(",")) + .item("orderBy", Rank.sortFieldsToString(sortCollation)) + .item("rankRange", rankRange.toString()) + .item("select", select) + } + + override def estimateRowCount(mq: RelMetadataQuery): Double = { + val countPerGroup = FlinkRelMdUtil.getRankRangeNdv(rankRange) + if (partitionKey.isEmpty) { + // only one group + countPerGroup + } else { + val inputRowCount = mq.getRowCount(input) + val numOfGroup = mq.getDistinctRowCount(input, partitionKey, null) + if (numOfGroup != null) { + NumberUtil.min(numOfGroup * countPerGroup, inputRowCount) + } else { + NumberUtil.min(mq.getRowCount(input) * 0.1, inputRowCount) + } + } + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + val rowCount = mq.getRowCount(input) + val cpuCost = rowCount + planner.getCostFactory.makeCost(rowCount, cpuCost, 0) + } + +} + +sealed trait RankRange extends Serializable { + def toString(inputFieldNames: Seq[String]): String +} + +/** [[ConstantRankRangeWithoutEnd]] is a RankRange which not specify RankEnd. */ +case class ConstantRankRangeWithoutEnd(rankStart: Long) extends RankRange { + override def toString(inputFieldNames: Seq[String]): String = this.toString + + override def toString: String = s"rankStart=$rankStart" +} + +/** rankStart and rankEnd are inclusive, rankStart always start from one. */ +case class ConstantRankRange(rankStart: Long, rankEnd: Long) extends RankRange { + + override def toString(inputFieldNames: Seq[String]): String = this.toString + + override def toString: String = s"rankStart=$rankStart, rankEnd=$rankEnd" +} + +/** changing rank limit depends on input */ +case class VariableRankRange(rankEndIndex: Int) extends RankRange { + override def toString(inputFieldNames: Seq[String]): String = { + s"rankEnd=${inputFieldNames(rankEndIndex)}" + } + + override def toString: String = { + s"rankEnd=$$$rankEndIndex" + } +} + +object Rank { + def sortFieldsToString(collationSort: RelCollation): String = { + val fieldCollations = collationSort.getFieldCollations + .map(c => (c.getFieldIndex, SortUtil.directionToOrder(c.getDirection))) + + fieldCollations.map { + case (index, order) => s"$$$index ${order.getShortName}" + }.mkString(", ") + } + + def sortFieldsToString(collationSort: RelCollation, inputType: RelDataType): String = { + val fieldCollations = collationSort.getFieldCollations + .map(c => (c.getFieldIndex, SortUtil.directionToOrder(c.getDirection))) + val inputFieldNames = inputType.getFieldNames + + fieldCollations.map { + case (index, order) => s"${inputFieldNames.get(index)} ${order.getShortName}" + }.mkString(", ") + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Sink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Sink.scala new file mode 100644 index 0000000..29623c3 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Sink.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.calcite + +import org.apache.flink.table.`type`.TypeConverters +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.sinks.TableSink + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} + +/** + * Relational expression that writes out data of input node into a [[TableSink]]. + * + * @param cluster cluster that this relational expression belongs to + * @param traitSet the traits of this rel + * @param input input relational expression + * @param sink Table sink to write into + * @param sinkName Name of tableSink, which is not required property, that is, it could be null + */ +abstract class Sink( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + val sink: TableSink[_], + val sinkName: String) + extends SingleRel(cluster, traitSet, input) { + + override def deriveRowType(): RelDataType = { + val typeFactory = getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + val outputType = sink.getOutputType + val internalType = TypeConverters.createInternalTypeFromTypeInfo(outputType) + typeFactory.createTypeFromInternalType(internalType, isNullable = true) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw) + .itemIf("name", sinkName, sinkName != null) + .item("fields", sink.getFieldNames.mkString(", ")) + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala new file mode 100644 index 0000000..5976648 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.calcite + +import org.apache.flink.table.calcite.FlinkTypeFactory + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} + +import scala.collection.JavaConversions._ + +/** + * Relational operator that generates [[org.apache.flink.streaming.api.watermark.Watermark]]. + */ +abstract class WatermarkAssigner( + cluster: RelOptCluster, + traits: RelTraitSet, + inputNode: RelNode, + val rowtimeField: String, + val watermarkOffset: Long) + extends SingleRel(cluster, traits, inputNode) { + + override def deriveRowType(): RelDataType = { + val inputRowType = inputNode.getRowType + val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + + val newFieldList = inputRowType.getFieldList.map { f => + if (f.getName.equals(rowtimeField)) { + val rowtimeIndicatorType = typeFactory.createRowtimeIndicatorType() + new RelDataTypeFieldImpl(rowtimeField, f.getIndex, rowtimeIndicatorType) + } else { + f + } + } + + val builder = typeFactory.builder + builder.addAll(newFieldList) + builder.build() + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw) + .item("fields", getRowType.getFieldNames) + .item("rowtimeField", rowtimeField) + .item("watermarkOffset", watermarkOffset) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala new file mode 100644 index 0000000..32ca930 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalAggregate.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.util.AggregateUtil + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.{Aggregate, AggregateCall} +import org.apache.calcite.rel.logical.LogicalAggregate +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.sql.SqlKind +import org.apache.calcite.util.ImmutableBitSet + +import java.util + +import scala.collection.JavaConversions._ + +/** + * Sub-class of [[Aggregate]] that is a relational operator which eliminates + * duplicates and computes totals in Flink. + */ +class FlinkLogicalAggregate( + cluster: RelOptCluster, + traitSet: RelTraitSet, + child: RelNode, + indicator: Boolean, + groupSet: ImmutableBitSet, + groupSets: util.List[ImmutableBitSet], + aggCalls: util.List[AggregateCall]) + extends Aggregate(cluster, traitSet, child, indicator, groupSet, groupSets, aggCalls) + with FlinkLogicalRel { + + override def copy( + traitSet: RelTraitSet, + input: RelNode, + indicator: Boolean, + groupSet: ImmutableBitSet, + groupSets: util.List[ImmutableBitSet], + aggCalls: util.List[AggregateCall]): Aggregate = { + new FlinkLogicalAggregate( + cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + if (getGroupSets.size > 1 || AggregateUtil.getGroupIdExprIndexes(getAggCallList).nonEmpty) { + planner.getCostFactory.makeInfiniteCost() + } else { + val child = this.getInput + val rowCnt = mq.getRowCount(child) + val rowSize = mq.getAverageRowSize(child) + val aggCnt = this.getAggCallList.size + // group by CPU cost(multiple by 1.1 to encourage less group keys) + agg call CPU cost + val cpuCost: Double = rowCnt * getGroupCount * 1.1 + rowCnt * aggCnt + planner.getCostFactory.makeCost(rowCnt, cpuCost, rowCnt * rowSize) + } + } + +} + +private class FlinkLogicalAggregateBatchConverter + extends ConverterRule( + classOf[LogicalAggregate], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalAggregateBatchConverter") { + + override def matches(call: RelOptRuleCall): Boolean = { + val agg = call.rel(0).asInstanceOf[LogicalAggregate] + + // we do not support these functions natively + // they have to be converted using the FlinkAggregateReduceFunctionsRule + val supported = agg.getAggCallList.map(_.getAggregation.getKind).forall { + // we support AVG + case SqlKind.AVG => true + // but none of the other AVG agg functions + case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false + case _ => true + } + + val hasAccurateDistinctCall = AggregateUtil.containsAccurateDistinctCall(agg.getAggCallList) + + !hasAccurateDistinctCall && supported + } + + override def convert(rel: RelNode): RelNode = { + val agg = rel.asInstanceOf[LogicalAggregate] + val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL) + FlinkLogicalAggregate.create( + newInput, + agg.indicator, + agg.getGroupSet, + agg.getGroupSets, + agg.getAggCallList) + } +} + +private class FlinkLogicalAggregateStreamConverter + extends ConverterRule( + classOf[LogicalAggregate], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalAggregateStreamConverter") { + + override def matches(call: RelOptRuleCall): Boolean = { + val agg = call.rel(0).asInstanceOf[LogicalAggregate] + + // we do not support these functions natively + // they have to be converted using the FlinkAggregateReduceFunctionsRule + agg.getAggCallList.map(_.getAggregation.getKind).forall { + case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false + case _ => true + } + } + + override def convert(rel: RelNode): RelNode = { + val agg = rel.asInstanceOf[LogicalAggregate] + val newInput = RelOptRule.convert(agg.getInput, FlinkConventions.LOGICAL) + FlinkLogicalAggregate.create( + newInput, + agg.indicator, + agg.getGroupSet, + agg.getGroupSets, + agg.getAggCallList) + } +} + +object FlinkLogicalAggregate { + val BATCH_CONVERTER: ConverterRule = new FlinkLogicalAggregateBatchConverter() + val STREAM_CONVERTER: ConverterRule = new FlinkLogicalAggregateStreamConverter() + + def create( + input: RelNode, + indicator: Boolean, + groupSet: ImmutableBitSet, + groupSets: util.List[ImmutableBitSet], + aggCalls: util.List[AggregateCall]): FlinkLogicalAggregate = { + val cluster = input.getCluster + val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() + new FlinkLogicalAggregate(cluster, traitSet, input, indicator, groupSet, groupSets, aggCalls) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala new file mode 100644 index 0000000..61a2244 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.util.CalcUtil + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rel.logical.LogicalCalc +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter} +import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexProgram} + +import scala.collection.JavaConversions._ + +/** + * Sub-class of [[Calc]] that is a relational expression which computes project expressions + * and also filters in Flink. + */ +class FlinkLogicalCalc( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + calcProgram: RexProgram) + extends Calc(cluster, traitSet, input, calcProgram) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = { + new FlinkLogicalCalc(cluster, traitSet, child, program) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + FlinkLogicalCalc.computeCost(calcProgram, planner, mq, this) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + pw.input("input", getInput) + .item("select", CalcUtil.selectionToString(calcProgram, getExpressionString)) + .itemIf("where", + CalcUtil.conditionToString(calcProgram, getExpressionString), + calcProgram.getCondition != null) + } + +} + +private class FlinkLogicalCalcConverter + extends ConverterRule( + classOf[LogicalCalc], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalCalcConverter") { + + override def convert(rel: RelNode): RelNode = { + val calc = rel.asInstanceOf[LogicalCalc] + val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.LOGICAL) + FlinkLogicalCalc.create(newInput, calc.getProgram) + } +} + +object FlinkLogicalCalc { + val CONVERTER: ConverterRule = new FlinkLogicalCalcConverter() + + def create( + input: RelNode, + calcProgram: RexProgram): FlinkLogicalCalc = { + val cluster = input.getCluster + val traitSet = cluster.traitSet.replace(FlinkConventions.LOGICAL).simplify() + new FlinkLogicalCalc(cluster, traitSet, input, calcProgram) + } + + def computeCost( + calcProgram: RexProgram, + planner: RelOptPlanner, + mq: RelMetadataQuery, + calc: Calc): RelOptCost = { + // compute number of expressions that do not access a field or literal, i.e. computations, + // conditions, etc. We only want to account for computations, not for simple projections. + // CASTs in RexProgram are reduced as far as possible by ReduceExpressionsRule + // in normalization stage. So we should ignore CASTs here in optimization stage. + val compCnt = calcProgram.getProjectList.map(calcProgram.expandLocalRef).toList.count { + case _: RexInputRef => false + case _: RexLiteral => false + case c: RexCall if c.getOperator.getName.equals("CAST") => false + case _ => true + } + val newRowCnt = mq.getRowCount(calc) + // TODO use inputRowCnt to compute cpu cost + planner.getCostFactory.makeCost(newRowCnt, newRowCnt * compCnt, 0) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala new file mode 100644 index 0000000..53800f6 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCorrelate.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.plan.nodes.FlinkConventions + +import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.{Correlate, CorrelationId} +import org.apache.calcite.rel.logical.LogicalCorrelate +import org.apache.calcite.sql.SemiJoinType +import org.apache.calcite.util.ImmutableBitSet + +/** + * Sub-class of [[Correlate]] that is a relational operator + * which performs nested-loop joins in Flink. + */ +class FlinkLogicalCorrelate( + cluster: RelOptCluster, + traitSet: RelTraitSet, + left: RelNode, + right: RelNode, + correlationId: CorrelationId, + requiredColumns: ImmutableBitSet, + joinType: SemiJoinType) + extends Correlate(cluster, traitSet, left, right, correlationId, requiredColumns, joinType) + with FlinkLogicalRel { + + override def copy( + traitSet: RelTraitSet, + left: RelNode, + right: RelNode, + correlationId: CorrelationId, + requiredColumns: ImmutableBitSet, + joinType: SemiJoinType): Correlate = { + + new FlinkLogicalCorrelate( + cluster, + traitSet, + left, + right, + correlationId, + requiredColumns, + joinType) + } + +} + +class FlinkLogicalCorrelateConverter + extends ConverterRule( + classOf[LogicalCorrelate], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalCorrelateConverter") { + + override def convert(rel: RelNode): RelNode = { + val correlate = rel.asInstanceOf[LogicalCorrelate] + val newLeft = RelOptRule.convert(correlate.getLeft, FlinkConventions.LOGICAL) + val newRight = RelOptRule.convert(correlate.getRight, FlinkConventions.LOGICAL) + FlinkLogicalCorrelate.create( + newLeft, + newRight, + correlate.getCorrelationId, + correlate.getRequiredColumns, + correlate.getJoinType) + } +} + +object FlinkLogicalCorrelate { + val CONVERTER: ConverterRule = new FlinkLogicalCorrelateConverter() + + def create( + left: RelNode, + right: RelNode, + correlationId: CorrelationId, + requiredColumns: ImmutableBitSet, + joinType: SemiJoinType): FlinkLogicalCorrelate = { + val cluster = left.getCluster + val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() + new FlinkLogicalCorrelate( + cluster, + traitSet, + left, + right, + correlationId, + requiredColumns, + joinType) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala index b16f629..f6ca632 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala @@ -30,6 +30,10 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import java.util +/** + * Sub-class of [[TableScan]] that is a relational operator + * which returns the contents of a [[DataStreamTable]] in Flink. + */ class FlinkLogicalDataStreamTableScan( cluster: RelOptCluster, traitSet: RelTraitSet, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalExpand.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalExpand.scala new file mode 100644 index 0000000..b7f37d0 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalExpand.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.calcite.{Expand, LogicalExpand} + +import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.{RelNode, RelWriter} +import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexNode} + +import java.util + +import scala.collection.JavaConversions._ + +/** + * Sub-class of [[Expand]] that is a relational expression + * which returns multiple rows expanded from one input row. + */ +class FlinkLogicalExpand( + cluster: RelOptCluster, + traits: RelTraitSet, + input: RelNode, + outputRowType: RelDataType, + projects: util.List[util.List[RexNode]], + expandIdIndex: Int) + extends Expand(cluster, traits, input, outputRowType, projects, expandIdIndex) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new FlinkLogicalExpand(cluster, traitSet, inputs.get(0), outputRowType, projects, expandIdIndex) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + val names = outputRowType.getFieldNames + val terms = projects.map { + project => + project.zipWithIndex.map { + case (r: RexInputRef, i: Int) => s"${names.get(i)}=[${r.getName}]" + case (l: RexLiteral, i: Int) => s"${names.get(i)}=[${l.getValue3}]" + case (o, _) => s"$o" + }.mkString("{", ", ", "}") + }.mkString(", ") + super.explainTerms(pw).item("projects", terms) + } + +} + +private class FlinkLogicalExpandConverter + extends ConverterRule( + classOf[LogicalExpand], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalExpandConverter") { + + override def convert(rel: RelNode): RelNode = { + val expand = rel.asInstanceOf[LogicalExpand] + val newInput = RelOptRule.convert(expand.getInput, FlinkConventions.LOGICAL) + FlinkLogicalExpand.create( + newInput, + expand.getRowType, + expand.projects, + expand.expandIdIndex) + } +} + +object FlinkLogicalExpand { + val CONVERTER: ConverterRule = new FlinkLogicalExpandConverter() + + def create( + input: RelNode, + outputRowType: RelDataType, + projects: util.List[util.List[RexNode]], + expandIdIndex: Int): FlinkLogicalExpand = { + val cluster = input.getCluster + val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() + new FlinkLogicalExpand( + cluster, + traitSet, + input, + outputRowType, + projects, + expandIdIndex) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalIntersect.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalIntersect.scala new file mode 100644 index 0000000..6c6a857 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalIntersect.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.plan.nodes.FlinkConventions + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.{Intersect, SetOp} +import org.apache.calcite.rel.logical.LogicalIntersect +import org.apache.calcite.rel.metadata.RelMetadataQuery + +import java.util + +import scala.collection.JavaConversions._ + +/** + * Sub-class of [[Intersect]] that is a relational expression + * which returns the intersection of the rows of its inputs in Flink. + */ +class FlinkLogicalIntersect( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputs: util.List[RelNode], + all: Boolean) + extends Intersect(cluster, traitSet, inputs, all) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode], all: Boolean): SetOp = { + new FlinkLogicalIntersect(cluster, traitSet, inputs, all) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + val zeroCost = planner.getCostFactory.makeCost(0, 0, 0) + this.getInputs.foldLeft(zeroCost) { + (cost, input) => + val rowCnt = mq.getRowCount(input) + val rowSize = mq.getAverageRowSize(input) + val inputCost = planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize) + cost.plus(inputCost) + } + } + +} + +private class FlinkLogicalIntersectConverter + extends ConverterRule( + classOf[LogicalIntersect], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalIntersectConverter") { + + override def convert(rel: RelNode): RelNode = { + val intersect = rel.asInstanceOf[LogicalIntersect] + val newInputs = intersect.getInputs.map { + input => RelOptRule.convert(input, FlinkConventions.LOGICAL) + } + FlinkLogicalIntersect.create(newInputs, intersect.all) + } +} + +object FlinkLogicalIntersect { + val CONVERTER: ConverterRule = new FlinkLogicalIntersectConverter() + + def create( + inputs: util.List[RelNode], + all: Boolean): FlinkLogicalIntersect = { + val cluster = inputs.get(0).getCluster + val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() + new FlinkLogicalIntersect(cluster, traitSet, inputs, all) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala new file mode 100644 index 0000000..d33630b --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalJoin.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.plan.nodes.FlinkConventions + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.{CorrelationId, Join, JoinRelType} +import org.apache.calcite.rel.logical.LogicalJoin +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.RexNode + +import scala.collection.JavaConversions._ + +/** + * Sub-class of [[Join]] that is a relational expression + * which combines two relational expressions according to some condition in Flink. + */ +class FlinkLogicalJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + left: RelNode, + right: RelNode, + condition: RexNode, + joinType: JoinRelType) + extends Join(cluster, traitSet, left, right, condition, Set.empty[CorrelationId], joinType) + with FlinkLogicalRel { + + override def copy( + traitSet: RelTraitSet, + conditionExpr: RexNode, + left: RelNode, + right: RelNode, + joinType: JoinRelType, + semiJoinDone: Boolean): Join = { + new FlinkLogicalJoin(cluster, traitSet, left, right, conditionExpr, joinType) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + val leftRowCnt = mq.getRowCount(getLeft) + val leftRowSize = mq.getAverageRowSize(getLeft) + + val rightRowCnt = mq.getRowCount(getRight) + val rightRowSize = mq.getAverageRowSize(getRight) + + val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize) + val cpuCost = leftRowCnt + rightRowCnt + val rowCnt = leftRowCnt + rightRowCnt + + planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost) + } +} + +/** + * Support all joins. + */ +private class FlinkLogicalJoinConverter + extends ConverterRule( + classOf[LogicalJoin], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalJoinConverter") { + + override def convert(rel: RelNode): RelNode = { + val join = rel.asInstanceOf[LogicalJoin] + val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL) + val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL) + FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, join.getJoinType) + } +} + +object FlinkLogicalJoin { + val CONVERTER: ConverterRule = new FlinkLogicalJoinConverter + + def create( + left: RelNode, + right: RelNode, + condition: RexNode, + joinType: JoinRelType): FlinkLogicalJoin = { + val cluster = left.getCluster + val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() + new FlinkLogicalJoin(cluster, traitSet, left, right, condition, joinType) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalMinus.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalMinus.scala new file mode 100644 index 0000000..870262b --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalMinus.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.plan.nodes.FlinkConventions + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.{Minus, SetOp} +import org.apache.calcite.rel.logical.LogicalMinus +import org.apache.calcite.rel.metadata.RelMetadataQuery + +import java.util.{List => JList} + +import scala.collection.JavaConversions._ + +/** + * Sub-class of [[Minus]] that is a relational expression which returns the rows of + * its first input minus any matching rows from its other inputs in Flink. + */ +class FlinkLogicalMinus( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputs: JList[RelNode], + all: Boolean) + extends Minus(cluster, traitSet, inputs, all) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: JList[RelNode], all: Boolean): SetOp = { + new FlinkLogicalMinus(cluster, traitSet, inputs, all) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + val zeroCost = planner.getCostFactory.makeCost(0, 0, 0) + this.getInputs.foldLeft(zeroCost) { + (cost, input) => + val rowCnt = mq.getRowCount(input) + val rowSize = mq.getAverageRowSize(input) + val inputCost = planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize) + cost.plus(inputCost) + } + } + +} + +private class FlinkLogicalMinusConverter + extends ConverterRule( + classOf[LogicalMinus], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalMinusConverter") { + + override def convert(rel: RelNode): RelNode = { + val minus = rel.asInstanceOf[LogicalMinus] + val newInputs = minus.getInputs.map { + input => RelOptRule.convert(input, FlinkConventions.LOGICAL) + } + FlinkLogicalMinus.create(newInputs, minus.all) + } +} + +object FlinkLogicalMinus { + val CONVERTER: ConverterRule = new FlinkLogicalMinusConverter() + + def create(inputs: JList[RelNode], all: Boolean): FlinkLogicalMinus = { + val cluster = inputs.get(0).getCluster + val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() + new FlinkLogicalMinus(cluster, traitSet, inputs, all) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala new file mode 100644 index 0000000..9550bb6 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.plan.nodes.FlinkConventions + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.Window +import org.apache.calcite.rel.logical.LogicalWindow +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.sql.SqlRankFunction + +import java.util.{List => JList} + +import scala.collection.JavaConversions._ + +/** + * Sub-class of [[Window]] that is a relational expression + * which represents a set of over window aggregates in Flink. + */ +class FlinkLogicalOverWindow( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + windowConstants: JList[RexLiteral], + rowType: RelDataType, + windowGroups: JList[Window.Group]) + extends Window(cluster, traitSet, input, windowConstants, rowType, windowGroups) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = { + new FlinkLogicalOverWindow( + cluster, + traitSet, + inputs.get(0), + windowConstants, + getRowType, + windowGroups) + } + +} + +class FlinkLogicalOverWindowConverter + extends ConverterRule( + classOf[LogicalWindow], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalOverWindowConverter") { + + override def convert(rel: RelNode): RelNode = { + val window = rel.asInstanceOf[LogicalWindow] + val cluster = rel.getCluster + val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() + val newInput = RelOptRule.convert(window.getInput, FlinkConventions.LOGICAL) + + window.groups.foreach { group => + val orderKeySize = group.orderKeys.getFieldCollations.size() + group.aggCalls.foreach { winAggCall => + if (orderKeySize == 0 && winAggCall.op.isInstanceOf[SqlRankFunction]) { + throw new ValidationException("Over Agg: The window rank function without order by. " + + "please re-check the over window statement.") + } + } + } + + new FlinkLogicalOverWindow( + rel.getCluster, + traitSet, + newInput, + window.constants, + window.getRowType, + window.groups) + } +} + +object FlinkLogicalOverWindow { + val CONVERTER = new FlinkLogicalOverWindowConverter +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRank.scala new file mode 100644 index 0000000..c93fd29 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRank.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.calcite.{LogicalRank, Rank, RankRange} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter} +import org.apache.calcite.sql.SqlRankFunction +import org.apache.calcite.util.ImmutableBitSet + +import java.util + +import scala.collection.JavaConversions._ + +/** + * Sub-class of [[Rank]] that is a relational expression which returns + * the rows in which the rank function value of each row is in the given range. + */ +class FlinkLogicalRank( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + rankFunction: SqlRankFunction, + partitionKey: ImmutableBitSet, + sortCollation: RelCollation, + rankRange: RankRange, + val outputRankFunColumn: Boolean) + extends Rank(cluster, traitSet, input, rankFunction, partitionKey, sortCollation, rankRange) + with FlinkLogicalRel { + + override def deriveRowType(): RelDataType = { + if (outputRankFunColumn) { + super.deriveRowType() + } else { + input.getRowType + } + } + + override def explainTerms(pw: RelWriter): RelWriter = { + val inputFieldNames = input.getRowType.getFieldNames + pw.item("input", getInput) + .item("rankFunction", rankFunction) + .item("partitionBy", partitionKey.map(inputFieldNames.get(_)).mkString(",")) + .item("orderBy", Rank.sortFieldsToString(sortCollation, input.getRowType)) + .item("rankRange", rankRange.toString(inputFieldNames)) + .item("select", getRowType.getFieldNames.mkString(", ")) + } + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new FlinkLogicalRank( + cluster, + traitSet, + inputs.get(0), + rankFunction, + partitionKey, + sortCollation, + rankRange, + outputRankFunColumn) + } + +} + +private class FlinkLogicalRankConverter extends ConverterRule( + classOf[LogicalRank], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalRankConverter") { + override def convert(rel: RelNode): RelNode = { + val rank = rel.asInstanceOf[LogicalRank] + val newInput = RelOptRule.convert(rank.getInput, FlinkConventions.LOGICAL) + FlinkLogicalRank.create( + newInput, + rank.rankFunction, + rank.partitionKey, + rank.sortCollation, + rank.rankRange, + outputRankFunColumn = true + ) + } +} + +object FlinkLogicalRank { + val CONVERTER: ConverterRule = new FlinkLogicalRankConverter + + def create( + input: RelNode, + rankFunction: SqlRankFunction, + partitionKey: ImmutableBitSet, + sortCollation: RelCollation, + rankRange: RankRange, + outputRankFunColumn: Boolean): FlinkLogicalRank = { + val cluster = input.getCluster + val traits = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() + new FlinkLogicalRank(cluster, traits, input, rankFunction, partitionKey, + sortCollation, rankRange, outputRankFunColumn) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSink.scala new file mode 100644 index 0000000..28b98a2 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSink.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.calcite.{LogicalSink, Sink} +import org.apache.flink.table.sinks.TableSink + +import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRule, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule + +import java.util + +import scala.collection.JavaConversions._ + +/** + * Sub-class of [[Sink]] that is a relational expression + * which writes out data of input node into a [[TableSink]]. + */ +class FlinkLogicalSink( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + sink: TableSink[_], + sinkName: String) + extends Sink(cluster, traitSet, input, sink, sinkName) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new FlinkLogicalSink(cluster, traitSet, inputs.head, sink, sinkName) + } + +} + +private class FlinkLogicalSinkConverter + extends ConverterRule( + classOf[LogicalSink], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalSinkConverter") { + + override def convert(rel: RelNode): RelNode = { + val sink = rel.asInstanceOf[LogicalSink] + val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL) + FlinkLogicalSink.create( + newInput, + sink.sink, + sink.sinkName) + } +} + +object FlinkLogicalSink { + val CONVERTER: ConverterRule = new FlinkLogicalSinkConverter() + + def create( + input: RelNode, + sink: TableSink[_], + sinkName: String): FlinkLogicalSink = { + val cluster = input.getCluster + val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() + new FlinkLogicalSink(cluster, traitSet, input, sink, sinkName) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSort.scala new file mode 100644 index 0000000..38e7bd5 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalSort.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.plan.nodes.FlinkConventions + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.Sort +import org.apache.calcite.rel.logical.LogicalSort +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode} +import org.apache.calcite.rex.{RexLiteral, RexNode} + +/** + * Sub-class of [[Sort]] that is a relational expression which imposes + * a particular sort order on its input without otherwise changing its content in Flink. + */ +class FlinkLogicalSort( + cluster: RelOptCluster, + traits: RelTraitSet, + child: RelNode, + collation: RelCollation, + sortOffset: RexNode, + sortFetch: RexNode) + extends Sort(cluster, traits, child, collation, sortOffset, sortFetch) + with FlinkLogicalRel { + + private lazy val limitStart: Long = if (offset != null) RexLiteral.intValue(offset) else 0L + + override def copy( + traitSet: RelTraitSet, + newInput: RelNode, + newCollation: RelCollation, + offset: RexNode, + fetch: RexNode): Sort = { + new FlinkLogicalSort(cluster, traitSet, newInput, newCollation, offset, fetch) + } + + override def estimateRowCount(mq: RelMetadataQuery): Double = { + val inputRowCnt = mq.getRowCount(this.getInput) + if (inputRowCnt == null) { + inputRowCnt + } else { + val rowCount = (inputRowCnt - limitStart).max(1.0) + if (fetch != null) { + val limit = RexLiteral.intValue(fetch) + rowCount.min(limit) + } else { + rowCount + } + } + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + // by default, assume cost is proportional to number of rows + val rowCount: Double = mq.getRowCount(this) + planner.getCostFactory.makeCost(rowCount, rowCount, 0) + } + +} + +class FlinkLogicalSortStreamConverter + extends ConverterRule( + classOf[LogicalSort], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalSortStreamConverter") { + + override def convert(rel: RelNode): RelNode = { + val sort = rel.asInstanceOf[LogicalSort] + val newInput = RelOptRule.convert(sort.getInput, FlinkConventions.LOGICAL) + FlinkLogicalSort.create(newInput, sort.getCollation, sort.offset, sort.fetch) + } +} + +class FlinkLogicalSortBatchConverter extends ConverterRule( + classOf[LogicalSort], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalSortBatchConverter") { + + override def convert(rel: RelNode): RelNode = { + val sort = rel.asInstanceOf[LogicalSort] + val newInput = RelOptRule.convert(sort.getInput, FlinkConventions.LOGICAL) + // TODO supports range sort + FlinkLogicalSort.create(newInput, sort.getCollation, sort.offset, sort.fetch) + } +} + +object FlinkLogicalSort { + val BATCH_CONVERTER: RelOptRule = new FlinkLogicalSortBatchConverter + val STREAM_CONVERTER: RelOptRule = new FlinkLogicalSortStreamConverter + + def create( + input: RelNode, + collation: RelCollation, + sortOffset: RexNode, + sortFetch: RexNode): FlinkLogicalSort = { + val cluster = input.getCluster + val collationTrait = RelCollationTraitDef.INSTANCE.canonize(collation) + val traitSet = input.getTraitSet.replace(FlinkConventions.LOGICAL) + .replace(collationTrait).simplify() + new FlinkLogicalSort(cluster, traitSet, input, collation, sortOffset, sortFetch) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala new file mode 100644 index 0000000..18d3d35 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.plan.nodes.FlinkConventions + +import com.google.common.collect.ImmutableList +import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.TableFunctionScan +import org.apache.calcite.rel.logical.LogicalTableFunctionScan +import org.apache.calcite.rel.metadata.RelColumnMapping +import org.apache.calcite.rex.{RexLiteral, RexNode, RexUtil} +import org.apache.calcite.sql.SemiJoinType +import org.apache.calcite.util.ImmutableBitSet + +import java.lang.reflect.Type +import java.util + +/** + * Sub-class of [[TableFunctionScan]] that is a relational expression + * which calls a table-valued function in Flink. + */ +class FlinkLogicalTableFunctionScan( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputs: util.List[RelNode], + rexCall: RexNode, + elementType: Type, + rowType: RelDataType, + columnMappings: util.Set[RelColumnMapping]) + extends TableFunctionScan( + cluster, + traitSet, + inputs, + rexCall, + elementType, + rowType, + columnMappings) + with FlinkLogicalRel { + + override def copy( + traitSet: RelTraitSet, + inputs: util.List[RelNode], + rexCall: RexNode, + elementType: Type, + rowType: RelDataType, + columnMappings: util.Set[RelColumnMapping]): TableFunctionScan = { + + new FlinkLogicalTableFunctionScan( + cluster, + traitSet, + inputs, + rexCall, + elementType, + rowType, + columnMappings) + } + +} + +class FlinkLogicalTableFunctionScanConverter + extends ConverterRule( + classOf[LogicalTableFunctionScan], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalTableFunctionScanConverter") { + + override def matches(call: RelOptRuleCall): Boolean = { + // TODO This rule do not match to TemporalTableFunction + super.matches(call) + } + + def convert(rel: RelNode): RelNode = { + val scan = rel.asInstanceOf[LogicalTableFunctionScan] + val traitSet = rel.getCluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() + + val constantTableFunction = RexUtil.isConstant(scan.getCall) && scan.getInputs.isEmpty + if (constantTableFunction) { + convertConstantFunctionTableScan(scan, traitSet) + } else { + createFlinkLogicalTableScan(scan, traitSet) + } + } + + def createFlinkLogicalTableScan( + scan: LogicalTableFunctionScan, + traitSet: RelTraitSet): FlinkLogicalTableFunctionScan = { + new FlinkLogicalTableFunctionScan( + scan.getCluster, + traitSet, + scan.getInputs, + scan.getCall, + scan.getElementType, + scan.getRowType, + scan.getColumnMappings + ) + } + + /** + * Converts [[LogicalTableFunctionScan]] with constant RexCall to + * {{{ + * [[FlinkLogicalCorrelate]] + * / \ + * empty [[FlinkLogicalValues]] [[FlinkLogicalTableFunctionScan]] + * }}} + */ + def convertConstantFunctionTableScan( + scan: LogicalTableFunctionScan, + traitSet: RelTraitSet): RelNode = { + val cluster = scan.getCluster + + // create correlate left + val values = new FlinkLogicalValues( + cluster, + traitSet, + cluster.getTypeFactory.createStructType(ImmutableList.of(), ImmutableList.of()), + ImmutableList.of(ImmutableList.of[RexLiteral]()) + ) + + // create correlate right + val newScan = createFlinkLogicalTableScan(scan, traitSet) + + new FlinkLogicalCorrelate( + cluster, + traitSet, + values, + newScan, + cluster.createCorrel(), // a dummy CorrelationId + ImmutableBitSet.of(), + SemiJoinType.INNER) + } +} + +object FlinkLogicalTableFunctionScan { + val CONVERTER = new FlinkLogicalTableFunctionScanConverter +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala new file mode 100644 index 0000000..6b6cd03 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableSourceScan.isTableSourceScan +import org.apache.flink.table.plan.schema.{FlinkRelOptTable, TableSourceTable} +import org.apache.flink.table.sources._ + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.TableScan +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter} + +/** + * Sub-class of [[TableScan]] that is a relational operator + * which returns the contents of a [[TableSource]] in Flink. + */ +class FlinkLogicalTableSourceScan( + cluster: RelOptCluster, + traitSet: RelTraitSet, + relOptTable: FlinkRelOptTable) + extends TableScan(cluster, traitSet, relOptTable) + with FlinkLogicalRel { + + val tableSource: TableSource[_] = relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource + + def copy( + traitSet: RelTraitSet, + relOptTable: FlinkRelOptTable): FlinkLogicalTableSourceScan = { + new FlinkLogicalTableSourceScan(cluster, traitSet, relOptTable) + } + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new FlinkLogicalTableSourceScan(cluster, traitSet, relOptTable) + } + + override def deriveRowType(): RelDataType = { + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + + tableSource match { + case s: StreamTableSource[_] => + TableSourceUtil.getRelDataType(s, None, streaming = true, flinkTypeFactory) + case _: BatchTableSource[_] => + flinkTypeFactory.buildLogicalRowType( + tableSource.getTableSchema, isStreaming = Option.apply(false)) + case _ => throw new TableException("Unknown TableSource type.") + } + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + val rowCnt = mq.getRowCount(this) + val rowSize = mq.getAverageRowSize(this) + planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw) + .item("fields", tableSource.getTableSchema.getColumnNames.mkString(", ")) + } + +} + +class FlinkLogicalTableSourceScanConverter + extends ConverterRule( + classOf[LogicalTableScan], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalTableSourceScanConverter") { + + override def matches(call: RelOptRuleCall): Boolean = { + val scan: TableScan = call.rel(0) + isTableSourceScan(scan) + } + + def convert(rel: RelNode): RelNode = { + val scan = rel.asInstanceOf[TableScan] + val table = scan.getTable.asInstanceOf[FlinkRelOptTable] + FlinkLogicalTableSourceScan.create(rel.getCluster, table) + } +} + +object FlinkLogicalTableSourceScan { + val CONVERTER = new FlinkLogicalTableSourceScanConverter + + def isTableSourceScan(scan: TableScan): Boolean = { + val tableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable[_]]) + tableSourceTable != null + } + + def create(cluster: RelOptCluster, relOptTable: FlinkRelOptTable): FlinkLogicalTableSourceScan = { + val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() + new FlinkLogicalTableSourceScan(cluster, traitSet, relOptTable) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUnion.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUnion.scala new file mode 100644 index 0000000..a723f12 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalUnion.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.plan.nodes.FlinkConventions + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.{SetOp, Union} +import org.apache.calcite.rel.logical.LogicalUnion +import org.apache.calcite.rel.metadata.RelMetadataQuery + +import java.util.{List => JList} + +import scala.collection.JavaConversions._ + +/** + * Sub-class of [[Union]] that is a relational expression + * which returns the union of the rows of its inputs in Flink. + */ +class FlinkLogicalUnion( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputs: JList[RelNode], + all: Boolean) + extends Union(cluster, traitSet, inputs, all) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: JList[RelNode], all: Boolean): SetOp = { + new FlinkLogicalUnion(cluster, traitSet, inputs, all) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + val rowCnt = this.getInputs.foldLeft(0D) { + (rows, input) => + val inputRowCount = mq.getRowCount(input) + rows + inputRowCount + } + planner.getCostFactory.makeCost(rowCnt, 0, 0) + } + +} + +private class FlinkLogicalUnionConverter + extends ConverterRule( + classOf[LogicalUnion], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalUnionConverter") { + + /** + * Only translate UNION ALL. + */ + override def matches(call: RelOptRuleCall): Boolean = { + val union: LogicalUnion = call.rel(0) + union.all + } + + override def convert(rel: RelNode): RelNode = { + val union = rel.asInstanceOf[LogicalUnion] + val newInputs = union.getInputs.map { + input => RelOptRule.convert(input, FlinkConventions.LOGICAL) + } + FlinkLogicalUnion.create(newInputs, union.all) + } +} + +object FlinkLogicalUnion { + val CONVERTER: ConverterRule = new FlinkLogicalUnionConverter() + + def create(inputs: JList[RelNode], all: Boolean): FlinkLogicalUnion = { + val cluster = inputs.get(0).getCluster + val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() + new FlinkLogicalUnion(cluster, traitSet, inputs, all) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalValues.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalValues.scala new file mode 100644 index 0000000..e27def1 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalValues.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.plan.nodes.FlinkConventions + +import com.google.common.collect.ImmutableList +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.core.Values +import org.apache.calcite.rel.logical.LogicalValues +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.RexLiteral + +import java.util + +/** + * Sub-class of [[Values]] that is a relational expression + * whose value is a sequence of zero or more literal row values in Flink. + */ +class FlinkLogicalValues( + cluster: RelOptCluster, + traitSet: RelTraitSet, + rowRelDataType: RelDataType, + tuples: ImmutableList[ImmutableList[RexLiteral]]) + extends Values(cluster, rowRelDataType, tuples, traitSet) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new FlinkLogicalValues(cluster, traitSet, rowRelDataType, tuples) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + val dRows = mq.getRowCount(this) + // Assume CPU is negligible since values are precomputed. + val dCpu = 1 + val dIo = 0 + planner.getCostFactory.makeCost(dRows, dCpu, dIo) + } + +} + +private class FlinkLogicalValuesConverter + extends ConverterRule( + classOf[LogicalValues], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalValuesConverter") { + + override def convert(rel: RelNode): RelNode = { + val values = rel.asInstanceOf[LogicalValues] + FlinkLogicalValues.create(rel.getCluster, values.getRowType, values.getTuples()) + } +} + +object FlinkLogicalValues { + val CONVERTER: ConverterRule = new FlinkLogicalValuesConverter() + + def create( + cluster: RelOptCluster, + rowType: RelDataType, + tuples: ImmutableList[ImmutableList[RexLiteral]]): FlinkLogicalValues = { + val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() + new FlinkLogicalValues(cluster, traitSet, rowType, tuples) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala new file mode 100644 index 0000000..5703315 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.calcite.{LogicalWatermarkAssigner, WatermarkAssigner} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule + +import java.util + +/** + * Sub-class of [[WatermarkAssigner]] that is a relational operator + * which generates [[org.apache.flink.streaming.api.watermark.Watermark]]. + */ +class FlinkLogicalWatermarkAssigner( + cluster: RelOptCluster, + traits: RelTraitSet, + input: RelNode, + rowtimeField: String, + watermarkOffset: Long) + extends WatermarkAssigner(cluster, traits, input, rowtimeField, watermarkOffset) + with FlinkLogicalRel { + + override def copy( + traitSet: RelTraitSet, + inputs: util.List[RelNode]): RelNode = { + new FlinkLogicalWatermarkAssigner( + cluster, traitSet, inputs.get(0), rowtimeField, watermarkOffset) + } + +} + +class FlinkLogicalWatermarkAssignerConverter extends ConverterRule( + classOf[LogicalWatermarkAssigner], + Convention.NONE, + FlinkConventions.LOGICAL, + "FlinkLogicalWatermarkAssignerConverter") { + + override def convert(rel: RelNode): RelNode = { + val watermark = rel.asInstanceOf[LogicalWatermarkAssigner] + val newInput = RelOptRule.convert(watermark.getInput, FlinkConventions.LOGICAL) + FlinkLogicalWatermarkAssigner.create( + newInput, + watermark.rowtimeField, + watermark.watermarkOffset) + } +} + +object FlinkLogicalWatermarkAssigner { + val CONVERTER = new FlinkLogicalWatermarkAssignerConverter + + def create( + input: RelNode, + rowtimeField: String, + watermarkOffset: Long): FlinkLogicalWatermarkAssigner = { + val cluster = input.getCluster + val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() + new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtimeField, watermarkOffset) + } +} + + diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala index 2fe6f3b..a23bbde 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala @@ -21,6 +21,9 @@ package org.apache.flink.table.plan.schema import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.plan.stats.FlinkStatistic +/** + * The class that wraps [[DataStream]] as a Calcite Table. + */ class DataStreamTable[T]( val dataStream: DataStream[T], val producesUpdates: Boolean, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala new file mode 100644 index 0000000..0644e5a --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/FlinkRelOptTable.scala @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.flink.table.plan.stats.FlinkStatistic + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.linq4j.tree.Expression +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.plan.{RelOptCluster, RelOptSchema} +import org.apache.calcite.prepare.Prepare.AbstractPreparingTable +import org.apache.calcite.prepare.{CalcitePrepareImpl, RelOptTableImpl} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.calcite.runtime.Hook +import org.apache.calcite.schema._ +import org.apache.calcite.sql.SqlAccessType +import org.apache.calcite.sql.validate.{SqlModality, SqlMonotonicity} +import org.apache.calcite.sql2rel.InitializerContext +import org.apache.calcite.util.ImmutableBitSet + +import java.util.{List => JList} + +import scala.collection.JavaConversions._ + +/** + * [[FlinkRelOptTable]] wraps a [[FlinkTable]] + * + * @param schema the [[RelOptSchema]] this table belongs to + * @param rowType the type of rows returned by this table + * @param names the identifier for this table. The identifier must be unique with + * respect to the Connection producing this table. + * @param table wrapped flink table + */ +class FlinkRelOptTable protected( + schema: RelOptSchema, + rowType: RelDataType, + names: JList[String], + table: FlinkTable) + extends AbstractPreparingTable { + + // Default value of rowCount if there is no available stats. + // Sets a bigger default value to avoid broadcast join. + val DEFAULT_ROWCOUNT: Double = 1E8 + + def copy(newTable: FlinkTable, newRowType: RelDataType): FlinkRelOptTable = + new FlinkRelOptTable(schema, newRowType, names, newTable) + + /** + * Obtains an identifier for this table. + * + * Note: the qualified names are used for computing the digest of TableScan. + * + * @return qualified name + */ + override def getQualifiedName: JList[String] = names + + /** + * Obtains the access type of the table. + * + * @return all access types including SELECT/UPDATE/INSERT/DELETE + */ + override def getAllowedAccess: SqlAccessType = SqlAccessType.ALL + + override def unwrap[T](clazz: Class[T]): T = { + if (clazz.isInstance(this)) { + clazz.cast(this) + } else if (clazz.isInstance(table)) { + clazz.cast(table) + } else { + null.asInstanceOf[T] + } + } + + /** + * Returns true if the given modality is supported, else false. + */ + override def supportsModality(modality: SqlModality): Boolean = modality match { + case SqlModality.STREAM => + table.isInstanceOf[StreamableTable] + case _ => + !table.isInstanceOf[StreamableTable] + } + + /** + * Returns the type of rows returned by this table. + */ + override def getRowType: RelDataType = rowType + + /** + * Obtains whether a given column is monotonic. + * + * @param columnName column name + * @return true if the given column is monotonic + */ + override def getMonotonicity(columnName: String): SqlMonotonicity = { + val columnIdx = rowType.getFieldNames.indexOf(columnName) + if (columnIdx >= 0) { + for (collation: RelCollation <- table.getStatistic.getCollations) { + val fieldCollation: RelFieldCollation = collation.getFieldCollations.get(0) + if (fieldCollation.getFieldIndex == columnIdx) { + return fieldCollation.direction.monotonicity + } + } + } + SqlMonotonicity.NOT_MONOTONIC + } + + /** + * Returns flink table statistics. + */ + def getFlinkStatistic: FlinkStatistic = { + if (table.getStatistic != null) { + table.getStatistic + } else { + FlinkStatistic.UNKNOWN + } + } + + /** + * Returns an estimate of the number of rows in the table. + */ + override def getRowCount: Double = + if (table.getStatistic != null) { + table.getStatistic match { + case stats: FlinkStatistic => + if (stats.getRowCount != null) { + stats.getRowCount + } else { + DEFAULT_ROWCOUNT + } + case _ => throw new AssertionError + } + } else { + DEFAULT_ROWCOUNT + } + + /** + * Converts this table into a [[RelNode]] relational expression. + * + * @return the RelNode converted from this table + */ + override def toRel(context: ToRelContext): RelNode = { + val cluster: RelOptCluster = context.getCluster + if (table.isInstanceOf[TranslatableTable]) { + table.asInstanceOf[TranslatableTable].toRel(context, this) + } else if (Hook.ENABLE_BINDABLE.get(false)) { + LogicalTableScan.create(cluster, this) + } else if (CalcitePrepareImpl.ENABLE_ENUMERABLE) { + EnumerableTableScan.create(cluster, this) + } else { + throw new AssertionError + } + } + + /** + * Returns the [[RelOptSchema]] this table belongs to. + */ + override def getRelOptSchema: RelOptSchema = schema + + /** + * Returns whether the given columns are a key or a superset of a unique key + * of this table. + * + * Note: Return true means TRUE. However return false means FALSE or NOT KNOWN. + * It's better to use [[org.apache.calcite.rel.metadata.RelMetadataQuery]].areRowsUnique to + * distinguish FALSE with NOT KNOWN. + * + * @param columns Ordinals of key columns + * @return if the input columns bits represents a unique column set; false if not (or + * if no metadata is available) + */ + @Deprecated + override def isKey(columns: ImmutableBitSet): Boolean = false + + /** + * Returns the referential constraints existing for this table. These constraints + * are represented over other tables using [[RelReferentialConstraint]] nodes. + */ + override def getReferentialConstraints: JList[RelReferentialConstraint] = { + if (table.getStatistic != null) { + table.getStatistic.getReferentialConstraints + } else { + ImmutableList.of() + } + } + + + /** + * Returns a description of the physical ordering (or orderings) of the rows + * returned from this table. + * + * @see [[org.apache.calcite.rel.metadata.RelMetadataQuery#collations(RelNode)]] + */ + override def getCollationList: JList[RelCollation] = { + if (table.getStatistic != null) { + table.getStatistic.getCollations + } else { + ImmutableList.of() + } + } + + /** + * Returns a description of the physical distribution of the rows + * in this table. + * + * @see [[org.apache.calcite.rel.metadata.RelMetadataQuery#distribution(RelNode)]] + */ + override def getDistribution: RelDistribution = { + if (table.getStatistic != null) { + table.getStatistic.getDistribution + } else { + null + } + } + + /** + * Generates code for this table, which is not supported now. + * + * @param clazz The desired collection class; for example [[org.apache.calcite.linq4j.Queryable]] + */ + override def getExpression(clazz: java.lang.Class[_]): Expression = + throw new UnsupportedOperationException + + /** + * Obtains whether the ordinal column has a default value, which is not supported now. + * + * @param rowType rowType of field + * @param ordinal index of the given column + * @param initializerContext the context for + * [[org.apache.calcite.sql2rel.InitializerExpressionFactory]] methods + * @return true if the column has a default value + */ + override def columnHasDefaultValue( + rowType: RelDataType, + ordinal: Int, + initializerContext: InitializerContext): Boolean = false + + override def getColumnStrategies: JList[ColumnStrategy] = RelOptTableImpl.columnStrategies(this) + + override def extend(extendedTable: Table) = + throw new RuntimeException("Extending column not supported") + +} + +object FlinkRelOptTable { + + def create(schema: RelOptSchema, + rowType: RelDataType, + names: JList[String], + table: FlinkTable): FlinkRelOptTable = + new FlinkRelOptTable(schema, rowType, names, table) + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala similarity index 50% copy from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala copy to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala index 48b0bc6..0fc8078 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala @@ -16,13 +16,30 @@ * limitations under the License. */ -package org.apache.flink.table.plan.nodes +package org.apache.flink.table.plan.schema -import org.apache.calcite.rel.RelNode +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.sources.TableSource /** - * Base class for flink relational expression. + * Abstract class which define the interfaces required to + * convert a [[TableSource]] to a Calcite Table. */ -trait FlinkRelNode extends RelNode { +abstract class TableSourceTable[T]( + val tableSource: TableSource[T], + val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) + extends FlinkTable { + /** + * Returns statistics of current table. + */ + override def getStatistic: FlinkStatistic = statistic + + /** + * Replaces table source with the given one, and create a new table source table. + * + * @param tableSource tableSource to replace. + * @return new TableSourceTable + */ + def replaceTableSource(tableSource: TableSource[T]): TableSourceTable[T] } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala new file mode 100644 index 0000000..c6b4bcc --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.rel.`type`.RelDataTypeSystem +import org.apache.calcite.sql.`type`.BasicSqlType + +import java.lang + +/** + * Creates a time indicator type for event-time or processing-time, but with similar properties + * as a basic SQL type. + */ +class TimeIndicatorRelDataType( + typeSystem: RelDataTypeSystem, + originalType: BasicSqlType, + val isEventTime: Boolean) + extends BasicSqlType( + typeSystem, + originalType.getSqlTypeName, + originalType.getPrecision) { + + override def equals(other: Any): Boolean = other match { + case that: TimeIndicatorRelDataType => + super.equals(that) && + isEventTime == that.isEventTime + case _ => false + } + + override def hashCode(): Int = { + super.hashCode() + 42 // we change the hash code to differentiate from regular timestamps + } + + override def toString: String = { + s"TIME ATTRIBUTE(${if (isEventTime) "ROWTIME" else "PROCTIME"})" + } + + override def generateTypeString(sb: lang.StringBuilder, withDetail: Boolean): Unit = { + sb.append(toString) + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala similarity index 51% copy from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java copy to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala index 12aa689..3f09a7d 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala @@ -15,30 +15,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.table.plan.util -package org.apache.flink.table.type; +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.sql.SqlKind -/** - * Sql timestamp type. - */ -public class TimestampType implements AtomicType { - - public static final TimestampType INSTANCE = new TimestampType(); - - private TimestampType() {} - - @Override - public boolean equals(Object o) { - return this == o || o != null && getClass() == o.getClass(); - } +object AggregateUtil { - @Override - public int hashCode() { - return getClass().hashCode(); - } + def getGroupIdExprIndexes(aggCalls: Seq[AggregateCall]): Seq[Int] = { + aggCalls.zipWithIndex.filter { case (call, _) => + call.getAggregation.getKind match { + case SqlKind.GROUP_ID | SqlKind.GROUPING | SqlKind.GROUPING_ID => true + case _ => false + } + }.map { case (_, idx) => idx } + } - @Override - public String toString() { - return getClass().getSimpleName(); - } + /** + * Returns whether any of the aggregates are accurate DISTINCT. + * + * @return Whether any of the aggregates are accurate DISTINCT + */ + def containsAccurateDistinctCall(aggCalls: Seq[AggregateCall]): Boolean = { + aggCalls.exists(call => call.isDistinct && !call.isApproximate) + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/CalcUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/CalcUtil.scala new file mode 100644 index 0000000..f8dfbeb --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/CalcUtil.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.util + +import org.apache.flink.table.plan.nodes.ExpressionFormat +import org.apache.flink.table.plan.nodes.ExpressionFormat.ExpressionFormat + +import org.apache.calcite.rex.{RexNode, RexProgram} + +import scala.collection.JavaConversions._ + +object CalcUtil { + + private[flink] def conditionToString( + calcProgram: RexProgram, + expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = { + val cond = calcProgram.getCondition + val inFields = calcProgram.getInputRowType.getFieldNames.toList + val localExprs = calcProgram.getExprList.toList + + if (cond != null) { + expression(cond, inFields, Some(localExprs)) + } else { + "" + } + } + + private[flink] def selectionToString( + calcProgram: RexProgram, + expression: (RexNode, List[String], Option[List[RexNode]], ExpressionFormat) => String, + expressionFormat: ExpressionFormat = ExpressionFormat.Prefix): String = { + + val proj = calcProgram.getProjectList.toList + val inFields = calcProgram.getInputRowType.getFieldNames.toList + val localExprs = calcProgram.getExprList.toList + val outFields = calcProgram.getOutputRowType.getFieldNames.toList + + proj + .map(expression(_, inFields, Some(localExprs), expressionFormat)) + .zip(outFields).map { case (e, o) => + if (e != o) { + e + " AS " + o + } else { + e + } + }.mkString(", ") + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala similarity index 65% copy from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala copy to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala index 48b0bc6..b75283d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala @@ -16,13 +16,20 @@ * limitations under the License. */ -package org.apache.flink.table.plan.nodes +package org.apache.flink.table.plan.util -import org.apache.calcite.rel.RelNode +import org.apache.flink.table.plan.nodes.calcite.{ConstantRankRange, RankRange} + +import java.lang.Double /** - * Base class for flink relational expression. + * FlinkRelMdUtil provides utility methods used by the metadata provider methods. */ -trait FlinkRelNode extends RelNode { +object FlinkRelMdUtil { + + def getRankRangeNdv(rankRange: RankRange): Double = rankRange match { + case r: ConstantRankRange => (r.rankEnd - r.rankStart + 1).toDouble + case _ => 100D // default value now + } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala similarity index 58% copy from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java copy to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala index 12aa689..f483f08 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala @@ -15,30 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.table.plan.util -package org.apache.flink.table.type; +import java.util -/** - * Sql timestamp type. - */ -public class TimestampType implements AtomicType { - - public static final TimestampType INSTANCE = new TimestampType(); - - private TimestampType() {} - - @Override - public boolean equals(Object o) { - return this == o || o != null && getClass() == o.getClass(); - } +object FlinkRelOptUtil { - @Override - public int hashCode() { - return getClass().hashCode(); - } + /** + * Get unique field name based on existed `allFieldNames` collection. + * NOTES: the new unique field name will be added to existed `allFieldNames` collection. + */ + def buildUniqueFieldName( + allFieldNames: util.Set[String], + toAddFieldName: String): String = { + var name: String = toAddFieldName + var i: Int = 0 + while (allFieldNames.contains(name)) { + name = toAddFieldName + "_" + i + i += 1 + } + allFieldNames.add(name) + name + } - @Override - public String toString() { - return getClass().getSimpleName(); - } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/SortUtil.scala similarity index 60% copy from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala copy to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/SortUtil.scala index 48b0bc6..d78a711 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/SortUtil.scala @@ -16,13 +16,22 @@ * limitations under the License. */ -package org.apache.flink.table.plan.nodes +package org.apache.flink.table.plan.util -import org.apache.calcite.rel.RelNode +import org.apache.flink.api.common.operators.Order + +import org.apache.calcite.rel.RelFieldCollation.Direction /** - * Base class for flink relational expression. + * Common methods for Flink sort operators. */ -trait FlinkRelNode extends RelNode { +object SortUtil { + def directionToOrder(direction: Direction): Order = { + direction match { + case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING + case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING + case _ => throw new IllegalArgumentException("Unsupported direction.") + } + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala similarity index 55% copy from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala copy to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala index 48b0bc6..bdd5641 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala @@ -16,13 +16,23 @@ * limitations under the License. */ -package org.apache.flink.table.plan.nodes +package org.apache.flink.table.sources -import org.apache.calcite.rel.RelNode +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment /** - * Base class for flink relational expression. + * Defines an external batch exec table and provides access to its data. + * + * @tparam T Type of the [[DataStream]] created by this [[TableSource]]. */ -trait FlinkRelNode extends RelNode { +trait BatchTableSource[T] extends TableSource[T] { + /** + * Returns the data of the table as a [[DataStream]]. + * + * NOTE: This method is for internal use only for defining a [[TableSource]]. + * Do not use it in Table API programs. + */ + def getBoundedStream(streamEnv: StreamExecutionEnvironment): DataStream[T] } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala similarity index 55% copy from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala copy to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala index 48b0bc6..07d7fa0 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala @@ -16,13 +16,23 @@ * limitations under the License. */ -package org.apache.flink.table.plan.nodes +package org.apache.flink.table.sources -import org.apache.calcite.rel.RelNode +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment /** - * Base class for flink relational expression. + * Defines an external stream table and provides access to its data. + * + * @tparam T Type of the [[DataStream]] created by this [[TableSource]]. */ -trait FlinkRelNode extends RelNode { +trait StreamTableSource[T] extends TableSource[T] { + /** + * Returns the data of the table as a [[DataStream]]. + * + * NOTE: This method is for internal use only for defining a [[TableSource]]. + * Do not use it in Table API programs. + */ + def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala new file mode 100644 index 0000000..985cc37 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.table.`type`.TypeConverters +import org.apache.flink.table.calcite.FlinkTypeFactory + +import org.apache.calcite.rel.`type`.RelDataType + +/** Util class for [[TableSource]]. */ +object TableSourceUtil { + + /** + * Returns the Calcite schema of a [[TableSource]]. + * + * @param tableSource The [[TableSource]] for which the Calcite schema is generated. + * @param selectedFields The indices of all selected fields. None, if all fields are selected. + * @param streaming Flag to determine whether the schema of a stream or batch table is created. + * @param typeFactory The type factory to create the schema. + * @return The Calcite schema for the selected fields of the given [[TableSource]]. + */ + def getRelDataType( + tableSource: TableSource[_], + selectedFields: Option[Array[Int]], + streaming: Boolean, + typeFactory: FlinkTypeFactory): RelDataType = { + + val fieldNames = tableSource.getTableSchema.getFieldNames + val fieldTypes = tableSource.getTableSchema.getFieldTypes + .map(TypeConverters.createInternalTypeFromTypeInfo) + // TODO get fieldNullables from TableSchema + val fieldNullables = fieldTypes.map(_ => true) + + // TODO supports time attributes after Expression is ready + + val (selectedFieldNames, selectedFieldTypes, selectedFieldNullables) = + if (selectedFields.isDefined) { + // filter field names and types by selected fields + ( + selectedFields.get.map(fieldNames(_)), + selectedFields.get.map(fieldTypes(_)), + selectedFields.get.map(fieldNullables(_))) + } else { + (fieldNames, fieldTypes, fieldNullables) + } + typeFactory.buildRelDataType(selectedFieldNames, selectedFieldTypes, selectedFieldNullables) + } + +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java index f3429b0..2b94cd7 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java @@ -53,6 +53,10 @@ public class InternalTypes { public static final DecimalType SYSTEM_DEFAULT_DECIMAL = DecimalType.SYSTEM_DEFAULT; + public static final TimestampType ROWTIME_INDICATOR = TimestampType.ROWTIME_INDICATOR; + + public static final TimestampType PROCTIME_INDICATOR = TimestampType.PROCTIME_INDICATOR; + public static ArrayType createArrayType(InternalType elementType) { return new ArrayType(elementType); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java index 12aa689..26ab8cd 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java @@ -23,22 +23,42 @@ package org.apache.flink.table.type; */ public class TimestampType implements AtomicType { - public static final TimestampType INSTANCE = new TimestampType(); + public static final TimestampType INSTANCE = new TimestampType(0, "TimestampType"); + public static final TimestampType INTERVAL_MILLIS = + new TimestampType(1, "IntervalMillis"); + public static final TimestampType ROWTIME_INDICATOR = + new TimestampType(2, "RowTimeIndicator"); + public static final TimestampType PROCTIME_INDICATOR = + new TimestampType(3, "ProctimeTimeIndicator"); - private TimestampType() {} + private int id; + private String name; + + private TimestampType(int id, String name) { + this.id = id; + this.name = name; + } @Override public boolean equals(Object o) { - return this == o || o != null && getClass() == o.getClass(); + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + return id == ((TimestampType) o).id; } @Override public int hashCode() { - return getClass().hashCode(); + int result = getClass().hashCode(); + result = 31 * result + name.hashCode(); + return result; } @Override public String toString() { - return getClass().getSimpleName(); + return name; } }