[FLINK-5884] [table] Integrate time indicators for Table API & SQL. Continued
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24bf61ce Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24bf61ce Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24bf61ce Branch: refs/heads/master Commit: 24bf61ceb332f2db2dc4bab624b73beffae1160a Parents: 495f104 Author: Fabian Hueske <fhue...@apache.org> Authored: Thu May 4 18:05:27 2017 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Sat May 6 01:51:54 2017 +0200 ---------------------------------------------------------------------- .../flink/table/api/BatchTableEnvironment.scala | 25 +-- .../table/api/StreamTableEnvironment.scala | 86 ++++++++-- .../flink/table/api/TableEnvironment.scala | 64 +------- .../flink/table/calcite/FlinkTypeFactory.scala | 17 +- .../table/expressions/ExpressionParser.scala | 18 +- .../table/plan/logical/LogicalWindow.scala | 2 +- .../flink/table/plan/logical/groupWindows.scala | 11 +- .../flink/table/plan/nodes/CommonCalc.scala | 10 +- .../plan/nodes/PhysicalTableSourceScan.scala | 2 +- .../plan/nodes/dataset/DataSetAggregate.scala | 2 +- .../table/plan/nodes/dataset/DataSetCalc.scala | 5 +- .../datastream/DataStreamOverAggregate.scala | 5 +- .../datastream/StreamTableSourceScan.scala | 28 +--- .../logical/FlinkLogicalTableSourceScan.scala | 32 +++- .../plan/schema/StreamTableSourceTable.scala | 65 ++++++++ .../table/runtime/aggregate/AggregateUtil.scala | 3 +- .../table/sources/DefinedTimeAttributes.scala | 47 ++++-- .../flink/table/TableEnvironmentTest.scala | 52 ++---- .../api/scala/batch/table/GroupWindowTest.scala | 4 +- .../stream/StreamTableEnvironmentTest.scala | 164 +++++++++++++++++++ .../api/scala/stream/TableSourceTest.scala | 154 +++++++++++++++++ .../scala/stream/sql/WindowAggregateTest.scala | 59 ++++--- 22 files changed, 628 insertions(+), 227 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 3eb2ffc..02c6063 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -31,7 +31,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.table.explain.PlanJsonParser -import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, TimeAttribute} import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.dataset.DataSetRel import org.apache.flink.table.plan.rules.FlinkRuleSets @@ -196,26 +196,11 @@ abstract class BatchTableEnvironment( val (fieldNames, fieldIndexes) = getFieldInfo[T]( dataSet.getType, - fields, - ignoreTimeAttributes = true) + fields) - // validate and extract time attributes - val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields) - - // don't allow proctime on batch - proctime match { - case Some(_) => - throw new ValidationException( - "A proctime attribute is not allowed in a batch environment. " + - "Working with processing-time on batch would lead to non-deterministic results.") - case _ => // ok - } - // rowtime must not extend the schema of a batch table - rowtime match { - case Some((idx, _)) if idx >= dataSet.getType.getArity => - throw new ValidationException( - "A rowtime attribute must be defined on an existing field in a batch environment.") - case _ => // ok + if (fields.exists(_.isInstanceOf[TimeAttribute])) { + throw new ValidationException( + ".rowtime and .proctime time indicators are not allowed in a batch environment.") } val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames) http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index d1f2fb5..dd2c09d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -26,19 +26,21 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{RuleSet, RuleSets} -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.calcite.RelTimeIndicatorConverter import org.apache.flink.table.explain.PlanJsonParser -import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference} import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.datastream.DataStreamRel import org.apache.flink.table.plan.rules.FlinkRuleSets -import org.apache.flink.table.plan.schema.{DataStreamTable, TableSourceTable} +import org.apache.flink.table.plan.schema.{DataStreamTable, StreamTableSourceTable, TableSourceTable} import org.apache.flink.table.sinks.{StreamTableSink, TableSink} import org.apache.flink.table.sources.{StreamTableSource, TableSource} +import org.apache.flink.table.typeutils.TypeCheckUtils import org.apache.flink.types.Row import _root_.scala.collection.JavaConverters._ @@ -99,7 +101,7 @@ abstract class StreamTableEnvironment( tableSource match { case streamTableSource: StreamTableSource[_] => - registerTableInternal(name, new TableSourceTable(streamTableSource)) + registerTableInternal(name, new StreamTableSourceTable(streamTableSource)) case _ => throw new TableException("Only StreamTableSource can be registered in " + "StreamTableEnvironment") @@ -168,14 +170,13 @@ abstract class StreamTableEnvironment( fields: Array[Expression]) : Unit = { - val (fieldNames, fieldIndexes) = getFieldInfo[T]( - dataStream.getType, - fields, - ignoreTimeAttributes = false) + val streamType = dataStream.getType - // validate and extract time attributes - val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields) + // get field names and types for all non-replaced fields + val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType, fields) + // validate and extract time attributes + val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, fields) val dataStreamTable = new DataStreamTable[T]( dataStream, @@ -188,6 +189,71 @@ abstract class StreamTableEnvironment( } /** + * Checks for at most one rowtime and proctime attribute. + * Returns the time attributes. + * + * @return rowtime attribute and proctime attribute + */ + private def validateAndExtractTimeAttributes( + streamType: TypeInformation[_], + exprs: Array[Expression]) + : (Option[(Int, String)], Option[(Int, String)]) = { + + val fieldTypes: Array[TypeInformation[_]] = streamType match { + case c: CompositeType[_] => (0 until c.getArity).map(i => c.getTypeAt(i)).toArray + case a: AtomicType[_] => Array(a) + } + + var fieldNames: List[String] = Nil + var rowtime: Option[(Int, String)] = None + var proctime: Option[(Int, String)] = None + + exprs.zipWithIndex.foreach { + case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) => + if (rowtime.isDefined) { + throw new TableException( + "The rowtime attribute can only be defined once in a table schema.") + } else { + // check type of field that is replaced + if (idx < fieldTypes.length && + !(TypeCheckUtils.isLong(fieldTypes(idx)) || + TypeCheckUtils.isTimePoint(fieldTypes(idx)))) { + throw new TableException( + "The rowtime attribute can only be replace a field with a valid time type, such as " + + "Timestamp or Long.") + } + rowtime = Some(idx, name) + } + case (ProctimeAttribute(reference@UnresolvedFieldReference(name)), idx) => + if (proctime.isDefined) { + throw new TableException( + "The proctime attribute can only be defined once in a table schema.") + } else { + // check that proctime is only appended + if (idx < fieldTypes.length) { + throw new TableException( + "The proctime attribute can only be appended to the table schema and not replace " + + "an existing field. Please move it to the end of the schema.") + } + proctime = Some(idx, name) + } + case (u: UnresolvedFieldReference, _) => fieldNames = u.name :: fieldNames + } + + if (rowtime.isDefined && fieldNames.contains(rowtime.get._2)) { + throw new TableException( + "The rowtime attribute may not have the same name as an another field.") + } + + if (proctime.isDefined && fieldNames.contains(proctime.get._2)) { + throw new TableException( + "The proctime attribute may not have the same name as an another field.") + } + + (rowtime, proctime) + } + + /** * Returns the decoration rule set for this environment * including a custom RuleSet configuration. */ http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 4c72e8f..9ed5000 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -601,50 +601,36 @@ abstract class TableEnvironment(val config: TableConfig) { /** * Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of - * [[Expression]]. It does not handle time attributes but considers them in indices, if - * ignore flag is not false. + * [[Expression]]. It does not handle time attributes but considers them in indices. * * @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated. - * @param exprs The expressions that define the field names. - * @param ignoreTimeAttributes ignore time attributes and handle them as regular expressions. + * @param exprs The expressions that define the field names. * @tparam A The type of the TypeInformation. * @return A tuple of two arrays holding the field names and corresponding field positions. */ protected[flink] def getFieldInfo[A]( inputType: TypeInformation[A], - exprs: Array[Expression], - ignoreTimeAttributes: Boolean) + exprs: Array[Expression]) : (Array[String], Array[Int]) = { TableEnvironment.validateType(inputType) - val filteredExprs = if (ignoreTimeAttributes) { - exprs.map { - case ta: TimeAttribute => ta.expression - case e@_ => e - } - } else { - exprs - } - val indexedNames: Array[(Int, String)] = inputType match { case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] => throw new TableException( "An input of GenericTypeInfo<Row> cannot be converted to Table. " + "Please specify the type of the input with a RowTypeInfo.") case a: AtomicType[A] => - filteredExprs.zipWithIndex flatMap { + exprs.zipWithIndex flatMap { case (UnresolvedFieldReference(name), idx) => if (idx > 0) { throw new TableException("Table of atomic type can only have a single field.") } Some((0, name)) - case (_: TimeAttribute, _) if ignoreTimeAttributes => - None case _ => throw new TableException("Field reference expression requested.") } case t: TupleTypeInfo[A] => - filteredExprs.zipWithIndex flatMap { + exprs.zipWithIndex flatMap { case (UnresolvedFieldReference(name), idx) => Some((idx, name)) case (Alias(UnresolvedFieldReference(origName), name, _), _) => @@ -659,7 +645,7 @@ abstract class TableEnvironment(val config: TableConfig) { "Field reference expression or alias on field expression expected.") } case c: CaseClassTypeInfo[A] => - filteredExprs.zipWithIndex flatMap { + exprs.zipWithIndex flatMap { case (UnresolvedFieldReference(name), idx) => Some((idx, name)) case (Alias(UnresolvedFieldReference(origName), name, _), _) => @@ -674,7 +660,7 @@ abstract class TableEnvironment(val config: TableConfig) { "Field reference expression or alias on field expression expected.") } case p: PojoTypeInfo[A] => - filteredExprs flatMap { + exprs flatMap { case (UnresolvedFieldReference(name)) => val idx = p.getFieldIndex(name) if (idx < 0) { @@ -822,42 +808,6 @@ abstract class TableEnvironment(val config: TableConfig) { Some(mapFunction) } - /** - * Checks for at most one rowtime and proctime attribute. - * Returns the time attributes. - * - * @return rowtime attribute and proctime attribute - */ - protected def validateAndExtractTimeAttributes( - fieldNames: Seq[String], - fieldIndices: Seq[Int], - exprs: Array[Expression]) - : (Option[(Int, String)], Option[(Int, String)]) = { - - var rowtime: Option[(Int, String)] = None - var proctime: Option[(Int, String)] = None - - exprs.zipWithIndex.foreach { - case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) => - if (rowtime.isDefined) { - throw new TableException( - "The rowtime attribute can only be defined once in a table schema.") - } else { - rowtime = Some(idx, name) - } - case (ProctimeAttribute(reference@UnresolvedFieldReference(name)), idx) => - if (proctime.isDefined) { - throw new TableException( - "The proctime attribute can only be defined once in a table schema.") - } else { - proctime = Some(idx, name) - } - case _ => - // do nothing - } - - (rowtime, proctime) - } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 001011b..9281ad8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -65,11 +65,12 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp createSqlIntervalType( new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)) - case TimeIndicatorTypeInfo.ROWTIME_INDICATOR => - createRowtimeIndicatorType() - - case TimeIndicatorTypeInfo.PROCTIME_INDICATOR => - createProctimeIndicatorType() + case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] => + if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) { + createRowtimeIndicatorType() + } else { + createProctimeIndicatorType() + } case _ => createSqlType(sqlType) @@ -114,9 +115,11 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp * @param fieldNames field names * @param fieldTypes field types, every element is Flink's [[TypeInformation]] * @param rowtime optional system field to indicate event-time; the index determines the index - * in the final record and might replace an existing field + * in the final record. If the index is smaller than the number of specified + * fields, it shifts all following fields. * @param proctime optional system field to indicate processing-time; the index determines the - * index in the final record and might replace an existing field + * index in the final record. If the index is smaller than the number of + * specified fields, it shifts all following fields. * @return a struct type with the input fieldNames, input fieldTypes, and system fields */ def buildLogicalRowType( http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala index c33f8fc..98580ba 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala @@ -98,11 +98,14 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val UNBOUNDED_ROW: Keyword = Keyword("unbounded_row") lazy val UNBOUNDED_RANGE: Keyword = Keyword("unbounded_range") lazy val ASIN: Keyword = Keyword("asin") + lazy val ROWTIME: Keyword = Keyword("rowtime") + lazy val PROCTIME: Keyword = Keyword("proctime") def functionIdent: ExpressionParser.Parser[String] = not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~ not(SUM0) ~ not(STDDEV_POP) ~ not(STDDEV_SAMP) ~ not(VAR_POP) ~ not(VAR_SAMP) ~ not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~ not(IF) ~ + not(ROWTIME) ~ not(PROCTIME) ~ not(CURRENT_ROW) ~ not(UNBOUNDED_ROW) ~ not(CURRENT_RANGE) ~ not(UNBOUNDED_RANGE) ~> super.ident @@ -532,12 +535,25 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { // alias - lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ { + lazy val alias: PackratParser[Expression] = timeIndicator | + logic ~ AS ~ fieldReference ^^ { case e ~ _ ~ name => Alias(e, name.name) } | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ { case e ~ _ ~ _ ~ names ~ _ => Alias(e, names.head.name, names.tail.map(_.name)) } | logic + // time indicators + + lazy val timeIndicator: PackratParser[Expression] = procTime | rowTime + + lazy val procTime: PackratParser[Expression] = fieldReference ~ "." ~ PROCTIME ^^ { + case f ~ _ ~ _ => ProctimeAttribute(f) + } + + lazy val rowTime: PackratParser[Expression] = fieldReference ~ "." ~ ROWTIME ^^ { + case f ~ _ ~ _ => RowtimeAttribute(f) + } + lazy val expression: PackratParser[Expression] = alias | failure("Invalid expression.") http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala index 92dc501..6161ef0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala @@ -23,7 +23,7 @@ import org.apache.flink.table.expressions.{Expression, WindowReference} import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} /** - * Logical super class for all types of windows (group-windows and row-windows). + * Logical super class for group windows. * * @param aliasAttribute window alias * @param timeAttribute time field indicating event-time or processing-time http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala index 3e5de28..4a8fb52 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.plan.logical import org.apache.flink.table.api.{BatchTableEnvironment, StreamTableEnvironment, TableEnvironment} import org.apache.flink.table.expressions.ExpressionUtils.{isRowCountLiteral, isRowtimeAttribute, isTimeAttribute, isTimeIntervalLiteral} import org.apache.flink.table.expressions._ -import org.apache.flink.table.typeutils.TypeCheckUtils.isTimePoint +import org.apache.flink.table.typeutils.TypeCheckUtils.{isTimePoint, isLong} import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} // ------------------------------------------------------------------------------------------------ @@ -56,7 +56,8 @@ case class TumblingGroupWindow( case _: StreamTableEnvironment if !isTimeAttribute(timeField) => ValidationFailure( "Tumbling window expects a time attribute for grouping in a stream environment.") - case _: BatchTableEnvironment if isTimePoint(size.resultType) => + case _: BatchTableEnvironment + if !(isTimePoint(timeField.resultType) || isLong(timeField.resultType)) => ValidationFailure( "Tumbling window expects a time attribute for grouping in a stream environment.") @@ -119,7 +120,8 @@ case class SlidingGroupWindow( case _: StreamTableEnvironment if !isTimeAttribute(timeField) => ValidationFailure( "Sliding window expects a time attribute for grouping in a stream environment.") - case _: BatchTableEnvironment if isTimePoint(size.resultType) => + case _: BatchTableEnvironment + if !(isTimePoint(timeField.resultType) || isLong(timeField.resultType)) => ValidationFailure( "Sliding window expects a time attribute for grouping in a stream environment.") @@ -169,7 +171,8 @@ case class SessionGroupWindow( case _: StreamTableEnvironment if !isTimeAttribute(timeField) => ValidationFailure( "Session window expects a time attribute for grouping in a stream environment.") - case _: BatchTableEnvironment if isTimePoint(gap.resultType) => + case _: BatchTableEnvironment + if !(isTimePoint(timeField.resultType) || isLong(timeField.resultType)) => ValidationFailure( "Session window expects a time attribute for grouping in a stream environment.") http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala index 5c35129..ff5ffb2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala @@ -34,11 +34,11 @@ import scala.collection.JavaConverters._ trait CommonCalc { private[flink] def functionBody( - generator: CodeGenerator, - inputSchema: RowSchema, - returnSchema: RowSchema, - calcProgram: RexProgram, - config: TableConfig) + generator: CodeGenerator, + inputSchema: RowSchema, + returnSchema: RowSchema, + calcProgram: RexProgram, + config: TableConfig) : String = { val expandedExpressions = calcProgram http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala index c18c3d1..dc7a0d6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala @@ -46,7 +46,7 @@ abstract class PhysicalTableSourceScan( override def explainTerms(pw: RelWriter): RelWriter = { val terms = super.explainTerms(pw) - .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) + .item("fields", deriveRowType().getFieldNames.asScala.mkString(", ")) val sourceDesc = tableSource.explainSource() if (sourceDesc.nonEmpty) { http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala index c22dc54..b53081c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala @@ -90,8 +90,8 @@ class DataSetAggregate( override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { - val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) val input = inputNode.asInstanceOf[DataSetRel] + val inputDS = input.translateToPlan(tableEnv) val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala index 9e18082..5274fa1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala @@ -103,10 +103,7 @@ class DataSetCalc( body, rowTypeInfo) - val runner = new FlatMapRunner[Row, Row]( - genFunction.name, - genFunction.code, - genFunction.returnType) + val runner = calcMapFunction(genFunction) inputDS.flatMap(runner).name(calcOpName(calcProgram, getExpressionString)) } http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index 8eb9d40..db31f32 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -241,10 +241,7 @@ class DataStreamOverAggregate( } val precedingOffset = - getLowerBoundary( - logicWindow, - overWindow, - input) + (if (isRowsClause) 1 else 0) + getLowerBoundary(logicWindow, overWindow, input) + (if (isRowsClause) 1 else 0) val processFunction = AggregateUtil.createBoundedOverProcessFunction( generator, http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index 5dc3da8..e34e416 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -26,7 +26,7 @@ import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan import org.apache.flink.table.plan.schema.{RowSchema, TableSourceTable} -import org.apache.flink.table.sources.{DefinedTimeAttributes, StreamTableSource, TableSource} +import org.apache.flink.table.sources._ import org.apache.flink.types.Row /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */ @@ -41,35 +41,23 @@ class StreamTableSourceScan( override def deriveRowType() = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - def removeIndex[T](idx: Int, l: List[T]): List[T] = { - if (l.size < idx) { - l - } else { - l.take(idx) ++ l.drop(idx + 1) - } - } + val fieldNames = TableEnvironment.getFieldNames(tableSource).toList + val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList - var fieldNames = TableEnvironment.getFieldNames(tableSource).toList - var fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList + val fieldCnt = fieldNames.length val rowtime = tableSource match { - case timeSource: DefinedTimeAttributes if timeSource.getRowtimeAttribute != null => + case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null => val rowtimeAttribute = timeSource.getRowtimeAttribute - // remove physical field if it is overwritten by time attribute - fieldNames = removeIndex(rowtimeAttribute.f0, fieldNames) - fieldTypes = removeIndex(rowtimeAttribute.f0, fieldTypes) - Some((rowtimeAttribute.f0, rowtimeAttribute.f1)) + Some((fieldCnt, rowtimeAttribute)) case _ => None } val proctime = tableSource match { - case timeSource: DefinedTimeAttributes if timeSource.getProctimeAttribute != null => + case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null => val proctimeAttribute = timeSource.getProctimeAttribute - // remove physical field if it is overwritten by time attribute - fieldNames = removeIndex(proctimeAttribute.f0, fieldNames) - fieldTypes = removeIndex(proctimeAttribute.f0, fieldTypes) - Some((proctimeAttribute.f0, proctimeAttribute.f1)) + Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute)) case _ => None } http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala index 53e7b31..a2777ec 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala @@ -29,7 +29,7 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.schema.TableSourceTable -import org.apache.flink.table.sources.TableSource +import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, TableSource} import scala.collection.JavaConverters._ @@ -47,11 +47,33 @@ class FlinkLogicalTableSourceScan( override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + + val fieldNames = TableEnvironment.getFieldNames(tableSource).toList + val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList + + val fieldCnt = fieldNames.length + + val rowtime = tableSource match { + case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null => + val rowtimeAttribute = timeSource.getRowtimeAttribute + Some((fieldCnt, rowtimeAttribute)) + case _ => + None + } + + val proctime = tableSource match { + case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null => + val proctimeAttribute = timeSource.getProctimeAttribute + Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute)) + case _ => + None + } + flinkTypeFactory.buildLogicalRowType( - TableEnvironment.getFieldNames(tableSource), - TableEnvironment.getFieldTypes(tableSource.getReturnType), - None, - None) + fieldNames, + fieldTypes, + rowtime, + proctime) } override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala new file mode 100644 index 0000000..75deca5 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, TableSource} + +class StreamTableSourceTable[T]( + override val tableSource: TableSource[T], + override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) + extends TableSourceTable[T](tableSource, statistic) { + + + override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { + val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] + + val fieldNames = TableEnvironment.getFieldNames(tableSource).toList + val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList + + val fieldCnt = fieldNames.length + + val rowtime = tableSource match { + case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null => + val rowtimeAttribute = timeSource.getRowtimeAttribute + Some((fieldCnt, rowtimeAttribute)) + case _ => + None + } + + val proctime = tableSource match { + case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null => + val proctimeAttribute = timeSource.getProctimeAttribute + Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute)) + case _ => + None + } + + flinkTypeFactory.buildLogicalRowType( + fieldNames, + fieldTypes, + rowtime, + proctime) + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 07992cd..dfed34a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -81,7 +81,6 @@ object AggregateUtil { isRowsClause: Boolean) : ProcessFunction[Row, Row] = { - val needRetract = false val (aggFields, aggregates) = transformToAggregateFunctions( namedAggregates.map(_.getKey), @@ -107,7 +106,7 @@ object AggregateUtil { None, None, outputArity, - needRetract, + needRetract = false, needMerge = false, needReset = false ) http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala index 8466cdf..6d87663 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala @@ -18,30 +18,43 @@ package org.apache.flink.table.sources -import org.apache.flink.api.java.tuple.Tuple2 - /** - * Defines logical time attributes for a [[TableSource]]. Time attributes can be used for - * indicating, accessing, and working with Flink's event-time or processing-time. A - * [[TableSource]] that implements this interface can define names and positions of rowtime - * and proctime attributes in the rows it produces. + * Defines a logical event-time attribute for a [[TableSource]]. + * The event-time attribute can be used for indicating, accessing, and working with Flink's + * event-time. + * + * A [[TableSource]] that implements this interface defines the name of + * the event-time attribute. The attribute will be added to the schema of the + * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]]. */ -trait DefinedTimeAttributes { +trait DefinedRowTimeAttribute { /** - * Defines a name and position (starting at 0) of rowtime attribute that represents Flink's - * event-time. Null if no rowtime should be available. If the position is within the arity of - * the result row, the logical attribute will overwrite the physical attribute. If the position - * is higher than the result row, the time attribute will be appended logically. + * Defines a name of the event-time attribute that represents Flink's + * event-time. Null if no rowtime should be available. + * + * The field will be appended to the schema provided by the [[TableSource]]. */ - def getRowtimeAttribute: Tuple2[Int, String] + def getRowtimeAttribute: String +} + +/** + * Defines a logical processing-time attribute for a [[TableSource]]. + * The processing-time attribute can be used for indicating, accessing, and working with Flink's + * processing-time. + * + * A [[TableSource]] that implements this interface defines the name of + * the processing-time attribute. The attribute will be added to the schema of the + * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]]. + */ +trait DefinedProcTimeAttribute { /** - * Defines a name and position (starting at 0) of proctime attribute that represents Flink's - * processing-time. Null if no proctime should be available. If the position is within the arity - * of the result row, the logical attribute will overwrite the physical attribute. If the - * position is higher than the result row, the time attribute will be appended logically. + * Defines a name of the processing-time attribute that represents Flink's + * processing-time. Null if no rowtime should be available. + * + * The field will be appended to the schema provided by the [[TableSource]]. */ - def getProctimeAttribute: Tuple2[Int, String] + def getProctimeAttribute: String } http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala index faacc54..5247685 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala @@ -93,8 +93,7 @@ class TableEnvironmentTest extends TableTestBase { UnresolvedFieldReference("name1"), UnresolvedFieldReference("name2"), UnresolvedFieldReference("name3") - ), - ignoreTimeAttributes = true) + )) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) @@ -108,8 +107,7 @@ class TableEnvironmentTest extends TableTestBase { UnresolvedFieldReference("name1"), UnresolvedFieldReference("name2"), UnresolvedFieldReference("name3") - ), - ignoreTimeAttributes = true) + )) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) @@ -123,8 +121,7 @@ class TableEnvironmentTest extends TableTestBase { UnresolvedFieldReference("name1"), UnresolvedFieldReference("name2"), UnresolvedFieldReference("name3") - ), - ignoreTimeAttributes = true) + )) } @Test @@ -135,8 +132,7 @@ class TableEnvironmentTest extends TableTestBase { UnresolvedFieldReference("pf3"), UnresolvedFieldReference("pf1"), UnresolvedFieldReference("pf2") - ), - ignoreTimeAttributes = true) + )) fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1)) @@ -146,9 +142,7 @@ class TableEnvironmentTest extends TableTestBase { def testGetFieldInfoAtomicName1(): Unit = { val fieldInfo = tEnv.getFieldInfo( atomicType, - Array(UnresolvedFieldReference("name")), - ignoreTimeAttributes = true - ) + Array(UnresolvedFieldReference("name"))) fieldInfo._1.zip(Array("name")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1)) @@ -161,8 +155,7 @@ class TableEnvironmentTest extends TableTestBase { Array( UnresolvedFieldReference("name1"), UnresolvedFieldReference("name2") - ), - ignoreTimeAttributes = true) + )) } @Test @@ -173,8 +166,7 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("f0"), "name1"), Alias(UnresolvedFieldReference("f1"), "name2"), Alias(UnresolvedFieldReference("f2"), "name3") - ), - ignoreTimeAttributes = true) + )) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) @@ -188,8 +180,7 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("f2"), "name1"), Alias(UnresolvedFieldReference("f0"), "name2"), Alias(UnresolvedFieldReference("f1"), "name3") - ), - ignoreTimeAttributes = true) + )) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1)) @@ -203,8 +194,7 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("xxx"), "name1"), Alias(UnresolvedFieldReference("yyy"), "name2"), Alias(UnresolvedFieldReference("zzz"), "name3") - ), - ignoreTimeAttributes = true) + )) } @Test @@ -215,8 +205,7 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("cf1"), "name1"), Alias(UnresolvedFieldReference("cf2"), "name2"), Alias(UnresolvedFieldReference("cf3"), "name3") - ), - ignoreTimeAttributes = true) + )) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) @@ -230,8 +219,7 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("cf3"), "name1"), Alias(UnresolvedFieldReference("cf1"), "name2"), Alias(UnresolvedFieldReference("cf2"), "name3") - ), - ignoreTimeAttributes = true) + )) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1)) @@ -245,8 +233,7 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("xxx"), "name1"), Alias(UnresolvedFieldReference("yyy"), "name2"), Alias(UnresolvedFieldReference("zzz"), "name3") - ), - ignoreTimeAttributes = true) + )) } @Test @@ -257,8 +244,7 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("pf1"), "name1"), Alias(UnresolvedFieldReference("pf2"), "name2"), Alias(UnresolvedFieldReference("pf3"), "name3") - ), - ignoreTimeAttributes = true) + )) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) @@ -272,8 +258,7 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("pf3"), "name1"), Alias(UnresolvedFieldReference("pf1"), "name2"), Alias(UnresolvedFieldReference("pf2"), "name3") - ), - ignoreTimeAttributes = true) + )) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1)) @@ -287,8 +272,7 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("xxx"), "name1"), Alias(UnresolvedFieldReference("yyy"), "name2"), Alias(UnresolvedFieldReference("zzz"), "name3") - ), - ignoreTimeAttributes = true) + )) } @Test(expected = classOf[TableException]) @@ -297,16 +281,14 @@ class TableEnvironmentTest extends TableTestBase { atomicType, Array( Alias(UnresolvedFieldReference("name1"), "name2") - ), - ignoreTimeAttributes = true) + )) } @Test(expected = classOf[TableException]) def testGetFieldInfoGenericRowAlias(): Unit = { tEnv.getFieldInfo( genericRowType, - Array(UnresolvedFieldReference("first")), - ignoreTimeAttributes = true) + Array(UnresolvedFieldReference("first"))) } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala index c481105..aa6edd3 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala @@ -100,7 +100,7 @@ class GroupWindowTest extends TableTestBase { @Test def testEventTimeTumblingGroupWindowOverCount(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table .window(Tumble over 2.rows on 'long as 'w) @@ -144,7 +144,7 @@ class GroupWindowTest extends TableTestBase { @Test def testEventTimeTumblingGroupWindowOverTime(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) val windowedTable = table .window(Tumble over 5.milli on 'long as 'w) http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala new file mode 100644 index 0000000..e9384c7 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import java.lang.{Integer => JInt, Long => JLong} +import java.util.Collections +import java.util.{List => JList} + +import org.apache.flink.api.java.tuple.{Tuple5 => JTuple5} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecEnv} +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException, Types} +import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestBase +import org.junit.Test +import org.mockito.Mockito.{mock, when} + +class StreamTableEnvironmentTest extends TableTestBase { + + @Test(expected = classOf[TableException]) + def testInvalidProctimeAttribute(): Unit = { + val util = streamTestUtil() + // cannot replace an attribute with proctime + util.addTable[(Long, Int, String, Int, Long)]('a, 'b.proctime, 'c, 'd, 'e) + } + + @Test + def testProctimeAttribute(): Unit = { + val util = streamTestUtil() + // cannot replace an attribute with proctime + util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'pt.proctime) + } + + @Test(expected = classOf[TableException]) + def testRowtimeAttributeReplaceFieldOfInvalidType(): Unit = { + val util = streamTestUtil() + // cannot replace a non-time attribute with rowtime + util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c.rowtime, 'd, 'e) + } + + @Test + def testReplacedRowtimeAttribute(): Unit = { + val util = streamTestUtil() + util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 'e) + } + + @Test + def testAppendedRowtimeAttribute(): Unit = { + val util = streamTestUtil() + util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rt.rowtime) + } + + @Test + def testRowtimeAndProctimeAttribute1(): Unit = { + val util = streamTestUtil() + util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rt.rowtime, 'pt.proctime) + } + + @Test + def testRowtimeAndProctimeAttribute2(): Unit = { + val util = streamTestUtil() + util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'pt.proctime, 'rt.rowtime) + } + + @Test + def testRowtimeAndProctimeAttribute3(): Unit = { + val util = streamTestUtil() + util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 'e, 'pt.proctime) + } + + @Test(expected = classOf[TableException]) + def testRowtimeAndInvalidProctimeAttribute(): Unit = { + val util = streamTestUtil() + util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 'pt.proctime) + } + + @Test(expected = classOf[TableException]) + def testOnlyOneRowtimeAttribute1(): Unit = { + val util = streamTestUtil() + util.addTable[(Long, Int, String, Int, Long)]('a.rowtime, 'b, 'c, 'd, 'e, 'rt.rowtime) + } + + @Test(expected = classOf[TableException]) + def testOnlyOneProctimeAttribute1(): Unit = { + val util = streamTestUtil() + util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'pt1.proctime, 'pt2.proctime) + } + + @Test(expected = classOf[TableException]) + def testRowtimeAttributeUsedName(): Unit = { + val util = streamTestUtil() + util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'a.rowtime) + } + + @Test(expected = classOf[TableException]) + def testProctimeAttributeUsedName(): Unit = { + val util = streamTestUtil() + util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'b.proctime) + } + + @Test + def testProctimeAttributeParsed(): Unit = { + val (jTEnv, ds) = prepareSchemaExpressionParser + jTEnv.fromDataStream(ds, "a, b, c, d, e, pt.proctime") + } + + @Test + def testReplacingRowtimeAttributeParsed(): Unit = { + val (jTEnv, ds) = prepareSchemaExpressionParser + jTEnv.fromDataStream(ds, "a.rowtime, b, c, d, e") + } + + @Test + def testAppedingRowtimeAttributeParsed(): Unit = { + val (jTEnv, ds) = prepareSchemaExpressionParser + jTEnv.fromDataStream(ds, "a, b, c, d, e, rt.rowtime") + } + + @Test + def testRowtimeAndProctimeAttributeParsed1(): Unit = { + val (jTEnv, ds) = prepareSchemaExpressionParser + jTEnv.fromDataStream(ds, "a, b, c, d, e, pt.proctime, rt.rowtime") + } + + @Test + def testRowtimeAndProctimeAttributeParsed2(): Unit = { + val (jTEnv, ds) = prepareSchemaExpressionParser + jTEnv.fromDataStream(ds, "rt.rowtime, b, c, d, e, pt.proctime") + } + + private def prepareSchemaExpressionParser: + (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = { + + val jTEnv = TableEnvironment.getTableEnvironment(mock(classOf[JStreamExecEnv])) + + val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG) + .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]] + val ds = mock(classOf[DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]]) + when(ds.getType).thenReturn(sType) + + (jTEnv, ds) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala new file mode 100644 index 0000000..7673266 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, StreamTableSource} +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil.{term, unaryNode} +import org.apache.flink.types.Row +import org.junit.Test + +class TableSourceTest extends TableTestBase { + + @Test + def testRowTimeTableSourceSimple(): Unit = { + val util = streamTestUtil() + util.tEnv.registerTableSource("rowTimeT", new TestRowTimeSource("addTime")) + + val t = util.tEnv.scan("rowTimeT").select("addTime, id, name, val") + + val expected = + unaryNode( + "DataStreamCalc", + "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])", + term("select", "addTime", "id", "name", "val") + ) + util.verifyTable(t, expected) + } + + @Test + def testRowTimeTableSourceGroupWindow(): Unit = { + val util = streamTestUtil() + util.tEnv.registerTableSource("rowTimeT", new TestRowTimeSource("addTime")) + + val t = util.tEnv.scan("rowTimeT") + .filter("val > 100") + .window(Tumble over 10.minutes on 'addTime as 'w) + .groupBy('name, 'w) + .select('name, 'w.end, 'val.avg) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamAggregate", + unaryNode( + "DataStreamCalc", + "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])", + term("select", "name", "val", "addTime"), + term("where", ">(val, 100)") + ), + term("groupBy", "name"), + term("window", "TumblingGroupWindow(WindowReference(w), 'addTime, 600000.millis)"), + term("select", "name", "AVG(val) AS TMP_1", "end(WindowReference(w)) AS TMP_0") + ), + term("select", "name", "TMP_0", "TMP_1") + ) + util.verifyTable(t, expected) + } + + @Test + def testProcTimeTableSourceSimple(): Unit = { + val util = streamTestUtil() + util.tEnv.registerTableSource("procTimeT", new TestProcTimeSource("pTime")) + + val t = util.tEnv.scan("procTimeT").select("pTime, id, name, val") + + val expected = + unaryNode( + "DataStreamCalc", + "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])", + term("select", "pTime", "id", "name", "val") + ) + util.verifyTable(t, expected) + } + + @Test + def testProcTimeTableSourceOverWindow(): Unit = { + val util = streamTestUtil() + util.tEnv.registerTableSource("procTimeT", new TestProcTimeSource("pTime")) + + val t = util.tEnv.scan("procTimeT") + .window(Over partitionBy 'id orderBy 'pTime preceding 2.hours as 'w) + .select('id, 'name, 'val.sum over 'w as 'valSum) + .filter('valSum > 100) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])", + term("partitionBy", "id"), + term("orderBy", "pTime"), + term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"), + term("select", "id", "val", "name", "pTime", "SUM(val) AS w0$o0") + ), + term("select", "id", "name", "w0$o0 AS valSum"), + term("where", ">(w0$o0, 100)") + ) + util.verifyTable(t, expected) + } +} + +class TestRowTimeSource(timeField: String) + extends StreamTableSource[Row] with DefinedRowTimeAttribute { + + override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ??? + + override def getRowtimeAttribute: String = timeField + + override def getReturnType: TypeInformation[Row] = { + new RowTypeInfo( + Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], + Array("id", "val", "name")) + } +} + +class TestProcTimeSource(timeField: String) + extends StreamTableSource[Row] with DefinedProcTimeAttribute { + + override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ??? + + override def getProctimeAttribute: String = timeField + + override def getReturnType: TypeInformation[Row] = { + new RowTypeInfo( + Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], + Array("id", "val", "name")) + } +} + + http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index edf7b1d..f84ae3d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -24,7 +24,7 @@ import org.apache.flink.table.api.scala._ import org.apache.flink.table.plan.logical._ import org.apache.flink.table.utils.TableTestUtil._ import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} -import org.junit.{Ignore, Test} +import org.junit.Test class WindowAggregateTest extends TableTestBase { private val streamUtil: StreamTableTestUtil = streamTestUtil() @@ -85,7 +85,6 @@ class WindowAggregateTest extends TableTestBase { } @Test - @Ignore // TODO enable once CALCITE-1761 is fixed def testTumbleFunction() = { streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge) @@ -98,24 +97,23 @@ class WindowAggregateTest extends TableTestBase { "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)" val expected = unaryNode( - "DataStreamCalc", + "DataStreamAggregate", unaryNode( - "DataStreamAggregate", + "DataStreamCalc", streamTableNode(0), - term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)), - term("select", - "COUNT(*) AS EXPR$0, " + - "weightedAvg(c, a) AS wAvg, " + - "start('w$) AS w$start, " + - "end('w$) AS w$end") + term("select", "1970-01-01 00:00:00 AS $f0", "c", "a") ), - term("select", "EXPR$0, wAvg, CAST(w$start) AS w$start, CAST(w$end) AS w$end") + term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)), + term("select", + "COUNT(*) AS EXPR$0, " + + "weightedAvg(c, a) AS wAvg, " + + "start('w$) AS w$start, " + + "end('w$) AS w$end") ) streamUtil.verifySql(sql, expected) } @Test - @Ignore // TODO enable once CALCITE-1761 is fixed def testHoppingFunction() = { streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge) @@ -127,24 +125,23 @@ class WindowAggregateTest extends TableTestBase { "GROUP BY HOP(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR)" val expected = unaryNode( - "DataStreamCalc", + "DataStreamAggregate", unaryNode( - "DataStreamAggregate", + "DataStreamCalc", streamTableNode(0), - term("window", SlidingGroupWindow('w$, 'rowtime, 3600000.millis, 900000.millis)), - term("select", - "COUNT(*) AS EXPR$0, " + - "weightedAvg(c, a) AS wAvg, " + - "start('w$) AS w$start, " + - "end('w$) AS w$end") + term("select", "1970-01-01 00:00:00 AS $f0", "c", "a") ), - term("select", "EXPR$0, wAvg, CAST(w$start) AS w$start, CAST(w$end) AS w$end") + term("window", SlidingGroupWindow('w$, 'proctime, 3600000.millis, 900000.millis)), + term("select", + "COUNT(*) AS EXPR$0, " + + "weightedAvg(c, a) AS wAvg, " + + "start('w$) AS w$start, " + + "end('w$) AS w$end") ) streamUtil.verifySql(sql, expected) } @Test - @Ignore // TODO enable once CALCITE-1761 is fixed def testSessionFunction() = { streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge) @@ -157,18 +154,18 @@ class WindowAggregateTest extends TableTestBase { "GROUP BY SESSION(proctime, INTERVAL '15' MINUTE)" val expected = unaryNode( - "DataStreamCalc", + "DataStreamAggregate", unaryNode( - "DataStreamAggregate", + "DataStreamCalc", streamTableNode(0), - term("window", SessionGroupWindow('w$, 'rowtime, 900000.millis)), - term("select", - "COUNT(*) AS EXPR$0, " + - "weightedAvg(c, a) AS wAvg, " + - "start('w$) AS w$start, " + - "end('w$) AS w$end") + term("select", "1970-01-01 00:00:00 AS $f0", "c", "a") ), - term("select", "EXPR$0, wAvg, CAST(w$start) AS w$start, CAST(w$end) AS w$end") + term("window", SessionGroupWindow('w$, 'proctime, 900000.millis)), + term("select", + "COUNT(*) AS EXPR$0, " + + "weightedAvg(c, a) AS wAvg, " + + "start('w$) AS w$start, " + + "end('w$) AS w$end") ) streamUtil.verifySql(sql, expected) }