[FLINK-5884] [table] Integrate time indicators for Table API & SQL This closes #3808.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/495f104b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/495f104b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/495f104b Branch: refs/heads/master Commit: 495f104b439096dc7eea5141bfe0de0283c5cc62 Parents: 28ab737 Author: twalthr <twal...@apache.org> Authored: Thu Mar 2 16:06:55 2017 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Sat May 6 01:51:31 2017 +0200 ---------------------------------------------------------------------- .../flink/table/api/BatchTableEnvironment.scala | 25 +- .../table/api/StreamTableEnvironment.scala | 79 ++-- .../flink/table/api/TableEnvironment.scala | 101 ++++- .../flink/table/api/scala/expressionDsl.scala | 18 +- .../apache/flink/table/api/scala/windows.scala | 2 +- .../org/apache/flink/table/api/table.scala | 14 +- .../org/apache/flink/table/api/windows.scala | 357 +++++++-------- .../calcite/FlinkCalciteSqlValidator.scala | 12 +- .../flink/table/calcite/FlinkPlannerImpl.scala | 6 +- .../flink/table/calcite/FlinkTypeFactory.scala | 132 +++++- .../calcite/RelTimeIndicatorConverter.scala | 222 ++++++++++ .../flink/table/codegen/CodeGenerator.scala | 99 +++-- .../table/codegen/calls/FunctionGenerator.scala | 10 - .../table/expressions/ExpressionUtils.scala | 39 ++ .../apache/flink/table/expressions/call.scala | 16 +- .../table/expressions/fieldExpression.scala | 48 +- .../TimeMaterializationSqlFunction.scala | 41 ++ .../functions/TimeModeIndicatorFunctions.scala | 137 ------ .../flink/table/plan/ProjectionTranslator.scala | 31 +- .../table/plan/logical/LogicalWindow.scala | 14 +- .../flink/table/plan/logical/groupWindows.scala | 280 ++++-------- .../flink/table/plan/logical/operators.scala | 40 +- .../flink/table/plan/nodes/CommonCalc.scala | 37 +- .../table/plan/nodes/CommonCorrelate.scala | 32 +- .../flink/table/plan/nodes/CommonScan.scala | 24 +- .../flink/table/plan/nodes/OverAggregate.scala | 35 +- .../plan/nodes/PhysicalTableSourceScan.scala | 6 +- .../table/plan/nodes/dataset/BatchScan.scala | 12 +- .../nodes/dataset/BatchTableSourceScan.scala | 14 +- .../plan/nodes/dataset/DataSetAggregate.scala | 8 +- .../table/plan/nodes/dataset/DataSetCalc.scala | 21 +- .../plan/nodes/dataset/DataSetCorrelate.scala | 9 +- .../nodes/dataset/DataSetWindowAggregate.scala | 68 +-- .../nodes/datastream/DataStreamAggregate.scala | 178 ++++---- .../plan/nodes/datastream/DataStreamCalc.scala | 22 +- .../nodes/datastream/DataStreamCorrelate.scala | 22 +- .../datastream/DataStreamOverAggregate.scala | 112 ++--- .../plan/nodes/datastream/DataStreamRel.scala | 1 - .../plan/nodes/datastream/DataStreamScan.scala | 10 +- .../plan/nodes/datastream/DataStreamUnion.scala | 14 +- .../nodes/datastream/DataStreamValues.scala | 21 +- .../plan/nodes/datastream/StreamScan.scala | 32 +- .../datastream/StreamTableSourceScan.scala | 58 ++- .../nodes/logical/FlinkLogicalOverWindow.scala | 2 +- .../logical/FlinkLogicalTableSourceScan.scala | 6 +- .../common/WindowStartEndPropertiesRule.scala | 4 +- .../datastream/DataStreamAggregateRule.scala | 5 +- .../rules/datastream/DataStreamCalcRule.scala | 4 +- .../datastream/DataStreamCorrelateRule.scala | 6 +- .../DataStreamLogicalWindowAggregateRule.scala | 56 ++- .../DataStreamOverAggregateRule.scala | 5 +- .../rules/datastream/DataStreamScanRule.scala | 4 +- .../rules/datastream/DataStreamUnionRule.scala | 3 +- .../rules/datastream/DataStreamValuesRule.scala | 3 +- .../table/plan/schema/DataStreamTable.scala | 14 + .../flink/table/plan/schema/FlinkTable.scala | 7 +- .../flink/table/plan/schema/RowSchema.scala | 152 +++++++ .../plan/schema/TimeIndicatorRelDataType.scala | 49 +++ .../apache/flink/table/runtime/MapRunner.scala | 2 +- .../table/runtime/aggregate/AggregateUtil.scala | 190 ++++---- .../aggregate/RowTimeBoundedRangeOver.scala | 6 +- .../aggregate/RowTimeBoundedRowsOver.scala | 2 +- .../table/sources/DefinedTimeAttributes.scala | 47 ++ .../table/typeutils/TimeIndicatorTypeInfo.scala | 45 ++ .../flink/table/typeutils/TypeCheckUtils.scala | 5 +- .../flink/table/validate/FunctionCatalog.scala | 10 +- .../api/java/batch/TableEnvironmentITCase.java | 9 - .../flink/table/TableEnvironmentTest.scala | 55 ++- .../scala/batch/TableEnvironmentITCase.scala | 10 - .../scala/batch/sql/WindowAggregateTest.scala | 18 +- .../scala/batch/table/FieldProjectionTest.scala | 36 +- .../api/scala/batch/table/GroupWindowTest.scala | 121 ++--- .../table/api/scala/stream/sql/SqlITCase.scala | 150 ++++--- .../scala/stream/sql/WindowAggregateTest.scala | 179 ++++---- .../scala/stream/table/AggregationsITCase.scala | 12 +- .../api/scala/stream/table/CalcITCase.scala | 16 - .../scala/stream/table/GroupWindowTest.scala | 440 ++++++------------- .../scala/stream/table/OverWindowITCase.scala | 12 +- .../api/scala/stream/table/OverWindowTest.scala | 101 ++--- .../GroupWindowStringExpressionTest.scala | 6 +- .../OverWindowStringExpressionTest.scala | 16 +- .../datastream/DataStreamAggregateITCase.scala | 22 +- 82 files changed, 2368 insertions(+), 1921 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 00cf11c..3eb2ffc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -194,7 +194,30 @@ abstract class BatchTableEnvironment( protected def registerDataSetInternal[T]( name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = { - val (fieldNames, fieldIndexes) = getFieldInfo[T](dataSet.getType, fields) + val (fieldNames, fieldIndexes) = getFieldInfo[T]( + dataSet.getType, + fields, + ignoreTimeAttributes = true) + + // validate and extract time attributes + val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields) + + // don't allow proctime on batch + proctime match { + case Some(_) => + throw new ValidationException( + "A proctime attribute is not allowed in a batch environment. " + + "Working with processing-time on batch would lead to non-deterministic results.") + case _ => // ok + } + // rowtime must not extend the schema of a batch table + rowtime match { + case Some((idx, _)) if idx >= dataSet.getType.getArity => + throw new ValidationException( + "A rowtime attribute must be defined on an existing field in a batch environment.") + case _ => // ok + } + val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames) registerTableInternal(name, dataSetTable) } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index f532c5b..d1f2fb5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.calcite.RelTimeIndicatorConverter import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions.Expression import org.apache.flink.table.plan.nodes.FlinkConventions @@ -87,47 +88,6 @@ abstract class StreamTableEnvironment( protected def createUniqueTableName(): String = "_DataStreamTable_" + nameCntr.getAndIncrement() /** - * Returns field names and field positions for a given [[TypeInformation]]. - * - * Field names are automatically extracted for - * [[org.apache.flink.api.common.typeutils.CompositeType]]. - * The method fails if inputType is not a - * [[org.apache.flink.api.common.typeutils.CompositeType]]. - * - * @param inputType The TypeInformation extract the field names and positions from. - * @tparam A The type of the TypeInformation. - * @return A tuple of two arrays holding the field names and corresponding field positions. - */ - override protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]) - : (Array[String], Array[Int]) = { - val fieldInfo = super.getFieldInfo(inputType) - if (fieldInfo._1.contains("rowtime")) { - throw new TableException("'rowtime' ia a reserved field name in stream environment.") - } - fieldInfo - } - - /** - * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of - * [[Expression]]. - * - * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated. - * @param exprs The expressions that define the field names. - * @tparam A The type of the TypeInformation. - * @return A tuple of two arrays holding the field names and corresponding field positions. - */ - override protected[flink] def getFieldInfo[A]( - inputType: TypeInformation[A], - exprs: Array[Expression]) - : (Array[String], Array[Int]) = { - val fieldInfo = super.getFieldInfo(inputType, exprs) - if (fieldInfo._1.contains("rowtime")) { - throw new TableException("'rowtime' is a reserved field name in stream environment.") - } - fieldInfo - } - - /** * Registers an external [[StreamTableSource]] in this [[TableEnvironment]]'s catalog. * Registered tables can be referenced in SQL queries. * @@ -145,6 +105,7 @@ abstract class StreamTableEnvironment( "StreamTableEnvironment") } } + /** * Writes a [[Table]] to a [[TableSink]]. * @@ -185,7 +146,9 @@ abstract class StreamTableEnvironment( val dataStreamTable = new DataStreamTable[T]( dataStream, fieldIndexes, - fieldNames + fieldNames, + None, + None ) registerTableInternal(name, dataStreamTable) } @@ -200,15 +163,26 @@ abstract class StreamTableEnvironment( * @tparam T The type of the [[DataStream]]. */ protected def registerDataStreamInternal[T]( - name: String, - dataStream: DataStream[T], - fields: Array[Expression]): Unit = { + name: String, + dataStream: DataStream[T], + fields: Array[Expression]) + : Unit = { + + val (fieldNames, fieldIndexes) = getFieldInfo[T]( + dataStream.getType, + fields, + ignoreTimeAttributes = false) + + // validate and extract time attributes + val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields) + - val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields) val dataStreamTable = new DataStreamTable[T]( dataStream, fieldIndexes, - fieldNames + fieldNames, + rowtime, + proctime ) registerTableInternal(name, dataStreamTable) } @@ -259,7 +233,10 @@ abstract class StreamTableEnvironment( // 1. decorrelate val decorPlan = RelDecorrelator.decorrelateQuery(relNode) - // 2. normalize the logical plan + // 2. convert time indicators + val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder) + + // 3. normalize the logical plan val normRuleSet = getNormRuleSet val normalizedPlan = if (normRuleSet.iterator().hasNext) { runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, decorPlan, decorPlan.getTraitSet) @@ -267,7 +244,7 @@ abstract class StreamTableEnvironment( decorPlan } - // 3. optimize the logical Flink plan + // 4. optimize the logical Flink plan val logicalOptRuleSet = getLogicalOptRuleSet val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify() val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) { @@ -276,7 +253,7 @@ abstract class StreamTableEnvironment( normalizedPlan } - // 4. optimize the physical Flink plan + // 5. optimize the physical Flink plan val physicalOptRuleSet = getPhysicalOptRuleSet val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify() val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) { @@ -285,7 +262,7 @@ abstract class StreamTableEnvironment( logicalPlan } - // 5. decorate the optimized plan + // 6. decorate the optimized plan val decoRuleSet = getDecoRuleSet val decoratedPlan = if (decoRuleSet.iterator().hasNext) { runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, physicalPlan, physicalPlan.getTraitSet) http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 06c405e..4c72e8f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -52,6 +52,9 @@ import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer} import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference} import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ import org.apache.flink.table.functions.{ScalarFunction, TableFunction, AggregateFunction} +import org.apache.flink.table.expressions._ +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions} +import org.apache.flink.table.functions.{ScalarFunction, TableFunction} import org.apache.flink.table.plan.cost.DataSetCostFactory import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode} import org.apache.flink.table.plan.rules.FlinkRuleSets @@ -598,70 +601,94 @@ abstract class TableEnvironment(val config: TableConfig) { /** * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of - * [[Expression]]. + * [[Expression]]. It does not handle time attributes but considers them in indices, if + * ignore flag is not false. * * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated. * @param exprs The expressions that define the field names. + * @param ignoreTimeAttributes ignore time attributes and handle them as regular expressions. * @tparam A The type of the TypeInformation. * @return A tuple of two arrays holding the field names and corresponding field positions. */ protected[flink] def getFieldInfo[A]( - inputType: TypeInformation[A], - exprs: Array[Expression]): (Array[String], Array[Int]) = { + inputType: TypeInformation[A], + exprs: Array[Expression], + ignoreTimeAttributes: Boolean) + : (Array[String], Array[Int]) = { TableEnvironment.validateType(inputType) + val filteredExprs = if (ignoreTimeAttributes) { + exprs.map { + case ta: TimeAttribute => ta.expression + case e@_ => e + } + } else { + exprs + } + val indexedNames: Array[(Int, String)] = inputType match { case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] => throw new TableException( "An input of GenericTypeInfo<Row> cannot be converted to Table. " + "Please specify the type of the input with a RowTypeInfo.") case a: AtomicType[A] => - if (exprs.length != 1) { - throw new TableException("Table of atomic type can only have a single field.") - } - exprs.map { - case UnresolvedFieldReference(name) => (0, name) + filteredExprs.zipWithIndex flatMap { + case (UnresolvedFieldReference(name), idx) => + if (idx > 0) { + throw new TableException("Table of atomic type can only have a single field.") + } + Some((0, name)) + case (_: TimeAttribute, _) if ignoreTimeAttributes => + None case _ => throw new TableException("Field reference expression requested.") } case t: TupleTypeInfo[A] => - exprs.zipWithIndex.map { - case (UnresolvedFieldReference(name), idx) => (idx, name) + filteredExprs.zipWithIndex flatMap { + case (UnresolvedFieldReference(name), idx) => + Some((idx, name)) case (Alias(UnresolvedFieldReference(origName), name, _), _) => val idx = t.getFieldIndex(origName) if (idx < 0) { throw new TableException(s"$origName is not a field of type $t") } - (idx, name) + Some((idx, name)) + case (_: TimeAttribute, _) => + None case _ => throw new TableException( "Field reference expression or alias on field expression expected.") } case c: CaseClassTypeInfo[A] => - exprs.zipWithIndex.map { - case (UnresolvedFieldReference(name), idx) => (idx, name) + filteredExprs.zipWithIndex flatMap { + case (UnresolvedFieldReference(name), idx) => + Some((idx, name)) case (Alias(UnresolvedFieldReference(origName), name, _), _) => val idx = c.getFieldIndex(origName) if (idx < 0) { throw new TableException(s"$origName is not a field of type $c") } - (idx, name) + Some((idx, name)) + case (_: TimeAttribute, _) => + None case _ => throw new TableException( "Field reference expression or alias on field expression expected.") } case p: PojoTypeInfo[A] => - exprs.map { + filteredExprs flatMap { case (UnresolvedFieldReference(name)) => val idx = p.getFieldIndex(name) if (idx < 0) { throw new TableException(s"$name is not a field of type $p") } - (idx, name) + Some((idx, name)) case Alias(UnresolvedFieldReference(origName), name, _) => val idx = p.getFieldIndex(origName) if (idx < 0) { throw new TableException(s"$origName is not a field of type $p") } - (idx, name) + Some((idx, name)) + case _: TimeAttribute => + None case _ => throw new TableException( "Field reference expression or alias on field expression expected.") } @@ -795,6 +822,42 @@ abstract class TableEnvironment(val config: TableConfig) { Some(mapFunction) } + /** + * Checks for at most one rowtime and proctime attribute. + * Returns the time attributes. + * + * @return rowtime attribute and proctime attribute + */ + protected def validateAndExtractTimeAttributes( + fieldNames: Seq[String], + fieldIndices: Seq[Int], + exprs: Array[Expression]) + : (Option[(Int, String)], Option[(Int, String)]) = { + + var rowtime: Option[(Int, String)] = None + var proctime: Option[(Int, String)] = None + + exprs.zipWithIndex.foreach { + case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) => + if (rowtime.isDefined) { + throw new TableException( + "The rowtime attribute can only be defined once in a table schema.") + } else { + rowtime = Some(idx, name) + } + case (ProctimeAttribute(reference@UnresolvedFieldReference(name)), idx) => + if (proctime.isDefined) { + throw new TableException( + "The proctime attribute can only be defined once in a table schema.") + } else { + proctime = Some(idx, name) + } + case _ => + // do nothing + } + + (rowtime, proctime) + } } /** @@ -803,6 +866,10 @@ abstract class TableEnvironment(val config: TableConfig) { */ object TableEnvironment { + // default names that can be used in in TableSources etc. + val DEFAULT_ROWTIME_ATTRIBUTE = "rowtime" + val DEFAULT_PROCTIME_ATTRIBUTE = "proctime" + /** * Returns a [[JavaBatchTableEnv]] for a Java [[JavaBatchExecEnv]]. * http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index cc58ff5..6d15212 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -625,7 +625,7 @@ trait ImplicitExpressionOperations { */ def millis = milli - // row interval type + // Row interval type /** * Creates an interval of rows. @@ -634,6 +634,8 @@ trait ImplicitExpressionOperations { */ def rows = toRowInterval(expr) + // Advanced type helper functions + /** * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and * returns it's value. @@ -680,6 +682,20 @@ trait ImplicitExpressionOperations { * @return the first and only element of an array with a single element */ def element() = ArrayElement(expr) + + // Schema definition + + /** + * Declares a field as the rowtime attribute for indicating, accessing, and working in + * Flink's event time. + */ + def rowtime = RowtimeAttribute(expr) + + /** + * Declares a field as the proctime attribute for indicating, accessing, and working in + * Flink's processing time. + */ + def proctime = ProctimeAttribute(expr) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala index 5e70440..d0430c2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala @@ -18,8 +18,8 @@ package org.apache.flink.table.api.scala +import org.apache.flink.table.api.{OverWindowWithOrderBy, SessionWithGap, SlideWithSize, TumbleWithSize} import org.apache.flink.table.expressions.Expression -import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithOrderBy, SlideWithSize, SessionWithGap} /** * Helper object for creating a tumbling window. Tumbling windows are consecutive, non-overlapping http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala index 87dde0a..dd8265b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala @@ -20,12 +20,12 @@ package org.apache.flink.table.api import org.apache.calcite.rel.RelNode import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.operators.join.JoinType -import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} +import org.apache.flink.table.calcite.FlinkRelBuilder +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, UnresolvedAlias, UnresolvedFieldReference} -import org.apache.flink.table.plan.logical.Minus import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils import org.apache.flink.table.plan.ProjectionTranslator._ -import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.logical.{Minus, _} import org.apache.flink.table.sinks.TableSink import _root_.scala.collection.JavaConverters._ @@ -1015,13 +1015,7 @@ class WindowGroupedTable( val projectsOnAgg = replaceAggregationsAndProperties( fields, table.tableEnv, aggNames, propNames) - val projectFields = (table.tableEnv, window) match { - // event time can be arbitrary field in batch environment - case (_: BatchTableEnvironment, w: EventTimeWindow) => - extractFieldReferences(fields ++ groupKeys ++ Seq(w.timeField)) - case (_, _) => - extractFieldReferences(fields ++ groupKeys) - } + val projectFields = extractFieldReferences(fields ++ groupKeys :+ window.timeField) new Table(table.tableEnv, Project( http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala index 80260f7..11ef360 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala @@ -149,7 +149,7 @@ class OverWindowWithOrderBy( * A window specification. * * Window groups rows based on time or row-count intervals. It is a general way to group the - * elements, which is very helpful for both groupby-aggregations and over-aggregations to + * elements, which is very helpful for both groupBy-aggregations and over-aggregations to * compute aggregates on groups of elements. * * Infinite streaming tables can only be grouped into time or row intervals. Hence window grouping @@ -157,111 +157,73 @@ class OverWindowWithOrderBy( * * For finite batch tables, window provides shortcuts for time-based groupBy. * - * @param alias The expression of alias for this Window */ -abstract class Window(val alias: Expression) { +abstract class Window(val alias: Expression, val timeField: Expression) { /** * Converts an API class to a logical window for planning. */ private[flink] def toLogicalWindow: LogicalWindow + } +// ------------------------------------------------------------------------------------------------ +// Tumbling windows +// ------------------------------------------------------------------------------------------------ + /** - * A window specification without alias. + * Tumbling window. + * + * For streaming tables you can specify grouping by a event-time or processing-time attribute. + * + * For batch tables you can specify grouping on a timestamp or long attribute. + * + * @param size the size of the window either as time or row-count interval. */ -abstract class WindowWithoutAlias { +class TumbleWithSize(size: Expression) { /** - * Assigns an alias for this window that the following `groupBy()` and `select()` clause can - * refer to. `select()` statement can access window properties such as window start or end time. + * Tumbling window. * - * @param alias alias for this window - * @return this window - */ - def as(alias: Expression): Window - - /** - * Assigns an alias for this window that the following `groupBy()` and `select()` clause can - * refer to. `select()` statement can access window properties such as window start or end time. + * For streaming tables you can specify grouping by a event-time or processing-time attribute. * - * @param alias alias for this window - * @return this window + * For batch tables you can specify grouping on a timestamp or long attribute. + * + * @param size the size of the window either as time or row-count interval. */ - def as(alias: String): Window = as(ExpressionParser.parseExpression(alias)) -} - -/** - * A predefined specification of window on processing-time - */ -abstract class ProcTimeWindowWithoutAlias extends WindowWithoutAlias { + def this(size: String) = this(ExpressionParser.parseExpression(size)) /** * Specifies the time attribute on which rows are grouped. * - * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows - * are grouped by processing-time. + * For streaming tables you can specify grouping by a event-time or processing-time attribute. * - * For batch tables, refer to a timestamp or long attribute. + * For batch tables you can specify grouping on a timestamp or long attribute. * - * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a predefined window on event-time + * @param timeField time attribute for streaming and batch tables + * @return a tumbling window on event-time */ - def on(timeField: Expression): WindowWithoutAlias + def on(timeField: Expression): TumbleWithSizeOnTime = + new TumbleWithSizeOnTime(timeField, size) /** * Specifies the time attribute on which rows are grouped. * - * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows - * are grouped by processing-time. + * For streaming tables you can specify grouping by a event-time or processing-time attribute. * - * For batch tables, refer to a timestamp or long attribute. + * For batch tables you can specify grouping on a timestamp or long attribute. * - * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a predefined window on event-time + * @param timeField time attribute for streaming and batch tables + * @return a tumbling window on event-time */ - def on(timeField: String): WindowWithoutAlias = + def on(timeField: String): TumbleWithSizeOnTime = on(ExpressionParser.parseExpression(timeField)) } /** - * A window operating on event-time. - * - * For streaming tables call on('rowtime) to specify grouping by event-time. - * Otherwise rows are grouped by processing-time. - * - * For batch tables, refer to a timestamp or long attribute. - * - * @param timeField time mode for streaming tables and time attribute for batch tables + * Tumbling window on time. */ -abstract class EventTimeWindow(alias: Expression, val timeField: Expression) extends Window(alias) - -// ------------------------------------------------------------------------------------------------ -// Tumbling windows -// ------------------------------------------------------------------------------------------------ - -/** - * A partial specification of a tumbling window. - * - * @param size the size of the window either a time or a row-count interval. - */ -class TumbleWithSize(size: Expression) extends ProcTimeWindowWithoutAlias { - - def this(size: String) = this(ExpressionParser.parseExpression(size)) - - /** - * Specifies the time attribute on which rows are grouped. - * - * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. - * Otherwise rows are grouped by processing-time. - * - * For batch tables, refer to a timestamp or long attribute. - * - * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a predefined window on event-time - */ - override def on(timeField: Expression): WindowWithoutAlias = - new TumbleWithoutAlias(timeField, size) +class TumbleWithSizeOnTime(time: Expression, size: Expression) { /** * Assigns an alias for this window that the following `groupBy()` and `select()` clause can @@ -270,15 +232,9 @@ class TumbleWithSize(size: Expression) extends ProcTimeWindowWithoutAlias { * @param alias alias for this window * @return this window */ - override def as(alias: Expression) = new TumblingWindow(alias, size) -} - -/** - * A tumbling window on event-time without alias. - */ -class TumbleWithoutAlias( - time: Expression, - size: Expression) extends WindowWithoutAlias { + def as(alias: Expression): TumbleWithSizeOnTimeWithAlias = { + new TumbleWithSizeOnTimeWithAlias(alias, time, size) + } /** * Assigns an alias for this window that the following `groupBy()` and `select()` clause can @@ -287,31 +243,28 @@ class TumbleWithoutAlias( * @param alias alias for this window * @return this window */ - override def as(alias: Expression): Window = new TumblingEventTimeWindow(alias, time, size) -} - -/** - * Tumbling window on processing-time. - * - * @param alias the alias of the window. - * @param size the size of the window either a time or a row-count interval. - */ -class TumblingWindow(alias: Expression, size: Expression) extends Window(alias) { - - override private[flink] def toLogicalWindow: LogicalWindow = - ProcessingTimeTumblingGroupWindow(alias, size) + def as(alias: String): TumbleWithSizeOnTimeWithAlias = { + as(ExpressionParser.parseExpression(alias)) + } } /** - * Tumbling window on event-time. + * Tumbling window on time with alias. Fully specifies a window. */ -class TumblingEventTimeWindow( +class TumbleWithSizeOnTimeWithAlias( alias: Expression, - time: Expression, - size: Expression) extends EventTimeWindow(alias, time) { + timeField: Expression, + size: Expression) + extends Window( + alias, + timeField) { - override private[flink] def toLogicalWindow: LogicalWindow = - EventTimeTumblingGroupWindow(alias, time, size) + /** + * Converts an API class to a logical window for planning. + */ + override private[flink] def toLogicalWindow: LogicalWindow = { + TumblingGroupWindow(alias, timeField, size) + } } // ------------------------------------------------------------------------------------------------ @@ -319,16 +272,16 @@ class TumblingEventTimeWindow( // ------------------------------------------------------------------------------------------------ /** - * A partially specified sliding window. + * Partially specified sliding window. * - * @param size the size of the window either a time or a row-count interval. + * @param size the size of the window either as time or row-count interval. */ class SlideWithSize(size: Expression) { /** - * A partially specified sliding window. + * Partially specified sliding window. * - * @param size the size of the window either a time or a row-count interval. + * @param size the size of the window either as time or row-count interval. */ def this(size: String) = this(ExpressionParser.parseExpression(size)) @@ -343,9 +296,9 @@ class SlideWithSize(size: Expression) { * windows. * * @param slide the slide of the window either as time or row-count interval. - * @return a predefined sliding window. + * @return a sliding window */ - def every(slide: Expression): SlideWithSlide = new SlideWithSlide(size, slide) + def every(slide: Expression): SlideWithSizeAndSlide = new SlideWithSizeAndSlide(size, slide) /** * Specifies the window's slide as time or row-count interval. @@ -358,48 +311,54 @@ class SlideWithSize(size: Expression) { * windows. * * @param slide the slide of the window either as time or row-count interval. - * @return a predefined sliding window. + * @return a sliding window */ - def every(slide: String): WindowWithoutAlias = every(ExpressionParser.parseExpression(slide)) + def every(slide: String): SlideWithSizeAndSlide = every(ExpressionParser.parseExpression(slide)) } /** - * A partially defined sliding window. + * Sliding window. + * + * For streaming tables you can specify grouping by a event-time or processing-time attribute. + * + * For batch tables you can specify grouping on a timestamp or long attribute. + * + * @param size the size of the window either as time or row-count interval. */ -class SlideWithSlide( - size: Expression, - slide: Expression) extends ProcTimeWindowWithoutAlias { +class SlideWithSizeAndSlide(size: Expression, slide: Expression) { + /** * Specifies the time attribute on which rows are grouped. * - * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows - * are grouped by processing-time. + * For streaming tables you can specify grouping by a event-time or processing-time attribute. * - * For batch tables, refer to a timestamp or long attribute. + * For batch tables you can specify grouping on a timestamp or long attribute. * - * @param timeField time mode for streaming tables and time attribute for batch tables - * @return a predefined Sliding window on event-time. + * @param timeField time attribute for streaming and batch tables + * @return a tumbling window on event-time */ - override def on(timeField: Expression): SlideWithoutAlias = - new SlideWithoutAlias(timeField, size, slide) + def on(timeField: Expression): SlideWithSizeAndSlideOnTime = + new SlideWithSizeAndSlideOnTime(timeField, size, slide) /** - * Assigns an alias for this window that the following `groupBy()` and `select()` clause can - * refer to. `select()` statement can access window properties such as window start or end time. + * Specifies the time attribute on which rows are grouped. * - * @param alias alias for this window - * @return this window + * For streaming tables you can specify grouping by a event-time or processing-time attribute. + * + * For batch tables you can specify grouping on a timestamp or long attribute. + * + * @param timeField time attribute for streaming and batch tables + * @return a tumbling window on event-time */ - override def as(alias: Expression): Window = new SlidingWindow(alias, size, slide) + def on(timeField: String): SlideWithSizeAndSlideOnTime = + on(ExpressionParser.parseExpression(timeField)) } /** - * A partially defined sliding window on event-time without alias. + * Sliding window on time. */ -class SlideWithoutAlias( - timeField: Expression, - size: Expression, - slide: Expression) extends WindowWithoutAlias { +class SlideWithSizeAndSlideOnTime(timeField: Expression, size: Expression, slide: Expression) { + /** * Assigns an alias for this window that the following `groupBy()` and `select()` clause can * refer to. `select()` statement can access window properties such as window start or end time. @@ -407,39 +366,40 @@ class SlideWithoutAlias( * @param alias alias for this window * @return this window */ - override def as(alias: Expression): Window = - new SlidingEventTimeWindow(alias, timeField, size, slide) -} + def as(alias: Expression): SlideWithSizeAndSlideOnTimeWithAlias = { + new SlideWithSizeAndSlideOnTimeWithAlias(alias, timeField, size, slide) + } -/** - * A sliding window on processing-time. - * - * @param alias the alias of the window. - * @param size the size of the window either a time or a row-count interval. - * @param slide the interval by which the window slides. - */ -class SlidingWindow( - alias: Expression, - size: Expression, - slide: Expression) - extends Window(alias) { - - override private[flink] def toLogicalWindow: LogicalWindow = - ProcessingTimeSlidingGroupWindow(alias, size, slide) + /** + * Assigns an alias for this window that the following `groupBy()` and `select()` clause can + * refer to. `select()` statement can access window properties such as window start or end time. + * + * @param alias alias for this window + * @return this window + */ + def as(alias: String): SlideWithSizeAndSlideOnTimeWithAlias = { + as(ExpressionParser.parseExpression(alias)) + } } /** - * A sliding window on event-time. + * Sliding window on time with alias. Fully specifies a window. */ -class SlidingEventTimeWindow( +class SlideWithSizeAndSlideOnTimeWithAlias( alias: Expression, timeField: Expression, size: Expression, slide: Expression) - extends EventTimeWindow(alias, timeField) { + extends Window( + alias, + timeField) { - override private[flink] def toLogicalWindow: LogicalWindow = - EventTimeSlidingGroupWindow(alias, timeField, size, slide) + /** + * Converts an API class to a logical window for planning. + */ + override private[flink] def toLogicalWindow: LogicalWindow = { + SlidingGroupWindow(alias, timeField, size, slide) + } } // ------------------------------------------------------------------------------------------------ @@ -447,42 +407,59 @@ class SlidingEventTimeWindow( // ------------------------------------------------------------------------------------------------ /** - * A partially defined session window. + * Session window. + * + * For streaming tables you can specify grouping by a event-time or processing-time attribute. + * + * For batch tables you can specify grouping on a timestamp or long attribute. + * + * @param gap the time interval of inactivity before a window is closed. */ -class SessionWithGap(gap: Expression) extends ProcTimeWindowWithoutAlias { +class SessionWithGap(gap: Expression) { + /** + * Session window. + * + * For streaming tables you can specify grouping by a event-time or processing-time attribute. + * + * For batch tables you can specify grouping on a timestamp or long attribute. + * + * @param gap the time interval of inactivity before a window is closed. + */ def this(gap: String) = this(ExpressionParser.parseExpression(gap)) /** * Specifies the time attribute on which rows are grouped. * - * For streaming tables call [[on('rowtime)]] to specify grouping by event-time. Otherwise rows - * are grouped by processing-time. + * For streaming tables you can specify grouping by a event-time or processing-time attribute. * - * For batch tables, refer to a timestamp or long attribute. + * For batch tables you can specify grouping on a timestamp or long attribute. * - * @param timeField time mode for streaming tables and time attribute for batch tables - * @return an on event-time session window on event-time + * @param timeField time attribute for streaming and batch tables + * @return a tumbling window on event-time */ - override def on(timeField: Expression): SessionWithoutAlias = - new SessionWithoutAlias(timeField, gap) + def on(timeField: Expression): SessionWithGapOnTime = + new SessionWithGapOnTime(timeField, gap) /** - * Assigns an alias for this window that the following `groupBy()` and `select()` clause can - * refer to. `select()` statement can access window properties such as window start or end time. + * Specifies the time attribute on which rows are grouped. * - * @param alias alias for this window - * @return this window + * For streaming tables you can specify grouping by a event-time or processing-time attribute. + * + * For batch tables you can specify grouping on a timestamp or long attribute. + * + * @param timeField time attribute for streaming and batch tables + * @return a tumbling window on event-time */ - override def as(alias: Expression): Window = new SessionWindow(alias, gap) + def on(timeField: String): SessionWithGapOnTime = + on(ExpressionParser.parseExpression(timeField)) } /** - * A partially defined session window on event-time without alias. + * Session window on time. */ -class SessionWithoutAlias( - timeField: Expression, - gap: Expression) extends WindowWithoutAlias { +class SessionWithGapOnTime(timeField: Expression, gap: Expression) { + /** * Assigns an alias for this window that the following `groupBy()` and `select()` clause can * refer to. `select()` statement can access window properties such as window start or end time. @@ -490,29 +467,37 @@ class SessionWithoutAlias( * @param alias alias for this window * @return this window */ - override def as(alias: Expression): Window = new SessionEventTimeWindow(alias, timeField, gap) -} - -/** - * A session window on processing-time. - * - * @param gap the time interval of inactivity before a window is closed. - */ -class SessionWindow(alias: Expression, gap: Expression) extends Window(alias) { + def as(alias: Expression): SessionWithGapOnTimeWithAlias = { + new SessionWithGapOnTimeWithAlias(alias, timeField, gap) + } - override private[flink] def toLogicalWindow: LogicalWindow = - ProcessingTimeSessionGroupWindow(alias, gap) + /** + * Assigns an alias for this window that the following `groupBy()` and `select()` clause can + * refer to. `select()` statement can access window properties such as window start or end time. + * + * @param alias alias for this window + * @return this window + */ + def as(alias: String): SessionWithGapOnTimeWithAlias = { + as(ExpressionParser.parseExpression(alias)) + } } /** - * A session window on event-time. + * Session window on time with alias. Fully specifies a window. */ -class SessionEventTimeWindow( +class SessionWithGapOnTimeWithAlias( alias: Expression, timeField: Expression, gap: Expression) - extends EventTimeWindow(alias, timeField) { + extends Window( + alias, + timeField) { - override private[flink] def toLogicalWindow: LogicalWindow = - EventTimeSessionGroupWindow(alias, timeField, gap) + /** + * Converts an API class to a logical window for planning. + */ + override private[flink] def toLogicalWindow: LogicalWindow = { + SessionGroupWindow(alias, timeField, gap) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala index b4a3c42..2bdf360 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkCalciteSqlValidator.scala @@ -21,8 +21,8 @@ package org.apache.flink.table.calcite import org.apache.calcite.adapter.java.JavaTypeFactory import org.apache.calcite.prepare.CalciteCatalogReader import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.sql.validate.{SqlConformance, SqlValidatorImpl} -import org.apache.calcite.sql.{SqlInsert, SqlOperatorTable} +import org.apache.calcite.sql._ +import org.apache.calcite.sql.validate.{SqlConformanceEnum, SqlValidatorImpl} /** * This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]]. @@ -30,8 +30,12 @@ import org.apache.calcite.sql.{SqlInsert, SqlOperatorTable} class FlinkCalciteSqlValidator( opTab: SqlOperatorTable, catalogReader: CalciteCatalogReader, - typeFactory: JavaTypeFactory) extends SqlValidatorImpl( - opTab, catalogReader, typeFactory, SqlConformance.DEFAULT) { + factory: JavaTypeFactory) + extends SqlValidatorImpl( + opTab, + catalogReader, + factory, + SqlConformanceEnum.DEFAULT) { override def getLogicalSourceRowType( sourceRowType: RelDataType, http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala index 09e3277..beb2436 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala @@ -107,7 +107,11 @@ class FlinkPlannerImpl( // we disable automatic flattening in order to let composite types pass without modification // we might enable it again once Calcite has better support for structured types // root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)) - root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel)) + + // TableEnvironment.optimize will execute the following + // root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel)) + // convert time indicators + // root = root.withRel(RelTimeIndicatorConverter.convert(root.rel, rexBuilder)) root } catch { case e: RelConversionException => throw TableException(e.getMessage) http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 7762ff8..001011b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -20,25 +20,25 @@ package org.apache.flink.table.calcite import org.apache.calcite.avatica.util.TimeUnit import org.apache.calcite.jdbc.JavaTypeFactoryImpl -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem} +import org.apache.calcite.rel.`type`._ import org.apache.calcite.sql.SqlIntervalQualifier -import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.`type`.SqlTypeName._ +import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName} import org.apache.calcite.sql.parser.SqlParserPos import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo} import org.apache.flink.api.java.typeutils.ValueTypeInfo._ +import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo} import org.apache.flink.table.api.TableException -import org.apache.flink.table.plan.schema.{ArrayRelDataType, CompositeRelDataType, GenericRelDataType, MapRelDataType} -import org.apache.flink.table.typeutils.TimeIntervalTypeInfo -import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName +import org.apache.flink.table.plan.schema._ +import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple +import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TimeIntervalTypeInfo} import org.apache.flink.types.Row -import scala.collection.mutable import scala.collection.JavaConverters._ +import scala.collection.mutable /** * Flink specific type factory that represents the interface between Flink's [[TypeInformation]] @@ -65,6 +65,12 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp createSqlIntervalType( new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)) + case TimeIndicatorTypeInfo.ROWTIME_INDICATOR => + createRowtimeIndicatorType() + + case TimeIndicatorTypeInfo.PROCTIME_INDICATOR => + createProctimeIndicatorType() + case _ => createSqlType(sqlType) } @@ -77,23 +83,75 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp } /** + * Creates a indicator type for processing-time, but with similar properties as SQL timestamp. + */ + def createProctimeIndicatorType(): RelDataType = { + val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP) + canonize( + new TimeIndicatorRelDataType( + getTypeSystem, + originalType.asInstanceOf[BasicSqlType], + isEventTime = false) + ) + } + + /** + * Creates a indicator type for event-time, but with similar properties as SQL timestamp. + */ + def createRowtimeIndicatorType(): RelDataType = { + val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP) + canonize( + new TimeIndicatorRelDataType( + getTypeSystem, + originalType.asInstanceOf[BasicSqlType], + isEventTime = true) + ) + } + + /** * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory * * @param fieldNames field names * @param fieldTypes field types, every element is Flink's [[TypeInformation]] - * @return a struct type with the input fieldNames and input fieldTypes + * @param rowtime optional system field to indicate event-time; the index determines the index + * in the final record and might replace an existing field + * @param proctime optional system field to indicate processing-time; the index determines the + * index in the final record and might replace an existing field + * @return a struct type with the input fieldNames, input fieldTypes, and system fields */ - def buildRowDataType( - fieldNames: Array[String], - fieldTypes: Array[TypeInformation[_]]) + def buildLogicalRowType( + fieldNames: Seq[String], + fieldTypes: Seq[TypeInformation[_]], + rowtime: Option[(Int, String)], + proctime: Option[(Int, String)]) : RelDataType = { - val rowDataTypeBuilder = builder - fieldNames - .zip(fieldTypes) - .foreach { f => - rowDataTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2)).nullable(true) + val logicalRowTypeBuilder = builder + + val fields = fieldNames.zip(fieldTypes) + + var totalNumberOfFields = fields.length + if (rowtime.isDefined) { + totalNumberOfFields += 1 + } + if (proctime.isDefined) { + totalNumberOfFields += 1 + } + + var addedTimeAttributes = 0 + for (i <- 0 until totalNumberOfFields) { + if (rowtime.isDefined && rowtime.get._1 == i) { + logicalRowTypeBuilder.add(rowtime.get._2, createRowtimeIndicatorType()) + addedTimeAttributes += 1 + } else if (proctime.isDefined && proctime.get._1 == i) { + logicalRowTypeBuilder.add(proctime.get._2, createProctimeIndicatorType()) + addedTimeAttributes += 1 + } else { + val field = fields(i - addedTimeAttributes) + logicalRowTypeBuilder.add(field._1, createTypeFromTypeInfo(field._2)).nullable(true) } - rowDataTypeBuilder.build + } + + logicalRowTypeBuilder.build } override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = { @@ -178,6 +236,7 @@ object FlinkTypeFactory { /** * Converts a Calcite logical record into a Flink type information. */ + @deprecated("Use the RowSchema class instead because it handles both logical and physical rows.") def toInternalRowTypeInfo(logicalRowType: RelDataType): TypeInformation[Row] = { // convert to type information val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType => @@ -188,6 +247,36 @@ object FlinkTypeFactory { new RowTypeInfo(logicalFieldTypes.toArray, logicalFieldNames.toArray) } + def isProctimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match { + case ti: TimeIndicatorRelDataType if !ti.isEventTime => true + case _ => false + } + + def isProctimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match { + case ti: TimeIndicatorTypeInfo if !ti.isEventTime => true + case _ => false + } + + def isRowtimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match { + case ti: TimeIndicatorRelDataType if ti.isEventTime => true + case _ => false + } + + def isRowtimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match { + case ti: TimeIndicatorTypeInfo if ti.isEventTime => true + case _ => false + } + + def isTimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match { + case ti: TimeIndicatorRelDataType => true + case _ => false + } + + def isTimeIndicatorType(typeInfo: TypeInformation[_]): Boolean = typeInfo match { + case ti: TimeIndicatorTypeInfo => true + case _ => false + } + def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match { case BOOLEAN => BOOLEAN_TYPE_INFO case TINYINT => BYTE_TYPE_INFO @@ -199,6 +288,15 @@ object FlinkTypeFactory { case VARCHAR | CHAR => STRING_TYPE_INFO case DECIMAL => BIG_DEC_TYPE_INFO + // time indicators + case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] => + val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType] + if (indicator.isEventTime) { + TimeIndicatorTypeInfo.ROWTIME_INDICATOR + } else { + TimeIndicatorTypeInfo.PROCTIME_INDICATOR + } + // temporal types case DATE => SqlTimeTypeInfo.DATE case TIME => SqlTimeTypeInfo.TIME http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala new file mode 100644 index 0000000..fa2e3ee --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.calcite + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl, RelRecordType, StructKind} +import org.apache.calcite.rel.logical._ +import org.apache.calcite.rel.{RelNode, RelShuttleImpl} +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.calcite.FlinkTypeFactory.isTimeIndicatorType +import org.apache.flink.table.functions.TimeMaterializationSqlFunction +import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType + +import scala.collection.JavaConversions._ + +/** + * Traverses a [[RelNode]] tree and converts fields with [[TimeIndicatorRelDataType]] type. If a + * time attribute is accessed for a calculation, it will be materialized. Forwarding is allowed in + * some cases, but not all. + */ +class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttleImpl { + + override def visit(project: LogicalProject): RelNode = { + // visit children and update inputs + val updatedProject = super.visit(project).asInstanceOf[LogicalProject] + + // check if input field contains time indicator type + // materialize field if no time indicator is present anymore + // if input field is already materialized, change to timestamp type + val materializer = new RexTimeIndicatorMaterializer( + rexBuilder, + updatedProject.getInput.getRowType.getFieldList.map(_.getType)) + val newProjects = updatedProject.getProjects.map(_.accept(materializer)) + + // copy project + updatedProject.copy( + updatedProject.getTraitSet, + updatedProject.getInput, + newProjects, + buildRowType(updatedProject.getRowType.getFieldNames, newProjects.map(_.getType)) + ) + } + + override def visit(filter: LogicalFilter): RelNode = { + // visit children and update inputs + val updatedFilter = super.visit(filter).asInstanceOf[LogicalFilter] + + // check if input field contains time indicator type + // materialize field if no time indicator is present anymore + // if input field is already materialized, change to timestamp type + val materializer = new RexTimeIndicatorMaterializer( + rexBuilder, + updatedFilter.getInput.getRowType.getFieldList.map(_.getType)) + val newCondition = updatedFilter.getCondition.accept(materializer) + + // copy filter + updatedFilter.copy( + updatedFilter.getTraitSet, + updatedFilter.getInput, + newCondition + ) + } + + override def visit(union: LogicalUnion): RelNode = { + // visit children and update inputs + val updatedUnion = super.visit(union).asInstanceOf[LogicalUnion] + + // make sure that time indicator types match + val inputTypes = updatedUnion.getInputs.map(_.getRowType) + + val head = inputTypes.head.getFieldList.map(_.getType) + + val isValid = inputTypes.forall { t => + val fieldTypes = t.getFieldList.map(_.getType) + + fieldTypes.zip(head).forall { case (l, r) => + // check if time indicators match + if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) { + val leftTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime + val rightTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime + leftTime == rightTime + } + // one side is not an indicator + else if (isTimeIndicatorType(l) || isTimeIndicatorType(r)) { + false + } + // uninteresting types + else { + true + } + } + } + + if (!isValid) { + throw new ValidationException( + "Union fields with time attributes have different types.") + } + + updatedUnion + } + + override def visit(other: RelNode): RelNode = other match { + case scan: LogicalTableFunctionScan if + stack.size() > 0 && stack.peek().isInstanceOf[LogicalCorrelate] => + // visit children and update inputs + val updatedScan = super.visit(scan).asInstanceOf[LogicalTableFunctionScan] + + val correlate = stack.peek().asInstanceOf[LogicalCorrelate] + + // check if input field contains time indicator type + // materialize field if no time indicator is present anymore + // if input field is already materialized, change to timestamp type + val materializer = new RexTimeIndicatorMaterializer( + rexBuilder, + correlate.getInputs.get(0).getRowType.getFieldList.map(_.getType)) + val newCall = updatedScan.getCall.accept(materializer) + + // copy scan + updatedScan.copy( + updatedScan.getTraitSet, + updatedScan.getInputs, + newCall, + updatedScan.getElementType, + updatedScan.getRowType, + updatedScan.getColumnMappings + ) + + case _ => + super.visit(other) + } + + private def buildRowType(names: Seq[String], types: Seq[RelDataType]): RelDataType = { + val fields = names.zipWithIndex.map { case (name, idx) => + new RelDataTypeFieldImpl(name, idx, types(idx)) + } + new RelRecordType(StructKind.FULLY_QUALIFIED, fields) + } +} + +class RexTimeIndicatorMaterializer( + private val rexBuilder: RexBuilder, + private val input: Seq[RelDataType]) + extends RexShuttle { + + val timestamp = rexBuilder + .getTypeFactory + .asInstanceOf[FlinkTypeFactory] + .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP) + + override def visitInputRef(inputRef: RexInputRef): RexNode = { + // reference is interesting + if (isTimeIndicatorType(inputRef.getType)) { + val resolvedRefType = input(inputRef.getIndex) + // input is a valid time indicator + if (isTimeIndicatorType(resolvedRefType)) { + inputRef + } + // input has been materialized + else { + new RexInputRef(inputRef.getIndex, resolvedRefType) + } + } + // reference is a regular field + else { + super.visitInputRef(inputRef) + } + } + + override def visitCall(call: RexCall): RexNode = { + val updatedCall = super.visitCall(call).asInstanceOf[RexCall] + + // skip materialization for special operators + updatedCall.getOperator match { + case SqlStdOperatorTable.SESSION | SqlStdOperatorTable.HOP | SqlStdOperatorTable.TUMBLE => + return updatedCall + + case _ => // do nothing + } + + // materialize operands with time indicators + val materializedOperands = updatedCall.getOperands.map { o => + if (isTimeIndicatorType(o.getType)) { + rexBuilder.makeCall(TimeMaterializationSqlFunction, o) + } else { + o + } + } + + // remove time indicator return type + if (isTimeIndicatorType(updatedCall.getType)) { + updatedCall.clone(timestamp, materializedOperands) + } else { + updatedCall.clone(updatedCall.getType, materializedOperands) + } + } +} + +object RelTimeIndicatorConverter { + + def convert(rootRel: RelNode, rexBuilder: RexBuilder): RelNode = { + val converter = new RelTimeIndicatorConverter(rexBuilder) + rootRel.accept(converter) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 5bb3b0e..25addbc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -23,7 +23,6 @@ import java.lang.{Iterable => JIterable} import java.math.{BigDecimal => JBigDecimal} import org.apache.calcite.avatica.util.DateTimeUtils -import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.`type`.SqlTypeName._ @@ -42,8 +41,8 @@ import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE} import org.apache.flink.table.codegen.Indenter.toISC import org.apache.flink.table.codegen.calls.FunctionGenerator import org.apache.flink.table.codegen.calls.ScalarOperators._ -import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, UserDefinedFunction} import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.functions.{AggregateFunction, FunctionContext, TimeMaterializationSqlFunction, UserDefinedFunction} import org.apache.flink.table.runtime.TableFunctionCollector import org.apache.flink.table.typeutils.TypeCheckUtils._ import org.apache.flink.types.Row @@ -59,19 +58,18 @@ import scala.collection.mutable * @param nullableInput input(s) can be null. * @param input1 type information about the first input of the Function * @param input2 type information about the second input if the Function is binary - * @param input1PojoFieldMapping additional mapping information if input1 is a POJO (POJO types - * have no deterministic field order). - * @param input2PojoFieldMapping additional mapping information if input2 is a POJO (POJO types - * have no deterministic field order). - * + * @param input1FieldMapping additional mapping information for input1 + * (e.g. POJO types have no deterministic field order and some input fields might not be read) + * @param input2FieldMapping additional mapping information for input2 + * (e.g. POJO types have no deterministic field order and some input fields might not be read) */ class CodeGenerator( - config: TableConfig, - nullableInput: Boolean, - input1: TypeInformation[_ <: Any], - input2: Option[TypeInformation[_ <: Any]] = None, - input1PojoFieldMapping: Option[Array[Int]] = None, - input2PojoFieldMapping: Option[Array[Int]] = None) + config: TableConfig, + nullableInput: Boolean, + input1: TypeInformation[_ <: Any], + input2: Option[TypeInformation[_ <: Any]] = None, + input1FieldMapping: Option[Array[Int]] = None, + input2FieldMapping: Option[Array[Int]] = None) extends RexVisitor[GeneratedExpression] { // check if nullCheck is enabled when inputs can be null @@ -82,7 +80,7 @@ class CodeGenerator( // check for POJO input1 mapping input1 match { case pt: PojoTypeInfo[_] => - input1PojoFieldMapping.getOrElse( + input1FieldMapping.getOrElse( throw new CodeGenException("No input mapping is specified for input1 of type POJO.")) case _ => // ok } @@ -90,11 +88,24 @@ class CodeGenerator( // check for POJO input2 mapping input2 match { case Some(pt: PojoTypeInfo[_]) => - input2PojoFieldMapping.getOrElse( + input2FieldMapping.getOrElse( throw new CodeGenException("No input mapping is specified for input2 of type POJO.")) case _ => // ok } + private val input1Mapping = input1FieldMapping match { + case Some(mapping) => mapping + case _ => (0 until input1.getArity).toArray + } + + private val input2Mapping = input2FieldMapping match { + case Some(mapping) => mapping + case _ => input2 match { + case Some(input) => (0 until input.getArity).toArray + case _ => Array[Int]() + } + } + /** * A code generator for generating unary Flink * [[org.apache.flink.api.common.functions.Function]]s with one input. @@ -102,15 +113,15 @@ class CodeGenerator( * @param config configuration that determines runtime behavior * @param nullableInput input(s) can be null. * @param input type information about the input of the Function - * @param inputPojoFieldMapping additional mapping information necessary if input is a - * POJO (POJO types have no deterministic field order). + * @param inputFieldMapping additional mapping information necessary for input + * (e.g. POJO types have no deterministic field order and some input fields might not be read) */ def this( config: TableConfig, nullableInput: Boolean, input: TypeInformation[Any], - inputPojoFieldMapping: Array[Int]) = - this(config, nullableInput, input, None, Some(inputPojoFieldMapping)) + inputFieldMapping: Array[Int]) = + this(config, nullableInput, input, None, Some(inputFieldMapping)) /** * A code generator for generating Flink input formats. @@ -249,7 +260,7 @@ class CodeGenerator( * @param name Class name of the function. * Does not need to be unique but has to be a valid Java class identifier. * @param generator The code generator instance - * @param inputType Input row type + * @param physicalInputTypes Physical input row types * @param aggregates All aggregate functions * @param aggFields Indexes of the input fields for all aggregate functions * @param aggMapping The mapping of aggregates to output fields @@ -270,7 +281,7 @@ class CodeGenerator( def generateAggregations( name: String, generator: CodeGenerator, - inputType: RelDataType, + physicalInputTypes: Seq[TypeInformation[_]], aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]], aggFields: Array[Array[Int]], aggMapping: Array[Int], @@ -295,8 +306,7 @@ class CodeGenerator( val accTypes = accTypeClasses.map(_.getCanonicalName) // get java classes of input fields - val javaClasses = inputType.getFieldList - .map(f => FlinkTypeFactory.toTypeInfo(f.getType).getTypeClass) + val javaClasses = physicalInputTypes.map(t => t.getTypeClass) // get parameter lists for aggregation functions val parameters = aggFields.map { inFields => val fields = for (f <- inFields) yield @@ -844,12 +854,12 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { - val input1AccessExprs = for (i <- 0 until input1.getArity) - yield generateInputAccess(input1, input1Term, i, input1PojoFieldMapping) + val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) + yield generateInputAccess(input1, input1Term, i, input1Mapping) val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity) - yield generateInputAccess(ti, input2Term, i, input2PojoFieldMapping) + case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) + yield generateInputAccess(ti, input2Term, i, input2Mapping) case None => Seq() // add nothing } @@ -861,14 +871,14 @@ class CodeGenerator( */ def generateCorrelateAccessExprs: (Seq[GeneratedExpression], Seq[GeneratedExpression]) = { val input1AccessExprs = for (i <- 0 until input1.getArity) - yield generateInputAccess(input1, input1Term, i, input1PojoFieldMapping) + yield generateInputAccess(input1, input1Term, i, input1Mapping) val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity) + case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) // use generateFieldAccess instead of generateInputAccess to avoid the generated table // function's field access code is put on the top of function body rather than // the while loop - yield generateFieldAccess(ti, input2Term, i, input2PojoFieldMapping) + yield generateFieldAccess(ti, input2Term, i, input2Mapping) case None => throw new CodeGenException("Type information of input2 must not be null.") } (input1AccessExprs, input2AccessExprs) @@ -1123,11 +1133,11 @@ class CodeGenerator( override def visitInputRef(inputRef: RexInputRef): GeneratedExpression = { // if inputRef index is within size of input1 we work with input1, input2 otherwise val input = if (inputRef.getIndex < input1.getArity) { - (input1, input1Term, input1PojoFieldMapping) + (input1, input1Term, input1Mapping) } else { (input2.getOrElse(throw new CodeGenException("Invalid input access.")), input2Term, - input2PojoFieldMapping) + input2Mapping) } val index = if (input._2 == input1Term) { @@ -1146,7 +1156,7 @@ class CodeGenerator( refExpr.resultType, refExpr.resultTerm, index, - input1PojoFieldMapping) + input1Mapping) val resultTerm = newName("result") val nullTerm = newName("isNull") @@ -1302,6 +1312,11 @@ class CodeGenerator( throw new CodeGenException("Dynamic parameter references are not supported yet.") override def visitCall(call: RexCall): GeneratedExpression = { + // time materialization is not implemented yet + if (call.getOperator == TimeMaterializationSqlFunction) { + throw new CodeGenException("Access to time attributes is not possible yet.") + } + val operands = call.getOperands.map(_.accept(this)) val resultType = FlinkTypeFactory.toTypeInfo(call.getType) @@ -1546,7 +1561,7 @@ class CodeGenerator( inputType: TypeInformation[_ <: Any], inputTerm: String, index: Int, - pojoFieldMapping: Option[Array[Int]]) + fieldMapping: Array[Int]) : GeneratedExpression = { // if input has been used before, we can reuse the code that // has already been generated @@ -1558,9 +1573,9 @@ class CodeGenerator( // generate input access and unboxing if necessary case None => val expr = if (nullableInput) { - generateNullableInputFieldAccess(inputType, inputTerm, index, pojoFieldMapping) + generateNullableInputFieldAccess(inputType, inputTerm, index, fieldMapping) } else { - generateFieldAccess(inputType, inputTerm, index, pojoFieldMapping) + generateFieldAccess(inputType, inputTerm, index, fieldMapping) } reusableInputUnboxingExprs((inputTerm, index)) = expr @@ -1574,7 +1589,7 @@ class CodeGenerator( inputType: TypeInformation[_ <: Any], inputTerm: String, index: Int, - pojoFieldMapping: Option[Array[Int]]) + fieldMapping: Array[Int]) : GeneratedExpression = { val resultTerm = newName("result") val nullTerm = newName("isNull") @@ -1582,7 +1597,7 @@ class CodeGenerator( val fieldType = inputType match { case ct: CompositeType[_] => val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]]) { - pojoFieldMapping.get(index) + fieldMapping(index) } else { index @@ -1593,7 +1608,7 @@ class CodeGenerator( } val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldType) val defaultValue = primitiveDefaultValue(fieldType) - val fieldAccessExpr = generateFieldAccess(inputType, inputTerm, index, pojoFieldMapping) + val fieldAccessExpr = generateFieldAccess(inputType, inputTerm, index, fieldMapping) val inputCheckCode = s""" @@ -1617,12 +1632,12 @@ class CodeGenerator( inputType: TypeInformation[_], inputTerm: String, index: Int, - pojoFieldMapping: Option[Array[Int]]) + fieldMapping: Array[Int]) : GeneratedExpression = { inputType match { case ct: CompositeType[_] => - val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]] && pojoFieldMapping.nonEmpty) { - pojoFieldMapping.get(index) + val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]]) { + fieldMapping(index) } else { index http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index d9f394b..64280c8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -29,7 +29,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression} -import org.apache.flink.table.functions.{EventTimeExtractor, ProcTimeExtractor} import org.apache.flink.table.functions.utils.{ScalarSqlFunction, TableSqlFunction} import scala.collection.mutable @@ -496,15 +495,6 @@ object FunctionGenerator { ) ) - // generate a constant for time indicator functions. - // this is a temporary solution and will be removed when FLINK-5884 is implemented. - case ProcTimeExtractor | EventTimeExtractor => - Some(new CallGenerator { - override def generate(codeGenerator: CodeGenerator, operands: Seq[GeneratedExpression]) = { - GeneratedExpression("0L", "false", "", SqlTimeTypeInfo.TIMESTAMP) - } - }) - // built-in scalar function case _ => sqlFunctions.get((sqlOperator, operandTypes)) http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala index 4b5781f..08abc8f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala @@ -28,10 +28,49 @@ import org.apache.calcite.rex.{RexBuilder, RexNode} import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} +import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime} object ExpressionUtils { + private[flink] def isTimeIntervalLiteral(expr: Expression): Boolean = expr match { + case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) => true + case _ => false + } + + private[flink] def isRowCountLiteral(expr: Expression): Boolean = expr match { + case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) => true + case _ => false + } + + private[flink] def isTimeAttribute(expr: Expression): Boolean = expr match { + case r: ResolvedFieldReference if FlinkTypeFactory.isTimeIndicatorType(r.resultType) => true + case _ => false + } + + private[flink] def isRowtimeAttribute(expr: Expression): Boolean = expr match { + case r: ResolvedFieldReference if FlinkTypeFactory.isRowtimeIndicatorType(r.resultType) => true + case _ => false + } + + private[flink] def isProctimeAttribute(expr: Expression): Boolean = expr match { + case r: ResolvedFieldReference if FlinkTypeFactory.isProctimeIndicatorType(r.resultType) => + true + case _ => false + } + + private[flink] def toTime(expr: Expression): FlinkTime = expr match { + case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => + FlinkTime.milliseconds(value) + case _ => throw new IllegalArgumentException() + } + + private[flink] def toLong(expr: Expression): Long = expr match { + case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value + case _ => throw new IllegalArgumentException() + } + private[flink] def toMonthInterval(expr: Expression, multiplier: Int): Expression = expr match { case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) => Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MONTHS) http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala index 5f7204a..13f8a11 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala @@ -110,18 +110,11 @@ case class OverCall( val aggExprs = agg.asInstanceOf[Aggregation].children.map(_.toRexNode(relBuilder)).asJava // assemble order by key - val orderKey = orderBy match { - case _: RowTime => - new RexFieldCollation(relBuilder.call(EventTimeExtractor), Set[SqlKind]().asJava) - case _: ProcTime => - new RexFieldCollation(relBuilder.call(ProcTimeExtractor), Set[SqlKind]().asJava) - case _ => - throw new ValidationException("Invalid OrderBy expression.") - } + val orderKey = new RexFieldCollation(orderBy.toRexNode, Set[SqlKind]().asJava) val orderKeys = ImmutableList.of(orderKey) // assemble partition by keys - val partitionKeys = partitionBy.map(_.toRexNode(relBuilder)).asJava + val partitionKeys = partitionBy.map(_.toRexNode).asJava // assemble bounds val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo] @@ -249,6 +242,11 @@ case class OverCall( return ValidationFailure("Preceding and following must be of same interval type.") } + // check time field + if (!ExpressionUtils.isTimeAttribute(orderBy)) { + return ValidationFailure("Ordering must be defined on a time attribute.") + } + ValidationSuccess } }