[FLINK-7337] [table] Refactor internal handling of time indicator attributes.
- Expand phyiscal Row schema for time indicators. - Refactor computation of logical schema of tables to import. - Refactor operators to use time attribute in Row instead of StreamRecord timestamp. This closes #4488. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93d0ae4a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93d0ae4a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93d0ae4a Branch: refs/heads/master Commit: 93d0ae4a9f059da4bd2b720f7503da0f9c0a8c7c Parents: 6ed5815 Author: Fabian Hueske <fhue...@apache.org> Authored: Fri Aug 4 02:20:56 2017 +0200 Committer: twalthr <twal...@apache.org> Committed: Wed Aug 23 10:09:21 2017 +0200 ---------------------------------------------------------------------- .../table/api/StreamTableEnvironment.scala | 155 ++++- .../flink/table/api/TableEnvironment.scala | 8 +- .../flink/table/calcite/FlinkTypeFactory.scala | 37 +- .../calcite/RelTimeIndicatorConverter.scala | 67 +-- .../flink/table/codegen/CodeGenerator.scala | 78 ++- .../table/codegen/calls/ScalarOperators.scala | 44 +- .../table/functions/ProctimeSqlFunction.scala | 41 ++ .../TimeMaterializationSqlFunction.scala | 41 -- .../flink/table/plan/nodes/CommonCalc.scala | 6 +- .../table/plan/nodes/CommonCorrelate.scala | 20 +- .../plan/nodes/PhysicalTableSourceScan.scala | 4 +- .../nodes/dataset/BatchTableSourceScan.scala | 4 +- .../plan/nodes/datastream/DataStreamCalc.scala | 12 +- .../nodes/datastream/DataStreamCorrelate.scala | 6 +- .../datastream/DataStreamGroupAggregate.scala | 34 +- .../DataStreamGroupWindowAggregate.scala | 84 +-- .../datastream/DataStreamOverAggregate.scala | 149 ++--- .../plan/nodes/datastream/DataStreamScan.scala | 2 +- .../plan/nodes/datastream/DataStreamSort.scala | 20 +- .../plan/nodes/datastream/DataStreamUnion.scala | 6 +- .../nodes/datastream/DataStreamValues.scala | 12 +- .../nodes/datastream/DataStreamWindowJoin.scala | 16 +- .../plan/nodes/datastream/StreamScan.scala | 36 +- .../datastream/StreamTableSourceScan.scala | 23 +- .../logical/FlinkLogicalTableSourceScan.scala | 23 +- .../datastream/DataStreamWindowJoinRule.scala | 2 +- .../table/plan/schema/DataStreamTable.scala | 13 - .../flink/table/plan/schema/FlinkTable.scala | 10 +- .../flink/table/plan/schema/RowSchema.scala | 124 +--- .../plan/schema/StreamTableSourceTable.scala | 48 +- .../runtime/CRowInputTupleOutputMapRunner.scala | 2 + .../table/runtime/CRowOutputMapRunner.scala | 60 -- .../table/runtime/CRowOutputProcessRunner.scala | 72 +++ .../TimestampSetterProcessFunction.scala | 52 ++ ...WrappingTimestampSetterProcessFunction.scala | 61 ++ .../table/runtime/aggregate/AggregateUtil.scala | 54 +- ...SetSessionWindowAggReduceGroupFunction.scala | 5 +- ...taSetSlideWindowAggReduceGroupFunction.scala | 5 +- ...TumbleTimeWindowAggReduceGroupFunction.scala | 2 +- ...rementalAggregateAllTimeWindowFunction.scala | 15 +- ...IncrementalAggregateTimeWindowFunction.scala | 19 +- .../aggregate/ProcTimeBoundedRangeOver.scala | 4 + .../aggregate/ProcTimeSortProcessFunction.scala | 8 +- .../aggregate/RowTimeBoundedRangeOver.scala | 9 +- .../aggregate/RowTimeBoundedRowsOver.scala | 9 +- .../aggregate/RowTimeSortProcessFunction.scala | 20 +- .../aggregate/RowTimeUnboundedOver.scala | 19 +- .../table/runtime/aggregate/SortUtil.scala | 5 +- .../aggregate/TimeWindowPropertyCollector.scala | 23 +- .../table/runtime/join/WindowJoinUtil.scala | 11 +- .../table/typeutils/TimeIndicatorTypeInfo.scala | 3 + .../flink/table/api/stream/sql/SortTest.scala | 4 +- .../api/stream/table/TableSourceTest.scala | 4 +- .../plan/TimeIndicatorConversionTest.scala | 29 +- .../table/runtime/harness/HarnessTestBase.scala | 22 +- .../table/runtime/harness/JoinHarnessTest.scala | 4 +- .../runtime/harness/NonWindowHarnessTest.scala | 4 +- .../runtime/harness/OverWindowHarnessTest.scala | 586 ++++++++----------- .../SortProcessFunctionHarnessTest.scala | 129 ++-- .../runtime/stream/table/TableSinkITCase.scala | 140 ++++- 60 files changed, 1362 insertions(+), 1143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 7328b2a..c4e1450 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -23,10 +23,8 @@ import _root_.java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.plan.hep.HepMatchOrder -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelNode, RelVisitor} -import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode} -import org.apache.calcite.sql.SqlKind +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelDataTypeFieldImpl, RelRecordType} +import org.apache.calcite.rel.RelNode import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{RuleSet, RuleSets} import org.apache.flink.api.common.functions.MapFunction @@ -38,19 +36,19 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.table.calcite.RelTimeIndicatorConverter +import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter} import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions._ import org.apache.flink.table.plan.nodes.FlinkConventions -import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait, _} +import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait} import org.apache.flink.table.plan.rules.FlinkRuleSets import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable} import org.apache.flink.table.plan.util.UpdatingPlanChecker import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} -import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner} +import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner, WrappingTimestampSetterProcessFunction} import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink} import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource} -import org.apache.flink.table.typeutils.TypeCheckUtils +import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils} import org.apache.flink.types.Row import _root_.scala.collection.JavaConverters._ @@ -270,11 +268,11 @@ abstract class StreamTableEnvironment( * valid Java class identifier. */ private def getConversionMapperWithChanges[OUT]( - physicalTypeInfo: TypeInformation[CRow], - schema: RowSchema, - requestedTypeInfo: TypeInformation[OUT], - functionName: String): - MapFunction[CRow, OUT] = { + physicalTypeInfo: TypeInformation[CRow], + schema: RowSchema, + requestedTypeInfo: TypeInformation[OUT], + functionName: String): + MapFunction[CRow, OUT] = { requestedTypeInfo match { @@ -356,9 +354,7 @@ abstract class StreamTableEnvironment( val dataStreamTable = new DataStreamTable[T]( dataStream, fieldIndexes, - fieldNames, - None, - None + fieldNames ) registerTableInternal(name, dataStreamTable) } @@ -393,12 +389,14 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } + // adjust field indexes and field names + val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, proctime) + val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, proctime) + val dataStreamTable = new DataStreamTable[T]( dataStream, - fieldIndexes, - fieldNames, - rowtime, - proctime + indexesWithIndicatorFields, + namesWithIndicatorFields ) registerTableInternal(name, dataStreamTable) } @@ -502,6 +500,63 @@ abstract class StreamTableEnvironment( } /** + * Injects markers for time indicator fields into the field indexes. + * + * @param fieldIndexes The field indexes into which the time indicators markers are injected. + * @param rowtime An optional rowtime indicator + * @param proctime An optional proctime indicator + * @return An adjusted array of field indexes. + */ + private def adjustFieldIndexes( + fieldIndexes: Array[Int], + rowtime: Option[(Int, String)], + proctime: Option[(Int, String)]): Array[Int] = { + + // inject rowtime field + val withRowtime = rowtime match { + case Some(rt) => fieldIndexes.patch(rt._1, Seq(TimeIndicatorTypeInfo.ROWTIME_MARKER), 0) + case _ => fieldIndexes + } + + // inject proctime field + val withProctime = proctime match { + case Some(pt) => withRowtime.patch(pt._1, Seq(TimeIndicatorTypeInfo.PROCTIME_MARKER), 0) + case _ => withRowtime + } + + withProctime + } + + /** + * Injects names of time indicator fields into the list of field names. + * + * @param fieldNames The array of field names into which the time indicator field names are + * injected. + * @param rowtime An optional rowtime indicator + * @param proctime An optional proctime indicator + * @return An adjusted array of field names. + */ + private def adjustFieldNames( + fieldNames: Array[String], + rowtime: Option[(Int, String)], + proctime: Option[(Int, String)]): Array[String] = { + + // inject rowtime field + val withRowtime = rowtime match { + case Some(rt) => fieldNames.patch(rt._1, Seq(rowtime.get._2), 0) + case _ => fieldNames + } + + // inject proctime field + val withProctime = proctime match { + case Some(pt) => withRowtime.patch(pt._1, Seq(proctime.get._2), 0) + case _ => withRowtime + } + + withProctime + } + + /** * Returns the decoration rule set for this environment * including a custom RuleSet configuration. */ @@ -632,10 +687,21 @@ abstract class StreamTableEnvironment( val relNode = table.getRelNode val dataStreamPlan = optimize(relNode, updatesAsRetraction) - // we convert the logical row type to the output row type - val convertedOutputType = RelTimeIndicatorConverter.convertOutputType(relNode) - - translate(dataStreamPlan, convertedOutputType, queryConfig, withChangeFlag) + // zip original field names with optimized field types + val fieldTypes = relNode.getRowType.getFieldList.asScala + .zip(dataStreamPlan.getRowType.getFieldList.asScala) + // get name of original plan and type of optimized plan + .map(x => (x._1.getName, x._2.getType)) + // add field indexes + .zipWithIndex + // build new field types + .map(x => new RelDataTypeFieldImpl(x._1._1, x._2, x._1._2)) + + // build a record type from list of field types + val rowType = new RelRecordType( + fieldTypes.toList.asInstanceOf[List[RelDataTypeField]].asJava) + + translate(dataStreamPlan, rowType, queryConfig, withChangeFlag) } /** @@ -684,12 +750,41 @@ abstract class StreamTableEnvironment( val rootParallelism = plan.getParallelism - conversion match { - case mapFunction: MapFunction[CRow, A] => - plan.map(mapFunction) - .returns(tpe) - .name(s"to: ${tpe.getTypeClass.getSimpleName}") - .setParallelism(rootParallelism) + val rowtimeFields = logicalType.getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + + if (rowtimeFields.isEmpty) { + // no rowtime field to set + conversion match { + case mapFunction: MapFunction[CRow, A] => + plan.map(mapFunction) + .returns(tpe) + .name(s"to: ${tpe.getTypeClass.getSimpleName}") + .setParallelism(rootParallelism) + } + } else if (rowtimeFields.size == 1) { + // set the only rowtime field as event-time timestamp for DataStream + val mapFunction = conversion match { + case mapFunction: MapFunction[CRow, A] => mapFunction + case _ => new MapFunction[CRow, A] { + override def map(cRow: CRow): A = cRow.asInstanceOf[A] + } + } + + plan.process( + new WrappingTimestampSetterProcessFunction[A]( + mapFunction, + rowtimeFields.head.getIndex)) + .returns(tpe) + .name(s"to: ${tpe.getTypeClass.getSimpleName}") + .setParallelism(rootParallelism) + + } else { + throw new TableException( + s"Found more than one rowtime field: [${rowtimeFields.map(_.getName).mkString(", ")}] in " + + s"the table that should be converted to a DataStream.\n" + + s"Please select the rowtime field that should be used as event-time timestamp for the " + + s"DataStream by casting all other fields to TIMESTAMP.") } } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 3bca156..b647c51 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -731,15 +731,15 @@ abstract class TableEnvironment(val config: TableConfig) { // validate that at least the field types of physical and logical type match // we do that here to make sure that plan translation was correct - if (schema.physicalTypeInfo != inputTypeInfo) { + if (schema.typeInfo != inputTypeInfo) { throw TableException( s"The field types of physical and logical row types do not match. " + - s"Physical type is [${schema.physicalTypeInfo}], Logical type is [${inputTypeInfo}]. " + + s"Physical type is [${schema.typeInfo}], Logical type is [${inputTypeInfo}]. " + s"This is a bug and should not happen. Please file an issue.") } - val fieldTypes = schema.physicalFieldTypeInfo - val fieldNames = schema.physicalFieldNames + val fieldTypes = schema.fieldTypeInfos + val fieldNames = schema.fieldNames // validate requested type if (requestedTypeInfo.getArity != fieldTypes.length) { http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index dbefe20..637e8cc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -172,45 +172,20 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp * * @param fieldNames field names * @param fieldTypes field types, every element is Flink's [[TypeInformation]] - * @param rowtime optional system field to indicate event-time; the index determines the index - * in the final record. If the index is smaller than the number of specified - * fields, it shifts all following fields. - * @param proctime optional system field to indicate processing-time; the index determines the - * index in the final record. If the index is smaller than the number of - * specified fields, it shifts all following fields. * @return a struct type with the input fieldNames, input fieldTypes, and system fields */ def buildLogicalRowType( fieldNames: Seq[String], - fieldTypes: Seq[TypeInformation[_]], - rowtime: Option[(Int, String)], - proctime: Option[(Int, String)]) + fieldTypes: Seq[TypeInformation[_]]) : RelDataType = { val logicalRowTypeBuilder = builder val fields = fieldNames.zip(fieldTypes) - - var totalNumberOfFields = fields.length - if (rowtime.isDefined) { - totalNumberOfFields += 1 - } - if (proctime.isDefined) { - totalNumberOfFields += 1 - } - - var addedTimeAttributes = 0 - for (i <- 0 until totalNumberOfFields) { - if (rowtime.isDefined && rowtime.get._1 == i) { - logicalRowTypeBuilder.add(rowtime.get._2, createRowtimeIndicatorType()) - addedTimeAttributes += 1 - } else if (proctime.isDefined && proctime.get._1 == i) { - logicalRowTypeBuilder.add(proctime.get._2, createProctimeIndicatorType()) - addedTimeAttributes += 1 - } else { - val field = fields(i - addedTimeAttributes) - logicalRowTypeBuilder.add(field._1, createTypeFromTypeInfo(field._2, isNullable = true)) - } - } + fields.foreach(f => { + // time indicators are not nullable + val nullable = !FlinkTypeFactory.isTimeIndicatorType(f._2) + logicalRowTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2, nullable)) + }) logicalRowTypeBuilder.build } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala index eb14291..717a1af 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.calcite -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl, RelRecordType} +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core._ import org.apache.calcite.rel.logical._ import org.apache.calcite.rel.{RelNode, RelShuttle} @@ -26,8 +26,8 @@ import org.apache.calcite.rex._ import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo import org.apache.flink.table.api.{TableException, ValidationException} -import org.apache.flink.table.calcite.FlinkTypeFactory.isTimeIndicatorType -import org.apache.flink.table.functions.TimeMaterializationSqlFunction +import org.apache.flink.table.calcite.FlinkTypeFactory.{isRowtimeIndicatorType, _} +import org.apache.flink.table.functions.ProctimeSqlFunction import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType @@ -242,9 +242,13 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { case lp: LogicalProject => val projects = lp.getProjects.zipWithIndex.map { case (expr, idx) => if (isTimeIndicatorType(expr.getType) && refIndices.contains(idx)) { - rexBuilder.makeCall( - TimeMaterializationSqlFunction, - expr) + if (isRowtimeIndicatorType(expr.getType)) { + // cast rowtime indicator to regular timestamp + rexBuilder.makeAbstractCast(timestamp, expr) + } else { + // generate proctime access + rexBuilder.makeCall(ProctimeSqlFunction, expr) + } } else { expr } @@ -259,9 +263,17 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { case _ => val projects = input.getRowType.getFieldList.map { field => if (isTimeIndicatorType(field.getType) && refIndices.contains(field.getIndex)) { - rexBuilder.makeCall( - TimeMaterializationSqlFunction, - new RexInputRef(field.getIndex, field.getType)) + if (isRowtimeIndicatorType(field.getType)) { + // cast rowtime indicator to regular timestamp + rexBuilder.makeAbstractCast( + timestamp, + new RexInputRef(field.getIndex, field.getType)) + } else { + // generate proctime access + rexBuilder.makeCall( + ProctimeSqlFunction, + new RexInputRef(field.getIndex, field.getType)) + } } else { new RexInputRef(field.getIndex, field.getType) } @@ -311,19 +323,19 @@ object RelTimeIndicatorConverter { var needsConversion = false - // materialize all remaining time indicators + // materialize remaining proctime indicators val projects = convertedRoot.getRowType.getFieldList.map(field => - if (isTimeIndicatorType(field.getType)) { + if (isProctimeIndicatorType(field.getType)) { needsConversion = true rexBuilder.makeCall( - TimeMaterializationSqlFunction, + ProctimeSqlFunction, new RexInputRef(field.getIndex, field.getType)) } else { new RexInputRef(field.getIndex, field.getType) } ) - // add final conversion + // add final conversion if necessary if (needsConversion) { LogicalProject.create( convertedRoot, @@ -334,27 +346,6 @@ object RelTimeIndicatorConverter { } } - def convertOutputType(rootRel: RelNode): RelDataType = { - - val timestamp = rootRel - .getCluster - .getRexBuilder - .getTypeFactory - .asInstanceOf[FlinkTypeFactory] - .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false) - - // convert all time indicators types to timestamps - val fields = rootRel.getRowType.getFieldList.map { field => - if (isTimeIndicatorType(field.getType)) { - new RelDataTypeFieldImpl(field.getName, field.getIndex, timestamp) - } else { - field - } - } - - new RelRecordType(fields) - } - /** * Materializes time indicator accesses in an expression. * @@ -415,7 +406,13 @@ class RexTimeIndicatorMaterializer( case _ => updatedCall.getOperands.map { o => if (isTimeIndicatorType(o.getType)) { - rexBuilder.makeCall(TimeMaterializationSqlFunction, o) + if (isRowtimeIndicatorType(o.getType)) { + // cast rowtime indicator to regular timestamp + rexBuilder.makeAbstractCast(timestamp, o) + } else { + // generate proctime access + rexBuilder.makeCall(ProctimeSqlFunction, o) + } } else { o } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 28fea59..be55eac 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -37,11 +37,12 @@ import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenUtils._ import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE} +import org.apache.flink.table.codegen.calls.FunctionGenerator import org.apache.flink.table.codegen.calls.ScalarOperators._ -import org.apache.flink.table.codegen.calls.{BuiltInMethods, FunctionGenerator} import org.apache.flink.table.functions.sql.ScalarSqlFunctions import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils -import org.apache.flink.table.functions.{FunctionContext, TimeMaterializationSqlFunction, UserDefinedFunction} +import org.apache.flink.table.functions.{FunctionContext, ProctimeSqlFunction, UserDefinedFunction} +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import org.apache.flink.table.typeutils.TypeCheckUtils._ import scala.collection.JavaConversions._ @@ -56,10 +57,11 @@ import scala.collection.mutable * @param nullableInput input(s) can be null. * @param input1 type information about the first input of the Function * @param input2 type information about the second input if the Function is binary - * @param input1FieldMapping additional mapping information for input1 - * (e.g. POJO types have no deterministic field order and some input fields might not be read) - * @param input2FieldMapping additional mapping information for input2 - * (e.g. POJO types have no deterministic field order and some input fields might not be read) + * @param input1FieldMapping additional mapping information for input1. + * POJO types have no deterministic field order and some input fields might not be read. + * The input1FieldMapping is also used to inject time indicator attributes. + * @param input2FieldMapping additional mapping information for input2. + * POJO types have no deterministic field order and some input fields might not be read. */ abstract class CodeGenerator( config: TableConfig, @@ -245,16 +247,23 @@ abstract class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { - val input1AccessExprs = input1Mapping.map { idx => - generateInputAccess(input1, input1Term, idx) + + val input1AccessExprs = input1Mapping.map { + case TimeIndicatorTypeInfo.ROWTIME_MARKER => + // attribute is a rowtime indicator. Access event-time timestamp in StreamRecord. + generateRowtimeAccess() + case TimeIndicatorTypeInfo.PROCTIME_MARKER => + // attribute is proctime indicator. + // We use a null literal and generate a timestamp when we need it. + generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR) + case idx => + // regular attribute. Access attribute in input data type. + generateInputAccess(input1, input1Term, idx) } val input2AccessExprs = input2 match { case Some(ti) => - input2Mapping.map { idx => - generateInputAccess(ti, input2Term, idx) - }.toSeq - + input2Mapping.map(idx => generateInputAccess(ti, input2Term, idx)).toSeq case None => Seq() // add nothing } @@ -724,10 +733,8 @@ abstract class CodeGenerator( override def visitCall(call: RexCall): GeneratedExpression = { // special case: time materialization - if (call.getOperator == TimeMaterializationSqlFunction) { - return generateRecordTimestamp( - FlinkTypeFactory.isRowtimeIndicatorType(call.getOperands.get(0).getType) - ) + if (call.getOperator == ProctimeSqlFunction) { + return generateProctimeTimestamp() } val resultType = FlinkTypeFactory.toTypeInfo(call.getType) @@ -967,10 +974,10 @@ abstract class CodeGenerator( generateArrayElement(this, array) case ScalarSqlFunctions.CONCAT => - generateConcat(BuiltInMethods.CONCAT, operands) + generateConcat(this.nullCheck, operands) case ScalarSqlFunctions.CONCAT_WS => - generateConcat(BuiltInMethods.CONCAT_WS, operands) + generateConcatWs(operands) // advanced scalar functions case sqlOperator: SqlOperator => @@ -1216,8 +1223,14 @@ abstract class CodeGenerator( |""".stripMargin } else if (nullCheck) { s""" - |$resultTypeTerm $resultTerm = $unboxedFieldCode; |boolean $nullTerm = $fieldTerm == null; + |$resultTypeTerm $resultTerm; + |if ($nullTerm) { + | $resultTerm = $defaultValue; + |} + |else { + | $resultTerm = $unboxedFieldCode; + |} |""".stripMargin } else { s""" @@ -1270,27 +1283,32 @@ abstract class CodeGenerator( } } - private[flink] def generateRecordTimestamp(isEventTime: Boolean): GeneratedExpression = { + private[flink] def generateRowtimeAccess(): GeneratedExpression = { val resultTerm = newName("result") - val resultTypeTerm = primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP) + val nullTerm = newName("isNull") - val resultCode = if (isEventTime) { + val accessCode = s""" - |$resultTypeTerm $resultTerm; - |if ($contextTerm.timestamp() == null) { + |Long $resultTerm = $contextTerm.timestamp(); + |if ($resultTerm == null) { | throw new RuntimeException("Rowtime timestamp is null. Please make sure that a proper " + | "TimestampAssigner is defined and the stream environment uses the EventTime time " + | "characteristic."); |} - |else { - | $resultTerm = $contextTerm.timestamp(); - |} - |""".stripMargin - } else { + |boolean $nullTerm = false; + """.stripMargin + + GeneratedExpression(resultTerm, nullTerm, accessCode, TimeIndicatorTypeInfo.ROWTIME_INDICATOR) + } + + private[flink] def generateProctimeTimestamp(): GeneratedExpression = { + val resultTerm = newName("result") + val resultTypeTerm = primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP) + + val resultCode = s""" |$resultTypeTerm $resultTerm = $contextTerm.timerService().currentProcessingTime(); |""".stripMargin - } GeneratedExpression(resultTerm, NEVER_NULL, resultCode, SqlTimeTypeInfo.TIMESTAMP) } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala index 1ab927d..01e9dff 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala @@ -17,8 +17,6 @@ */ package org.apache.flink.table.codegen.calls -import java.lang.reflect.Method - import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange} import org.apache.calcite.util.BuiltInMethod @@ -1026,14 +1024,48 @@ object ScalarOperators { } def generateConcat( - method: Method, - operands: Seq[GeneratedExpression]): GeneratedExpression = { + nullCheck: Boolean, + operands: Seq[GeneratedExpression]) + : GeneratedExpression = { - generateCallIfArgsNotNull(false, STRING_TYPE_INFO, operands) { - (terms) =>s"${qualifyMethod(method)}(${terms.mkString(", ")})" + generateCallIfArgsNotNull(nullCheck, STRING_TYPE_INFO, operands) { + (terms) =>s"${qualifyMethod(BuiltInMethods.CONCAT)}(${terms.mkString(", ")})" } } + def generateConcatWs(operands: Seq[GeneratedExpression]): GeneratedExpression = { + + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val defaultValue = primitiveDefaultValue(Types.STRING) + + val tempTerms = operands.tail.map(_ => newName("temp")) + + val operatorCode = + s""" + |${operands.map(_.code).mkString("\n")} + | + |String $resultTerm; + |boolean $nullTerm; + |if (${operands.head.nullTerm}) { + | $nullTerm = true; + | $resultTerm = $defaultValue; + |} else { + | ${operands.tail.zip(tempTerms).map { + case (o: GeneratedExpression, t: String) => + s"String $t;\n" + + s" if (${o.nullTerm}) $t = null; else $t = ${o.resultTerm};" + }.mkString("\n") + } + | $nullTerm = false; + | $resultTerm = ${qualifyMethod(BuiltInMethods.CONCAT_WS)} + | (${operands.head.resultTerm}, ${tempTerms.mkString(", ")}); + |} + |""".stripMargin + + GeneratedExpression(resultTerm, nullTerm, operatorCode, Types.STRING) + } + def generateMapGet( codeGenerator: CodeGenerator, map: GeneratedExpression, http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala new file mode 100644 index 0000000..4fb0378 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions + +import org.apache.calcite.sql._ +import org.apache.calcite.sql.`type`._ +import org.apache.calcite.sql.validate.SqlMonotonicity + +/** + * Function that materializes a processing time attribute. + * After materialization the result can be used in regular arithmetical calculations. + */ +object ProctimeSqlFunction + extends SqlFunction( + "PROCTIME", + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.TIMESTAMP), + InferTypes.RETURN_TYPE, + OperandTypes.family(SqlTypeFamily.TIMESTAMP), + SqlFunctionCategory.SYSTEM) { + + override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION + + override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity = + SqlMonotonicity.INCREASING +} http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeMaterializationSqlFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeMaterializationSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeMaterializationSqlFunction.scala deleted file mode 100644 index d875026..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeMaterializationSqlFunction.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.functions - -import org.apache.calcite.sql._ -import org.apache.calcite.sql.`type`._ -import org.apache.calcite.sql.validate.SqlMonotonicity - -/** - * Function that materializes a time attribute to the metadata timestamp. After materialization - * the result can be used in regular arithmetical calculations. - */ -object TimeMaterializationSqlFunction - extends SqlFunction( - "TIME_MATERIALIZATION", - SqlKind.OTHER_FUNCTION, - ReturnTypes.explicit(SqlTypeName.TIMESTAMP), - InferTypes.RETURN_TYPE, - OperandTypes.family(SqlTypeFamily.TIMESTAMP), - SqlFunctionCategory.SYSTEM) { - - override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION - - override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity = - SqlMonotonicity.INCREASING -} http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala index 3e355ff..2f1871b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala @@ -42,8 +42,8 @@ trait CommonCalc { GeneratedFunction[T, Row] = { val projection = generator.generateResultExpression( - returnSchema.physicalTypeInfo, - returnSchema.physicalFieldNames, + returnSchema.typeInfo, + returnSchema.fieldNames, calcProjection) // only projection @@ -80,7 +80,7 @@ trait CommonCalc { ruleDescription, functionClass, body, - returnSchema.physicalTypeInfo) + returnSchema.typeInfo) } private[flink] def conditionToString( http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala index 96aaf3e..7c01fde 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala @@ -53,12 +53,10 @@ trait CommonCorrelate { functionClass: Class[T]): GeneratedFunction[T, Row] = { - val physicalRexCall = inputSchema.mapRexNode(rexCall) - val functionGenerator = new FunctionCodeGenerator( config, false, - inputSchema.physicalTypeInfo, + inputSchema.typeInfo, Some(udtfTypeInfo), None, pojoFieldMapping) @@ -69,7 +67,7 @@ trait CommonCorrelate { .addReusableConstructor(classOf[TableFunctionCollector[_]]) .head - val call = functionGenerator.generateExpression(physicalRexCall) + val call = functionGenerator.generateExpression(rexCall) var body = s""" |${call.resultTerm}.setCollector($collectorTerm); @@ -90,8 +88,8 @@ trait CommonCorrelate { } val outerResultExpr = functionGenerator.generateResultExpression( input1AccessExprs ++ input2NullExprs, - returnSchema.physicalTypeInfo, - returnSchema.physicalFieldNames) + returnSchema.typeInfo, + returnSchema.fieldNames) body += s""" |boolean hasOutput = $collectorTerm.isCollected(); @@ -108,7 +106,7 @@ trait CommonCorrelate { ruleDescription, functionClass, body, - returnSchema.physicalTypeInfo) + returnSchema.typeInfo) } /** @@ -126,7 +124,7 @@ trait CommonCorrelate { val generator = new CollectorCodeGenerator( config, false, - inputSchema.physicalTypeInfo, + inputSchema.typeInfo, Some(udtfTypeInfo), None, pojoFieldMapping) @@ -135,8 +133,8 @@ trait CommonCorrelate { val crossResultExpr = generator.generateResultExpression( input1AccessExprs ++ input2AccessExprs, - returnSchema.physicalTypeInfo, - returnSchema.physicalFieldNames) + returnSchema.typeInfo, + returnSchema.fieldNames) val collectorCode = if (condition.isEmpty) { s""" @@ -148,7 +146,7 @@ trait CommonCorrelate { // adjust indicies of InputRefs to adhere to schema expected by generator val changeInputRefIndexShuttle = new RexShuttle { override def visitInputRef(inputRef: RexInputRef): RexNode = { - new RexInputRef(inputSchema.physicalArity + inputRef.getIndex, inputRef.getType) + new RexInputRef(inputSchema.arity + inputRef.getIndex, inputRef.getType) } } // Run generateExpression to add init statements (ScalarFunctions) of condition to generator. http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala index dc7a0d6..5872d8c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala @@ -39,9 +39,7 @@ abstract class PhysicalTableSourceScan( val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] flinkTypeFactory.buildLogicalRowType( TableEnvironment.getFieldNames(tableSource), - TableEnvironment.getFieldTypes(tableSource.getReturnType), - None, - None) + TableEnvironment.getFieldTypes(tableSource.getReturnType)) } override def explainTerms(pw: RelWriter): RelWriter = { http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala index fb291e4..74aac43 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -42,9 +42,7 @@ class BatchTableSourceScan( val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] flinkTypeFactory.buildLogicalRowType( TableEnvironment.getFieldNames(tableSource), - TableEnvironment.getFieldTypes(tableSource.getReturnType), - None, - None) + TableEnvironment.getFieldTypes(tableSource.getReturnType)) } override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala index 2e00330..45e6902 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala @@ -52,7 +52,7 @@ class DataStreamCalc( with CommonCalc with DataStreamRel { - override def deriveRowType(): RelDataType = schema.logicalType + override def deriveRowType(): RelDataType = schema.relDataType override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = { new DataStreamCalc( @@ -100,7 +100,7 @@ class DataStreamCalc( val condition = if (calcProgram.getCondition != null) { val materializedCondition = RelTimeIndicatorConverter.convertExpression( calcProgram.expandLocalRef(calcProgram.getCondition), - inputSchema.logicalType, + inputSchema.relDataType, cluster.getRexBuilder) Some(materializedCondition) } else { @@ -110,12 +110,8 @@ class DataStreamCalc( // filter out time attributes val projection = calcProgram.getProjectList.asScala .map(calcProgram.expandLocalRef) - // time indicator fields must not be part of the code generation - .filter(expr => !FlinkTypeFactory.isTimeIndicatorType(expr.getType)) - // update indices - .map(expr => inputSchema.mapRexNode(expr)) - val generator = new FunctionCodeGenerator(config, false, inputSchema.physicalTypeInfo) + val generator = new FunctionCodeGenerator(config, false, inputSchema.typeInfo) val genFunction = generateFunction( generator, @@ -132,7 +128,7 @@ class DataStreamCalc( val processFunc = new CRowProcessRunner( genFunction.name, genFunction.code, - CRowTypeInfo(schema.physicalTypeInfo)) + CRowTypeInfo(schema.typeInfo)) inputDataStream .process(processFunc) http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala index b7165cd..18ab2a3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala @@ -50,7 +50,7 @@ class DataStreamCorrelate( with CommonCorrelate with DataStreamRel { - override def deriveRowType() = schema.logicalType + override def deriveRowType() = schema.relDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamCorrelate( @@ -78,7 +78,7 @@ class DataStreamCorrelate( super.explainTerms(pw) .item("invocation", scan.getCall) .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName) - .item("rowType", schema.logicalType) + .item("rowType", schema.relDataType) .item("joinType", joinType) .itemIf("condition", condition.orNull, condition.isDefined) } @@ -130,7 +130,7 @@ class DataStreamCorrelate( .process(processFunc) // preserve input parallelism to ensure that acc and retract messages remain in order .setParallelism(inputParallelism) - .name(correlateOpName(rexCall, sqlFunction, schema.logicalType)) + .name(correlateOpName(rexCall, sqlFunction, schema.relDataType)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala index 12694fc..590d9be 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala @@ -59,7 +59,7 @@ class DataStreamGroupAggregate( private val LOG = LoggerFactory.getLogger(this.getClass) - override def deriveRowType() = schema.logicalType + override def deriveRowType() = schema.relDataType override def needsUpdatesAsRetraction = true @@ -83,20 +83,20 @@ class DataStreamGroupAggregate( override def toString: String = { s"Aggregate(${ if (!groupings.isEmpty) { - s"groupBy: (${groupingToString(inputSchema.logicalType, groupings)}), " + s"groupBy: (${groupingToString(inputSchema.relDataType, groupings)}), " } else { "" } }select:(${aggregationToString( - inputSchema.logicalType, groupings, getRowType, namedAggregates, Nil)}))" + inputSchema.relDataType, groupings, getRowType, namedAggregates, Nil)}))" } override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) .itemIf("groupBy", groupingToString( - inputSchema.logicalType, groupings), !groupings.isEmpty) + inputSchema.relDataType, groupings), !groupings.isEmpty) .item("select", aggregationToString( - inputSchema.logicalType, groupings, getRowType, namedAggregates, Nil)) + inputSchema.relDataType, groupings, getRowType, namedAggregates, Nil)) } override def translateToPlan( @@ -112,37 +112,29 @@ class DataStreamGroupAggregate( val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) - val physicalNamedAggregates = namedAggregates.map { namedAggregate => - new CalcitePair[AggregateCall, String]( - inputSchema.mapAggregateCall(namedAggregate.left), - namedAggregate.right) - } - - val outRowType = CRowTypeInfo(schema.physicalTypeInfo) + val outRowType = CRowTypeInfo(schema.typeInfo) val generator = new AggregationCodeGenerator( tableEnv.getConfig, false, - inputSchema.physicalTypeInfo) + inputSchema.typeInfo) val aggString = aggregationToString( - inputSchema.logicalType, + inputSchema.relDataType, groupings, getRowType, namedAggregates, Nil) - val keyedAggOpName = s"groupBy: (${groupingToString(inputSchema.logicalType, groupings)}), " + + val keyedAggOpName = s"groupBy: (${groupingToString(inputSchema.relDataType, groupings)}), " + s"select: ($aggString)" val nonKeyedAggOpName = s"select: ($aggString)" - val physicalGrouping = groupings.map(inputSchema.mapIndex) - val processFunction = AggregateUtil.createGroupAggregateFunction( generator, - physicalNamedAggregates, - inputSchema.logicalType, - inputSchema.physicalFieldTypeInfo, + namedAggregates, + inputSchema.relDataType, + inputSchema.fieldTypeInfos, groupings, queryConfig, DataStreamRetractionRules.isAccRetract(this), @@ -150,7 +142,7 @@ class DataStreamGroupAggregate( val result: DataStream[CRow] = // grouped / keyed aggregation - if (physicalGrouping.nonEmpty) { + if (groupings.nonEmpty) { inputDS .keyBy(groupings: _*) .process(processFunction) http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index c4ffdb1..ac63be1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -32,11 +32,13 @@ import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.AggregationCodeGenerator import org.apache.flink.table.expressions.ExpressionUtils._ +import org.apache.flink.table.expressions.ResolvedFieldReference import org.apache.flink.table.plan.logical._ import org.apache.flink.table.plan.nodes.CommonAggregate import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._ import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules +import org.apache.flink.table.runtime.TimestampSetterProcessFunction import org.apache.flink.table.runtime.aggregate.AggregateUtil._ import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} @@ -58,7 +60,7 @@ class DataStreamGroupWindowAggregate( private val LOG = LoggerFactory.getLogger(this.getClass) - override def deriveRowType(): RelDataType = schema.logicalType + override def deriveRowType(): RelDataType = schema.relDataType override def needsUpdatesAsRetraction = true @@ -84,14 +86,14 @@ class DataStreamGroupWindowAggregate( override def toString: String = { s"Aggregate(${ if (!grouping.isEmpty) { - s"groupBy: (${groupingToString(inputSchema.logicalType, grouping)}), " + s"groupBy: (${groupingToString(inputSchema.relDataType, grouping)}), " } else { "" } }window: ($window), " + s"select: (${ aggregationToString( - inputSchema.logicalType, + inputSchema.relDataType, grouping, getRowType, namedAggregates, @@ -101,13 +103,13 @@ class DataStreamGroupWindowAggregate( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) - .itemIf("groupBy", groupingToString(inputSchema.logicalType, grouping), !grouping.isEmpty) + .itemIf("groupBy", groupingToString(inputSchema.relDataType, grouping), !grouping.isEmpty) .item("window", window) .item( "select", aggregationToString( - inputSchema.logicalType, + inputSchema.relDataType, grouping, - schema.logicalType, + schema.relDataType, namedAggregates, namedProperties)) } @@ -118,14 +120,6 @@ class DataStreamGroupWindowAggregate( val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) - val physicalNamedAggregates = namedAggregates.map { namedAggregate => - new CalcitePair[AggregateCall, String]( - inputSchema.mapAggregateCall(namedAggregate.left), - namedAggregate.right) - } - val physicalNamedProperties = namedProperties - .filter(np => !FlinkTypeFactory.isTimeIndicatorType(np.property.resultType)) - val inputIsAccRetract = DataStreamRetractionRules.isAccRetract(input) if (inputIsAccRetract) { @@ -148,16 +142,30 @@ class DataStreamGroupWindowAggregate( "state size. You may specify a retention time of 0 to not clean up the state.") } - val outRowType = CRowTypeInfo(schema.physicalTypeInfo) + val timestampedInput = if (isRowtimeAttribute(window.timeAttribute)) { + // copy the window rowtime attribute into the StreamRecord timestamp field + val timeAttribute = window.timeAttribute.asInstanceOf[ResolvedFieldReference].name + val timeIdx = inputSchema.fieldNames.indexOf(timeAttribute) + + inputDS + .process( + new TimestampSetterProcessFunction(timeIdx, CRowTypeInfo(inputSchema.typeInfo))) + .setParallelism(inputDS.getParallelism) + .name(s"time attribute: ($timeAttribute)") + } else { + inputDS + } + + val outRowType = CRowTypeInfo(schema.typeInfo) val aggString = aggregationToString( - inputSchema.logicalType, + inputSchema.relDataType, grouping, - schema.logicalType, + schema.relDataType, namedAggregates, namedProperties) - val keyedAggOpName = s"groupBy: (${groupingToString(inputSchema.logicalType, grouping)}), " + + val keyedAggOpName = s"groupBy: (${groupingToString(inputSchema.relDataType, grouping)}), " + s"window: ($window), " + s"select: ($aggString)" val nonKeyedAggOpName = s"window: ($window), select: ($aggString)" @@ -165,24 +173,22 @@ class DataStreamGroupWindowAggregate( val generator = new AggregationCodeGenerator( tableEnv.getConfig, false, - inputSchema.physicalTypeInfo) + inputSchema.typeInfo) val needMerge = window match { case SessionGroupWindow(_, _, _) => true case _ => false } - val physicalGrouping = grouping.map(inputSchema.mapIndex) - // grouped / keyed aggregation - if (physicalGrouping.length > 0) { + if (grouping.length > 0) { val windowFunction = AggregateUtil.createAggregationGroupWindowFunction( window, - physicalGrouping.length, - physicalNamedAggregates.size, - schema.physicalArity, - physicalNamedProperties) + grouping.length, + namedAggregates.size, + schema.arity, + namedProperties) - val keyedStream = inputDS.keyBy(physicalGrouping: _*) + val keyedStream = timestampedInput.keyBy(grouping: _*) val windowedStream = createKeyedWindowedStream(queryConfig, window, keyedStream) .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]] @@ -190,11 +196,11 @@ class DataStreamGroupWindowAggregate( val (aggFunction, accumulatorRowType, aggResultRowType) = AggregateUtil.createDataStreamAggregateFunction( generator, - physicalNamedAggregates, - inputSchema.physicalType, - inputSchema.physicalFieldTypeInfo, - schema.physicalType, - physicalGrouping, + namedAggregates, + inputSchema.relDataType, + inputSchema.fieldTypeInfos, + schema.relDataType, + grouping, needMerge) windowedStream @@ -205,20 +211,20 @@ class DataStreamGroupWindowAggregate( else { val windowFunction = AggregateUtil.createAggregationAllWindowFunction( window, - schema.physicalArity, - physicalNamedProperties) + schema.arity, + namedProperties) val windowedStream = - createNonKeyedWindowedStream(queryConfig, window, inputDS) + createNonKeyedWindowedStream(queryConfig, window, timestampedInput) .asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]] val (aggFunction, accumulatorRowType, aggResultRowType) = AggregateUtil.createDataStreamAggregateFunction( generator, - physicalNamedAggregates, - inputSchema.physicalType, - inputSchema.physicalFieldTypeInfo, - schema.physicalType, + namedAggregates, + inputSchema.relDataType, + inputSchema.fieldTypeInfos, + schema.relDataType, Array[Int](), needMerge) http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index 34a7fd8..7bf342a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -50,7 +50,7 @@ class DataStreamOverAggregate( with DataStreamRel { private val LOG = LoggerFactory.getLogger(this.getClass) - override def deriveRowType(): RelDataType = schema.logicalType + override def deriveRowType(): RelDataType = schema.relDataType override def needsUpdatesAsRetraction = true @@ -78,15 +78,15 @@ class DataStreamOverAggregate( super.explainTerms(pw) .itemIf("partitionBy", - partitionToString(schema.logicalType, partitionKeys), partitionKeys.nonEmpty) + partitionToString(schema.relDataType, partitionKeys), partitionKeys.nonEmpty) .item("orderBy", - orderingToString(schema.logicalType, overWindow.orderKeys.getFieldCollations)) + orderingToString(schema.relDataType, overWindow.orderKeys.getFieldCollations)) .itemIf("rows", windowRange(logicWindow, overWindow, inputNode), overWindow.isRows) .itemIf("range", windowRange(logicWindow, overWindow, inputNode), !overWindow.isRows) .item( "select", aggregationToString( - inputSchema.logicalType, - schema.logicalType, + inputSchema.relDataType, + schema.relDataType, namedAggregates)) } @@ -134,67 +134,44 @@ class DataStreamOverAggregate( val generator = new AggregationCodeGenerator( tableEnv.getConfig, false, - inputSchema.physicalTypeInfo) + inputSchema.typeInfo) - val timeType = schema.logicalType + val timeType = schema.relDataType .getFieldList .get(orderKey.getFieldIndex) .getType - timeType match { - case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) => - // proc-time OVER window - if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { - // unbounded OVER window - createUnboundedAndCurrentRowOverWindow( - queryConfig, - generator, - inputDS, - isRowTimeType = false, - isRowsClause = overWindow.isRows) - } else if ( - overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded && - overWindow.upperBound.isCurrentRow) { - - // bounded OVER window - createBoundedAndCurrentRowOverWindow( - queryConfig, - generator, - inputDS, - isRowTimeType = false, - isRowsClause = overWindow.isRows) - } else { - throw new TableException( - "OVER RANGE FOLLOWING windows are not supported yet.") - } - - case _ if FlinkTypeFactory.isRowtimeIndicatorType(timeType) => - // row-time OVER window - if (overWindow.lowerBound.isPreceding && - overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { - // unbounded OVER window - createUnboundedAndCurrentRowOverWindow( - queryConfig, - generator, - inputDS, - isRowTimeType = true, - isRowsClause = overWindow.isRows) - } else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) { - // bounded OVER window - createBoundedAndCurrentRowOverWindow( - queryConfig, - generator, - inputDS, - isRowTimeType = true, - isRowsClause = overWindow.isRows) - } else { - throw new TableException( - "OVER RANGE FOLLOWING windows are not supported yet.") - } - - case _ => - throw new TableException( - s"OVER windows can only be applied on time attributes.") + // identify window rowtime attribute + val rowTimeIdx: Option[Int] = if (FlinkTypeFactory.isRowtimeIndicatorType(timeType)) { + Some(orderKey.getFieldIndex) + } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) { + None + } else { + throw new TableException(s"OVER windows can only be applied on time attributes.") + } + + if (overWindow.lowerBound.isPreceding && overWindow.lowerBound.isUnbounded && + overWindow.upperBound.isCurrentRow) { + // unbounded OVER window + createUnboundedAndCurrentRowOverWindow( + queryConfig, + generator, + inputDS, + rowTimeIdx, + isRowsClause = overWindow.isRows) + } else if ( + overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded && + overWindow.upperBound.isCurrentRow) { + + // bounded OVER window + createBoundedAndCurrentRowOverWindow( + queryConfig, + generator, + inputDS, + rowTimeIdx, + isRowsClause = overWindow.isRows) + } else { + throw new TableException("OVER RANGE FOLLOWING windows are not supported yet.") } } @@ -202,31 +179,26 @@ class DataStreamOverAggregate( queryConfig: StreamQueryConfig, generator: AggregationCodeGenerator, inputDS: DataStream[CRow], - isRowTimeType: Boolean, + rowTimeIdx: Option[Int], isRowsClause: Boolean): DataStream[CRow] = { val overWindow: Group = logicWindow.groups.get(0) - val partitionKeys: Array[Int] = overWindow.keys.toArray.map(schema.mapIndex) + val partitionKeys: Array[Int] = overWindow.keys.toArray - val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates.map { - namedAggregate => - new CalcitePair[AggregateCall, String]( - schema.mapAggregateCall(namedAggregate.left), - namedAggregate.right) - } + val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates // get the output types - val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo) + val returnTypeInfo = CRowTypeInfo(schema.typeInfo) val processFunction = AggregateUtil.createUnboundedOverProcessFunction( generator, namedAggregates, - inputSchema.physicalType, - inputSchema.physicalTypeInfo, - inputSchema.physicalFieldTypeInfo, + inputSchema.relDataType, + inputSchema.typeInfo, + inputSchema.fieldTypeInfos, queryConfig, - isRowTimeType, + rowTimeIdx, partitionKeys.nonEmpty, isRowsClause) @@ -254,34 +226,29 @@ class DataStreamOverAggregate( queryConfig: StreamQueryConfig, generator: AggregationCodeGenerator, inputDS: DataStream[CRow], - isRowTimeType: Boolean, + rowTimeIdx: Option[Int], isRowsClause: Boolean): DataStream[CRow] = { val overWindow: Group = logicWindow.groups.get(0) - val partitionKeys: Array[Int] = overWindow.keys.toArray.map(schema.mapIndex) - val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates.map { - namedAggregate => - new CalcitePair[AggregateCall, String]( - schema.mapAggregateCall(namedAggregate.left), - namedAggregate.right) - } + val partitionKeys: Array[Int] = overWindow.keys.toArray + val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates val precedingOffset = getLowerBoundary(logicWindow, overWindow, getInput()) + (if (isRowsClause) 1 else 0) // get the output types - val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo) + val returnTypeInfo = CRowTypeInfo(schema.typeInfo) val processFunction = AggregateUtil.createBoundedOverProcessFunction( generator, namedAggregates, - inputSchema.physicalType, - inputSchema.physicalTypeInfo, - inputSchema.physicalFieldTypeInfo, + inputSchema.relDataType, + inputSchema.typeInfo, + inputSchema.fieldTypeInfos, precedingOffset, queryConfig, isRowsClause, - isRowTimeType + rowTimeIdx ) val result: DataStream[CRow] = // partitioned aggregation @@ -318,18 +285,18 @@ class DataStreamOverAggregate( s"over: (${ if (!partitionKeys.isEmpty) { - s"PARTITION BY: ${partitionToString(inputSchema.logicalType, partitionKeys)}, " + s"PARTITION BY: ${partitionToString(inputSchema.relDataType, partitionKeys)}, " } else { "" } - }ORDER BY: ${orderingToString(inputSchema.logicalType, + }ORDER BY: ${orderingToString(inputSchema.relDataType, overWindow.orderKeys.getFieldCollations)}, " + s"${if (overWindow.isRows) "ROWS" else "RANGE"}" + s"${windowRange(logicWindow, overWindow, inputNode)}, " + s"select: (${ aggregationToString( - inputSchema.logicalType, - schema.logicalType, + inputSchema.relDataType, + schema.relDataType, namedAggregates) }))" } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala index 424c6a2..9352efb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala @@ -43,7 +43,7 @@ class DataStreamScan( val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]]) - override def deriveRowType(): RelDataType = schema.logicalType + override def deriveRowType(): RelDataType = schema.relDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamScan( http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala index a11e6c1..8f9942f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala @@ -53,7 +53,7 @@ class DataStreamSort( with CommonSort with DataStreamRel { - override def deriveRowType(): RelDataType = schema.logicalType + override def deriveRowType(): RelDataType = schema.relDataType override def copy( traitSet: RelTraitSet, @@ -75,13 +75,13 @@ class DataStreamSort( } override def toString: String = { - sortToString(schema.logicalType, sortCollation, sortOffset, sortFetch) + sortToString(schema.relDataType, sortCollation, sortOffset, sortFetch) } override def explainTerms(pw: RelWriter) : RelWriter = { sortExplainTerms( pw.input("input", getInput()), - schema.logicalType, + schema.relDataType, sortCollation, sortOffset, sortFetch) @@ -94,7 +94,7 @@ class DataStreamSort( val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) // need to identify time between others order fields. Time needs to be first sort element - val timeType = SortUtil.getFirstSortField(sortCollation, schema.logicalType).getType + val timeType = SortUtil.getFirstSortField(sortCollation, schema.relDataType).getType // time ordering needs to be ascending if (SortUtil.getFirstSortDirection(sortCollation) != Direction.ASCENDING) { @@ -141,15 +141,15 @@ class DataStreamSort( inputDS: DataStream[CRow], execCfg: ExecutionConfig): DataStream[CRow] = { - val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo) + val returnTypeInfo = CRowTypeInfo(schema.typeInfo) // if the order has secondary sorting fields in addition to the proctime if (sortCollation.getFieldCollations.size() > 1) { val processFunction = SortUtil.createProcTimeSortFunction( sortCollation, - inputSchema.logicalType, - inputSchema.physicalTypeInfo, + inputSchema.relDataType, + inputSchema.typeInfo, execCfg) inputDS.keyBy(new NullByteKeySelector[CRow]) @@ -173,12 +173,12 @@ class DataStreamSort( inputDS: DataStream[CRow], execCfg: ExecutionConfig): DataStream[CRow] = { - val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo) + val returnTypeInfo = CRowTypeInfo(schema.typeInfo) val processFunction = SortUtil.createRowTimeSortFunction( sortCollation, - inputSchema.logicalType, - inputSchema.physicalTypeInfo, + inputSchema.relDataType, + inputSchema.typeInfo, execCfg) inputDS.keyBy(new NullByteKeySelector[CRow]) http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala index 6f4980a..7258ec8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala @@ -38,7 +38,7 @@ class DataStreamUnion( extends BiRel(cluster, traitSet, leftNode, rightNode) with DataStreamRel { - override def deriveRowType() = schema.logicalType + override def deriveRowType() = schema.relDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamUnion( @@ -55,7 +55,7 @@ class DataStreamUnion( } override def toString = { - s"Union All(union: (${schema.logicalFieldNames.mkString(", ")}))" + s"Union All(union: (${schema.fieldNames.mkString(", ")}))" } override def translateToPlan( @@ -68,6 +68,6 @@ class DataStreamUnion( } private def unionSelectionToString: String = { - schema.logicalFieldNames.mkString(", ") + schema.fieldNames.mkString(", ") } } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala index 1476681..1ef9107 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala @@ -41,10 +41,10 @@ class DataStreamValues( schema: RowSchema, tuples: ImmutableList[ImmutableList[RexLiteral]], ruleDescription: String) - extends Values(cluster, schema.logicalType, tuples, traitSet) + extends Values(cluster, schema.relDataType, tuples, traitSet) with DataStreamRel { - override def deriveRowType() = schema.logicalType + override def deriveRowType() = schema.relDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamValues( @@ -62,14 +62,14 @@ class DataStreamValues( val config = tableEnv.getConfig - val returnType = CRowTypeInfo(schema.physicalTypeInfo) + val returnType = CRowTypeInfo(schema.typeInfo) val generator = new InputFormatCodeGenerator(config) // generate code for every record val generatedRecords = getTuples.asScala.map { r => generator.generateResultExpression( - schema.physicalTypeInfo, - schema.physicalFieldNames, + schema.typeInfo, + schema.fieldNames, r.asScala) } @@ -77,7 +77,7 @@ class DataStreamValues( val generatedFunction = generator.generateValuesInputFormat( ruleDescription, generatedRecords.map(_.code), - schema.physicalTypeInfo) + schema.typeInfo) val inputFormat = new CRowValuesInputFormat( generatedFunction.name, http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala index 987947c..f8015b3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala @@ -54,7 +54,7 @@ class DataStreamWindowJoin( with CommonJoin with DataStreamRel { - override def deriveRowType(): RelDataType = schema.logicalType + override def deriveRowType(): RelDataType = schema.relDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamWindowJoin( @@ -76,7 +76,7 @@ class DataStreamWindowJoin( override def toString: String = { joinToString( - schema.logicalType, + schema.relDataType, joinCondition, joinType, getExpressionString) @@ -85,7 +85,7 @@ class DataStreamWindowJoin( override def explainTerms(pw: RelWriter): RelWriter = { joinExplainTerms( super.explainTerms(pw), - schema.logicalType, + schema.relDataType, joinCondition, joinType, getExpressionString) @@ -117,8 +117,8 @@ class DataStreamWindowJoin( WindowJoinUtil.generateJoinFunction( config, joinType, - leftSchema.physicalTypeInfo, - rightSchema.physicalTypeInfo, + leftSchema.typeInfo, + rightSchema.typeInfo, schema, remainCondition, ruleDescription) @@ -160,13 +160,13 @@ class DataStreamWindowJoin( leftKeys: Array[Int], rightKeys: Array[Int]): DataStream[CRow] = { - val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo) + val returnTypeInfo = CRowTypeInfo(schema.typeInfo) val procInnerJoinFunc = new ProcTimeWindowInnerJoin( leftLowerBound, leftUpperBound, - leftSchema.physicalTypeInfo, - rightSchema.physicalTypeInfo, + leftSchema.typeInfo, + rightSchema.typeInfo, joinFunctionName, joinFunctionCode) http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala index 25e72fa..4aca856 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala @@ -18,14 +18,15 @@ package org.apache.flink.table.plan.nodes.datastream -import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.codegen.FunctionCodeGenerator import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row import org.apache.flink.table.plan.schema.FlinkTable -import org.apache.flink.table.runtime.CRowOutputMapRunner +import org.apache.flink.table.runtime.CRowOutputProcessRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import scala.collection.JavaConverters._ @@ -40,29 +41,42 @@ trait StreamScan extends CommonScan[CRow] with DataStreamRel { : DataStream[CRow] = { val inputType = input.getType - val internalType = CRowTypeInfo(schema.physicalTypeInfo) + val internalType = CRowTypeInfo(schema.typeInfo) // conversion if (needsConversion(input.getType, internalType)) { - val function = generatedConversionFunction( + val generator = new FunctionCodeGenerator( config, - classOf[MapFunction[Any, Row]], + false, inputType, - schema.physicalTypeInfo, - "DataStreamSourceConversion", - schema.physicalFieldNames, + None, Some(flinkTable.fieldIndexes)) - val mapFunc = new CRowOutputMapRunner( + val conversion = generator.generateConverterResultExpression( + schema.typeInfo, + schema.fieldNames) + + val body = + s""" + |${conversion.code} + |${generator.collectorTerm}.collect(${conversion.resultTerm}); + |""".stripMargin + + val function = generator.generateFunction( + "DataStreamSourceConversion", + classOf[ProcessFunction[Any, Row]], + body, + schema.typeInfo) + + val processFunc = new CRowOutputProcessRunner( function.name, function.code, internalType) val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - // TODO we need a ProcessFunction here - input.map(mapFunc).name(opName).returns(internalType) + input.process(processFunc).name(opName).returns(internalType) } // no conversion necessary, forward else { http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index 72ecac5..663b276 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -30,6 +30,7 @@ import org.apache.flink.table.sources._ import org.apache.flink.table.plan.schema.TableSourceTable import org.apache.flink.table.runtime.types.CRow import org.apache.flink.table.sources.{StreamTableSource, TableSource} +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */ class StreamTableSourceScan( @@ -46,29 +47,29 @@ class StreamTableSourceScan( val fieldNames = TableEnvironment.getFieldNames(tableSource).toList val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList - val fieldCnt = fieldNames.length + val fields = fieldNames.zip(fieldTypes) - val rowtime = tableSource match { + val withRowtime = tableSource match { case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => val rowtimeAttribute = timeSource.getRowtimeAttribute - Some((fieldCnt, rowtimeAttribute)) + fields :+ (rowtimeAttribute, TimeIndicatorTypeInfo.ROWTIME_INDICATOR) case _ => - None + fields } - val proctime = tableSource match { + val withProctime = tableSource match { case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null => val proctimeAttribute = timeSource.getProctimeAttribute - Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute)) + withRowtime :+ (proctimeAttribute, TimeIndicatorTypeInfo.PROCTIME_INDICATOR) case _ => - None + withRowtime } + val (fieldNamesWithIndicators, fieldTypesWithIndicators) = withProctime.unzip + flinkTypeFactory.buildLogicalRowType( - fieldNames, - fieldTypes, - rowtime, - proctime) + fieldNamesWithIndicators, + fieldTypesWithIndicators) } override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {