http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index 2224752..8eb9d40 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -20,36 +20,34 @@ package org.apache.flink.table.plan.nodes.datastream import java.util.{List => JList} import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.{AggregateCall, Window} import org.apache.calcite.rel.core.Window.Group +import org.apache.calcite.rel.core.{AggregateCall, Window} import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING -import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.java.functions.NullByteKeySelector import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.{StreamTableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.plan.nodes.OverAggregate +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.types.Row -import org.apache.flink.api.java.functions.NullByteKeySelector -import org.apache.flink.table.codegen.CodeGenerator -import org.apache.flink.table.functions.{ProcTimeType, RowTimeType} -import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair - class DataStreamOverAggregate( logicWindow: Window, cluster: RelOptCluster, traitSet: RelTraitSet, inputNode: RelNode, - rowRelDataType: RelDataType, - inputType: RelDataType) + schema: RowSchema, + inputSchema: RowSchema) extends SingleRel(cluster, traitSet, inputNode) with OverAggregate with DataStreamRel { - override def deriveRowType(): RelDataType = rowRelDataType + override def deriveRowType(): RelDataType = schema.logicalType override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = { new DataStreamOverAggregate( @@ -57,8 +55,8 @@ class DataStreamOverAggregate( cluster, traitSet, inputs.get(0), - getRowType, - inputType) + schema, + inputSchema) } override def toString: String = { @@ -72,14 +70,16 @@ class DataStreamOverAggregate( val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates super.explainTerms(pw) - .itemIf("partitionBy", partitionToString(inputType, partitionKeys), partitionKeys.nonEmpty) - .item("orderBy",orderingToString(inputType, overWindow.orderKeys.getFieldCollations)) - .itemIf("rows", windowRange(logicWindow, overWindow, getInput), overWindow.isRows) - .itemIf("range", windowRange(logicWindow, overWindow, getInput), !overWindow.isRows) + .itemIf("partitionBy", + partitionToString(schema.logicalType, partitionKeys), partitionKeys.nonEmpty) + .item("orderBy", + orderingToString(schema.logicalType, overWindow.orderKeys.getFieldCollations)) + .itemIf("rows", windowRange(logicWindow, overWindow, inputNode), overWindow.isRows) + .itemIf("range", windowRange(logicWindow, overWindow, inputNode), !overWindow.isRows) .item( "select", aggregationToString( - inputType, - getRowType, + inputSchema.logicalType, + schema.logicalType, namedAggregates)) } @@ -111,13 +111,13 @@ class DataStreamOverAggregate( false, inputDS.getType) - val timeType = inputType + val timeType = schema.logicalType .getFieldList .get(orderKey.getFieldIndex) - .getValue + .getType timeType match { - case _: ProcTimeType => + case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) => // proc-time OVER window if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { // unbounded OVER window @@ -140,7 +140,8 @@ class DataStreamOverAggregate( throw new TableException( "OVER RANGE FOLLOWING windows are not supported yet.") } - case _: RowTimeType => + + case _ if FlinkTypeFactory.isRowtimeIndicatorType(timeType) => // row-time OVER window if (overWindow.lowerBound.isPreceding && overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { @@ -158,17 +159,16 @@ class DataStreamOverAggregate( inputDS, isRowTimeType = true, isRowsClause = overWindow.isRows - ) + ) } else { throw new TableException( "OVER RANGE FOLLOWING windows are not supported yet.") } + case _ => throw new TableException( - "Unsupported time type {$timeType}. " + - "OVER windows do only support RowTimeType and ProcTimeType.") + s"OVER windows can only be applied on time attributes.") } - } def createUnboundedAndCurrentRowOverWindow( @@ -178,16 +178,20 @@ class DataStreamOverAggregate( isRowsClause: Boolean): DataStream[Row] = { val overWindow: Group = logicWindow.groups.get(0) - val partitionKeys: Array[Int] = overWindow.keys.toArray - val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates - - // get the output types - val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + val partitionKeys: Array[Int] = overWindow.keys.toArray.map(schema.mapIndex) + val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates.map { + namedAggregate => + new CalcitePair[AggregateCall, String]( + schema.mapAggregateCall(namedAggregate.left), + namedAggregate.right) + } val processFunction = AggregateUtil.createUnboundedOverProcessFunction( generator, namedAggregates, - inputType, + inputSchema.physicalType, + inputSchema.physicalTypeInfo, + inputSchema.physicalFieldTypeInfo, isRowTimeType, partitionKeys.nonEmpty, isRowsClause) @@ -198,7 +202,7 @@ class DataStreamOverAggregate( inputDS .keyBy(partitionKeys: _*) .process(processFunction) - .returns(rowTypeInfo) + .returns(schema.physicalTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] } @@ -207,13 +211,13 @@ class DataStreamOverAggregate( if (isRowTimeType) { inputDS.keyBy(new NullByteKeySelector[Row]) .process(processFunction).setParallelism(1).setMaxParallelism(1) - .returns(rowTypeInfo) + .returns(schema.physicalTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] } else { inputDS .process(processFunction).setParallelism(1).setMaxParallelism(1) - .returns(rowTypeInfo) + .returns(schema.physicalTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] } @@ -228,19 +232,26 @@ class DataStreamOverAggregate( isRowsClause: Boolean): DataStream[Row] = { val overWindow: Group = logicWindow.groups.get(0) - val partitionKeys: Array[Int] = overWindow.keys.toArray - val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + val partitionKeys: Array[Int] = overWindow.keys.toArray.map(schema.mapIndex) + val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates.map { + namedAggregate => + new CalcitePair[AggregateCall, String]( + schema.mapAggregateCall(namedAggregate.left), + namedAggregate.right) + } val precedingOffset = - getLowerBoundary(logicWindow, overWindow, getInput()) + (if (isRowsClause) 1 else 0) - - // get the output types - val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + getLowerBoundary( + logicWindow, + overWindow, + input) + (if (isRowsClause) 1 else 0) val processFunction = AggregateUtil.createBoundedOverProcessFunction( generator, namedAggregates, - inputType, + inputSchema.physicalType, + inputSchema.physicalTypeInfo, + inputSchema.physicalFieldTypeInfo, precedingOffset, isRowsClause, isRowTimeType @@ -251,7 +262,7 @@ class DataStreamOverAggregate( inputDS .keyBy(partitionKeys: _*) .process(processFunction) - .returns(rowTypeInfo) + .returns(schema.physicalTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] } @@ -260,7 +271,7 @@ class DataStreamOverAggregate( inputDS .keyBy(new NullByteKeySelector[Row]) .process(processFunction).setParallelism(1).setMaxParallelism(1) - .returns(rowTypeInfo) + .returns(schema.physicalTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] } @@ -282,17 +293,18 @@ class DataStreamOverAggregate( s"over: (${ if (!partitionKeys.isEmpty) { - s"PARTITION BY: ${partitionToString(inputType, partitionKeys)}, " + s"PARTITION BY: ${partitionToString(inputSchema.logicalType, partitionKeys)}, " } else { "" } - }ORDER BY: ${orderingToString(inputType, overWindow.orderKeys.getFieldCollations)}, " + + }ORDER BY: ${orderingToString(inputSchema.logicalType, + overWindow.orderKeys.getFieldCollations)}, " + s"${if (overWindow.isRows) "ROWS" else "RANGE"}" + - s"${windowRange(logicWindow, overWindow, getInput)}, " + + s"${windowRange(logicWindow, overWindow, inputNode.asInstanceOf[DataStreamRel])}, " + s"select: (${ aggregationToString( - inputType, - getRowType, + inputSchema.logicalType, + schema.logicalType, namedAggregates) }))" }
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala index ae172a5..03938f3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala @@ -34,4 +34,3 @@ trait DataStreamRel extends FlinkRelNode { def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row] } - http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala index c187ae8..05f60ba 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala @@ -24,7 +24,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment -import org.apache.flink.table.plan.schema.DataStreamTable +import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema} import org.apache.flink.types.Row /** @@ -36,27 +36,27 @@ class DataStreamScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - rowRelDataType: RelDataType) + schema: RowSchema) extends TableScan(cluster, traitSet, table) with StreamScan { val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]]) - override def deriveRowType(): RelDataType = rowRelDataType + override def deriveRowType(): RelDataType = schema.logicalType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamScan( cluster, traitSet, getTable, - getRowType + schema ) } override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { val config = tableEnv.getConfig val inputDataStream: DataStream[Any] = dataStreamTable.dataStream - convertToInternalRow(inputDataStream, dataStreamTable, config) + convertToInternalRow(schema, inputDataStream, dataStreamTable, config) } } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala index f340ac7..47b4946 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala @@ -19,14 +19,12 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row -import scala.collection.JavaConverters._ - /** * Flink RelNode which matches along with Union. * @@ -36,11 +34,11 @@ class DataStreamUnion( traitSet: RelTraitSet, leftNode: RelNode, rightNode: RelNode, - rowRelDataType: RelDataType) + schema: RowSchema) extends BiRel(cluster, traitSet, leftNode, rightNode) with DataStreamRel { - override def deriveRowType() = rowRelDataType + override def deriveRowType() = schema.logicalType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamUnion( @@ -48,7 +46,7 @@ class DataStreamUnion( traitSet, inputs.get(0), inputs.get(1), - getRowType + schema ) } @@ -57,7 +55,7 @@ class DataStreamUnion( } override def toString = { - s"Union All(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))" + s"Union All(union: (${schema.logicalFieldNames.mkString(", ")}))" } override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { @@ -68,6 +66,6 @@ class DataStreamUnion( } private def unionSelectionToString: String = { - getRowType.getFieldNames.asScala.toList.mkString(", ") + schema.logicalFieldNames.mkString(", ") } } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala index 0ab4a48..c964e03 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala @@ -21,13 +21,12 @@ package org.apache.flink.table.plan.nodes.datastream import com.google.common.collect.ImmutableList import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.Values import org.apache.calcite.rex.RexLiteral import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment -import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.runtime.io.ValuesInputFormat import org.apache.flink.types.Row @@ -39,19 +38,19 @@ import scala.collection.JavaConverters._ class DataStreamValues( cluster: RelOptCluster, traitSet: RelTraitSet, - rowRelDataType: RelDataType, + schema: RowSchema, tuples: ImmutableList[ImmutableList[RexLiteral]], ruleDescription: String) - extends Values(cluster, rowRelDataType, tuples, traitSet) + extends Values(cluster, schema.logicalType, tuples, traitSet) with DataStreamRel { - override def deriveRowType() = rowRelDataType + override def deriveRowType() = schema.logicalType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamValues( cluster, traitSet, - getRowType, + schema, getTuples, ruleDescription ) @@ -61,15 +60,13 @@ class DataStreamValues( val config = tableEnv.getConfig - val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) - val generator = new CodeGenerator(config) // generate code for every record val generatedRecords = getTuples.asScala.map { r => generator.generateResultExpression( - returnType, - getRowType.getFieldNames.asScala, + schema.physicalTypeInfo, + schema.physicalFieldNames, r.asScala) } @@ -77,14 +74,14 @@ class DataStreamValues( val generatedFunction = generator.generateValuesInputFormat( ruleDescription, generatedRecords.map(_.code), - returnType) + schema.physicalTypeInfo) val inputFormat = new ValuesInputFormat[Row]( generatedFunction.name, generatedFunction.code, generatedFunction.returnType) - tableEnv.execEnv.createInput(inputFormat, returnType) + tableEnv.execEnv.createInput(inputFormat, schema.physicalTypeInfo) } } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala index 6d08302..dd82819 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala @@ -18,42 +18,46 @@ package org.apache.flink.table.plan.nodes.datastream +import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.CommonScan -import org.apache.flink.table.plan.schema.FlinkTable +import org.apache.flink.table.plan.schema.{FlinkTable, RowSchema} +import org.apache.flink.table.runtime.MapRunner import org.apache.flink.types.Row -import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ trait StreamScan extends CommonScan with DataStreamRel { protected def convertToInternalRow( + schema: RowSchema, input: DataStream[Any], flinkTable: FlinkTable[_], config: TableConfig) : DataStream[Row] = { - val inputType = input.getType - - val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) - // conversion - if (needsConversion(inputType, internalType)) { + if (needsConversion(input.getType, schema.physicalTypeInfo)) { - val mapFunc = getConversionMapper( + val function = generatedConversionFunction( config, - inputType, - internalType, + classOf[MapFunction[Any, Row]], + input.getType, + schema.physicalTypeInfo, "DataStreamSourceConversion", - getRowType.getFieldNames, + schema.physicalFieldNames, Some(flinkTable.fieldIndexes)) - val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" + val runner = new MapRunner[Any, Row]( + function.name, + function.code, + function.returnType) + + val opName = s"from: (${schema.logicalFieldNames.mkString(", ")})" - input.map(mapFunc).name(opName) + // TODO we need a ProcessFunction here + input.map(runner).name(opName) } // no conversion necessary, forward else { http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index 0a466a3..5dc3da8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -22,10 +22,11 @@ import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan -import org.apache.flink.table.plan.schema.TableSourceTable -import org.apache.flink.table.sources.{StreamTableSource, TableSource} +import org.apache.flink.table.plan.schema.{RowSchema, TableSourceTable} +import org.apache.flink.table.sources.{DefinedTimeAttributes, StreamTableSource, TableSource} import org.apache.flink.types.Row /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */ @@ -37,7 +38,50 @@ class StreamTableSourceScan( extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource) with StreamScan { - override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + override def deriveRowType() = { + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + + def removeIndex[T](idx: Int, l: List[T]): List[T] = { + if (l.size < idx) { + l + } else { + l.take(idx) ++ l.drop(idx + 1) + } + } + + var fieldNames = TableEnvironment.getFieldNames(tableSource).toList + var fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList + + val rowtime = tableSource match { + case timeSource: DefinedTimeAttributes if timeSource.getRowtimeAttribute != null => + val rowtimeAttribute = timeSource.getRowtimeAttribute + // remove physical field if it is overwritten by time attribute + fieldNames = removeIndex(rowtimeAttribute.f0, fieldNames) + fieldTypes = removeIndex(rowtimeAttribute.f0, fieldTypes) + Some((rowtimeAttribute.f0, rowtimeAttribute.f1)) + case _ => + None + } + + val proctime = tableSource match { + case timeSource: DefinedTimeAttributes if timeSource.getProctimeAttribute != null => + val proctimeAttribute = timeSource.getProctimeAttribute + // remove physical field if it is overwritten by time attribute + fieldNames = removeIndex(proctimeAttribute.f0, fieldNames) + fieldTypes = removeIndex(proctimeAttribute.f0, fieldTypes) + Some((proctimeAttribute.f0, proctimeAttribute.f1)) + case _ => + None + } + + flinkTypeFactory.buildLogicalRowType( + fieldNames, + fieldTypes, + rowtime, + proctime) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { val rowCnt = metadata.getRowCount(this) planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType)) } @@ -67,6 +111,10 @@ class StreamTableSourceScan( override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { val config = tableEnv.getConfig val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] - convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config) + convertToInternalRow( + new RowSchema(getRowType), + inputDataStream, + new TableSourceTable(tableSource), + config) } } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala index b1f991e..11b227f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala @@ -45,7 +45,7 @@ class FlinkLogicalOverWindow( traitSet, inputs.get(0), windowConstants, - rowType, + getRowType, windowGroups) } } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala index eacbafa..53e7b31 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala @@ -47,9 +47,11 @@ class FlinkLogicalTableSourceScan( override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildRowDataType( + flinkTypeFactory.buildLogicalRowType( TableEnvironment.getFieldNames(tableSource), - TableEnvironment.getFieldTypes(tableSource.getReturnType)) + TableEnvironment.getFieldTypes(tableSource.getReturnType), + None, + None) } override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala index 4da2da9..7577deb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala @@ -53,8 +53,8 @@ class WindowStartEndPropertiesRule transformed.push(LogicalWindowAggregate.create( agg.getWindow, Seq( - NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias)), - NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias)) + NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)), + NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute)) ), agg) ) http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala index f011b66..fc65403 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala @@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableException import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate import org.apache.flink.table.plan.nodes.logical.FlinkLogicalWindowAggregate +import org.apache.flink.table.plan.schema.RowSchema import scala.collection.JavaConversions._ @@ -65,8 +66,8 @@ class DataStreamAggregateRule traitSet, convInput, agg.getNamedAggCalls, - rel.getRowType, - agg.getInput.getRowType, + new RowSchema(rel.getRowType), + new RowSchema(agg.getInput.getRowType), agg.getGroupSet.toArray) } } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala index 1777264..0a1a31a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala @@ -24,6 +24,7 @@ import org.apache.calcite.rel.convert.ConverterRule import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamCalc import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCalc +import org.apache.flink.table.plan.schema.RowSchema class DataStreamCalcRule extends ConverterRule( @@ -42,7 +43,8 @@ class DataStreamCalcRule rel.getCluster, traitSet, convInput, - rel.getRowType, + new RowSchema(convInput.getRowType), + new RowSchema(rel.getRowType), calc.getProgram, description) } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala index ae39d40..cd0663e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala @@ -25,6 +25,7 @@ import org.apache.calcite.rex.RexNode import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan} +import org.apache.flink.table.plan.schema.RowSchema class DataStreamCorrelateRule extends ConverterRule( @@ -68,11 +69,12 @@ class DataStreamCorrelateRule new DataStreamCorrelate( rel.getCluster, traitSet, + new RowSchema(convInput.getRowType), convInput, scan, condition, - rel.getRowType, - join.getRowType, + new RowSchema(rel.getRowType), + new RowSchema(join.getRowType), join.getJoinType, description) } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala index 175a202..28efcf5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala @@ -18,15 +18,15 @@ package org.apache.flink.table.plan.rules.datastream -import java.math.BigDecimal +import java.math.{BigDecimal => JBigDecimal} import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rex.{RexBuilder, RexCall, RexLiteral, RexNode} +import org.apache.calcite.rex._ import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.table.api.{TableException, Window} import org.apache.flink.table.api.scala.{Session, Slide, Tumble} -import org.apache.flink.table.expressions.Literal -import org.apache.flink.table.functions.TimeModeTypes +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Literal, UnresolvedFieldReference} import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule import org.apache.flink.table.typeutils.TimeIntervalTypeInfo @@ -49,16 +49,12 @@ class DataStreamLogicalWindowAggregateRule val timeType = windowExpression.operands.get(0).getType timeType match { - case TimeModeTypes.ROWTIME => - rexBuilder.makeAbstractCast( - TimeModeTypes.ROWTIME, - rexBuilder.makeLiteral(0L, TimeModeTypes.ROWTIME, true)) - case TimeModeTypes.PROCTIME => - rexBuilder.makeAbstractCast( - TimeModeTypes.PROCTIME, - rexBuilder.makeLiteral(0L, TimeModeTypes.PROCTIME, true)) + + case _ if FlinkTypeFactory.isTimeIndicatorType(timeType) => + rexBuilder.makeLiteral(0L, timeType, true) + case _ => - throw TableException(s"""Unexpected time type $timeType encountered""") + throw TableException(s"""Time attribute expected but $timeType encountered.""") } } @@ -68,41 +64,41 @@ class DataStreamLogicalWindowAggregateRule def getOperandAsLong(call: RexCall, idx: Int): Long = call.getOperands.get(idx) match { - case v : RexLiteral => v.getValue.asInstanceOf[BigDecimal].longValue() - case _ => throw new TableException("Only constant window descriptors are supported") + case v: RexLiteral => v.getValue.asInstanceOf[JBigDecimal].longValue() + case _ => throw new TableException("Only constant window descriptors are supported.") + } + + def getOperandAsTimeIndicator(call: RexCall, idx: Int): String = + call.getOperands.get(idx) match { + case v: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(v.getType) => + rowType.getFieldList.get(v.getIndex).getName + case _ => + throw new TableException("Window can only be defined over a time attribute column.") } windowExpr.getOperator match { case SqlStdOperatorTable.TUMBLE => + val time = getOperandAsTimeIndicator(windowExpr, 0) val interval = getOperandAsLong(windowExpr, 1) val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - val window = windowExpr.getType match { - case TimeModeTypes.PROCTIME => w - case TimeModeTypes.ROWTIME => w.on("rowtime") - } - window.as("w$") + w.on(UnresolvedFieldReference(time)).as("w$") case SqlStdOperatorTable.HOP => + val time = getOperandAsTimeIndicator(windowExpr, 0) val (slide, size) = (getOperandAsLong(windowExpr, 1), getOperandAsLong(windowExpr, 2)) val w = Slide .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS)) .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - val window = windowExpr.getType match { - case TimeModeTypes.PROCTIME => w - case TimeModeTypes.ROWTIME => w.on("rowtime") - } - window.as("w$") + w.on(UnresolvedFieldReference(time)).as("w$") + case SqlStdOperatorTable.SESSION => + val time = getOperandAsTimeIndicator(windowExpr, 0) val gap = getOperandAsLong(windowExpr, 1) val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS)) - val window = windowExpr.getType match { - case TimeModeTypes.PROCTIME => w - case TimeModeTypes.ROWTIME => w.on("rowtime") - } - window.as("w$") + w.on(UnresolvedFieldReference(time)).as("w$") } } } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala index 8e96970..b3d7603 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala @@ -25,6 +25,7 @@ import org.apache.calcite.rel.convert.ConverterRule import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate import org.apache.flink.table.plan.nodes.logical.FlinkLogicalOverWindow +import org.apache.flink.table.plan.schema.RowSchema class DataStreamOverAggregateRule extends ConverterRule( @@ -46,8 +47,8 @@ class DataStreamOverAggregateRule rel.getCluster, traitSet, convertInput, - rel.getRowType, - inputRowType) + new RowSchema(rel.getRowType), + new RowSchema(inputRowType)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala index 5bf60a7..d8dda80 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamScan -import org.apache.flink.table.plan.schema.DataStreamTable +import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema} import org.apache.flink.table.plan.nodes.logical.FlinkLogicalNativeTableScan class DataStreamScanRule @@ -53,7 +53,7 @@ class DataStreamScanRule rel.getCluster, traitSet, scan.getTable, - rel.getRowType + new RowSchema(rel.getRowType) ) } } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala index 4241f53..8402f6d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala @@ -24,6 +24,7 @@ import org.apache.calcite.rel.convert.ConverterRule import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamUnion import org.apache.flink.table.plan.nodes.logical.FlinkLogicalUnion +import org.apache.flink.table.plan.schema.RowSchema class DataStreamUnionRule extends ConverterRule( @@ -44,7 +45,7 @@ class DataStreamUnionRule traitSet, convLeft, convRight, - rel.getRowType) + new RowSchema(rel.getRowType)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala index fbad21f..a1453a8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala @@ -24,6 +24,7 @@ import org.apache.calcite.rel.convert.ConverterRule import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamValues import org.apache.flink.table.plan.nodes.logical.FlinkLogicalValues +import org.apache.flink.table.plan.schema.RowSchema class DataStreamValuesRule extends ConverterRule( @@ -40,7 +41,7 @@ class DataStreamValuesRule new DataStreamValues( rel.getCluster, traitSet, - rel.getRowType, + new RowSchema(rel.getRowType), values.getTuples, description) } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala index 6ce6570..70054b4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala @@ -18,13 +18,27 @@ package org.apache.flink.table.plan.schema +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.stats.FlinkStatistic class DataStreamTable[T]( val dataStream: DataStream[T], override val fieldIndexes: Array[Int], override val fieldNames: Array[String], + val rowtime: Option[(Int, String)], + val proctime: Option[(Int, String)], override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames, statistic) { + + override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { + val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] + + flinkTypeFactory.buildLogicalRowType( + fieldNames, + fieldTypes, + rowtime, + proctime) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala index ea77061..752b00e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala @@ -48,10 +48,11 @@ abstract class FlinkTable[T]( val fieldTypes: Array[TypeInformation[_]] = typeInfo match { case cType: CompositeType[_] => - if (fieldNames.length != cType.getArity) { + // it is ok to leave out fields + if (fieldNames.length > cType.getArity) { throw new TableException( s"Arity of type (" + cType.getFieldNames.deep + ") " + - "not equal to number of field names " + fieldNames.deep + ".") + "must not be greater than number of field names " + fieldNames.deep + ".") } fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]]) case aType: AtomicType[_] => @@ -64,7 +65,7 @@ abstract class FlinkTable[T]( override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildRowDataType(fieldNames, fieldTypes) + flinkTypeFactory.buildLogicalRowType(fieldNames, fieldTypes, None, None) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala new file mode 100644 index 0000000..b42be82 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelRecordType} +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rex.{RexInputRef, RexNode, RexShuttle} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ + +/** + * Schema that describes both a logical and physical row. + */ +class RowSchema(private val logicalRowType: RelDataType) { + + private lazy val physicalRowFields: Seq[RelDataTypeField] = logicalRowType.getFieldList filter { + field => !FlinkTypeFactory.isTimeIndicatorType(field.getType) + } + + private lazy val physicalRowType: RelDataType = new RelRecordType(physicalRowFields) + + private lazy val physicalRowFieldTypes: Seq[TypeInformation[_]] = physicalRowFields map { f => + FlinkTypeFactory.toTypeInfo(f.getType) + } + + private lazy val physicalRowFieldNames: Seq[String] = physicalRowFields.map(_.getName) + + private lazy val physicalRowTypeInfo: TypeInformation[Row] = new RowTypeInfo( + physicalRowFieldTypes.toArray, physicalRowFieldNames.toArray) + + private lazy val indexMapping: Array[Int] = generateIndexMapping + + private lazy val inputRefUpdater = new RexInputRefUpdater() + + private def generateIndexMapping: Array[Int] = { + val mapping = new Array[Int](logicalRowType.getFieldCount) + var countTimeIndicators = 0 + var i = 0 + while (i < logicalRowType.getFieldCount) { + val t = logicalRowType.getFieldList.get(i).getType + if (FlinkTypeFactory.isTimeIndicatorType(t)) { + countTimeIndicators += 1 + // no mapping + mapping(i) = -1 + } else { + mapping(i) = i - countTimeIndicators + } + i += 1 + } + mapping + } + + private class RexInputRefUpdater extends RexShuttle { + + override def visitInputRef(inputRef: RexInputRef): RexNode = { + new RexInputRef(mapIndex(inputRef.getIndex), inputRef.getType) + } + } + + /** + * Returns the arity of the logical record. + */ + def logicalArity: Int = logicalRowType.getFieldCount + + /** + * Returns the arity of the physical record. + */ + def physicalArity: Int = physicalTypeInfo.getArity + + /** + * Returns a logical [[RelDataType]] including logical fields (i.e. time indicators). + */ + def logicalType: RelDataType = logicalRowType + + /** + * Returns a physical [[RelDataType]] with no logical fields (i.e. time indicators). + */ + def physicalType: RelDataType = physicalRowType + + /** + * Returns a physical [[TypeInformation]] of row with no logical fields (i.e. time indicators). + */ + def physicalTypeInfo: TypeInformation[Row] = physicalRowTypeInfo + + /** + * Returns [[TypeInformation]] of the row's fields with no logical fields (i.e. time indicators). + */ + def physicalFieldTypeInfo: Seq[TypeInformation[_]] = physicalRowFieldTypes + + /** + * Returns the logical fields names including logical fields (i.e. time indicators). + */ + def logicalFieldNames: Seq[String] = logicalRowType.getFieldNames + + /** + * Returns the physical fields names with no logical fields (i.e. time indicators). + */ + def physicalFieldNames: Seq[String] = physicalRowFieldNames + + /** + * Converts logical indices to physical indices based on this schema. + */ + def mapIndex(logicalIndex: Int): Int = { + val mappedIndex = indexMapping(logicalIndex) + if (mappedIndex < 0) { + throw new TableException("Invalid access to a logical field.") + } else { + mappedIndex + } + } + + /** + * Converts logical indices of a aggregate call to physical ones. + */ + def mapAggregateCall(logicalAggCall: AggregateCall): AggregateCall = { + logicalAggCall.copy( + logicalAggCall.getArgList.map(mapIndex(_).asInstanceOf[Integer]), + if (logicalAggCall.filterArg < 0) { + logicalAggCall.filterArg + } else { + mapIndex(logicalAggCall.filterArg) + } + ) + } + + /** + * Converts logical field references of a [[RexNode]] to physical ones. + */ + def mapRexNode(logicalRexNode: RexNode): RexNode = logicalRexNode.accept(inputRefUpdater) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala new file mode 100644 index 0000000..5e27061 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.rel.`type`.RelDataTypeSystem +import org.apache.calcite.sql.`type`.BasicSqlType + +/** + * Creates a time indicator type for event-time or processing-time, but with similar properties + * as a basic SQL type. + */ +class TimeIndicatorRelDataType( + typeSystem: RelDataTypeSystem, + originalType: BasicSqlType, + val isEventTime: Boolean) + extends BasicSqlType( + typeSystem, + originalType.getSqlTypeName, + originalType.getPrecision) { + + override def equals(other: Any): Boolean = other match { + case that: TimeIndicatorRelDataType => + super.equals(that) && + isEventTime == that.isEventTime + case that: BasicSqlType => + super.equals(that) + case _ => false + } + + override def hashCode(): Int = { + super.hashCode() + 42 // we change the hash code to differentiate from regular timestamps + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala index 51e2fc5..32562c7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala @@ -35,7 +35,7 @@ class MapRunner[IN, OUT]( val LOG = LoggerFactory.getLogger(this.getClass) - private var function: MapFunction[IN, OUT] = null + private var function: MapFunction[IN, OUT] = _ override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index e38207d..07992cd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -26,17 +26,18 @@ import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.fun._ import org.apache.calcite.sql.{SqlAggFunction, SqlKind} -import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction, AggregateFunction => DataStreamAggFunction, _} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.api.common.functions.{AggregateFunction => DataStreamAggFunction, _} -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.expressions.ExpressionUtils.isTimeIntervalLiteral import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.aggfunctions._ import org.apache.flink.table.functions.utils.AggSqlFunction @@ -61,26 +62,31 @@ object AggregateUtil { * window to evaluate final aggregate value. * * @param generator code generator instance - * @param namedAggregates List of calls to aggregate functions and their output field names - * @param inputType Input row type + * @param namedAggregates Physical calls to aggregate functions and their output field names + * @param inputType Physical type of the row. + * @param inputTypeInfo Physical type information of the row. + * @param inputFieldTypeInfo Physical type information of the row's fields. * @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType * @param isPartitioned It is a tag that indicate whether the input is partitioned * @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause */ private[flink] def createUnboundedOverProcessFunction( - generator: CodeGenerator, - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - isRowTimeType: Boolean, - isPartitioned: Boolean, - isRowsClause: Boolean): ProcessFunction[Row, Row] = { + generator: CodeGenerator, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType, + inputTypeInfo: TypeInformation[Row], + inputFieldTypeInfo: Seq[TypeInformation[_]], + isRowTimeType: Boolean, + isPartitioned: Boolean, + isRowsClause: Boolean) + : ProcessFunction[Row, Row] = { val needRetract = false val (aggFields, aggregates) = transformToAggregateFunctions( namedAggregates.map(_.getKey), inputType, - needRetract) + needRetraction = false) val aggregationStateType: RowTypeInfo = createDataSetAggregateBufferDataType(Array(), aggregates, inputType) @@ -92,7 +98,7 @@ object AggregateUtil { val genFunction = generator.generateAggregations( "UnboundedProcessingOverAggregateHelper", generator, - inputType, + inputFieldTypeInfo, aggregates, aggFields, aggMapping, @@ -112,13 +118,13 @@ object AggregateUtil { new RowTimeUnboundedRowsOver( genFunction, aggregationStateType, - FlinkTypeFactory.toInternalRowTypeInfo(inputType)) + inputTypeInfo) } else { // RANGE unbounded over process function new RowTimeUnboundedRangeOver( genFunction, aggregationStateType, - FlinkTypeFactory.toInternalRowTypeInfo(inputType)) + inputTypeInfo) } } else { if (isPartitioned) { @@ -138,20 +144,25 @@ object AggregateUtil { * bounded OVER window to evaluate final aggregate value. * * @param generator code generator instance - * @param namedAggregates List of calls to aggregate functions and their output field names - * @param inputType Input row type + * @param namedAggregates Physical calls to aggregate functions and their output field names + * @param inputType Physical type of the row. + * @param inputTypeInfo Physical type information of the row. + * @param inputFieldTypeInfo Physical type information of the row's fields. * @param precedingOffset the preceding offset * @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause * @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] */ private[flink] def createBoundedOverProcessFunction( - generator: CodeGenerator, - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, - precedingOffset: Long, - isRowsClause: Boolean, - isRowTimeType: Boolean): ProcessFunction[Row, Row] = { + generator: CodeGenerator, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType, + inputTypeInfo: TypeInformation[Row], + inputFieldTypeInfo: Seq[TypeInformation[_]], + precedingOffset: Long, + isRowsClause: Boolean, + isRowTimeType: Boolean) + : ProcessFunction[Row, Row] = { val needRetract = true val (aggFields, aggregates) = @@ -161,7 +172,6 @@ object AggregateUtil { needRetract) val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates) - val inputRowType = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo] val forwardMapping = (0 until inputType.getFieldCount).toArray val aggMapping = aggregates.indices.map(x => x + inputType.getFieldCount).toArray @@ -170,7 +180,7 @@ object AggregateUtil { val genFunction = generator.generateAggregations( "BoundedOverAggregateHelper", generator, - inputType, + inputFieldTypeInfo, aggregates, aggFields, aggMapping, @@ -189,14 +199,14 @@ object AggregateUtil { new RowTimeBoundedRowsOver( genFunction, aggregationStateType, - inputRowType, + inputTypeInfo, precedingOffset ) } else { new RowTimeBoundedRangeOver( genFunction, aggregationStateType, - inputRowType, + inputTypeInfo, precedingOffset ) } @@ -206,13 +216,13 @@ object AggregateUtil { genFunction, precedingOffset, aggregationStateType, - inputRowType) + inputTypeInfo) } else { new ProcTimeBoundedRangeOver( genFunction, precedingOffset, aggregationStateType, - inputRowType) + inputTypeInfo) } } } @@ -241,12 +251,13 @@ object AggregateUtil { * NOTE: this function is only used for time based window on batch tables. */ def createDataSetWindowPrepareMapFunction( - generator: CodeGenerator, - window: LogicalWindow, - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - groupings: Array[Int], - inputType: RelDataType, - isParserCaseSensitive: Boolean) + generator: CodeGenerator, + window: LogicalWindow, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + groupings: Array[Int], + inputType: RelDataType, + inputFieldTypeInfo: Seq[TypeInformation[_]], + isParserCaseSensitive: Boolean) : MapFunction[Row, Row] = { val needRetract = false @@ -263,28 +274,28 @@ object AggregateUtil { Some(Array(BasicTypeInfo.LONG_TYPE_INFO))) val (timeFieldPos, tumbleTimeWindowSize) = window match { - case EventTimeTumblingGroupWindow(_, time, size) if isTimeInterval(size.resultType) => - val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) - (timeFieldPos, Some(asLong(size))) - case EventTimeTumblingGroupWindow(_, time, _) => + case TumblingGroupWindow(_, time, size) => val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) - (timeFieldPos, None) + size match { + case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => + (timeFieldPos, Some(value)) + case _ => (timeFieldPos, None) + } - case EventTimeSessionGroupWindow(_, time, _) => - val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) - (timeFieldPos, None) + case SessionGroupWindow(_, time, _) => + (getTimeFieldPosition(time, inputType, isParserCaseSensitive), None) - case EventTimeSlidingGroupWindow(_, time, size, slide) - if isTimeInterval(time.resultType) && doAllSupportPartialMerge(aggregates) => - // pre-tumble incremental aggregates on time-windows + case SlidingGroupWindow(_, time, size, slide) => val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) - val preTumblingSize = determineLargestTumblingSize(asLong(size), asLong(slide)) - (timeFieldPos, Some(preTumblingSize)) - - case EventTimeSlidingGroupWindow(_, time, _, _) => - val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) - (timeFieldPos, None) + size match { + case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => + // pre-tumble incremental aggregates on time-windows + val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive) + val preTumblingSize = determineLargestTumblingSize(asLong(size), asLong(slide)) + (timeFieldPos, Some(preTumblingSize)) + case _ => (timeFieldPos, None) + } case _ => throw new UnsupportedOperationException(s"$window is currently not supported on batch") @@ -296,7 +307,7 @@ object AggregateUtil { val genFunction = generator.generateAggregations( "DataSetAggregatePrepareMapHelper", generator, - inputType, + inputFieldTypeInfo, aggregates, aggFieldIndexes, aggMapping, @@ -349,31 +360,32 @@ object AggregateUtil { window: LogicalWindow, namedAggregates: Seq[CalcitePair[AggregateCall, String]], groupings: Array[Int], - inputType: RelDataType, + physicalInputRowType: RelDataType, + physicalInputTypes: Seq[TypeInformation[_]], isParserCaseSensitive: Boolean) : RichGroupReduceFunction[Row, Row] = { val needRetract = false val (aggFieldIndexes, aggregates) = transformToAggregateFunctions( namedAggregates.map(_.getKey), - inputType, + physicalInputRowType, needRetract) val returnType: RowTypeInfo = createDataSetAggregateBufferDataType( groupings, aggregates, - inputType, + physicalInputRowType, Some(Array(BasicTypeInfo.LONG_TYPE_INFO))) val keysAndAggregatesArity = groupings.length + namedAggregates.length window match { - case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) => + case SlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) => // sliding time-window for partial aggregations val genFunction = generator.generateAggregations( "DataSetAggregatePrepareMapHelper", generator, - inputType, + physicalInputTypes, aggregates, aggFieldIndexes, aggregates.indices.map(_ + groupings.length).toArray, @@ -433,7 +445,7 @@ object AggregateUtil { : FlatMapFunction[Row, Row] = { window match { - case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) => + case SlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) => new DataSetSlideTimeWindowAggFlatMapFunction( inputType.getArity - 1, asLong(size), @@ -458,7 +470,8 @@ object AggregateUtil { generator: CodeGenerator, window: LogicalWindow, namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, + physicalInputRowType: RelDataType, + physicalInputTypes: Seq[TypeInformation[_]], outputType: RelDataType, groupings: Array[Int], properties: Seq[NamedWindowProperty], @@ -468,7 +481,7 @@ object AggregateUtil { val needRetract = false val (aggFieldIndexes, aggregates) = transformToAggregateFunctions( namedAggregates.map(_.getKey), - inputType, + physicalInputRowType, needRetract) val aggMapping = aggregates.indices.toArray.map(_ + groupings.length) @@ -476,7 +489,7 @@ object AggregateUtil { val genPreAggFunction = generator.generateAggregations( "GroupingWindowAggregateHelper", generator, - inputType, + physicalInputTypes, aggregates, aggFieldIndexes, aggMapping, @@ -493,7 +506,7 @@ object AggregateUtil { val genFinalAggFunction = generator.generateAggregations( "GroupingWindowAggregateHelper", generator, - inputType, + physicalInputTypes, aggregates, aggFieldIndexes, aggMapping, @@ -510,7 +523,7 @@ object AggregateUtil { val keysAndAggregatesArity = groupings.length + namedAggregates.length window match { - case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) => + case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) => // tumbling time window val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) if (doAllSupportPartialMerge(aggregates)) { @@ -532,13 +545,13 @@ object AggregateUtil { endPos, outputType.getFieldCount) } - case EventTimeTumblingGroupWindow(_, _, size) => + case TumblingGroupWindow(_, _, size) => // tumbling count window new DataSetTumbleCountWindowAggReduceGroupFunction( genFinalAggFunction, asLong(size)) - case EventTimeSessionGroupWindow(_, _, gap) => + case SessionGroupWindow(_, _, gap) => val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) new DataSetSessionWindowAggReduceGroupFunction( genFinalAggFunction, @@ -548,7 +561,7 @@ object AggregateUtil { asLong(gap), isInputCombined) - case EventTimeSlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) => + case SlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) => val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) if (doAllSupportPartialMerge(aggregates)) { // for partial aggregations @@ -570,7 +583,7 @@ object AggregateUtil { asLong(size)) } - case EventTimeSlidingGroupWindow(_, _, size, _) => + case SlidingGroupWindow(_, _, size, _) => new DataSetSlideWindowAggReduceGroupFunction( genFinalAggFunction, keysAndAggregatesArity, @@ -608,13 +621,14 @@ object AggregateUtil { generator: CodeGenerator, window: LogicalWindow, namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, + physicalInputRowType: RelDataType, + physicalInputTypes: Seq[TypeInformation[_]], groupings: Array[Int]): MapPartitionFunction[Row, Row] = { val needRetract = false val (aggFieldIndexes, aggregates) = transformToAggregateFunctions( namedAggregates.map(_.getKey), - inputType, + physicalInputRowType, needRetract) val aggMapping = aggregates.indices.map(_ + groupings.length).toArray @@ -622,18 +636,18 @@ object AggregateUtil { val keysAndAggregatesArity = groupings.length + namedAggregates.length window match { - case EventTimeSessionGroupWindow(_, _, gap) => + case SessionGroupWindow(_, _, gap) => val combineReturnType: RowTypeInfo = createDataSetAggregateBufferDataType( groupings, aggregates, - inputType, + physicalInputRowType, Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO))) val genFunction = generator.generateAggregations( "GroupingWindowAggregateHelper", generator, - inputType, + physicalInputTypes, aggregates, aggFieldIndexes, aggMapping, @@ -679,14 +693,15 @@ object AggregateUtil { generator: CodeGenerator, window: LogicalWindow, namedAggregates: Seq[CalcitePair[AggregateCall, String]], - inputType: RelDataType, + physicalInputRowType: RelDataType, + physicalInputTypes: Seq[TypeInformation[_]], groupings: Array[Int]) : GroupCombineFunction[Row, Row] = { val needRetract = false val (aggFieldIndexes, aggregates) = transformToAggregateFunctions( namedAggregates.map(_.getKey), - inputType, + physicalInputRowType, needRetract) val aggMapping = aggregates.indices.map(_ + groupings.length).toArray @@ -695,18 +710,18 @@ object AggregateUtil { window match { - case EventTimeSessionGroupWindow(_, _, gap) => + case SessionGroupWindow(_, _, gap) => val combineReturnType: RowTypeInfo = createDataSetAggregateBufferDataType( groupings, aggregates, - inputType, + physicalInputRowType, Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO))) val genFunction = generator.generateAggregations( "GroupingWindowAggregateHelper", generator, - inputType, + physicalInputTypes, aggregates, aggFieldIndexes, aggMapping, @@ -742,6 +757,7 @@ object AggregateUtil { generator: CodeGenerator, namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, + inputFieldTypeInfo: Seq[TypeInformation[_]], outputType: RelDataType, groupings: Array[Int], inGroupingSet: Boolean): (Option[DataSetPreAggFunction], @@ -786,7 +802,7 @@ object AggregateUtil { val genPreAggFunction = generator.generateAggregations( "DataSetAggregatePrepareMapHelper", generator, - inputType, + inputFieldTypeInfo, aggregates, aggInFields, aggregates.indices.map(_ + groupings.length).toArray, @@ -813,7 +829,7 @@ object AggregateUtil { val genFinalAggFunction = generator.generateAggregations( "DataSetAggregateFinalHelper", generator, - inputType, + inputFieldTypeInfo, aggregates, aggInFields, aggOutFields, @@ -837,7 +853,7 @@ object AggregateUtil { val genFunction = generator.generateAggregations( "DataSetAggregateHelper", generator, - inputType, + inputFieldTypeInfo, aggregates, aggInFields, aggOutFields, @@ -914,6 +930,7 @@ object AggregateUtil { generator: CodeGenerator, namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, + inputFieldTypeInfo: Seq[TypeInformation[_]], outputType: RelDataType, needMerge: Boolean) : (DataStreamAggFunction[Row, Row, Row], RowTypeInfo, RowTypeInfo) = { @@ -931,7 +948,7 @@ object AggregateUtil { val genFunction = generator.generateAggregations( "GroupingWindowAggregateHelper", generator, - inputType, + inputFieldTypeInfo, aggregates, aggFields, aggMapping, @@ -1047,12 +1064,9 @@ object AggregateUtil { private def isTimeWindow(window: LogicalWindow) = { window match { - case ProcessingTimeTumblingGroupWindow(_, size) => isTimeInterval(size.resultType) - case ProcessingTimeSlidingGroupWindow(_, size, _) => isTimeInterval(size.resultType) - case ProcessingTimeSessionGroupWindow(_, _) => true - case EventTimeTumblingGroupWindow(_, _, size) => isTimeInterval(size.resultType) - case EventTimeSlidingGroupWindow(_, _, size, _) => isTimeInterval(size.resultType) - case EventTimeSessionGroupWindow(_, _, _) => true + case TumblingGroupWindow(_, _, size) => isTimeIntervalLiteral(size) + case SlidingGroupWindow(_, _, size, _) => isTimeIntervalLiteral(size) + case SessionGroupWindow(_, _, _) => true } } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala index 03ca02c..ef97e71 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala @@ -21,7 +21,7 @@ import java.util.{List => JList, ArrayList => JArrayList} import org.apache.flink.api.common.state._ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo} +import org.apache.flink.api.java.typeutils.ListTypeInfo import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler} @@ -39,8 +39,8 @@ import org.slf4j.LoggerFactory */ class RowTimeBoundedRangeOver( genAggregations: GeneratedAggregationsFunction, - aggregationStateType: RowTypeInfo, - inputRowType: RowTypeInfo, + aggregationStateType: TypeInformation[Row], + inputRowType: TypeInformation[Row], precedingOffset: Long) extends ProcessFunction[Row, Row] with Compiler[GeneratedAggregations] { http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala index 4a9a14c..7169cf7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala @@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory class RowTimeBoundedRowsOver( genAggregations: GeneratedAggregationsFunction, aggregationStateType: RowTypeInfo, - inputRowType: RowTypeInfo, + inputRowType: TypeInformation[Row], precedingOffset: Long) extends ProcessFunction[Row, Row] with Compiler[GeneratedAggregations] { http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala new file mode 100644 index 0000000..8466cdf --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.api.java.tuple.Tuple2 + +/** + * Defines logical time attributes for a [[TableSource]]. Time attributes can be used for + * indicating, accessing, and working with Flink's event-time or processing-time. A + * [[TableSource]] that implements this interface can define names and positions of rowtime + * and proctime attributes in the rows it produces. + */ +trait DefinedTimeAttributes { + + /** + * Defines a name and position (starting at 0) of rowtime attribute that represents Flink's + * event-time. Null if no rowtime should be available. If the position is within the arity of + * the result row, the logical attribute will overwrite the physical attribute. If the position + * is higher than the result row, the time attribute will be appended logically. + */ + def getRowtimeAttribute: Tuple2[Int, String] + + /** + * Defines a name and position (starting at 0) of proctime attribute that represents Flink's + * processing-time. Null if no proctime should be available. If the position is within the arity + * of the result row, the logical attribute will overwrite the physical attribute. If the + * position is higher than the result row, the time attribute will be appended logically. + */ + def getProctimeAttribute: Tuple2[Int, String] + +} http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala new file mode 100644 index 0000000..31dcb5c --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.typeutils + +import java.sql.Timestamp + +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.api.common.typeutils.base.{SqlTimestampComparator, SqlTimestampSerializer} + +/** + * Type information for indicating event or processing time. However, it behaves like a + * regular SQL timestamp. + */ +class TimeIndicatorTypeInfo(val isEventTime: Boolean) + extends SqlTimeTypeInfo[Timestamp]( + classOf[Timestamp], + SqlTimestampSerializer.INSTANCE, + classOf[SqlTimestampComparator].asInstanceOf[Class[TypeComparator[Timestamp]]]) { + + override def toString: String = s"TimeIndicatorTypeInfo" +} + +object TimeIndicatorTypeInfo { + + val ROWTIME_INDICATOR = new TimeIndicatorTypeInfo(true) + val PROCTIME_INDICATOR = new TimeIndicatorTypeInfo(false) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala index 40f0cf2..9896a8c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.table.typeutils -import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo._ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo import org.apache.flink.table.validate._ @@ -29,6 +29,7 @@ object TypeCheckUtils { * SQL type but NOT vice versa. */ def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match { + case _: TimeIndicatorTypeInfo => false case _: BasicTypeInfo[_] => false case _: SqlTimeTypeInfo[_] => false case _: TimeIntervalTypeInfo[_] => false @@ -64,6 +65,8 @@ object TypeCheckUtils { def isInteger(dataType: TypeInformation[_]): Boolean = dataType == INT_TYPE_INFO + def isLong(dataType: TypeInformation[_]): Boolean = dataType == LONG_TYPE_INFO + def isArray(dataType: TypeInformation[_]): Boolean = dataType match { case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => true case _ => false