http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/BatchTableSourceTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/BatchTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/BatchTableSourceTable.scala new file mode 100644 index 0000000..14e8cb1 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/BatchTableSourceTable.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.sources.{BatchTableSource, TableSourceUtil} + +class BatchTableSourceTable[T]( + tableSource: BatchTableSource[T], + statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) + extends TableSourceTable[T]( + tableSource, + statistic) { + + TableSourceUtil.validateTableSource(tableSource) + + override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { + TableSourceUtil.getRelDataType( + tableSource, + None, + streaming = false, + typeFactory.asInstanceOf[FlinkTypeFactory]) + } +} +
http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala index 0ce2a87..c1515b1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataSetTable.scala @@ -26,5 +26,4 @@ class DataSetTable[T]( override val fieldIndexes: Array[Int], override val fieldNames: Array[String], override val statistic: FlinkStatistic = FlinkStatistic.of(TableStats(1000L))) - extends FlinkTable[T](dataSet.getType, fieldIndexes, fieldNames, statistic) { -} + extends InlineTable[T](dataSet.getType, fieldIndexes, fieldNames, statistic) http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala index b7021e2..6de962c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala @@ -26,6 +26,4 @@ class DataStreamTable[T]( override val fieldIndexes: Array[Int], override val fieldNames: Array[String], override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) - extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames, statistic) { - -} + extends InlineTable[T](dataStream.getType, fieldIndexes, fieldNames, statistic) http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala deleted file mode 100644 index c76532f..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.plan.schema - -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} -import org.apache.calcite.schema.Statistic -import org.apache.calcite.schema.impl.AbstractTable -import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.table.api.TableException -import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo - -abstract class FlinkTable[T]( - val typeInfo: TypeInformation[T], - val fieldIndexes: Array[Int], - val fieldNames: Array[String], - val statistic: FlinkStatistic) - extends AbstractTable { - - if (fieldIndexes.length != fieldNames.length) { - throw new TableException( - s"Number of field names and field indexes must be equal.\n" + - s"Number of names is ${fieldNames.length}, number of indexes is ${fieldIndexes.length}.\n" + - s"List of column names: ${fieldNames.mkString("[", ", ", "]")}.\n" + - s"List of column indexes: ${fieldIndexes.mkString("[", ", ", "]")}.") - } - - // check uniqueness of field names - if (fieldNames.length != fieldNames.toSet.size) { - val duplicateFields = fieldNames - // count occurences of field names - .groupBy(identity).mapValues(_.length) - // filter for occurences > 1 and map to field name - .filter(g => g._2 > 1).keys - - throw new TableException( - s"Field names must be unique.\n" + - s"List of duplicate fields: ${duplicateFields.mkString("[", ", ", "]")}.\n" + - s"List of all fields: ${fieldNames.mkString("[", ", ", "]")}.") - } - - val fieldTypes: Array[TypeInformation[_]] = - typeInfo match { - case cType: CompositeType[_] => - // it is ok to leave out fields - if (fieldIndexes.count(_ >= 0) > cType.getArity) { - throw new TableException( - s"Arity of type (" + cType.getFieldNames.deep + ") " + - "must not be greater than number of field names " + fieldNames.deep + ".") - } - fieldIndexes.map { - case TimeIndicatorTypeInfo.ROWTIME_MARKER => TimeIndicatorTypeInfo.ROWTIME_INDICATOR - case TimeIndicatorTypeInfo.PROCTIME_MARKER => TimeIndicatorTypeInfo.PROCTIME_INDICATOR - case i => cType.getTypeAt(i).asInstanceOf[TypeInformation[_]]} - case aType: AtomicType[_] => - if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) { - throw new TableException( - "Non-composite input type may have only a single field and its index must be 0.") - } - Array(aType) - } - - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { - val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildLogicalRowType(fieldNames, fieldTypes) - } - - /** - * Returns statistics of current table - * - * @return statistics of current table - */ - override def getStatistic: Statistic = statistic - -} http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala new file mode 100644 index 0000000..22d6151 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Statistic +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.table.api.{TableException, Types} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.stats.FlinkStatistic +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo + +abstract class InlineTable[T]( + val typeInfo: TypeInformation[T], + val fieldIndexes: Array[Int], + val fieldNames: Array[String], + val statistic: FlinkStatistic) + extends AbstractTable { + + if (fieldIndexes.length != fieldNames.length) { + throw new TableException( + s"Number of field names and field indexes must be equal.\n" + + s"Number of names is ${fieldNames.length}, number of indexes is ${fieldIndexes.length}.\n" + + s"List of column names: ${fieldNames.mkString("[", ", ", "]")}.\n" + + s"List of column indexes: ${fieldIndexes.mkString("[", ", ", "]")}.") + } + + // check uniqueness of field names + if (fieldNames.length != fieldNames.toSet.size) { + val duplicateFields = fieldNames + // count occurences of field names + .groupBy(identity).mapValues(_.length) + // filter for occurences > 1 and map to field name + .filter(g => g._2 > 1).keys + + throw new TableException( + s"Field names must be unique.\n" + + s"List of duplicate fields: ${duplicateFields.mkString("[", ", ", "]")}.\n" + + s"List of all fields: ${fieldNames.mkString("[", ", ", "]")}.") + } + + val fieldTypes: Array[TypeInformation[_]] = + typeInfo match { + case cType: CompositeType[_] => + // it is ok to leave out fields + if (fieldIndexes.count(_ >= 0) > cType.getArity) { + throw new TableException( + s"Arity of type (" + cType.getFieldNames.deep + ") " + + "must not be greater than number of field names " + fieldNames.deep + ".") + } + fieldIndexes.map { + case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER => + TimeIndicatorTypeInfo.ROWTIME_INDICATOR + case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER => + TimeIndicatorTypeInfo.PROCTIME_INDICATOR + case TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER => + Types.SQL_TIMESTAMP + case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER => + Types.SQL_TIMESTAMP + case i => cType.getTypeAt(i).asInstanceOf[TypeInformation[_]]} + case aType: AtomicType[_] => + var cnt = 0 + val types = fieldIndexes.map { + case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER => + TimeIndicatorTypeInfo.ROWTIME_INDICATOR + case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER => + TimeIndicatorTypeInfo.PROCTIME_INDICATOR + case TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER => + Types.SQL_TIMESTAMP + case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER => + Types.SQL_TIMESTAMP + case _ => + cnt += 1 + aType.asInstanceOf[TypeInformation[_]] + } + // ensure that the atomic type is matched at most once. + if (cnt > 1) { + throw new TableException( + "Non-composite input type may have only a single field and its index must be 0.") + } else { + types + } + } + + override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { + val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] + flinkTypeFactory.buildLogicalRowType(fieldNames, fieldTypes) + } + + /** + * Returns statistics of current table + * + * @return statistics of current table + */ + override def getStatistic: Statistic = statistic + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala index e94b4f2..9e82313 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala @@ -19,139 +19,24 @@ package org.apache.flink.table.plan.schema import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.{TableEnvironment, TableException, Types} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, StreamTableSource, TableSource} -import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import org.apache.flink.table.sources.{StreamTableSource, TableSourceUtil} class StreamTableSourceTable[T]( - override val tableSource: TableSource[T], - override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) + tableSource: StreamTableSource[T], + statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) extends TableSourceTable[T]( tableSource, - StreamTableSourceTable.adjustFieldIndexes(tableSource), - StreamTableSourceTable.adjustFieldNames(tableSource), statistic) { - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { - val fieldTypes = StreamTableSourceTable.adjustFieldTypes(tableSource) - - val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildLogicalRowType( - this.fieldNames, - fieldTypes) - } - -} - -object StreamTableSourceTable { - - private def adjustFieldIndexes(tableSource: TableSource[_]): Array[Int] = { - val (_, proctime) = getTimeIndicators(tableSource) - - val original = TableEnvironment.getFieldIndices(tableSource) - - // append proctime marker - if (proctime.isDefined) { - original :+ TimeIndicatorTypeInfo.PROCTIME_MARKER - } else { - original - } - } - - private def adjustFieldNames(tableSource: TableSource[_]): Array[String] = { - val (_, proctime) = getTimeIndicators(tableSource) - - val original = TableEnvironment.getFieldNames(tableSource) - - // append proctime field - if (proctime.isDefined) { - original :+ proctime.get - } else { - original - } - } - - private def adjustFieldTypes(tableSource: TableSource[_]): Array[TypeInformation[_]] = { - val (rowtime, proctime) = StreamTableSourceTable.getTimeIndicators(tableSource) + TableSourceUtil.validateTableSource(tableSource) - val original = TableEnvironment.getFieldTypes(tableSource.getReturnType) - - // update rowtime type - val withRowtime = if (rowtime.isDefined) { - // replace field type by RowtimeIndicator type - val rowtimeIdx = TableEnvironment.getFieldNames(tableSource).indexOf(rowtime.get) - original.patch(rowtimeIdx, Seq(TimeIndicatorTypeInfo.ROWTIME_INDICATOR), 1) - } else { - original - } - - // append proctime type - val withProctime = if (proctime.isDefined) { - withRowtime :+ TimeIndicatorTypeInfo.PROCTIME_INDICATOR - } else { - withRowtime - } - - withProctime.asInstanceOf[Array[TypeInformation[_]]] - } - - private def getTimeIndicators(tableSource: TableSource[_]): (Option[String], Option[String]) = { - - val fieldNames = TableEnvironment.getFieldNames(tableSource).toList - val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList - - val rowtime: Option[String] = tableSource match { - case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute == null => - None - case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute.trim.equals("") => - throw TableException("The name of the rowtime attribute must not be empty.") - - case timeSource: DefinedRowtimeAttribute => - // validate the rowtime field exists and is of type Long or Timestamp - val rowtimeAttribute = timeSource.getRowtimeAttribute - val rowtimeIdx = fieldNames.indexOf(rowtimeAttribute) - - if (rowtimeIdx < 0) { - throw TableException( - s"Rowtime field '$rowtimeAttribute' is not present in TableSource. " + - s"Available fields are ${fieldNames.mkString("[", ", ", "]") }.") - } - val fieldType = fieldTypes(rowtimeIdx) - if (fieldType != Types.LONG && fieldType != Types.SQL_TIMESTAMP) { - throw TableException( - s"Rowtime field '$rowtimeAttribute' must be of type Long or Timestamp " + - s"but of type ${fieldTypes(rowtimeIdx)}.") - } - Some(rowtimeAttribute) - case _ => - None - } - - val proctime: Option[String] = tableSource match { - case timeSource : DefinedProctimeAttribute if timeSource.getProctimeAttribute == null => - None - case timeSource: DefinedProctimeAttribute - if timeSource.getProctimeAttribute.trim.equals("") => - throw TableException("The name of the rowtime attribute must not be empty.") - case timeSource: DefinedProctimeAttribute => - val proctimeAttribute = timeSource.getProctimeAttribute - Some(proctimeAttribute) - case _ => - None - } - (rowtime, proctime) - } - - def deriveRowTypeOfTableSource( - tableSource: StreamTableSource[_], - typeFactory: FlinkTypeFactory): RelDataType = { - - val fieldNames = adjustFieldNames(tableSource) - val fieldTypes = adjustFieldTypes(tableSource) - - typeFactory.buildLogicalRowType(fieldNames, fieldTypes) + override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { + TableSourceUtil.getRelDataType( + tableSource, + None, + streaming = true, + typeFactory.asInstanceOf[FlinkTypeFactory]) } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala index 2f0ba1a..048e862 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala @@ -18,30 +18,22 @@ package org.apache.flink.table.plan.schema -import org.apache.flink.table.api.TableEnvironment +import org.apache.calcite.schema.Statistic +import org.apache.calcite.schema.impl.AbstractTable import org.apache.flink.table.plan.stats.FlinkStatistic import org.apache.flink.table.sources.TableSource /** Table which defines an external table via a [[TableSource]] */ -class TableSourceTable[T]( +abstract class TableSourceTable[T]( val tableSource: TableSource[T], - fieldIndexes: Array[Int], - fieldNames: Array[String], - override val statistic: FlinkStatistic) - extends FlinkTable[T]( - typeInfo = tableSource.getReturnType, - fieldIndexes, - fieldNames, - statistic) { + val statistic: FlinkStatistic) + extends AbstractTable { - def this( - tableSource: TableSource[T], - statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) { + /** + * Returns statistics of current table + * + * @return statistics of current table + */ + override def getStatistic: Statistic = statistic - this( - tableSource, - TableEnvironment.getFieldIndices(tableSource), - TableEnvironment.getFieldNames(tableSource), - statistic) - } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala index a524816..d5a5f36 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala @@ -25,9 +25,11 @@ import java.util.{Collections, List} import org.apache.calcite.rel.{RelCollation, RelDistribution, RelReferentialConstraint} import org.apache.calcite.schema.Statistic import org.apache.calcite.util.ImmutableBitSet +import org.apache.flink.table.plan.schema.TableSourceTable +import org.apache.flink.table.plan.schema.InlineTable /** - * The class provides statistics for a [[org.apache.flink.table.plan.schema.FlinkTable]]. + * The class provides statistics for a [[InlineTable]] or [[TableSourceTable]]. * * @param tableStats The table statistics. */ http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala index cfc8ada..c443a69 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala @@ -27,7 +27,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.{TableException, TableSchema} import scala.collection.mutable @@ -38,6 +38,8 @@ import scala.collection.mutable * @param path The path to the CSV file. * @param fieldNames The names of the table fields. * @param fieldTypes The types of the table fields. + * @param selectedFields The fields which will be read and returned by the table source. + * If None, all fields are returned. * @param fieldDelim The field delimiter, "," by default. * @param rowDelim The row delimiter, "\n" by default. * @param quoteCharacter An optional quote character for String values, null by default. @@ -45,16 +47,17 @@ import scala.collection.mutable * @param ignoreComments An optional prefix to indicate comments, null by default. * @param lenient Flag to skip records with parse error instead to fail, false by default. */ -class CsvTableSource( +class CsvTableSource private ( private val path: String, private val fieldNames: Array[String], private val fieldTypes: Array[TypeInformation[_]], - private val fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER, - private val rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER, - private val quoteCharacter: Character = null, - private val ignoreFirstLine: Boolean = false, - private val ignoreComments: String = null, - private val lenient: Boolean = false) + private val selectedFields: Array[Int], + private val fieldDelim: String, + private val rowDelim: String, + private val quoteCharacter: Character, + private val ignoreFirstLine: Boolean, + private val ignoreComments: String, + private val lenient: Boolean) extends BatchTableSource[Row] with StreamTableSource[Row] with ProjectableTableSource[Row] { @@ -66,18 +69,59 @@ class CsvTableSource( * @param path The path to the CSV file. * @param fieldNames The names of the table fields. * @param fieldTypes The types of the table fields. + * @param fieldDelim The field delimiter, "," by default. + * @param rowDelim The row delimiter, "\n" by default. + * @param quoteCharacter An optional quote character for String values, null by default. + * @param ignoreFirstLine Flag to ignore the first line, false by default. + * @param ignoreComments An optional prefix to indicate comments, null by default. + * @param lenient Flag to skip records with parse error instead to fail, false by default. */ - def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = + def this( + path: String, + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]], + fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER, + rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER, + quoteCharacter: Character = null, + ignoreFirstLine: Boolean = false, + ignoreComments: String = null, + lenient: Boolean = false) = { + + this( + path, + fieldNames, + fieldTypes, + fieldTypes.indices.toArray, // initially, all fields are returned + fieldDelim, + rowDelim, + quoteCharacter, + ignoreFirstLine, + ignoreComments, + lenient) + + } + + /** + * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a + * (logically) unlimited number of fields. + * + * @param path The path to the CSV file. + * @param fieldNames The names of the table fields. + * @param fieldTypes The types of the table fields. + */ + def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = { this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false) + } if (fieldNames.length != fieldTypes.length) { throw TableException("Number of field names and field types must be equal.") } - private val returnType = new RowTypeInfo(fieldTypes, fieldNames) + private val selectedFieldTypes = selectedFields.map(fieldTypes(_)) + private val selectedFieldNames = selectedFields.map(fieldNames(_)) - private var selectedFields: Array[Int] = fieldTypes.indices.toArray + private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, selectedFieldNames) /** * Returns the data of the table as a [[DataSet]] of [[Row]]. @@ -102,35 +146,32 @@ class CsvTableSource( streamExecEnv.createInput(createCsvInput(), returnType) } + /** Returns the schema of the produced table. */ + override def getTableSchema = new TableSchema(fieldNames, fieldTypes) + /** Returns a copy of [[TableSource]] with ability to project fields */ override def projectFields(fields: Array[Int]): CsvTableSource = { - val (newFields, newFieldNames, newFieldTypes) = if (fields.nonEmpty) { - (fields, fields.map(fieldNames(_)), fields.map(fieldTypes(_))) - } else { - // reporting number of records only, we must read some columns to get row count. - // (e.g. SQL: select count(1) from csv_table) - // We choose the first column here. - (Array(0), Array(fieldNames.head), Array[TypeInformation[_]](fieldTypes.head)) - } + val selectedFields = if (fields.isEmpty) Array(0) else fields +// val selectedFiels = fields - val source = new CsvTableSource(path, - newFieldNames, - newFieldTypes, + new CsvTableSource( + path, + fieldNames, + fieldTypes, + selectedFields, fieldDelim, rowDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient) - source.selectedFields = newFields - source } private def createCsvInput(): RowCsvInputFormat = { val inputFormat = new RowCsvInputFormat( new Path(path), - fieldTypes, + selectedFieldTypes, rowDelim, fieldDelim, selectedFields) @@ -162,6 +203,11 @@ class CsvTableSource( override def hashCode(): Int = { returnType.hashCode() } + + override def explainSource(): String = { + s"CsvTableSource(" + + s"read fields: ${getReturnType.getFieldNames.mkString(", ")})" + } } object CsvTableSource { http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala new file mode 100644 index 0000000..6a2ccc9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldMapping.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.util.{Map => JMap} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableSchema + +/** + * The [[DefinedFieldMapping]] interface provides a mapping for the fields of the table schema + * ([[TableSource.getTableSchema]] to fields of the physical returned type + * [[TableSource.getReturnType]] of a [[TableSource]]. + * + * If a [[TableSource]] does not implement the [[DefinedFieldMapping]] interface, the fields of + * its [[TableSchema]] are mapped to the fields of its return type [[TypeInformation]] by name. + * + * If the fields cannot or should not be implicitly mapped by name, an explicit mapping can be + * provided by implementing this interface. + * If a mapping is provided, all fields must be explicitly mapped. + */ +trait DefinedFieldMapping { + + /** + * Returns the mapping for the fields of the [[TableSource]]'s [[TableSchema]] to the fields of + * its return type [[TypeInformation]]. + * + * The mapping is done based on field names, e.g., a mapping "name" -> "f1" maps the schema field + * "name" to the field "f1" of the return type, for example in this case the second field of a + * [[org.apache.flink.api.java.tuple.Tuple]]. + * + * The returned mapping must map all fields (except proctime and rowtime fields) to the return + * type. It can also provide a mapping for fields which are not in the [[TableSchema]] to make + * fields in the physical [[TypeInformation]] accessible for a [[TimestampExtractor]]. + * + * @return A mapping from [[TableSchema]] fields to [[TypeInformation]] fields. + */ + def getFieldMapping: JMap[String, String] +} http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala deleted file mode 100644 index bead3e9..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sources - -/** - * Trait that defines custom field names and their indices in the underlying - * data type. - * - * Should be extended together with [[TableSource]] trait. - */ -trait DefinedFieldNames { - - /** Returns the names of the table fields. */ - def getFieldNames: Array[String] - - /** Returns the indices of the table fields. */ - def getFieldIndices: Array[Int] - -} http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala new file mode 100644 index 0000000..9728763 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/FieldComputer.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.expressions.{Expression, ResolvedFieldReference} + +/** + * The [[FieldComputer]] interface returns an expression to compute the field of the table schema + * of a [[TableSource]] from one or more fields of the [[TableSource]]'s return type. + * + * @tparam T The result type of the provided expression. + */ +abstract class FieldComputer[T] { + + /** + * Returns the names of all fields that the expression of the field computer accesses. + * + * @return An array with the names of all accessed fields. + */ + def getArgumentFields: Array[String] + + /** + * Returns the result type of the expression. + * + * @return The result type of the expression. + */ + def getReturnType: TypeInformation[T] + + /** + * Validates that the fields that the expression references have the correct types. + * + * @param argumentFieldTypes The types of the physical input fields. + */ + @throws[ValidationException] + def validateArgumentFields(argumentFieldTypes: Array[TypeInformation[_]]): Unit + + /** + * Returns the [[Expression]] that computes the value of the field. + * + * @param fieldAccesses Field access expressions for the argument fields. + * @return The expression to extract the timestamp from the [[TableSource]] return type. + */ + def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala index a10187b..d0c7fdc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/NestedFieldsProjectableTableSource.scala @@ -18,33 +18,59 @@ package org.apache.flink.table.sources +import org.apache.flink.api.java.DataSet +import org.apache.flink.streaming.api.datastream.DataStream + /** * Adds support for projection push-down to a [[TableSource]] with nested fields. - * A [[TableSource]] extending this interface is able - * to project the nested fields of the returned table. + * A [[TableSource]] extending this interface is able to project the fields of the returned + * [[DataSet]] if it is a [[BatchTableSource]] or [[DataStream]] if it is a [[StreamTableSource]]. * - * @tparam T The return type of the [[NestedFieldsProjectableTableSource]]. + * @tparam T The return type of the [[TableSource]]. */ trait NestedFieldsProjectableTableSource[T] { /** - * Creates a copy of the [[TableSource]] that projects its output on the specified nested fields. + * Creates a copy of the [[TableSource]] that projects its output to the given field indexes. + * The field indexes relate to the physical return type ([[TableSource.getReturnType]]) and not + * to the table schema ([[TableSource.getTableSchema]] of the [[TableSource]]. * - * @param fields The indexes of the fields to return. - * @param nestedFields The accessed nested fields of the fields to return. + * The table schema ([[TableSource.getTableSchema]] of the [[TableSource]] copy must not be + * modified by this method, but only the return type ([[TableSource.getReturnType]]) and the + * produced [[DataSet]] ([[BatchTableSource.getDataSet(]]) or [[DataStream]] + * ([[StreamTableSource.getDataStream]]). The return type may only be changed by + * removing or reordering first level fields. The type of the first level fields must not be + * changed. + * + * If the [[TableSource]] implements the [[DefinedFieldMapping]] interface, it might + * be necessary to adjust the mapping as well. + * + * The nestedFields parameter contains all nested fields that are accessed by the query. + * This information can be used to only read and set the accessed fields. + * Non-accessed fields may be left empty, set to null, or to a default value. * - * e.g. + * The [[projectNestedFields()]] method is called with parameters as shown in the example below: + * + * // schema * tableSchema = { * id, * student<\school<\city, tuition>, age, name>, * teacher<\age, name> * } * + * // query * select (id, student.school.city, student.age, teacher) * + * // parameters * fields = field = [0, 1, 2] * nestedFields \[\["*"], ["school.city", "age"], ["*"\]\] * + * IMPORTANT: This method must return a true copy and must not modify the original table source + * object. + * + * @param fields The indexes of the fields to return. + * @param nestedFields The paths of all nested fields which are accessed by the query. All other + * nested fields may be empty. * @return A copy of the [[TableSource]] that projects its output. */ def projectNestedFields( http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala index 570bdff..abb9970 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/ProjectableTableSource.scala @@ -18,16 +18,33 @@ package org.apache.flink.table.sources +import org.apache.flink.api.java.DataSet +import org.apache.flink.streaming.api.datastream.DataStream + /** * Adds support for projection push-down to a [[TableSource]]. - * A [[TableSource]] extending this interface is able to project the fields of the return table. + * A [[TableSource]] extending this interface is able to project the fields of the returned + * [[DataSet]] if it is a [[BatchTableSource]] or [[DataStream]] if it is a [[StreamTableSource]]. * * @tparam T The return type of the [[TableSource]]. */ trait ProjectableTableSource[T] { /** - * Creates a copy of the [[TableSource]] that projects its output on the specified fields. + * Creates a copy of the [[TableSource]] that projects its output to the given field indexes. + * The field indexes relate to the physical return type ([[TableSource.getReturnType]]) and not + * to the table schema ([[TableSource.getTableSchema]] of the [[TableSource]]. + * + * The table schema ([[TableSource.getTableSchema]] of the [[TableSource]] copy must not be + * modified by this method, but only the return type ([[TableSource.getReturnType]]) and the + * produced [[DataSet]] ([[BatchTableSource.getDataSet(]]) or [[DataStream]] + * ([[StreamTableSource.getDataStream]]). + * + * If the [[TableSource]] implements the [[DefinedFieldMapping]] interface, it might + * be necessary to adjust the mapping as well. + * + * IMPORTANT: This method must return a true copy and must not modify the original table source + * object. * * @param fields The indexes of the fields to return. * @return A copy of the [[TableSource]] that projects its output. http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala index d9ebc5a..b082a53 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala @@ -18,26 +18,46 @@ package org.apache.flink.table.sources +import org.apache.flink.api.scala.DataSet +import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableSchema -/** Defines an external table by providing schema information and used to produce a - * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. - * Schema information consists of a data type, field names, and corresponding indices of - * these names in the data type. +/** + * Defines an external table with the schema that is provided by [[TableSource#getTableSchema]]. * - * To define a TableSource one needs to implement [[TableSource#getReturnType]]. In this case - * field names and field indices are derived from the returned type. + * The data of a [[TableSource]] is produced as a [[DataSet]] in case of a [[BatchTableSource]] or + * as a [[DataStream]] in case of a [[StreamTableSource]]. + * The type of ths produced [[DataSet]] or [[DataStream]] is specified by the + * [[TableSource#getReturnType]] method. * - * In case if custom field names are required one need to additionally implement - * the [[DefinedFieldNames]] trait. + * By default, the fields of the [[TableSchema]] are implicitly mapped by name to the fields of the + * return type [[TypeInformation]]. An explicit mapping can be defined by implementing the + * [[DefinedFieldMapping]] interface. * * @tparam T The return type of the [[TableSource]]. */ trait TableSource[T] { - /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ + /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. + * The fields of the return type are mapped to the table schema based on their name. + * + * @return The type of the returned [[DataSet]] or [[DataStream]]. + */ def getReturnType: TypeInformation[T] - /** Describes the table source */ + /** + * Returns the schema of the produced table. + * + * @return The [[TableSchema]] of the produced table. + */ + def getTableSchema: TableSchema + + /** + * Describes the table source + * + * @return A String explaining the [[TableSource]]. + */ def explainSource(): String = "" + } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala new file mode 100644 index 0000000..48ab3de --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import java.sql.Timestamp + +import com.google.common.collect.ImmutableList +import org.apache.calcite.plan.RelOptCluster +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalValues +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.table.api.{TableException, Types, ValidationException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Cast, ResolvedFieldReference} +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo + +import scala.collection.JavaConverters._ + +/** Util class for [[TableSource]]. */ +object TableSourceUtil { + + /** Returns true if the [[TableSource]] has a rowtime attribute. */ + def hasRowtimeAttribute(tableSource: TableSource[_]): Boolean = + getRowtimeAttributes(tableSource).nonEmpty + + /** Returns true if the [[TableSource]] has a proctime attribute. */ + def hasProctimeAttribute(tableSource: TableSource[_]): Boolean = + getProctimeAttribute(tableSource).nonEmpty + + /** + * Validates a TableSource. + * + * - checks that all fields of the schema can be resolved + * - checks that resolved fields have the correct type + * - checks that the time attributes are correctly configured. + * + * @param tableSource The [[TableSource]] for which the time attributes are checked. + */ + def validateTableSource(tableSource: TableSource[_]): Unit = { + + val schema = tableSource.getTableSchema + val tableFieldNames = schema.getColumnNames + val tableFieldTypes = schema.getTypes + + // get rowtime and proctime attributes + val rowtimeAttributes = getRowtimeAttributes(tableSource) + val proctimeAttribute = getProctimeAttribute(tableSource) + + // validate that schema fields can be resolved to a return type field of correct type + var mappedFieldCnt = 0 + tableFieldTypes.zip(tableFieldNames).foreach { + case (t: SqlTimeTypeInfo[_], name: String) + if t.getTypeClass == classOf[Timestamp] && proctimeAttribute.contains(name) => + // OK, field was mapped to proctime attribute + case (t: SqlTimeTypeInfo[_], name: String) + if t.getTypeClass == classOf[Timestamp] && rowtimeAttributes.contains(name) => + // OK, field was mapped to rowtime attribute + case (t: TypeInformation[_], name) => + // check if field is registered as time indicator + if (getProctimeAttribute(tableSource).contains(name)) { + throw new ValidationException(s"Processing time field '$name' has invalid type $t. " + + s"Processing time attributes must be of type ${Types.SQL_TIMESTAMP}.") + } + if (getRowtimeAttributes(tableSource).contains(name)) { + throw new ValidationException(s"Rowtime field '$name' has invalid type $t. " + + s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.") + } + // check that field can be resolved in input type + val (physicalName, _, tpe) = resolveInputField(name, tableSource) + // validate that mapped fields are are same type + if (tpe != t) { + throw ValidationException(s"Type $t of table field '$name' does not " + + s"match with type $tpe of the field '$physicalName' of the TableSource return type.") + } + mappedFieldCnt += 1 + } + // ensure that only one field is mapped to an atomic type + if (tableSource.getReturnType.isInstanceOf[AtomicType[_]] && mappedFieldCnt > 1) { + throw ValidationException( + s"More than one table field matched to atomic input type ${tableSource.getReturnType}.") + } + + // validate rowtime attributes + tableSource match { + case r: DefinedRowtimeAttributes => + val descriptors = r.getRowtimeAttributeDescriptors + if (descriptors.size() > 1) { + throw ValidationException("Currently, only a single rowtime attribute is supported. " + + s"Please remove all but one RowtimeAttributeDescriptor.") + } else if (descriptors.size() == 1) { + val descriptor = descriptors.get(0) + val rowtimeAttribute = descriptor.getAttributeName + val rowtimeIdx = schema.getColumnNames.indexOf(rowtimeAttribute) + // ensure that field exists + if (rowtimeIdx < 0) { + throw ValidationException(s"Found a RowtimeAttributeDescriptor for field " + + s"'$rowtimeAttribute' but field '$rowtimeAttribute' does not exist in table.") + } + // ensure that field is of type TIMESTAMP + if (schema.getTypes(rowtimeIdx) != Types.SQL_TIMESTAMP) { + throw ValidationException(s"Found a RowtimeAttributeDescriptor for field " + + s"'$rowtimeAttribute' but field '$rowtimeAttribute' is not of type TIMESTAMP.") + } + // look up extractor input fields in return type + val extractorInputFields = descriptor.getTimestampExtractor.getArgumentFields + val physicalTypes = resolveInputFields(extractorInputFields, tableSource).map(_._3) + // validate timestamp extractor + descriptor.getTimestampExtractor.validateArgumentFields(physicalTypes) + } + case _ => // nothing to validate + } + + // validate proctime attribute + tableSource match { + case p: DefinedProctimeAttribute if p.getProctimeAttribute != null => + val proctimeAttribute = p.getProctimeAttribute + val proctimeIdx = schema.getColumnNames.indexOf(proctimeAttribute) + // ensure that field exists + if (proctimeIdx < 0) { + throw ValidationException(s"Found a RowtimeAttributeDescriptor for field " + + s"'$proctimeAttribute' but field '$proctimeAttribute' does not exist in table.") + } + // ensure that field is of type TIMESTAMP + if (schema.getTypes(proctimeIdx) != Types.SQL_TIMESTAMP) { + throw ValidationException(s"Found a RowtimeAttributeDescriptor for field " + + s"'$proctimeAttribute' but field '$proctimeAttribute' is not of type TIMESTAMP.") + } + case _ => // nothing to validate + } + + // ensure that proctime and rowtime attribute do not overlap + if (proctimeAttribute.isDefined && rowtimeAttributes.contains(proctimeAttribute.get)) { + throw new ValidationException(s"Field '${proctimeAttribute.get}' must not be " + + s"processing time and rowtime attribute at the same time.") + } + } + + /** + * Computes the indices that map the input type of the DataStream to the schema of the table. + * + * The mapping is based on the field names and fails if a table field cannot be + * mapped to a field of the input type. + * + * @param tableSource The table source for which the table schema is mapped to the input type. + * @param isStreamTable True if the mapping is computed for a streaming table, false otherwise. + * @param selectedFields The indexes of the table schema fields for which a mapping is + * computed. If None, a mapping for all fields is computed. + * @return An index mapping from input type to table schema. + */ + def computeIndexMapping( + tableSource: TableSource[_], + isStreamTable: Boolean, + selectedFields: Option[Array[Int]]): Array[Int] = { + val inputType = tableSource.getReturnType + val tableSchema = tableSource.getTableSchema + + // get names of selected fields + val tableFieldNames = if (selectedFields.isDefined) { + val names = tableSchema.getColumnNames + selectedFields.get.map(names(_)) + } else { + tableSchema.getColumnNames + } + + // get types of selected fields + val tableFieldTypes = if (selectedFields.isDefined) { + val types = tableSchema.getTypes + selectedFields.get.map(types(_)) + } else { + tableSchema.getTypes + } + + // get rowtime and proctime attributes + val rowtimeAttributes = getRowtimeAttributes(tableSource) + val proctimeAttributes = getProctimeAttribute(tableSource) + + // compute mapping of selected fields and time attributes + val mapping: Array[Int] = tableFieldTypes.zip(tableFieldNames).map { + case (t: SqlTimeTypeInfo[_], name: String) + if t.getTypeClass == classOf[Timestamp] && proctimeAttributes.contains(name) => + if (isStreamTable) { + TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER + } else { + TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER + } + case (t: SqlTimeTypeInfo[_], name: String) + if t.getTypeClass == classOf[Timestamp] && rowtimeAttributes.contains(name) => + if (isStreamTable) { + TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER + } else { + TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER + } + case (t: TypeInformation[_], name) => + // check if field is registered as time indicator + if (getProctimeAttribute(tableSource).contains(name)) { + throw new ValidationException(s"Processing time field '$name' has invalid type $t. " + + s"Processing time attributes must be of type ${Types.SQL_TIMESTAMP}.") + } + if (getRowtimeAttributes(tableSource).contains(name)) { + throw new ValidationException(s"Rowtime field '$name' has invalid type $t. " + + s"Rowtime attributes must be of type ${Types.SQL_TIMESTAMP}.") + } + + val (physicalName, idx, tpe) = resolveInputField(name, tableSource) + // validate that mapped fields are are same type + if (tpe != t) { + throw ValidationException(s"Type $t of table field '$name' does not " + + s"match with type $tpe of the field '$physicalName' of the TableSource return type.") + } + idx + } + + // ensure that only one field is mapped to an atomic type + if (inputType.isInstanceOf[AtomicType[_]] && mapping.count(_ >= 0) > 1) { + throw ValidationException( + s"More than one table field matched to atomic input type $inputType.") + } + + mapping + } + + /** + * Returns the Calcite schema of a [[TableSource]]. + * + * @param tableSource The [[TableSource]] for which the Calcite schema is generated. + * @param selectedFields The indicies of all selected fields. None, if all fields are selected. + * @param streaming Flag to determine whether the schema of a stream or batch table is created. + * @param typeFactory The type factory to create the schema. + * @return The Calcite schema for the selected fields of the given [[TableSource]]. + */ + def getRelDataType( + tableSource: TableSource[_], + selectedFields: Option[Array[Int]], + streaming: Boolean, + typeFactory: FlinkTypeFactory): RelDataType = { + + val fieldNames = tableSource.getTableSchema.getColumnNames + var fieldTypes = tableSource.getTableSchema.getTypes + + if (streaming) { + // adjust the type of time attributes for streaming tables + val rowtimeAttributes = getRowtimeAttributes(tableSource) + val proctimeAttributes = getProctimeAttribute(tableSource) + + // patch rowtime fields with time indicator type + rowtimeAttributes.foreach { rowtimeField => + val idx = fieldNames.indexOf(rowtimeField) + fieldTypes = fieldTypes.patch(idx, Seq(TimeIndicatorTypeInfo.ROWTIME_INDICATOR), 1) + } + // patch proctime field with time indicator type + proctimeAttributes.foreach { proctimeField => + val idx = fieldNames.indexOf(proctimeField) + fieldTypes = fieldTypes.patch(idx, Seq(TimeIndicatorTypeInfo.PROCTIME_INDICATOR), 1) + } + } + + val (selectedFieldNames, selectedFieldTypes) = if (selectedFields.isDefined) { + // filter field names and types by selected fields + (selectedFields.get.map(fieldNames(_)), selectedFields.get.map(fieldTypes(_))) + } else { + (fieldNames, fieldTypes) + } + typeFactory.buildLogicalRowType(selectedFieldNames, selectedFieldTypes) + } + + /** + * Returns the [[RowtimeAttributeDescriptor]] of a [[TableSource]]. + * + * @param tableSource The [[TableSource]] for which the [[RowtimeAttributeDescriptor]] is + * returned. + * @param selectedFields The fields which are selected from the [[TableSource]]. + * If None, all fields are selected. + * @return The [[RowtimeAttributeDescriptor]] of the [[TableSource]]. + */ + def getRowtimeAttributeDescriptor( + tableSource: TableSource[_], + selectedFields: Option[Array[Int]]): Option[RowtimeAttributeDescriptor] = { + + tableSource match { + case r: DefinedRowtimeAttributes => + val descriptors = r.getRowtimeAttributeDescriptors + if (descriptors.size() == 0) { + None + } else if (descriptors.size > 1) { + throw ValidationException("Table with has more than a single rowtime attribute..") + } else { + // exactly one rowtime attribute descriptor + if (selectedFields.isEmpty) { + // all fields are selected. + Some(descriptors.get(0)) + } else { + val descriptor = descriptors.get(0) + // look up index of row time attribute in schema + val fieldIdx = tableSource.getTableSchema.getColumnNames.indexOf( + descriptor.getAttributeName) + // is field among selected fields? + if (selectedFields.get.contains(fieldIdx)) { + Some(descriptor) + } else { + None + } + } + } + case _ => None + } + } + + /** + * Obtains the [[RexNode]] expression to extract the rowtime timestamp for a [[TableSource]]. + * + * @param tableSource The [[TableSource]] for which the expression is extracted. + * @param selectedFields The selected fields of the [[TableSource]]. + * If None, all fields are selected. + * @param cluster The [[RelOptCluster]] of the current optimization process. + * @param relBuilder The [[RelBuilder]] to build the [[RexNode]]. + * @param resultType The result type of the timestamp expression. + * @return The [[RexNode]] expression to extract the timestamp of the table source. + */ + def getRowtimeExtractionExpression( + tableSource: TableSource[_], + selectedFields: Option[Array[Int]], + cluster: RelOptCluster, + relBuilder: RelBuilder, + resultType: TypeInformation[_]): Option[RexNode] = { + + val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + + /** + * Creates a RelNode with a schema that corresponds on the given fields + * Fields for which no information is available, will have default values. + */ + def createSchemaRelNode(fields: Array[(String, Int, TypeInformation[_])]): RelNode = { + val maxIdx = fields.map(_._2).max + val idxMap: Map[Int, (String, TypeInformation[_])] = Map( + fields.map(f => f._2 -> (f._1, f._3)): _*) + val (physicalFields, physicalTypes) = (0 to maxIdx) + .map(i => idxMap.getOrElse(i, ("", Types.BYTE))).unzip + val physicalSchema: RelDataType = typeFactory.buildLogicalRowType( + physicalFields, + physicalTypes) + LogicalValues.create( + cluster, + physicalSchema, + ImmutableList.of().asInstanceOf[ImmutableList[ImmutableList[RexLiteral]]]) + } + + val rowtimeDesc = getRowtimeAttributeDescriptor(tableSource, selectedFields) + rowtimeDesc.map { r => + val tsExtractor = r.getTimestampExtractor + + val fieldAccesses = if (tsExtractor.getArgumentFields.nonEmpty) { + val resolvedFields = resolveInputFields(tsExtractor.getArgumentFields, tableSource) + // push an empty values node with the physical schema on the relbuilder + relBuilder.push(createSchemaRelNode(resolvedFields)) + // get extraction expression + resolvedFields.map(f => ResolvedFieldReference(f._1, f._3)) + } else { + new Array[ResolvedFieldReference](0) + } + + val expression = tsExtractor.getExpression(fieldAccesses) + // add cast to requested type and convert expression to RexNode + val rexExpression = Cast(expression, resultType).toRexNode(relBuilder) + relBuilder.clear() + rexExpression + } + } + + /** + * Returns the indexes of the physical fields that required to compute the given logical fields. + * + * @param tableSource The [[TableSource]] for which the physical indexes are computed. + * @param logicalFieldIndexes The indexes of the accessed logical fields for which the physical + * indexes are computed. + * @return The indexes of the physical fields are accessed to forward and compute the logical + * fields. + */ + def getPhysicalIndexes( + tableSource: TableSource[_], + logicalFieldIndexes: Array[Int]): Array[Int] = { + + // get the mapping from logical to physical positions. + // stream / batch distinction not important here + val fieldMapping = computeIndexMapping(tableSource, isStreamTable = true, None) + + logicalFieldIndexes + // resolve logical indexes to physical indexes + .map(fieldMapping(_)) + // resolve time indicator markers to physical indexes + .flatMap { + case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER => + // proctime field do not access a physical field + Seq() + case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER => + // rowtime field is computed. + // get names of fields which are accessed by the expression to compute the rowtime field. + val rowtimeAttributeDescriptor = getRowtimeAttributeDescriptor(tableSource, None) + val accessedFields = if (rowtimeAttributeDescriptor.isDefined) { + rowtimeAttributeDescriptor.get.getTimestampExtractor.getArgumentFields + } else { + throw TableException("Computed field mapping includes a rowtime marker but the " + + "TableSource does not provide a RowtimeAttributeDescriptor. " + + "This is a bug and should be reported.") + } + // resolve field names to physical fields + resolveInputFields(accessedFields, tableSource).map(_._2) + case idx => + Seq(idx) + } + } + + /** Returns a list with all rowtime attribute names of the [[TableSource]]. */ + private def getRowtimeAttributes(tableSource: TableSource[_]): Array[String] = { + tableSource match { + case r: DefinedRowtimeAttributes => + r.getRowtimeAttributeDescriptors.asScala.map(_.getAttributeName).toArray + case _ => + Array() + } + } + + /** Returns the proctime attribute of the [[TableSource]] if it is defined. */ + private def getProctimeAttribute(tableSource: TableSource[_]): Option[String] = { + tableSource match { + case p: DefinedProctimeAttribute if p.getProctimeAttribute != null => + Some(p.getProctimeAttribute) + case _ => + None + } + } + + /** + * Identifies for a field name of the logical schema, the corresponding physical field in the + * return type of a [[TableSource]]. + * + * @param fieldName The logical field to look up. + * @param tableSource The table source in which to look for the field. + * @return The name, index, and type information of the physical field. + */ + private def resolveInputField( + fieldName: String, + tableSource: TableSource[_]): (String, Int, TypeInformation[_]) = { + + val returnType = tableSource.getReturnType + + /** Look up a field by name in a type information */ + def lookupField(fieldName: String, failMsg: String): (String, Int, TypeInformation[_]) = { + returnType match { + case a: AtomicType[_] => + // no composite type, we return the full atomic type as field + (fieldName, 0, a) + case c: CompositeType[_] => + // get and check field index + val idx = c.getFieldIndex(fieldName) + if (idx < 0) { + throw ValidationException(failMsg) + } + // return field name, index, and field type + (fieldName, idx, c.getTypeAt(idx)) + case _ => throw TableException("Unexpected type information.") + } + } + + tableSource match { + case d: DefinedFieldMapping if d.getFieldMapping != null => + // resolve field name in field mapping + val resolvedFieldName = d.getFieldMapping.get(fieldName) + if (resolvedFieldName == null) { + throw ValidationException( + s"Field '$fieldName' could not be resolved by the field mapping.") + } + // look up resolved field in return type + lookupField( + resolvedFieldName, + s"Table field '$fieldName' was resolved to TableSource return type field " + + s"'$resolvedFieldName', but field '$resolvedFieldName' was not found in the return " + + s"type $returnType of the TableSource. " + + s"Please verify the field mapping of the TableSource.") + case _ => + // look up field in return type + lookupField( + fieldName, + s"Table field '$fieldName' was not found in the return type $returnType of the " + + s"TableSource.") + } + } + + /** + * Identifies the physical fields in the return type [[TypeInformation]] of a [[TableSource]] + * for a list of field names of the [[TableSource]]'s [[org.apache.flink.table.api.TableSchema]]. + * + * @param fieldNames The field names to look up. + * @param tableSource The table source in which to look for the field. + * @return The name, index, and type information of the physical field. + */ + private def resolveInputFields( + fieldNames: Array[String], + tableSource: TableSource[_]): Array[(String, Int, TypeInformation[_])] = { + fieldNames.map(resolveInputField(_, tableSource)) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala index babd815..bfc06f9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala @@ -18,54 +18,61 @@ package org.apache.flink.table.sources +import java.util + +import org.apache.flink.table.api.TableSchema +import org.apache.flink.table.api.Types + /** - * Defines a logical event-time attribute for a [[TableSource]]. - * The event-time attribute can be used for indicating, accessing, and working with Flink's - * event-time. - * - * A [[TableSource]] that implements this interface defines the name of - * the event-time attribute. The attribute must be present in the schema of the [[TableSource]] - * and must be of type [[Long]] or [[java.sql.Timestamp]]. + * Extends a [[TableSource]] to specify a processing time attribute. */ -trait DefinedRowtimeAttribute { +trait DefinedProctimeAttribute { /** - * Defines a name of the event-time attribute that represents Flink's event-time, i.e., an - * attribute that is aligned with the watermarks of the - * [[org.apache.flink.streaming.api.datastream.DataStream]] returned by - * [[StreamTableSource.getDataStream()]]. - * - * An attribute with the given name must be present in the schema of the [[TableSource]]. - * The attribute must be of type [[Long]] or [[java.sql.Timestamp]]. + * Returns the name of a processing time attribute or null if no processing time attribute is + * present. * - * The method should return null if no rowtime attribute is defined. - * - * @return The name of the field that represents the event-time field and which is aligned - * with the watermarks of the [[org.apache.flink.streaming.api.datastream.DataStream]] - * returned by [[StreamTableSource.getDataStream()]]. - * The field must be present in the schema of the [[TableSource]] and be of type [[Long]] - * or [[java.sql.Timestamp]]. + * The referenced attribute must be present in the [[TableSchema]] of the [[TableSource]] and of + * type [[Types.SQL_TIMESTAMP]]. */ - def getRowtimeAttribute: String + def getProctimeAttribute: String } /** - * Defines a logical processing-time attribute for a [[TableSource]]. - * The processing-time attribute can be used for indicating, accessing, and working with Flink's - * processing-time. - * - * A [[TableSource]] that implements this interface defines the name of - * the processing-time attribute. The attribute will be added to the schema of the - * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]]. + * Extends a [[TableSource]] to specify rowtime attributes via a + * [[RowtimeAttributeDescriptor]]. */ -trait DefinedProctimeAttribute { +trait DefinedRowtimeAttributes { /** - * Defines a name of the processing-time attribute that represents Flink's - * processing-time. Null if no rowtime should be available. + * Returns a list of [[RowtimeAttributeDescriptor]] for all rowtime attributes of the table. * - * The field will be appended to the schema provided by the [[TableSource]]. + * All referenced attributes must be present in the [[TableSchema]] of the [[TableSource]] and of + * type [[Types.SQL_TIMESTAMP]]. + * + * @return A list of [[RowtimeAttributeDescriptor]]. */ - def getProctimeAttribute: String + def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] +} + +/** + * Describes a rowtime attribute of a [[TableSource]]. + * + * @param attributeName The name of the rowtime attribute. + * @param timestampExtractor The timestamp extractor to derive the values of the attribute. + * @param watermarkStrategy The watermark strategy associated with the attribute. + */ +class RowtimeAttributeDescriptor( + attributeName: String, + timestampExtractor: TimestampExtractor, + watermarkStrategy: WatermarkStrategy) { + + /** Returns the name of the rowtime attribute. */ + def getAttributeName: String = attributeName + + /** Returns the [[TimestampExtractor]] for the attribute. */ + def getTimestampExtractor: TimestampExtractor = timestampExtractor + /** Returns the [[WatermarkStrategy]] for the attribute. */ + def getWatermarkStrategy: WatermarkStrategy = watermarkStrategy } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala new file mode 100644 index 0000000..e0f01d5 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/timestampExtractors.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.{Types, ValidationException} +import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldReference} + +/** + * Provides the an expression to extract the timestamp for a rowtime attribute. + */ +abstract class TimestampExtractor extends FieldComputer[Long] { + + /** Timestamp extractors compute the timestamp as Long. */ + override def getReturnType: TypeInformation[Long] = Types.LONG.asInstanceOf[TypeInformation[Long]] +} + +/** + * Converts an existing [[Long]] or [[java.sql.Timestamp]] field into a rowtime attribute. + * + * @param field The field to convert into a rowtime attribute. + */ +class ExistingField(field: String) extends TimestampExtractor { + + override def getArgumentFields: Array[String] = Array(field) + + @throws[ValidationException] + override def validateArgumentFields(physicalFieldTypes: Array[TypeInformation[_]]): Unit = { + + // get type of field to convert + val fieldType = physicalFieldTypes(0) + + // check that the field to convert is of type Long or Timestamp + fieldType match { + case Types.LONG => // OK + case Types.SQL_TIMESTAMP => // OK + case _: TypeInformation[_] => + throw ValidationException( + s"Field '$field' must be of type Long or Timestamp but is of type $fieldType.") + } + } + + /** + * Returns an [[Expression]] that casts a [[Long]] or [[java.sql.Timestamp]] field into a + * rowtime attribute. + */ + def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = { + + val fieldAccess: Expression = fieldAccesses(0) + + fieldAccess.resultType match { + case Types.LONG => + // access LONG field + fieldAccess + case Types.SQL_TIMESTAMP => + // cast timestamp to long + Cast(fieldAccess, Types.LONG) + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala new file mode 100644 index 0000000..eec423f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/watermarkStrategies.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.types.Row + +/** + * Provides a strategy to generate watermarks for a rowtime attribute. + * + * A watermark strategy is either a [[PeriodicWatermarkAssigner]] or + * [[PunctuatedWatermarkAssigner]]. + * + */ +sealed abstract class WatermarkStrategy extends Serializable + +/** A periodic watermark assigner. */ +abstract class PeriodicWatermarkAssigner extends WatermarkStrategy { + + /** + * Updates the assigner with the next timestamp. + * + * @param timestamp The next timestamp to update the assigner. + */ + def nextTimestamp(timestamp: Long): Unit + + /** + * Returns the current watermark. + * + * @return The current watermark. + */ + def getWatermark: Watermark +} + +/** A punctuated watermark assigner. */ +abstract class PunctuatedWatermarkAssigner extends WatermarkStrategy { + + /** + * Returns the watermark for the current row or null if no watermark should be generated. + * + * @param row The current row. + * @param timestamp The value of the timestamp attribute for the row. + * @return The watermark for this row or null if no watermark should be generated. + */ + def getWatermark(row: Row, timestamp: Long): Watermark +} + +/** + * A watermark assigner for ascending rowtime attributes. + * + * Emits a watermark of the maximum observed timestamp so far minus 1. + * Rows that have a timestamp equal to the max timestamp are not late. + */ +class AscendingWatermarks extends PeriodicWatermarkAssigner { + + var maxTimestamp: Long = Long.MinValue + 1 + + override def nextTimestamp(timestamp: Long): Unit = { + if (timestamp > maxTimestamp) { + maxTimestamp = timestamp + } + } + + override def getWatermark: Watermark = new Watermark(maxTimestamp - 1) +} + +/** + * A watermark assigner for rowtime attributes which are out-of-order by a bounded time interval. + * + * Emits watermarks which are the maximum observed timestamp minus the specified delay. + * + * @param delay The delay by which watermarks are behind the maximum observed timestamp. + */ +class BoundedOutOfOrderWatermarks(val delay: Long) extends PeriodicWatermarkAssigner { + + var maxTimestamp: Long = Long.MinValue + delay + + override def nextTimestamp(timestamp: Long): Unit = { + if (timestamp > maxTimestamp) { + maxTimestamp = timestamp + } + } + + override def getWatermark: Watermark = new Watermark(maxTimestamp - delay) +} http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala index 824f3fb..ad82d52 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala @@ -47,8 +47,11 @@ class TimeIndicatorTypeInfo(val isEventTime: Boolean) object TimeIndicatorTypeInfo { - val ROWTIME_MARKER: Int = -1 - val PROCTIME_MARKER: Int = -2 + val ROWTIME_STREAM_MARKER: Int = -1 + val PROCTIME_STREAM_MARKER: Int = -2 + + val ROWTIME_BATCH_MARKER: Int = -3 + val PROCTIME_BATCH_MARKER: Int = -4 val ROWTIME_INDICATOR = new TimeIndicatorTypeInfo(true) val PROCTIME_INDICATOR = new TimeIndicatorTypeInfo(false) http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala index a2356a6..6df00e7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala @@ -181,13 +181,17 @@ class ExternalCatalogTest extends TableTestBase { util.verifyTable(result, expected) } - def sourceBatchTableNode(sourceTablePath: Array[String], fields: Array[String]): String = { + def sourceBatchTableNode( + sourceTablePath: Array[String], + fields: Array[String]): String = { s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " + - s"fields=[${fields.mkString(", ")}])" + s"fields=[${fields.mkString(", ")}], " + + s"source=[CsvTableSource(read fields: ${fields.mkString(", ")})])" } def sourceStreamTableNode(sourceTablePath: Array[String], fields: Array[String]): String = { s"StreamTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " + - s"fields=[${fields.mkString(", ")}])" + s"fields=[${fields.mkString(", ")}], " + + s"source=[CsvTableSource(read fields: ${fields.mkString(", ")})])" } }
