[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192542#comment-16192542
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4710


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192067#comment-16192067
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4710
  
Thanks @wuchong 
I will merge this PR tomorrow.


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182840#comment-16182840
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4710
  
Looks good to me.

+1


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181224#comment-16181224
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4710
  
Thanks for the review @twalthr.
I've updated the PR. 

@haohui: This PR preserves the current logic that time attributes are 
exposed as `TIMESTAMP`. I agree that support for time indicators that expose 
themselves as `Long` is desirable. However, this requires quite a few changes 
as we need to extend several functions (incl. `TUMBLE`, `HOP`, etc.) and 
validation logic in some operators (over windows, joins, etc.). So this is not 
a lightweight change and should be done as a separate issue, IMO. 


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181204#comment-16181204
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r141130569
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
 ---
@@ -121,19 +161,81 @@ class TableSourceTest extends TableTestBase {
   )
 util.verifyTable(t, expected)
   }
+
+  @Test
+  def testProjectableProcTimeTableSource(): Unit = {
+// ensures that projection is not pushed into table source with 
proctime indicators
+val util = streamTestUtil()
+
+val projectableTableSource = new TestProctimeSource("pTime") with 
ProjectableTableSource[Row] {
+  override def projectFields(fields: Array[Int]): TableSource[Row] = {
+// ensure this method is not called!
+Assert.fail()
+null.asInstanceOf[TableSource[Row]]
+  }
+}
+util.tableEnv.registerTableSource("PTimeTable", projectableTableSource)
+
+val t = util.tableEnv.scan("PTimeTable")
+  .select('name, 'val)
+  .where('val > 10)
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+"StreamTableSourceScan(table=[[PTimeTable]], fields=[id, val, 
name, pTime])",
+term("select", "name", "val"),
+term("where", ">(val, 10)")
+  )
+util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testProjectableRowTimeTableSource(): Unit = {
--- End diff --

Yes, that should not be a problem. Projection push-down is not possible 
because the schema of the table is partially constructed inside of the Table 
API (e.g., appending the proctime attribute). 


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181172#comment-16181172
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r141124496
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
 ---
@@ -33,6 +32,9 @@ class PushProjectIntoTableSourceScanRule extends 
RelOptRule(
   override def matches(call: RelOptRuleCall): Boolean = {
 val scan: FlinkLogicalTableSourceScan = 
call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
 scan.tableSource match {
+  // projection pushdown is not supported for sources that provide 
time indicators
+  case r: DefinedRowtimeAttribute if r.getRowtimeAttribute != null => 
false
--- End diff --

Yes will do, but it is not easy to solve IMO.


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179072#comment-16179072
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/4710
  
@haohui We only cast ROWTIME / PROCTIME directly to LONG during runtime, 
the special types are needed during pre-flight phase and validation. We could 
not come up with a better solution that ensures that watermarks stay aligned 
with the rowtime.


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179063#comment-16179063
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r140774888
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -255,11 +255,30 @@ abstract class CodeGenerator(
 generateRowtimeAccess()
   case TimeIndicatorTypeInfo.PROCTIME_MARKER =>
 // attribute is proctime indicator.
-// We use a null literal and generate a timestamp when we need it.
+// we use a null literal and generate a timestamp when we need it.
 generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
   case idx =>
-// regular attribute. Access attribute in input data type.
-generateInputAccess(input1, input1Term, idx)
+// get type of result field
+val outIdx = input1Mapping.indexOf(idx)
+val outType = returnType match {
+  case pt: PojoTypeInfo[_] => 
pt.getTypeAt(resultFieldNames(outIdx))
+  case ct: CompositeType[_] => ct.getTypeAt(outIdx)
+  case t: TypeInformation[_] => t
+}
+val inputAccess = generateInputAccess(input1, input1Term, idx)
+// Change output type to rowtime indicator
+if (FlinkTypeFactory.isRowtimeIndicatorType(outType) &&
+  (inputAccess.resultType == Types.LONG || inputAccess.resultType 
== Types.SQL_TIMESTAMP)) {
+  // Hard cast possible because LONG, TIMESTAMP, and 
ROW_TIMEINDICATOR are internally
--- End diff --

`ROWTIME_INDICATOR`


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179065#comment-16179065
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r140778116
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
 ---
@@ -30,10 +30,16 @@ package org.apache.flink.table.sources
 trait DefinedRowtimeAttribute {
 
   /**
-* Defines a name of the event-time attribute that represents Flink's
-* event-time. Null if no rowtime should be available.
+* Defines a name of the event-time attribute that represents Flink's 
event-time, i.e., an
+* attribute that is aligned with the watermarks of the table.
--- End diff --

`that is aligned with the watermarks of the underlying DataStream`?


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179070#comment-16179070
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r140778313
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
 ---
@@ -30,10 +30,16 @@ package org.apache.flink.table.sources
 trait DefinedRowtimeAttribute {
 
   /**
-* Defines a name of the event-time attribute that represents Flink's
--- End diff --

Maybe also update the docs of the class?


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179071#comment-16179071
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r140774786
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -255,11 +255,30 @@ abstract class CodeGenerator(
 generateRowtimeAccess()
   case TimeIndicatorTypeInfo.PROCTIME_MARKER =>
 // attribute is proctime indicator.
-// We use a null literal and generate a timestamp when we need it.
+// we use a null literal and generate a timestamp when we need it.
 generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
   case idx =>
-// regular attribute. Access attribute in input data type.
-generateInputAccess(input1, input1Term, idx)
+// get type of result field
--- End diff --

add a comment that this is needed for `TableSource`?


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179066#comment-16179066
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r140771682
  
--- Diff: docs/dev/table/streaming.md ---
@@ -336,27 +336,24 @@ val windowedTable = tEnv
 
 ### Event time
 
-Event time allows a table program to produce results based on the time 
that is contained in every record. This allows for consistent results even in 
case of out-of-order events or late events. It also ensures replayable results 
of the table program when reading records from persistent storage. 
+Event time allows a table program to produce results based on the time 
that is contained in every record. This allows for consistent results even in 
case of out-of-order events or late events. It also ensures replayable results 
of the table program when reading records from persistent storage.
 
 Additionally, event time allows for unified syntax for table programs in 
both batch and streaming environments. A time attribute in a streaming 
environment can be a regular field of a record in a batch environment.
 
 In order to handle out-of-order events and distinguish between on-time and 
late events in streaming, Flink needs to extract timestamps from events and 
make some kind of progress in time (so-called [watermarks]({{ site.baseurl 
}}/dev/event_time.html)).
 
 An event time attribute can be defined either during DataStream-to-Table 
conversion or by using a TableSource. 
 
-The Table API & SQL assume that in both cases timestamps and watermarks 
have been generated in the [underlying DataStream API]({{ site.baseurl 
}}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a 
`TableSource` with knowledge about the incoming data's characteristics and is 
hidden from the end user of the API.
-
-
  During DataStream-to-Table Conversion
 
-The event time attribute is defined with the `.rowtime` property during 
schema definition. 
+The event time attribute is defined with the `.rowtime` property during 
schema definition. Timestamps and watermarks must have been assigned in the 
`DataStream` that is converted.
--- End diff --

Add a link to `dev/event_timestamps_watermarks` again?


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179062#comment-16179062
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r140778244
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
 ---
@@ -30,10 +30,16 @@ package org.apache.flink.table.sources
 trait DefinedRowtimeAttribute {
 
   /**
-* Defines a name of the event-time attribute that represents Flink's
-* event-time. Null if no rowtime should be available.
+* Defines a name of the event-time attribute that represents Flink's 
event-time, i.e., an
+* attribute that is aligned with the watermarks of the table.
+* An attribute with the given name must be present in the schema of 
the [[TableSource]].
+* The attribute must be of type [[Long]] or [[java.sql.Timestamp]].
 *
-* The field will be appended to the schema provided by the 
[[TableSource]].
+* The method should return null if no rowtime attribute is defined.
+*
+* @return The name of the field that represents the event-time field 
and which is aligned
+* with the watermarks of the table. The field must be present 
in the schema of the
--- End diff --

Some here.


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179069#comment-16179069
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r140777209
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
 ---
@@ -53,18 +53,11 @@ object StreamTableSourceTable {
 
 val original = TableEnvironment.getFieldIndices(tableSource)
 
-// append rowtime marker
-val withRowtime = if (rowtime.isDefined) {
--- End diff --

remove unused `rowtime`


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179064#comment-16179064
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r140782081
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
 ---
@@ -121,19 +161,81 @@ class TableSourceTest extends TableTestBase {
   )
 util.verifyTable(t, expected)
   }
+
+  @Test
+  def testProjectableProcTimeTableSource(): Unit = {
+// ensures that projection is not pushed into table source with 
proctime indicators
+val util = streamTestUtil()
+
+val projectableTableSource = new TestProctimeSource("pTime") with 
ProjectableTableSource[Row] {
+  override def projectFields(fields: Array[Int]): TableSource[Row] = {
+// ensure this method is not called!
+Assert.fail()
+null.asInstanceOf[TableSource[Row]]
+  }
+}
+util.tableEnv.registerTableSource("PTimeTable", projectableTableSource)
+
+val t = util.tableEnv.scan("PTimeTable")
+  .select('name, 'val)
+  .where('val > 10)
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+"StreamTableSourceScan(table=[[PTimeTable]], fields=[id, val, 
name, pTime])",
+term("select", "name", "val"),
+term("where", ">(val, 10)")
+  )
+util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testProjectableRowTimeTableSource(): Unit = {
--- End diff --

Does filter push down work? With rowtime and proctime?


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179067#comment-16179067
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r140775690
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -47,34 +46,18 @@ class FlinkLogicalTableSourceScan(
   }
 
   override def deriveRowType(): RelDataType = {
-val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-
-val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
-val fieldTypes = 
TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
 
-val fields = fieldNames.zip(fieldTypes)
-
-val withRowtime = tableSource match {
-  case timeSource: DefinedRowtimeAttribute if 
timeSource.getRowtimeAttribute != null =>
-val rowtimeAttribute = timeSource.getRowtimeAttribute
-fields :+ (rowtimeAttribute, 
TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
-  case _ =>
-fields
-}
+val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
 
-val withProctime = tableSource match {
-  case timeSource: DefinedProctimeAttribute if 
timeSource.getProctimeAttribute != null =>
-val proctimeAttribute = timeSource.getProctimeAttribute
-withRowtime :+ (proctimeAttribute, 
TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
-  case _ =>
-withRowtime
+tableSource match {
+  case s: StreamTableSource[_] =>
+StreamTableSourceTable.deriveRowTypeOfTableSource(s, 
flinkTypeFactory)
+  case b: BatchTableSource[_] =>
--- End diff --

Replace `b` with `_` to remove IDE warning.


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16179068#comment-16179068
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4710#discussion_r140776048
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
 ---
@@ -33,6 +32,9 @@ class PushProjectIntoTableSourceScanRule extends 
RelOptRule(
   override def matches(call: RelOptRuleCall): Boolean = {
 val scan: FlinkLogicalTableSourceScan = 
call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
 scan.tableSource match {
+  // projection pushdown is not supported for sources that provide 
time indicators
+  case r: DefinedRowtimeAttribute if r.getRowtimeAttribute != null => 
false
--- End diff --

Can you create a follow-up issue for this?


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178107#comment-16178107
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/4710
  
LGTM overall +1.

One question: since we now cast `ROWTIME` / `PROCTIME` directly to `LONG`, 
I wonder, do we want to revisit the decision that creates dedicated types for 
`ROWTIME` / `PROCTIME`?


> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16178004#comment-16178004
 ] 

ASF GitHub Bot commented on FLINK-7446:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/4710

[FLINK-7446] [table] Change DefinedRowtimeAttribute to work on existing 
field.

## What is the purpose of the change

Changes the contract of the `DefinedRowtimeAttribute` interface. The 
rowtime attribute is no longer appended to the schema of the row but marks an 
existing field in the input that will be handled as event time attribute. The 
specified field must be of type `Long` or `Timestamp`. The watermarks of 
`DataStream` must be aligned to the specified field.

## Brief change log

- rowtime fields are no longer appended but an existing Long or Timestamp 
field is marked as event time field
- Add checks that projections are not pushed to `TableSources` that 
implement `DefinedRowtimeAttribute` or `DefinedProctimeAttribute`
- Added several tests for table sources with rowtime or proctime attributes
- Updated the documenation

## Verifying this change

- Check the added test cases

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**

## Documentation

- Documentation is updated according to the changes of the PR.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableExistingField

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4710.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4710


commit 28f11bcf989e30d14b09b43891ee6012d77960aa
Author: Fabian Hueske 
Date:   2017-09-10T22:05:06Z

[FLINK-7446] [table] Change DefinedRowtimeAttribute to work on existing 
field.




> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Fabian Hueske
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-10 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16160671#comment-16160671
 ] 

Jark Wu commented on FLINK-7446:


Hi [~fhueske] , feel free to take this, I didn't start yet.

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-09-08 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158910#comment-16158910
 ] 

Fabian Hueske commented on FLINK-7446:
--

Hi [~jark], did you start to work on this issue? 
I just went ahead without checking the JIRA :-(

If you have something, I'd drop my code.

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-28 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144662#comment-16144662
 ] 

Jark Wu commented on FLINK-7446:


[~fhueske] yes, I created FLINK-7548 to discuss this.

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-28 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143517#comment-16143517
 ] 

Fabian Hueske commented on FLINK-7446:
--

I think that's a good plan [~jark]. 
However, we should design it in a way that we can support periodic and 
punctuated watermarks. 

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-22 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137794#comment-16137794
 ] 

Jark Wu commented on FLINK-7446:


In the long run, we can provide a new interface called {{DefinedWatermark}}, 
which has two methods {{getRowtimeAttribute}} (can only be an existing field) 
and {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked 
deprecated. In addition, we can provide some built-in watermark generators, 
such as {{AscendingTimestamp}}, {{BoundedOutOfOrderness}}. What do you think ? 

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137358#comment-16137358
 ] 

Fabian Hueske commented on FLINK-7446:
--

I agree that the watermark generation for TableSource's should be reworked. 
However, I'm not sure if we will achieve that before the next release. 
If we want to support to define an event-time indicator as an existing field 
and want to be sure that the implementation of the {{TableSource}} is correct, 
we would have to compare the {{StreamRecord}} timestamp with the existing 
attribute at runtime for each record.

However, since {{TableSource}} is an interface for rather experienced users 
(rather DBA than DB user), I'd be fine to omit this safety check. 

In the long run I agree with [~xccui]. We should have a watermark generator 
interface that works on an existing field with built-in implementations for the 
common cases.

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-20 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134448#comment-16134448
 ] 

Jark Wu commented on FLINK-7446:


>From my perspective, I think TableSource should provide an interface to define 
>timestamp and watermark at the same time. The {{DefinedRowtimeAttribute}} is 
>not a good design. A TableSource is not come from a DataStream, it is 
>converted into DataStream, so that the timestamp and watermark should be 
>defined by the TableSource not the DataStream. 

"generate timestamps and watermarks and validate that field and extracted 
timestamp are the same" is the validation happens at runtime? If it is, I 
prefer the later one. 

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-18 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133050#comment-16133050
 ] 

Xingcan Cui commented on FLINK-7446:


Thanks for the response, [~fhueske]. From my perspective, I prefer the later 
option (create new interfaces/operators...). Maybe we can provide one or more 
default watermark generators. Users can directly set them by providing some 
parameters (e.g., the watermark interval and the expected delay to the latest 
rowtime). Moreover, if the provided watermark generators can not meet the 
requirements, users can implement their own ones. What do you think, [~jark]?

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-18 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131805#comment-16131805
 ] 

Fabian Hueske commented on FLINK-7446:
--

That's in fact a good question. In principle we could avoid timestamp 
extraction. However, we need to generate watermarks and both things go together 
in the DataStream API. I see two options: generate timestamps and watermarks 
and validate that field and extracted timestamp are the same or create new 
interfaces / operators that generate watermarks based on a field. 

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-17 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131629#comment-16131629
 ] 

Xingcan Cui commented on FLINK-7446:


Now that the rowtime field is an existing one in the stream, shall we consider 
extracting the timestamps automatically instead of forcing the users to assign 
them with the {{TimestampAssigner}} in advance. What if the timestamps assigned 
in the {{StreamRecord}} and in the {{Row}} do not match.

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-17 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131616#comment-16131616
 ] 

Jark Wu commented on FLINK-7446:


[~fhueske] Great! I will make a pull request in the next days.

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-17 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131261#comment-16131261
 ] 

Fabian Hueske commented on FLINK-7446:
--

I think the existing {{DefinedRowtimeAttribute}} interface can be used for that 
as well. If the field name returned by 
{{DefinedRowtimeAttribute.getRowtimeAttribute()}} is included in the field 
names returned by {{TableSource}} we can use the existing field as time 
indicator attribute.

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)