http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala index 3ae949e..470d006 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala @@ -30,6 +30,7 @@ import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.schema.TableSourceTable import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, TableSource} +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import scala.collection.JavaConverters._ @@ -51,29 +52,29 @@ class FlinkLogicalTableSourceScan( val fieldNames = TableEnvironment.getFieldNames(tableSource).toList val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList - val fieldCnt = fieldNames.length + val fields = fieldNames.zip(fieldTypes) - val rowtime = tableSource match { + val withRowtime = tableSource match { case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => val rowtimeAttribute = timeSource.getRowtimeAttribute - Some((fieldCnt, rowtimeAttribute)) + fields :+ (rowtimeAttribute, TimeIndicatorTypeInfo.ROWTIME_INDICATOR) case _ => - None + fields } - val proctime = tableSource match { + val withProctime = tableSource match { case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null => val proctimeAttribute = timeSource.getProctimeAttribute - Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute)) + withRowtime :+ (proctimeAttribute, TimeIndicatorTypeInfo.PROCTIME_INDICATOR) case _ => - None + withRowtime } + val (fieldNamesWithIndicators, fieldTypesWithIndicators) = withProctime.unzip + flinkTypeFactory.buildLogicalRowType( - fieldNames, - fieldTypes, - rowtime, - proctime) + fieldNamesWithIndicators, + fieldTypesWithIndicators) } override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala index 2075689..7dfcbc5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala @@ -87,7 +87,7 @@ class DataStreamWindowJoinRule val (windowBounds, remainCondition) = WindowJoinUtil.extractWindowBoundsFromPredicate( joinInfo.getRemaining(join.getCluster.getRexBuilder), - leftRowSchema.logicalArity, + leftRowSchema.arity, join.getRowType, join.getCluster.getRexBuilder, TableConfig.DEFAULT) http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala index 70054b4..b7021e2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala @@ -18,27 +18,14 @@ package org.apache.flink.table.plan.schema -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.stats.FlinkStatistic class DataStreamTable[T]( val dataStream: DataStream[T], override val fieldIndexes: Array[Int], override val fieldNames: Array[String], - val rowtime: Option[(Int, String)], - val proctime: Option[(Int, String)], override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames, statistic) { - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { - val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] - - flinkTypeFactory.buildLogicalRowType( - fieldNames, - fieldTypes, - rowtime, - proctime) - } } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala index 752b00e..df56ae6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala @@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo abstract class FlinkTable[T]( val typeInfo: TypeInformation[T], @@ -49,12 +50,15 @@ abstract class FlinkTable[T]( typeInfo match { case cType: CompositeType[_] => // it is ok to leave out fields - if (fieldNames.length > cType.getArity) { + if (fieldIndexes.count(_ >= 0) > cType.getArity) { throw new TableException( s"Arity of type (" + cType.getFieldNames.deep + ") " + "must not be greater than number of field names " + fieldNames.deep + ".") } - fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]]) + fieldIndexes.map { + case TimeIndicatorTypeInfo.ROWTIME_MARKER => TimeIndicatorTypeInfo.ROWTIME_INDICATOR + case TimeIndicatorTypeInfo.PROCTIME_MARKER => TimeIndicatorTypeInfo.PROCTIME_INDICATOR + case i => cType.getTypeAt(i).asInstanceOf[TypeInformation[_]]} case aType: AtomicType[_] => if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) { throw new TableException( @@ -65,7 +69,7 @@ abstract class FlinkTable[T]( override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildLogicalRowType(fieldNames, fieldTypes, None, None) + flinkTypeFactory.buildLogicalRowType(fieldNames, fieldTypes) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala index ccbe44d..ad0f552 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala @@ -18,14 +18,10 @@ package org.apache.flink.table.plan.schema -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelRecordType} -import org.apache.calcite.rel.core.AggregateCall -import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexShuttle} +import org.apache.calcite.rel.`type`.RelDataType import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.functions.TimeMaterializationSqlFunction import org.apache.flink.types.Row import scala.collection.JavaConversions._ @@ -35,127 +31,35 @@ import scala.collection.JavaConversions._ */ class RowSchema(private val logicalRowType: RelDataType) { - private lazy val physicalRowFields: Seq[RelDataTypeField] = logicalRowType.getFieldList filter { - field => !FlinkTypeFactory.isTimeIndicatorType(field.getType) - } - - private lazy val physicalRowType: RelDataType = new RelRecordType(physicalRowFields) - - private lazy val physicalRowFieldTypes: Seq[TypeInformation[_]] = physicalRowFields map { f => - FlinkTypeFactory.toTypeInfo(f.getType) - } - - private lazy val physicalRowFieldNames: Seq[String] = physicalRowFields.map(_.getName) + private lazy val physicalRowFieldTypes: Seq[TypeInformation[_]] = + logicalRowType.getFieldList map { f => FlinkTypeFactory.toTypeInfo(f.getType) } private lazy val physicalRowTypeInfo: TypeInformation[Row] = new RowTypeInfo( - physicalRowFieldTypes.toArray, physicalRowFieldNames.toArray) - - private lazy val indexMapping: Array[Int] = generateIndexMapping - - private lazy val inputRefUpdater = new RexInputRefUpdater() - - private def generateIndexMapping: Array[Int] = { - val mapping = new Array[Int](logicalRowType.getFieldCount) - var countTimeIndicators = 0 - var i = 0 - while (i < logicalRowType.getFieldCount) { - val t = logicalRowType.getFieldList.get(i).getType - if (FlinkTypeFactory.isTimeIndicatorType(t)) { - countTimeIndicators += 1 - // no mapping - mapping(i) = -1 - } else { - mapping(i) = i - countTimeIndicators - } - i += 1 - } - mapping - } - - private class RexInputRefUpdater extends RexShuttle { - - override def visitInputRef(inputRef: RexInputRef): RexNode = { - new RexInputRef(mapIndex(inputRef.getIndex), inputRef.getType) - } - - override def visitCall(call: RexCall): RexNode = call.getOperator match { - // we leave time indicators unchanged yet - // the index becomes invalid but right now we are only - // interested in the type of the input reference - case TimeMaterializationSqlFunction => call - case _ => super.visitCall(call) - } - } - - /** - * Returns the arity of the logical record. - */ - def logicalArity: Int = logicalRowType.getFieldCount - - /** - * Returns the arity of the physical record. - */ - def physicalArity: Int = physicalTypeInfo.getArity - - /** - * Returns a logical [[RelDataType]] including logical fields (i.e. time indicators). - */ - def logicalType: RelDataType = logicalRowType - - /** - * Returns a physical [[RelDataType]] with no logical fields (i.e. time indicators). - */ - def physicalType: RelDataType = physicalRowType - - /** - * Returns a physical [[TypeInformation]] of row with no logical fields (i.e. time indicators). - */ - def physicalTypeInfo: TypeInformation[Row] = physicalRowTypeInfo - - /** - * Returns [[TypeInformation]] of the row's fields with no logical fields (i.e. time indicators). - */ - def physicalFieldTypeInfo: Seq[TypeInformation[_]] = physicalRowFieldTypes + physicalRowFieldTypes.toArray, fieldNames.toArray) /** - * Returns the logical fields names including logical fields (i.e. time indicators). + * Returns the arity of the schema. */ - def logicalFieldNames: Seq[String] = logicalRowType.getFieldNames + def arity: Int = logicalRowType.getFieldCount /** - * Returns the physical fields names with no logical fields (i.e. time indicators). + * Returns the [[RelDataType]] of the schema */ - def physicalFieldNames: Seq[String] = physicalRowFieldNames + def relDataType: RelDataType = logicalRowType /** - * Converts logical indices to physical indices based on this schema. + * Returns the [[TypeInformation]] of of the schema */ - def mapIndex(logicalIndex: Int): Int = { - val mappedIndex = indexMapping(logicalIndex) - if (mappedIndex < 0) { - throw new TableException("Invalid access to a logical field.") - } else { - mappedIndex - } - } + def typeInfo: TypeInformation[Row] = physicalRowTypeInfo /** - * Converts logical indices of a aggregate call to physical ones. + * Returns the [[TypeInformation]] of fields of the schema */ - def mapAggregateCall(logicalAggCall: AggregateCall): AggregateCall = { - logicalAggCall.copy( - logicalAggCall.getArgList.map(mapIndex(_).asInstanceOf[Integer]), - if (logicalAggCall.filterArg < 0) { - logicalAggCall.filterArg - } else { - mapIndex(logicalAggCall.filterArg) - } - ) - } + def fieldTypeInfos: Seq[TypeInformation[_]] = physicalRowFieldTypes /** - * Converts logical field references of a [[RexNode]] to physical ones. + * Returns the fields names */ - def mapRexNode(logicalRexNode: RexNode): RexNode = logicalRexNode.accept(inputRefUpdater) + def fieldNames: Seq[String] = logicalRowType.getFieldNames } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala index 408381d..dc1f31a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala @@ -23,6 +23,7 @@ import org.apache.flink.table.api.{TableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.stats.FlinkStatistic import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, TableSource} +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo class StreamTableSourceTable[T]( override val tableSource: TableSource[T], @@ -36,41 +37,38 @@ class StreamTableSourceTable[T]( val fieldNames = TableEnvironment.getFieldNames(tableSource).toList val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList - val fieldCnt = fieldNames.length + val fields = fieldNames.zip(fieldTypes) - val rowtime = tableSource match { - case nullTimeSource : DefinedRowtimeAttribute - if nullTimeSource.getRowtimeAttribute == null => - None - case emptyStringTimeSource: DefinedRowtimeAttribute - if emptyStringTimeSource.getRowtimeAttribute.trim.equals("") => - throw TableException("The name of the rowtime attribute must not be empty.") - case timeSource: DefinedRowtimeAttribute => + val withRowtime = tableSource match { + case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute == null => + fields + case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute.trim.equals("") => + throw TableException("The name of the rowtime attribute must not be empty.") + case timeSource: DefinedRowtimeAttribute => val rowtimeAttribute = timeSource.getRowtimeAttribute - Some((fieldCnt, rowtimeAttribute)) + fields :+ (rowtimeAttribute, TimeIndicatorTypeInfo.ROWTIME_INDICATOR) case _ => - None + fields } - val proctime = tableSource match { - case nullTimeSource : DefinedProctimeAttribute - if nullTimeSource.getProctimeAttribute == null => - None - case emptyStringTimeSource: DefinedProctimeAttribute - if emptyStringTimeSource.getProctimeAttribute.trim.equals("") => - throw TableException("The name of the proctime attribute must not be empty.") - case timeSource: DefinedProctimeAttribute => + val withProctime = tableSource match { + case timeSource : DefinedProctimeAttribute if timeSource.getProctimeAttribute == null => + withRowtime + case timeSource: DefinedProctimeAttribute + if timeSource.getProctimeAttribute.trim.equals("") => + throw TableException("The name of the rowtime attribute must not be empty.") + case timeSource: DefinedProctimeAttribute => val proctimeAttribute = timeSource.getProctimeAttribute - Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute)) + withRowtime :+ (proctimeAttribute, TimeIndicatorTypeInfo.PROCTIME_INDICATOR) case _ => - None + withRowtime } + val (fieldNamesWithIndicators, fieldTypesWithIndicators) = withProctime.unzip + flinkTypeFactory.buildLogicalRowType( - fieldNames, - fieldTypes, - rowtime, - proctime) + fieldNamesWithIndicators, + fieldTypesWithIndicators) } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala index 7c96437..6b3aa44 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala @@ -19,7 +19,9 @@ package org.apache.flink.table.runtime import java.lang.{Boolean => JBool} +import java.sql.Timestamp +import org.apache.calcite.runtime.SqlFunctions import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala deleted file mode 100644 index cb8f695..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.runtime - -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.configuration.Configuration -import org.apache.flink.table.codegen.Compiler -import org.apache.flink.table.runtime.types.CRow -import org.apache.flink.types.Row -import org.slf4j.LoggerFactory - -/** - * MapRunner with [[CRow]] output. - */ -class CRowOutputMapRunner( - name: String, - code: String, - @transient var returnType: TypeInformation[CRow]) - extends RichMapFunction[Any, CRow] - with ResultTypeQueryable[CRow] - with Compiler[MapFunction[Any, Row]] { - - val LOG = LoggerFactory.getLogger(this.getClass) - - private var function: MapFunction[Any, Row] = _ - private var outCRow: CRow = _ - - override def open(parameters: Configuration): Unit = { - LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") - val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) - LOG.debug("Instantiating MapFunction.") - function = clazz.newInstance() - outCRow = new CRow(null, true) - } - - override def map(in: Any): CRow = { - outCRow.row = function.map(in) - outCRow - } - - override def getProducedType: TypeInformation[CRow] = returnType -} http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala new file mode 100644 index 0000000..c80f291 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputProcessRunner.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +/** + * ProcessRunner with [[CRow]] output. + */ +class CRowOutputProcessRunner( + name: String, + code: String, + @transient var returnType: TypeInformation[CRow]) + extends ProcessFunction[Any, CRow] + with ResultTypeQueryable[CRow] + with Compiler[ProcessFunction[Any, Row]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: ProcessFunction[Any, Row] = _ + private var cRowWrapper: CRowWrappingCollector = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating ProcessFunction.") + function = clazz.newInstance() + + this.cRowWrapper = new CRowWrappingCollector() + this.cRowWrapper.setChange(true) + } + + override def processElement( + in: Any, + ctx: ProcessFunction[Any, CRow]#Context, + out: Collector[CRow]): Unit = { + + // remove timestamp from stream record + val tc = out.asInstanceOf[TimestampedCollector[_]] + tc.eraseTimestamp() + + cRowWrapper.out = out + function.processElement(in, ctx.asInstanceOf[ProcessFunction[Any, Row]#Context], cRowWrapper) + } + + override def getProducedType: TypeInformation[CRow] = returnType +} http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala new file mode 100644 index 0000000..00961f0 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import java.sql.Timestamp + +import org.apache.calcite.runtime.SqlFunctions +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.util.Collector + +/** + * ProcessFunction to copy a timestamp from a [[org.apache.flink.types.Row]] field into the + * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]]. + */ +class TimestampSetterProcessFunction( + val rowtimeIdx: Int, + @transient var returnType: TypeInformation[CRow]) + extends ProcessFunction[CRow, CRow] + with ResultTypeQueryable[CRow] { + + override def processElement( + in: CRow, + ctx: ProcessFunction[CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + + val timestamp = SqlFunctions.toLong(in.row.getField(rowtimeIdx).asInstanceOf[Timestamp]) + out.asInstanceOf[TimestampedCollector[CRow]].setAbsoluteTimestamp(timestamp) + out.collect(in) + } + + override def getProducedType: TypeInformation[CRow] = returnType +} http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala new file mode 100644 index 0000000..8f12c30 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import java.sql.Timestamp + +import org.apache.calcite.runtime.SqlFunctions +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.util.Collector + +/** + * Wraps a ProcessFunction and sets a Timestamp field of a CRow as + * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp. + */ +class WrappingTimestampSetterProcessFunction[OUT]( + function: MapFunction[CRow, OUT], + rowtimeIdx: Int) + extends ProcessFunction[CRow, OUT] { + + override def open(parameters: Configuration): Unit = { + super.open(parameters) + function match { + case f: RichMapFunction[_, _] => + f.setRuntimeContext(getRuntimeContext) + f.open(parameters) + case _ => + } + } + + override def processElement( + in: CRow, + ctx: ProcessFunction[CRow, OUT]#Context, + out: Collector[OUT]): Unit = { + + val timestamp = SqlFunctions.toLong(in.row.getField(rowtimeIdx).asInstanceOf[Timestamp]) + out.asInstanceOf[TimestampedCollector[_]].setAbsoluteTimestamp(timestamp) + + out.collect(function.map(in)) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index c9f98e3..52105e3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWin import org.apache.flink.table.api.{StreamQueryConfig, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.codegen.{AggregationCodeGenerator, CodeGenerator} +import org.apache.flink.table.codegen.AggregationCodeGenerator import org.apache.flink.table.expressions.ExpressionUtils.isTimeIntervalLiteral import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.aggfunctions._ @@ -66,7 +66,7 @@ object AggregateUtil { * @param inputType Physical type of the row. * @param inputTypeInfo Physical type information of the row. * @param inputFieldTypeInfo Physical type information of the row's fields. - * @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType + * @param rowTimeIdx The index of the rowtime field or None in case of processing time. * @param isPartitioned It is a tag that indicate whether the input is partitioned * @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause */ @@ -77,7 +77,7 @@ object AggregateUtil { inputTypeInfo: TypeInformation[Row], inputFieldTypeInfo: Seq[TypeInformation[_]], queryConfig: StreamQueryConfig, - isRowTimeType: Boolean, + rowTimeIdx: Option[Int], isPartitioned: Boolean, isRowsClause: Boolean) : ProcessFunction[CRow, CRow] = { @@ -111,13 +111,14 @@ object AggregateUtil { needReset = false ) - if (isRowTimeType) { + if (rowTimeIdx.isDefined) { if (isRowsClause) { // ROWS unbounded over process function new RowTimeUnboundedRowsOver( genFunction, aggregationStateType, CRowTypeInfo(inputTypeInfo), + rowTimeIdx.get, queryConfig) } else { // RANGE unbounded over process function @@ -125,6 +126,7 @@ object AggregateUtil { genFunction, aggregationStateType, CRowTypeInfo(inputTypeInfo), + rowTimeIdx.get, queryConfig) } } else { @@ -207,7 +209,7 @@ object AggregateUtil { * @param inputFieldTypeInfo Physical type information of the row's fields. * @param precedingOffset the preceding offset * @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause - * @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType + * @param rowTimeIdx The index of the rowtime field or None in case of processing time. * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] */ private[flink] def createBoundedOverProcessFunction( @@ -219,7 +221,7 @@ object AggregateUtil { precedingOffset: Long, queryConfig: StreamQueryConfig, isRowsClause: Boolean, - isRowTimeType: Boolean) + rowTimeIdx: Option[Int]) : ProcessFunction[CRow, CRow] = { val needRetract = true @@ -253,13 +255,14 @@ object AggregateUtil { needReset = true ) - if (isRowTimeType) { + if (rowTimeIdx.isDefined) { if (isRowsClause) { new RowTimeBoundedRowsOver( genFunction, aggregationStateType, inputRowType, precedingOffset, + rowTimeIdx.get, queryConfig) } else { new RowTimeBoundedRangeOver( @@ -267,6 +270,7 @@ object AggregateUtil { aggregationStateType, inputRowType, precedingOffset, + rowTimeIdx.get, queryConfig) } } else { @@ -588,7 +592,7 @@ object AggregateUtil { window match { case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) => // tumbling time window - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + val (startPos, endPos, _) = computeWindowPropertyPos(properties) if (doAllSupportPartialMerge(aggregates)) { // for incremental aggregations new DataSetTumbleTimeWindowAggReduceCombineFunction( @@ -615,7 +619,7 @@ object AggregateUtil { asLong(size)) case SessionGroupWindow(_, _, gap) => - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + val (startPos, endPos, _) = computeWindowPropertyPos(properties) new DataSetSessionWindowAggReduceGroupFunction( genFinalAggFunction, keysAndAggregatesArity, @@ -625,7 +629,7 @@ object AggregateUtil { isInputCombined) case SlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) => - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + val (startPos, endPos, _) = computeWindowPropertyPos(properties) if (doAllSupportPartialMerge(aggregates)) { // for partial aggregations new DataSetSlideWindowAggReduceCombineFunction( @@ -951,10 +955,11 @@ object AggregateUtil { : AllWindowFunction[Row, CRow, DataStreamWindow] = { if (isTimeWindow(window)) { - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + val (startPos, endPos, timePos) = computeWindowPropertyPos(properties) new IncrementalAggregateAllTimeWindowFunction( startPos, endPos, + timePos, finalRowArity) .asInstanceOf[AllWindowFunction[Row, CRow, DataStreamWindow]] } else { @@ -975,12 +980,13 @@ object AggregateUtil { WindowFunction[Row, CRow, Tuple, DataStreamWindow] = { if (isTimeWindow(window)) { - val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + val (startPos, endPos, timePos) = computeWindowPropertyPos(properties) new IncrementalAggregateTimeWindowFunction( numGroupingKeys, numAggregates, startPos, endPos, + timePos, finalRowArity) .asInstanceOf[WindowFunction[Row, CRow, Tuple, DataStreamWindow]] } else { @@ -1136,25 +1142,31 @@ object AggregateUtil { } } - private[flink] def computeWindowStartEndPropertyPos( - properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int]) = { + private[flink] def computeWindowPropertyPos( + properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int], Option[Int]) = { - val propPos = properties.foldRight((None: Option[Int], None: Option[Int], 0)) { - (p, x) => p match { + val propPos = properties.foldRight( + (None: Option[Int], None: Option[Int], None: Option[Int], 0)) { + case (p, (s, e, t, i)) => p match { case NamedWindowProperty(_, prop) => prop match { - case WindowStart(_) if x._1.isDefined => + case WindowStart(_) if s.isDefined => throw new TableException("Duplicate WindowStart property encountered. This is a bug.") case WindowStart(_) => - (Some(x._3), x._2, x._3 - 1) - case WindowEnd(_) if x._2.isDefined => + (Some(i), e, t, i - 1) + case WindowEnd(_) if e.isDefined => throw new TableException("Duplicate WindowEnd property encountered. This is a bug.") case WindowEnd(_) => - (x._1, Some(x._3), x._3 - 1) + (s, Some(i), t, i - 1) + case RowtimeAttribute(_) if t.isDefined => + throw new TableException( + "Duplicate Window rowtime property encountered. This is a bug.") + case RowtimeAttribute(_) => + (s, e, Some(i), i - 1) } } } - (propPos._1, propPos._2) + (propPos._1, propPos._2, propPos._3) } private def transformToAggregateFunctions( http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala index fabf200..2160ef5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala @@ -78,7 +78,10 @@ class DataSetSessionWindowAggReduceGroupFunction( output = function.createOutputRow() accumulators = function.createAccumulators() - collector = new RowTimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + collector = new RowTimeWindowPropertyCollector( + finalRowWindowStartPos, + finalRowWindowEndPos, + None) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala index 56ed08a..e4b9458 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala @@ -68,7 +68,10 @@ class DataSetSlideWindowAggReduceGroupFunction( output = function.createOutputRow() accumulators = function.createAccumulators() - collector = new RowTimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + collector = new RowTimeWindowPropertyCollector( + finalRowWindowStartPos, + finalRowWindowEndPos, + None) } override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala index 8af2c2e..b4f7585 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala @@ -67,7 +67,7 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( output = function.createOutputRow() accumulators = function.createAccumulators() - collector = new RowTimeWindowPropertyCollector(windowStartPos, windowEndPos) + collector = new RowTimeWindowPropertyCollector(windowStartPos, windowEndPos, None) } override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala index 711cc05..3c2e858 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala @@ -29,13 +29,15 @@ import org.apache.flink.util.Collector * * Computes the final aggregate value from incrementally computed aggregates. * - * @param windowStartPos the start position of window - * @param windowEndPos the end position of window + * @param windowStartOffset the offset of the window start property + * @param windowEndOffset the offset of the window end property + * @param windowRowtimeOffset the offset of the window rowtime property * @param finalRowArity The arity of the final output row. */ class IncrementalAggregateAllTimeWindowFunction( - private val windowStartPos: Option[Int], - private val windowEndPos: Option[Int], + private val windowStartOffset: Option[Int], + private val windowEndOffset: Option[Int], + private val windowRowtimeOffset: Option[Int], private val finalRowArity: Int) extends IncrementalAggregateAllWindowFunction[TimeWindow]( finalRowArity) { @@ -43,7 +45,10 @@ class IncrementalAggregateAllTimeWindowFunction( private var collector: CRowTimeWindowPropertyCollector = _ override def open(parameters: Configuration): Unit = { - collector = new CRowTimeWindowPropertyCollector(windowStartPos, windowEndPos) + collector = new CRowTimeWindowPropertyCollector( + windowStartOffset, + windowEndOffset, + windowRowtimeOffset) super.open(parameters) } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala index 809bbfd..1950230 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala @@ -29,15 +29,19 @@ import org.apache.flink.util.Collector /** * Computes the final aggregate value from incrementally computed aggreagtes. * - * @param windowStartPos the start position of window - * @param windowEndPos the end position of window - * @param finalRowArity The arity of the final output row + * @param numGroupingKey the number of grouping keys + * @param numAggregates the number of aggregates + * @param windowStartOffset the offset of the window start property + * @param windowEndOffset the offset of the window end property + * @param windowRowtimeOffset the offset of the window rowtime property + * @param finalRowArity The arity of the final output row. */ class IncrementalAggregateTimeWindowFunction( private val numGroupingKey: Int, private val numAggregates: Int, - private val windowStartPos: Option[Int], - private val windowEndPos: Option[Int], + private val windowStartOffset: Option[Int], + private val windowEndOffset: Option[Int], + private val windowRowtimeOffset: Option[Int], private val finalRowArity: Int) extends IncrementalAggregateWindowFunction[TimeWindow]( numGroupingKey, @@ -47,7 +51,10 @@ class IncrementalAggregateTimeWindowFunction( private var collector: CRowTimeWindowPropertyCollector = _ override def open(parameters: Configuration): Unit = { - collector = new CRowTimeWindowPropertyCollector(windowStartPos, windowEndPos) + collector = new CRowTimeWindowPropertyCollector( + windowStartOffset, + windowEndOffset, + windowRowtimeOffset) super.open(parameters) } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala index 8f2ec98..ab3dc1d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala @@ -31,6 +31,7 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo import java.util.{ArrayList, List => JList} import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector import org.apache.flink.table.api.StreamQueryConfig import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} @@ -123,6 +124,9 @@ class ProcTimeBoundedRangeOver( return } + // remove timestamp set outside of ProcessFunction. + out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp() + // we consider the original timestamp of events // that have registered this time trigger 1 ms ago http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala index 2d0b14b..1e12060 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala @@ -23,10 +23,11 @@ import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.types.Row import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.util.{Collector, Preconditions} - import java.util.ArrayList import java.util.Collections +import org.apache.flink.streaming.api.operators.TimestampedCollector + /** * ProcessFunction to sort on processing time and additional attributes. @@ -75,7 +76,10 @@ class ProcTimeSortProcessFunction( timestamp: Long, ctx: ProcessFunction[CRow, CRow]#OnTimerContext, out: Collector[CRow]): Unit = { - + + // remove timestamp set outside of ProcessFunction. + out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp() + val iter = bufferedEvents.get.iterator() // insert all rows into the sort buffer http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala index 1a207bb..ceb986d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala @@ -17,13 +17,16 @@ */ package org.apache.flink.table.runtime.aggregate +import java.sql.Timestamp import java.util.{ArrayList => JArrayList, List => JList} +import org.apache.calcite.runtime.SqlFunctions import org.apache.flink.api.common.state._ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector import org.apache.flink.table.api.StreamQueryConfig import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} @@ -44,6 +47,7 @@ class RowTimeBoundedRangeOver( aggregationStateType: RowTypeInfo, inputRowType: CRowTypeInfo, precedingOffset: Long, + rowTimeIdx: Int, queryConfig: StreamQueryConfig) extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig) with Compiler[GeneratedAggregations] { @@ -114,7 +118,7 @@ class RowTimeBoundedRangeOver( registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) // triggering timestamp for trigger calculation - val triggeringTs = ctx.timestamp + val triggeringTs = SqlFunctions.toLong(input.getField(rowTimeIdx).asInstanceOf[Timestamp]) val lastTriggeringTs = lastTriggeringTsState.value @@ -166,6 +170,9 @@ class RowTimeBoundedRangeOver( return } + // remove timestamp set outside of ProcessFunction. + out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp() + // gets all window data from state for the calculation val inputs: JList[Row] = dataState.get(timestamp) http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala index a4b1076..678a3b7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala @@ -17,14 +17,17 @@ */ package org.apache.flink.table.runtime.aggregate +import java.sql.Timestamp import java.util import java.util.{List => JList} +import org.apache.calcite.runtime.SqlFunctions import org.apache.flink.api.common.state._ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector import org.apache.flink.table.api.StreamQueryConfig import org.apache.flink.types.Row import org.apache.flink.util.{Collector, Preconditions} @@ -45,6 +48,7 @@ class RowTimeBoundedRowsOver( aggregationStateType: RowTypeInfo, inputRowType: CRowTypeInfo, precedingOffset: Long, + rowTimeIdx: Int, queryConfig: StreamQueryConfig) extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig) with Compiler[GeneratedAggregations] { @@ -123,7 +127,7 @@ class RowTimeBoundedRowsOver( registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) // triggering timestamp for trigger calculation - val triggeringTs = ctx.timestamp + val triggeringTs = SqlFunctions.toLong(input.getField(rowTimeIdx).asInstanceOf[Timestamp]) val lastTriggeringTs = lastTriggeringTsState.value // check if the data is expired, if not, save the data and register event time timer @@ -175,6 +179,9 @@ class RowTimeBoundedRowsOver( return } + // remove timestamp set outside of ProcessFunction. + out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp() + // gets all window data from state for the calculation val inputs: JList[Row] = dataState.get(timestamp) http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala index 737f32c..fd58678 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala @@ -17,6 +17,8 @@ */ package org.apache.flink.table.runtime.aggregate +import java.sql.Timestamp + import org.apache.flink.api.common.state.ValueState import org.apache.flink.api.common.state.ValueStateDescriptor import org.apache.flink.api.common.state.MapState @@ -28,18 +30,22 @@ import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.types.Row import org.apache.flink.util.{Collector, Preconditions} - import java.util.Collections -import java.util.{List => JList, ArrayList => JArrayList} +import java.util.{ArrayList => JArrayList, List => JList} + +import org.apache.calcite.runtime.SqlFunctions +import org.apache.flink.streaming.api.operators.TimestampedCollector /** * ProcessFunction to sort on event-time and possibly addtional secondary sort attributes. * * @param inputRowType The data type of the input data. + * @param rowtimeIdx The index of the rowtime field. * @param rowComparator A comparator to sort rows. */ class RowTimeSortProcessFunction( private val inputRowType: CRowTypeInfo, + private val rowtimeIdx: Int, private val rowComparator: Option[CollectionRowComparator]) extends ProcessFunction[CRow, CRow] { @@ -84,7 +90,7 @@ class RowTimeSortProcessFunction( val input = inputC.row // timestamp of the processed row - val rowtime = ctx.timestamp + val rowtime = SqlFunctions.toLong(input.getField(rowtimeIdx).asInstanceOf[Timestamp]) val lastTriggeringTs = lastTriggeringTsState.value @@ -105,13 +111,15 @@ class RowTimeSortProcessFunction( } } } - - + override def onTimer( timestamp: Long, ctx: ProcessFunction[CRow, CRow]#OnTimerContext, out: Collector[CRow]): Unit = { - + + // remove timestamp set outside of ProcessFunction. + out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp() + // gets all rows for the triggering timestamps val inputs: JList[Row] = dataState.get(timestamp) http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala index f38ba93..04b63a1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala @@ -17,14 +17,16 @@ */ package org.apache.flink.table.runtime.aggregate +import java.sql.Timestamp import java.util import java.util.{List => JList} +import org.apache.calcite.runtime.SqlFunctions import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.configuration.Configuration import org.apache.flink.types.Row import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.util.Collector import org.apache.flink.api.common.state._ import org.apache.flink.api.java.typeutils.ListTypeInfo import org.apache.flink.streaming.api.operators.TimestampedCollector @@ -45,6 +47,7 @@ abstract class RowTimeUnboundedOver( genAggregations: GeneratedAggregationsFunction, intermediateType: TypeInformation[Row], inputType: TypeInformation[CRow], + rowTimeIdx: Int, queryConfig: StreamQueryConfig) extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig) with Compiler[GeneratedAggregations] { @@ -108,7 +111,7 @@ abstract class RowTimeUnboundedOver( // register state-cleanup timer registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) - val timestamp = ctx.timestamp() + val timestamp = SqlFunctions.toLong(input.getField(rowTimeIdx).asInstanceOf[Timestamp]) val curWatermark = ctx.timerService().currentWatermark() // discard late record @@ -158,8 +161,8 @@ abstract class RowTimeUnboundedOver( return } - Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[CRow]]) - val collector = out.asInstanceOf[TimestampedCollector[CRow]] + // remove timestamp set outside of ProcessFunction. + out.asInstanceOf[TimestampedCollector[_]].eraseTimestamp() val keyIterator = rowMapState.keys.iterator if (keyIterator.hasNext) { @@ -188,10 +191,9 @@ abstract class RowTimeUnboundedOver( while (!sortedTimestamps.isEmpty) { val curTimestamp = sortedTimestamps.removeFirst() val curRowList = rowMapState.get(curTimestamp) - collector.setAbsoluteTimestamp(curTimestamp) // process the same timestamp datas, the mechanism is different according ROWS or RANGE - processElementsWithSameTimestamp(curRowList, lastAccumulator, collector) + processElementsWithSameTimestamp(curRowList, lastAccumulator, out) rowMapState.remove(curTimestamp) } @@ -250,11 +252,13 @@ class RowTimeUnboundedRowsOver( genAggregations: GeneratedAggregationsFunction, intermediateType: TypeInformation[Row], inputType: TypeInformation[CRow], + rowTimeIdx: Int, queryConfig: StreamQueryConfig) extends RowTimeUnboundedOver( genAggregations: GeneratedAggregationsFunction, intermediateType, inputType, + rowTimeIdx, queryConfig) { override def processElementsWithSameTimestamp( @@ -266,7 +270,6 @@ class RowTimeUnboundedRowsOver( while (i < curRowList.size) { val curRow = curRowList.get(i) - var j = 0 // copy forwarded fields to output row function.setForwardedFields(curRow, output.row) @@ -290,11 +293,13 @@ class RowTimeUnboundedRangeOver( genAggregations: GeneratedAggregationsFunction, intermediateType: TypeInformation[Row], inputType: TypeInformation[CRow], + rowTimeIdx: Int, queryConfig: StreamQueryConfig) extends RowTimeUnboundedOver( genAggregations: GeneratedAggregationsFunction, intermediateType, inputType, + rowTimeIdx, queryConfig) { override def processElementsWithSameTimestamp( http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala index 5f83f1d..d62c7b9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala @@ -60,6 +60,8 @@ object SortUtil { inputTypeInfo: TypeInformation[Row], execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = { + val rowtimeIdx = collationSort.getFieldCollations.get(0).getFieldIndex + val collectionRowComparator = if (collationSort.getFieldCollations.size() > 1) { val rowComp = createRowComparator( @@ -76,6 +78,7 @@ object SortUtil { new RowTimeSortProcessFunction( inputCRowType, + rowtimeIdx, collectionRowComparator) } @@ -139,7 +142,7 @@ object SortUtil { } new RowComparator( - new RowSchema(inputType).physicalArity, + new RowSchema(inputType).arity, sortFields.toArray, fieldComps.toArray, new Array[TypeSerializer[AnyRef]](0), // not required because we only compare objects. http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala index 0c8ae00..4ec5239 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala @@ -29,7 +29,8 @@ import org.apache.flink.util.Collector */ abstract class TimeWindowPropertyCollector[T]( windowStartOffset: Option[Int], - windowEndOffset: Option[Int]) + windowEndOffset: Option[Int], + windowRowtimeOffset: Option[Int]) extends Collector[T] { var wrappedCollector: Collector[T] = _ @@ -55,20 +56,32 @@ abstract class TimeWindowPropertyCollector[T]( SqlFunctions.internalToTimestamp(windowEnd)) } + if (windowRowtimeOffset.isDefined) { + output.setField( + lastFieldPos + windowRowtimeOffset.get, + SqlFunctions.internalToTimestamp(windowEnd - 1)) + } + wrappedCollector.collect(record) } override def close(): Unit = wrappedCollector.close() } -class RowTimeWindowPropertyCollector(startOffset: Option[Int], endOffset: Option[Int]) - extends TimeWindowPropertyCollector[Row](startOffset, endOffset) { +class RowTimeWindowPropertyCollector( + startOffset: Option[Int], + endOffset: Option[Int], + rowtimeOffset: Option[Int]) + extends TimeWindowPropertyCollector[Row](startOffset, endOffset, rowtimeOffset) { override def getRow(record: Row): Row = record } -class CRowTimeWindowPropertyCollector(startOffset: Option[Int], endOffset: Option[Int]) - extends TimeWindowPropertyCollector[CRow](startOffset, endOffset) { +class CRowTimeWindowPropertyCollector( + startOffset: Option[Int], + endOffset: Option[Int], + rowtimeOffset: Option[Int]) + extends TimeWindowPropertyCollector[CRow](startOffset, endOffset, rowtimeOffset) { override def getRow(record: CRow): Row = record.row } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala index 379b8d2..b566113 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala @@ -418,8 +418,8 @@ object WindowJoinUtil { Some(rightType)) val conversion = generator.generateConverterResultExpression( - returnType.physicalTypeInfo, - returnType.physicalType.getFieldNames.asScala) + returnType.typeInfo, + returnType.fieldNames) // if other condition is none, then output the result directly val body = otherCondition match { @@ -429,9 +429,8 @@ object WindowJoinUtil { |${generator.collectorTerm}.collect(${conversion.resultTerm}); |""".stripMargin case Some(remainCondition) => - // map logical field accesses to physical accesses - val physicalCondition = returnType.mapRexNode(remainCondition) - val genCond = generator.generateExpression(physicalCondition) + // generate code for remaining condition + val genCond = generator.generateExpression(remainCondition) s""" |${genCond.code} |if (${genCond.resultTerm}) { @@ -445,7 +444,7 @@ object WindowJoinUtil { ruleDescription, classOf[FlatJoinFunction[Row, Row, Row]], body, - returnType.physicalTypeInfo) + returnType.typeInfo) } } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala index 083f1eb..e0e054b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala @@ -40,6 +40,9 @@ class TimeIndicatorTypeInfo(val isEventTime: Boolean) object TimeIndicatorTypeInfo { + val ROWTIME_MARKER: Int = -1 + val PROCTIME_MARKER: Int = -2 + val ROWTIME_INDICATOR = new TimeIndicatorTypeInfo(true) val PROCTIME_INDICATOR = new TimeIndicatorTypeInfo(false) http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala index a5a1319..d20002a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SortTest.scala @@ -41,7 +41,7 @@ class SortTest extends TableTestBase { unaryNode("DataStreamSort", streamTableNode(0), term("orderBy", "proctime ASC", "c ASC")), - term("select", "a", "TIME_MATERIALIZATION(proctime) AS proctime", "c")) + term("select", "a", "PROCTIME(proctime) AS proctime", "c")) streamUtil.verifySql(sqlQuery, expected) } @@ -57,7 +57,7 @@ class SortTest extends TableTestBase { unaryNode("DataStreamSort", streamTableNode(0), term("orderBy", "rowtime ASC, c ASC")), - term("select", "a", "TIME_MATERIALIZATION(rowtime) AS rowtime", "c")) + term("select", "a", "rowtime", "c")) streamUtil.verifySql(sqlQuery, expected) } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala index 5d4386c..6967061 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala @@ -43,7 +43,7 @@ class TableSourceTest extends TableTestBase { unaryNode( "DataStreamCalc", "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])", - term("select", "TIME_MATERIALIZATION(addTime) AS addTime", "id", "name", "val") + term("select", "addTime", "id", "name", "val") ) util.verifyTable(t, expected) } @@ -90,7 +90,7 @@ class TableSourceTest extends TableTestBase { unaryNode( "DataStreamCalc", "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])", - term("select", "TIME_MATERIALIZATION(pTime) AS pTime", "id", "name", "val") + term("select", "PROCTIME(pTime) AS pTime", "id", "name", "val") ) util.verifyTable(t, expected) } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala index b17debe..ab80c65 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala @@ -48,7 +48,7 @@ class TimeIndicatorConversionTest extends TableTestBase { val expected = unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "FLOOR(TIME_MATERIALIZATION(rowtime)", "FLAG(DAY)) AS rowtime"), + term("select", "FLOOR(CAST(rowtime)", "FLAG(DAY)) AS rowtime"), term("where", ">(long, 0)") ) @@ -65,8 +65,8 @@ class TimeIndicatorConversionTest extends TableTestBase { val expected = unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long", "int", - "TIME_MATERIALIZATION(proctime) AS proctime") + term("select", "rowtime", "long", "int", + "PROCTIME(proctime) AS proctime") ) util.verifyTable(result, expected) @@ -84,7 +84,7 @@ class TimeIndicatorConversionTest extends TableTestBase { val expected = unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime"), + term("select", "rowtime"), term("where", ">(rowtime, 1990-12-02 12:11:11)") ) @@ -107,7 +107,7 @@ class TimeIndicatorConversionTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "long", "TIME_MATERIALIZATION(rowtime) AS rowtime") + term("select", "long", "CAST(rowtime) AS rowtime") ), term("groupBy", "rowtime"), term("select", "rowtime", "COUNT(long) AS TMP_0") @@ -134,7 +134,7 @@ class TimeIndicatorConversionTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime", "long") + term("select", "CAST(rowtime) AS rowtime", "long") ), term("groupBy", "long"), term("select", "long", "MIN(rowtime) AS TMP_0") @@ -159,16 +159,13 @@ class TimeIndicatorConversionTest extends TableTestBase { "DataStreamCorrelate", streamTableNode(0), term("invocation", - s"${func.functionIdentifier}(TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3), '')"), + s"${func.functionIdentifier}(CAST($$0):TIMESTAMP(3) NOT NULL, PROCTIME($$3), '')"), term("function", func), term("rowType", "RecordType(TIME ATTRIBUTE(ROWTIME) rowtime, BIGINT long, INTEGER int, " + "TIME ATTRIBUTE(PROCTIME) proctime, VARCHAR(65536) s)"), term("joinType", "INNER") ), - term("select", - "TIME_MATERIALIZATION(rowtime) AS rowtime", - "TIME_MATERIALIZATION(proctime) AS proctime", - "s") + term("select", "rowtime", "PROCTIME(proctime) AS proctime", "s") ) util.verifyTable(result, expected) @@ -219,7 +216,7 @@ class TimeIndicatorConversionTest extends TableTestBase { streamTableNode(0), term("union all", "rowtime", "long", "int") ), - term("select", "TIME_MATERIALIZATION(rowtime) AS rowtime") + term("select", "rowtime") ) util.verifyTable(result, expected) @@ -287,7 +284,7 @@ class TimeIndicatorConversionTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "TIME_MATERIALIZATION(proctime) AS proctime", "long") + term("select", "PROCTIME(proctime) AS proctime", "long") ), term("groupBy", "proctime"), term("select", "proctime", "COUNT(long) AS EXPR$0") @@ -312,7 +309,7 @@ class TimeIndicatorConversionTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "long", "TIME_MATERIALIZATION(proctime) AS proctime") + term("select", "long", "PROCTIME(proctime) AS proctime") ), term("groupBy", "long"), term("select", "long", "MIN(proctime) AS EXPR$0") @@ -368,7 +365,7 @@ class TimeIndicatorConversionTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "long", "rowtime", "TIME_MATERIALIZATION(rowtime) AS $f2") + term("select", "long", "rowtime", "CAST(rowtime) AS rowtime0") ), term("groupBy", "long"), term( @@ -377,7 +374,7 @@ class TimeIndicatorConversionTest extends TableTestBase { 'w$, 'rowtime, 100.millis)), - term("select", "long", "MIN($f2) AS EXPR$0") + term("select", "long", "MIN(rowtime0) AS EXPR$0") ), term("select", "EXPR$0", "long") ) http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala index 80ff55e..04aada6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala @@ -46,12 +46,10 @@ class HarnessTestBase { UserDefinedFunctionUtils.serialize(new IntSumWithRetractAggFunction) protected val MinMaxRowType = new RowTypeInfo(Array[TypeInformation[_]]( - INT_TYPE_INFO, LONG_TYPE_INFO, - INT_TYPE_INFO, STRING_TYPE_INFO, LONG_TYPE_INFO), - Array("a", "b", "c", "d", "e")) + Array("rowtime", "a", "b")) protected val SumRowType = new RowTypeInfo(Array[TypeInformation[_]]( LONG_TYPE_INFO, @@ -103,13 +101,13 @@ class HarnessTestBase { | | org.apache.flink.table.functions.AggregateFunction baseClass0 = | (org.apache.flink.table.functions.AggregateFunction) fmin; - | output.setField(5, baseClass0.getValue( + | output.setField(3, baseClass0.getValue( | (org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) | accs.getField(0))); | | org.apache.flink.table.functions.AggregateFunction baseClass1 = | (org.apache.flink.table.functions.AggregateFunction) fmax; - | output.setField(6, baseClass1.getValue( + | output.setField(4, baseClass1.getValue( | (org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) | accs.getField(1))); | } @@ -121,12 +119,12 @@ class HarnessTestBase { | fmin.accumulate( | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) | accs.getField(0)), - | (java.lang.Long) input.getField(4)); + | (java.lang.Long) input.getField(2)); | | fmax.accumulate( | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) | accs.getField(1)), - | (java.lang.Long) input.getField(4)); + | (java.lang.Long) input.getField(2)); | } | | public void retract( @@ -136,12 +134,12 @@ class HarnessTestBase { | fmin.retract( | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator) | accs.getField(0)), - | (java.lang.Long) input.getField(4)); + | (java.lang.Long) input.getField(2)); | | fmax.retract( | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator) | accs.getField(1)), - | (java.lang.Long) input.getField(4)); + | (java.lang.Long) input.getField(2)); | } | | public org.apache.flink.types.Row createAccumulators() { @@ -166,12 +164,10 @@ class HarnessTestBase { | output.setField(0, input.getField(0)); | output.setField(1, input.getField(1)); | output.setField(2, input.getField(2)); - | output.setField(3, input.getField(3)); - | output.setField(4, input.getField(4)); | } | | public org.apache.flink.types.Row createOutputRow() { - | return new org.apache.flink.types.Row(7); + | return new org.apache.flink.types.Row(5); | } | |/******* This test does not use the following methods *******/ @@ -326,7 +322,7 @@ object HarnessTestBase { /** * Return 0 for equal Rows and non zero for different rows */ - class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with Serializable { + class RowResultSortComparator() extends Comparator[Object] with Serializable { override def compare(o1: Object, o2: Object): Int = { http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala index 6c24c5d..065b7bc 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala @@ -154,7 +154,7 @@ class JoinHarnessTest extends HarnessTestBase{ expectedOutput.add(new StreamRecord( CRow(Row.of(2: JInt, "bbb2", 2: JInt, "Hello2"), true), 25)) - verify(expectedOutput, result, new RowResultSortComparator(6)) + verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } @@ -227,7 +227,7 @@ class JoinHarnessTest extends HarnessTestBase{ expectedOutput.add(new StreamRecord( CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12)) - verify(expectedOutput, result, new RowResultSortComparator(6)) + verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala index 04214f9..dd14d7e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala @@ -91,7 +91,7 @@ class NonWindowHarnessTest extends HarnessTestBase { expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 18: JInt), true), 1)) expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt), true), 1)) - verify(expectedOutput, result, new RowResultSortComparator(6)) + verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } @@ -150,7 +150,7 @@ class NonWindowHarnessTest extends HarnessTestBase { expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 2: JInt), false), 10)) expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 5: JInt), true), 10)) - verify(expectedOutput, result, new RowResultSortComparator(0)) + verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() }