[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192542#comment-16192542 ] ASF GitHub Bot commented on FLINK-7446: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4710 > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192067#comment-16192067 ] ASF GitHub Bot commented on FLINK-7446: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4710 Thanks @wuchong I will merge this PR tomorrow. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182840#comment-16182840 ] ASF GitHub Bot commented on FLINK-7446: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4710 Looks good to me. +1 > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181224#comment-16181224 ] ASF GitHub Bot commented on FLINK-7446: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4710 Thanks for the review @twalthr. I've updated the PR. @haohui: This PR preserves the current logic that time attributes are exposed as `TIMESTAMP`. I agree that support for time indicators that expose themselves as `Long` is desirable. However, this requires quite a few changes as we need to extend several functions (incl. `TUMBLE`, `HOP`, etc.) and validation logic in some operators (over windows, joins, etc.). So this is not a lightweight change and should be done as a separate issue, IMO. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181204#comment-16181204 ] ASF GitHub Bot commented on FLINK-7446: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r141130569 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala --- @@ -121,19 +161,81 @@ class TableSourceTest extends TableTestBase { ) util.verifyTable(t, expected) } + + @Test + def testProjectableProcTimeTableSource(): Unit = { +// ensures that projection is not pushed into table source with proctime indicators +val util = streamTestUtil() + +val projectableTableSource = new TestProctimeSource("pTime") with ProjectableTableSource[Row] { + override def projectFields(fields: Array[Int]): TableSource[Row] = { +// ensure this method is not called! +Assert.fail() +null.asInstanceOf[TableSource[Row]] + } +} +util.tableEnv.registerTableSource("PTimeTable", projectableTableSource) + +val t = util.tableEnv.scan("PTimeTable") + .select('name, 'val) + .where('val > 10) + +val expected = + unaryNode( +"DataStreamCalc", +"StreamTableSourceScan(table=[[PTimeTable]], fields=[id, val, name, pTime])", +term("select", "name", "val"), +term("where", ">(val, 10)") + ) +util.verifyTable(t, expected) + } + + @Test + def testProjectableRowTimeTableSource(): Unit = { --- End diff -- Yes, that should not be a problem. Projection push-down is not possible because the schema of the table is partially constructed inside of the Table API (e.g., appending the proctime attribute). > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181172#comment-16181172 ] ASF GitHub Bot commented on FLINK-7446: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r141124496 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala --- @@ -33,6 +32,9 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule( override def matches(call: RelOptRuleCall): Boolean = { val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan] scan.tableSource match { + // projection pushdown is not supported for sources that provide time indicators + case r: DefinedRowtimeAttribute if r.getRowtimeAttribute != null => false --- End diff -- Yes will do, but it is not easy to solve IMO. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179072#comment-16179072 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/4710 @haohui We only cast ROWTIME / PROCTIME directly to LONG during runtime, the special types are needed during pre-flight phase and validation. We could not come up with a better solution that ensures that watermarks stay aligned with the rowtime. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179063#comment-16179063 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140774888 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -255,11 +255,30 @@ abstract class CodeGenerator( generateRowtimeAccess() case TimeIndicatorTypeInfo.PROCTIME_MARKER => // attribute is proctime indicator. -// We use a null literal and generate a timestamp when we need it. +// we use a null literal and generate a timestamp when we need it. generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR) case idx => -// regular attribute. Access attribute in input data type. -generateInputAccess(input1, input1Term, idx) +// get type of result field +val outIdx = input1Mapping.indexOf(idx) +val outType = returnType match { + case pt: PojoTypeInfo[_] => pt.getTypeAt(resultFieldNames(outIdx)) + case ct: CompositeType[_] => ct.getTypeAt(outIdx) + case t: TypeInformation[_] => t +} +val inputAccess = generateInputAccess(input1, input1Term, idx) +// Change output type to rowtime indicator +if (FlinkTypeFactory.isRowtimeIndicatorType(outType) && + (inputAccess.resultType == Types.LONG || inputAccess.resultType == Types.SQL_TIMESTAMP)) { + // Hard cast possible because LONG, TIMESTAMP, and ROW_TIMEINDICATOR are internally --- End diff -- `ROWTIME_INDICATOR` > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179065#comment-16179065 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140778116 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala --- @@ -30,10 +30,16 @@ package org.apache.flink.table.sources trait DefinedRowtimeAttribute { /** -* Defines a name of the event-time attribute that represents Flink's -* event-time. Null if no rowtime should be available. +* Defines a name of the event-time attribute that represents Flink's event-time, i.e., an +* attribute that is aligned with the watermarks of the table. --- End diff -- `that is aligned with the watermarks of the underlying DataStream`? > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179070#comment-16179070 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140778313 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala --- @@ -30,10 +30,16 @@ package org.apache.flink.table.sources trait DefinedRowtimeAttribute { /** -* Defines a name of the event-time attribute that represents Flink's --- End diff -- Maybe also update the docs of the class? > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179071#comment-16179071 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140774786 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -255,11 +255,30 @@ abstract class CodeGenerator( generateRowtimeAccess() case TimeIndicatorTypeInfo.PROCTIME_MARKER => // attribute is proctime indicator. -// We use a null literal and generate a timestamp when we need it. +// we use a null literal and generate a timestamp when we need it. generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR) case idx => -// regular attribute. Access attribute in input data type. -generateInputAccess(input1, input1Term, idx) +// get type of result field --- End diff -- add a comment that this is needed for `TableSource`? > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179066#comment-16179066 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140771682 --- Diff: docs/dev/table/streaming.md --- @@ -336,27 +336,24 @@ val windowedTable = tEnv ### Event time -Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of the table program when reading records from persistent storage. +Event time allows a table program to produce results based on the time that is contained in every record. This allows for consistent results even in case of out-of-order events or late events. It also ensures replayable results of the table program when reading records from persistent storage. Additionally, event time allows for unified syntax for table programs in both batch and streaming environments. A time attribute in a streaming environment can be a regular field of a record in a batch environment. In order to handle out-of-order events and distinguish between on-time and late events in streaming, Flink needs to extract timestamps from events and make some kind of progress in time (so-called [watermarks]({{ site.baseurl }}/dev/event_time.html)). An event time attribute can be defined either during DataStream-to-Table conversion or by using a TableSource. -The Table API & SQL assume that in both cases timestamps and watermarks have been generated in the [underlying DataStream API]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a `TableSource` with knowledge about the incoming data's characteristics and is hidden from the end user of the API. - - During DataStream-to-Table Conversion -The event time attribute is defined with the `.rowtime` property during schema definition. +The event time attribute is defined with the `.rowtime` property during schema definition. Timestamps and watermarks must have been assigned in the `DataStream` that is converted. --- End diff -- Add a link to `dev/event_timestamps_watermarks` again? > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179062#comment-16179062 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140778244 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala --- @@ -30,10 +30,16 @@ package org.apache.flink.table.sources trait DefinedRowtimeAttribute { /** -* Defines a name of the event-time attribute that represents Flink's -* event-time. Null if no rowtime should be available. +* Defines a name of the event-time attribute that represents Flink's event-time, i.e., an +* attribute that is aligned with the watermarks of the table. +* An attribute with the given name must be present in the schema of the [[TableSource]]. +* The attribute must be of type [[Long]] or [[java.sql.Timestamp]]. * -* The field will be appended to the schema provided by the [[TableSource]]. +* The method should return null if no rowtime attribute is defined. +* +* @return The name of the field that represents the event-time field and which is aligned +* with the watermarks of the table. The field must be present in the schema of the --- End diff -- Some here. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179069#comment-16179069 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140777209 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala --- @@ -53,18 +53,11 @@ object StreamTableSourceTable { val original = TableEnvironment.getFieldIndices(tableSource) -// append rowtime marker -val withRowtime = if (rowtime.isDefined) { --- End diff -- remove unused `rowtime` > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179064#comment-16179064 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140782081 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala --- @@ -121,19 +161,81 @@ class TableSourceTest extends TableTestBase { ) util.verifyTable(t, expected) } + + @Test + def testProjectableProcTimeTableSource(): Unit = { +// ensures that projection is not pushed into table source with proctime indicators +val util = streamTestUtil() + +val projectableTableSource = new TestProctimeSource("pTime") with ProjectableTableSource[Row] { + override def projectFields(fields: Array[Int]): TableSource[Row] = { +// ensure this method is not called! +Assert.fail() +null.asInstanceOf[TableSource[Row]] + } +} +util.tableEnv.registerTableSource("PTimeTable", projectableTableSource) + +val t = util.tableEnv.scan("PTimeTable") + .select('name, 'val) + .where('val > 10) + +val expected = + unaryNode( +"DataStreamCalc", +"StreamTableSourceScan(table=[[PTimeTable]], fields=[id, val, name, pTime])", +term("select", "name", "val"), +term("where", ">(val, 10)") + ) +util.verifyTable(t, expected) + } + + @Test + def testProjectableRowTimeTableSource(): Unit = { --- End diff -- Does filter push down work? With rowtime and proctime? > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179067#comment-16179067 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140775690 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala --- @@ -47,34 +46,18 @@ class FlinkLogicalTableSourceScan( } override def deriveRowType(): RelDataType = { -val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - -val fieldNames = TableEnvironment.getFieldNames(tableSource).toList -val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList -val fields = fieldNames.zip(fieldTypes) - -val withRowtime = tableSource match { - case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => -val rowtimeAttribute = timeSource.getRowtimeAttribute -fields :+ (rowtimeAttribute, TimeIndicatorTypeInfo.ROWTIME_INDICATOR) - case _ => -fields -} +val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] -val withProctime = tableSource match { - case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null => -val proctimeAttribute = timeSource.getProctimeAttribute -withRowtime :+ (proctimeAttribute, TimeIndicatorTypeInfo.PROCTIME_INDICATOR) - case _ => -withRowtime +tableSource match { + case s: StreamTableSource[_] => +StreamTableSourceTable.deriveRowTypeOfTableSource(s, flinkTypeFactory) + case b: BatchTableSource[_] => --- End diff -- Replace `b` with `_` to remove IDE warning. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179068#comment-16179068 ] ASF GitHub Bot commented on FLINK-7446: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140776048 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala --- @@ -33,6 +32,9 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule( override def matches(call: RelOptRuleCall): Boolean = { val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan] scan.tableSource match { + // projection pushdown is not supported for sources that provide time indicators + case r: DefinedRowtimeAttribute if r.getRowtimeAttribute != null => false --- End diff -- Can you create a follow-up issue for this? > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178107#comment-16178107 ] ASF GitHub Bot commented on FLINK-7446: --- Github user haohui commented on the issue: https://github.com/apache/flink/pull/4710 LGTM overall +1. One question: since we now cast `ROWTIME` / `PROCTIME` directly to `LONG`, I wonder, do we want to revisit the decision that creates dedicated types for `ROWTIME` / `PROCTIME`? > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178004#comment-16178004 ] ASF GitHub Bot commented on FLINK-7446: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/4710 [FLINK-7446] [table] Change DefinedRowtimeAttribute to work on existing field. ## What is the purpose of the change Changes the contract of the `DefinedRowtimeAttribute` interface. The rowtime attribute is no longer appended to the schema of the row but marks an existing field in the input that will be handled as event time attribute. The specified field must be of type `Long` or `Timestamp`. The watermarks of `DataStream` must be aligned to the specified field. ## Brief change log - rowtime fields are no longer appended but an existing Long or Timestamp field is marked as event time field - Add checks that projections are not pushed to `TableSources` that implement `DefinedRowtimeAttribute` or `DefinedProctimeAttribute` - Added several tests for table sources with rowtime or proctime attributes - Updated the documenation ## Verifying this change - Check the added test cases ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** ## Documentation - Documentation is updated according to the changes of the PR. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableExistingField Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4710.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4710 commit 28f11bcf989e30d14b09b43891ee6012d77960aa Author: Fabian HueskeDate: 2017-09-10T22:05:06Z [FLINK-7446] [table] Change DefinedRowtimeAttribute to work on existing field. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16160671#comment-16160671 ] Jark Wu commented on FLINK-7446: Hi [~fhueske] , feel free to take this, I didn't start yet. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158910#comment-16158910 ] Fabian Hueske commented on FLINK-7446: -- Hi [~jark], did you start to work on this issue? I just went ahead without checking the JIRA :-( If you have something, I'd drop my code. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144662#comment-16144662 ] Jark Wu commented on FLINK-7446: [~fhueske] yes, I created FLINK-7548 to discuss this. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143517#comment-16143517 ] Fabian Hueske commented on FLINK-7446: -- I think that's a good plan [~jark]. However, we should design it in a way that we can support periodic and punctuated watermarks. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137794#comment-16137794 ] Jark Wu commented on FLINK-7446: In the long run, we can provide a new interface called {{DefinedWatermark}}, which has two methods {{getRowtimeAttribute}} (can only be an existing field) and {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked deprecated. In addition, we can provide some built-in watermark generators, such as {{AscendingTimestamp}}, {{BoundedOutOfOrderness}}. What do you think ? > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137358#comment-16137358 ] Fabian Hueske commented on FLINK-7446: -- I agree that the watermark generation for TableSource's should be reworked. However, I'm not sure if we will achieve that before the next release. If we want to support to define an event-time indicator as an existing field and want to be sure that the implementation of the {{TableSource}} is correct, we would have to compare the {{StreamRecord}} timestamp with the existing attribute at runtime for each record. However, since {{TableSource}} is an interface for rather experienced users (rather DBA than DB user), I'd be fine to omit this safety check. In the long run I agree with [~xccui]. We should have a watermark generator interface that works on an existing field with built-in implementations for the common cases. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134448#comment-16134448 ] Jark Wu commented on FLINK-7446: >From my perspective, I think TableSource should provide an interface to define >timestamp and watermark at the same time. The {{DefinedRowtimeAttribute}} is >not a good design. A TableSource is not come from a DataStream, it is >converted into DataStream, so that the timestamp and watermark should be >defined by the TableSource not the DataStream. "generate timestamps and watermarks and validate that field and extracted timestamp are the same" is the validation happens at runtime? If it is, I prefer the later one. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133050#comment-16133050 ] Xingcan Cui commented on FLINK-7446: Thanks for the response, [~fhueske]. From my perspective, I prefer the later option (create new interfaces/operators...). Maybe we can provide one or more default watermark generators. Users can directly set them by providing some parameters (e.g., the watermark interval and the expected delay to the latest rowtime). Moreover, if the provided watermark generators can not meet the requirements, users can implement their own ones. What do you think, [~jark]? > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131805#comment-16131805 ] Fabian Hueske commented on FLINK-7446: -- That's in fact a good question. In principle we could avoid timestamp extraction. However, we need to generate watermarks and both things go together in the DataStream API. I see two options: generate timestamps and watermarks and validate that field and extracted timestamp are the same or create new interfaces / operators that generate watermarks based on a field. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131629#comment-16131629 ] Xingcan Cui commented on FLINK-7446: Now that the rowtime field is an existing one in the stream, shall we consider extracting the timestamps automatically instead of forcing the users to assign them with the {{TimestampAssigner}} in advance. What if the timestamps assigned in the {{StreamRecord}} and in the {{Row}} do not match. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131616#comment-16131616 ] Jark Wu commented on FLINK-7446: [~fhueske] Great! I will make a pull request in the next days. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131261#comment-16131261 ] Fabian Hueske commented on FLINK-7446: -- I think the existing {{DefinedRowtimeAttribute}} interface can be used for that as well. If the field name returned by {{DefinedRowtimeAttribute.getRowtimeAttribute()}} is included in the field names returned by {{TableSource}} we can use the existing field as time indicator attribute. > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)