[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-12-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710249#comment-16710249
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime 
DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#issuecomment-444533622
 
 
   @pnowojski Hi, thanks a lot for your review and great suggestions. I have 
updated the pr according to your suggestions. Changes mainly include:
   - Add `UnresolvedKeyFieldReference`
   - Rename `LastRow` to `UpsertToRetraction`
   - Generate `UpsertToRetraction` from `RelTimeIndicatorConverter` to 
`EnumerableToLogicalTableScan`
   - Add `RemoveDataStreamUpsertToRetractionRule` to `UpsertToRetraction` 
relnode if it is a no-op node.
   Would be great if you can take a look when you are free. Thanks a lot.
   
   Best, Hequn


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702649#comment-16702649
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237328473
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalLastRow.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalLastRow
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalLastRow(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, child)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+new FlinkLogicalLastRow(cluster, traitSet, inputs.get(0), keyNames)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+val child = this.getInput
+val rowCnt = mq.getRowCount(child)
+// take rowCnt and fieldCnt into account, so that cost will be smaller 
when generate LastRow
+// after Calc.
+planner.getCostFactory.makeCost(rowCnt, rowCnt * 
child.getRowType.getFieldCount, 0)
 
 Review comment:
   Yes, it would be better if we know it is a no-op node in the logical 
optimize phase. I think the computation is ok since it is always right to push 
calc down for the plan. The plan is right. However, it is true that it will 
introduce some side effect, i.e., with greater search space during the 
optimization. 
   
   It is not easy to know whether it is a no-op node in the logical phase since 
the retraction rules have not yet been performed. Do you have any suggestions?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702618#comment-16702618
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237328473
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalLastRow.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalLastRow
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalLastRow(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, child)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+new FlinkLogicalLastRow(cluster, traitSet, inputs.get(0), keyNames)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+val child = this.getInput
+val rowCnt = mq.getRowCount(child)
+// take rowCnt and fieldCnt into account, so that cost will be smaller 
when generate LastRow
+// after Calc.
+planner.getCostFactory.makeCost(rowCnt, rowCnt * 
child.getRowType.getFieldCount, 0)
 
 Review comment:
   Yes, it would be better if we know it is a no-op node in the logical 
optimize phase. I think the computation is ok since it is always right to push 
calc down for the plan. The plan is right. However, it is true that it will 
introduce some side effect, i.e., with greater search space. 
   
   It is not easy to know whether it is a no-op node in the logical phase since 
the retraction rules have not yet been performed. Do you have any suggestions?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702619#comment-16702619
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237328203
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
 ##
 @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], 
inputType: TypeInforma
 }
   }
 
+  /**
+* Converts the [[DataStream]] with upsert messages into a [[Table]] with 
keys.
+*
+* The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag, the second
+* field holds the record. A true [[Boolean]] flag indicates an update 
message, a false flag
+* indicates a delete message.
+*
+* The field name and key of the new [[Table]] can be specified like this:
+*
+* {{{
+*   val env = StreamExecutionEnvironment.getExecutionEnvironment
+*   val tEnv = TableEnvironment.getTableEnvironment(env)
+*
+*   val stream: DataStream[(Boolean, (String, Int))] = ...
+*   val table = stream.toKeyedTable(tEnv, 'name.key, 'amount)
+* }}}
+*
+* If field names are not explicitly specified, names are automatically 
extracted from the type
+* of the [[DataStream]].
+* If keys are not explicitly specified, an empty key will be used and the 
table will be a
+* single row table.
+*
+* @param tableEnv The [[StreamTableEnvironment]] in which the new 
[[Table]] is created.
+* @param fields The field names of the new [[Table]] (optional).
+* @return The resulting [[Table]].
+*/
+  def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): 
Table = {
 
 Review comment:
   I think the difference between `toTableFromRetractStream` and `toTable` is 
`toTable` means `toTableFromAppendStream`. The method names are consistence.
   Methods in `StreamTableEnvironment`:
   - fromAppendStream
   - fromUpsertStream
   - fromRetractStream
   
   Methods in `DataStreamConversions`:
   - toTableFromAppendStream
   - toTableFromUpsertStream
   - toTableFromRetractStream
   
   Renaming `fromDataStream` to `fromAppendStream` is consistent with renaming 
`toDataStream` to `toAppendStream` by 
[FLINK-6543](https://issues.apache.org/jira/browse/FLINK-6543)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702617#comment-16702617
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237328431
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+if (upsertStreamTable != null) {
+  val relTypes = scan.getRowType.getFieldList.map(_.getType)
+  val timeIndicatorIndexes = relTypes.zipWithIndex
+.filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+.map(_._2)
+  val input = if (timeIndicatorIndexes.nonEmpty) {
+// materialize time indicator
+val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs)
+materializerUtils.projectAndMaterializeFields(rewrittenScan, 
timeIndicatorIndexes.toSet)
+  } else {
+scan
+  }
+
+  LogicalLastRow.create(
 
 Review comment:
   You are right. 
   Currently, there is no such no-op node in the execution plan. If you still 
concern with the no-op node in the optimization plan, I can add a rule in the 
RetractionRules to remove it. In this way, we can name `LastRow` to 
`UpsertToRetractionConverter`. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702620#comment-16702620
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237329703
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
 ##
 @@ -36,25 +36,32 @@ class DataStreamScanRule
 
   override def matches(call: RelOptRuleCall): Boolean = {
 val scan: FlinkLogicalNativeTableScan = 
call.rel(0).asInstanceOf[FlinkLogicalNativeTableScan]
-val dataSetTable = scan.getTable.unwrap(classOf[DataStreamTable[Any]])
-dataSetTable match {
-  case _: DataStreamTable[Any] =>
-true
-  case _ =>
-false
-}
+val appendTable = scan.getTable.unwrap(classOf[AppendStreamTable[Any]])
+val upsertTable = scan.getTable.unwrap(classOf[UpsertStreamTable[Any]])
+
+appendTable != null || upsertTable != null
 
 Review comment:
   I think you are right.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701652#comment-16701652
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237018239
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
 ##
 @@ -36,25 +36,32 @@ class DataStreamScanRule
 
   override def matches(call: RelOptRuleCall): Boolean = {
 val scan: FlinkLogicalNativeTableScan = 
call.rel(0).asInstanceOf[FlinkLogicalNativeTableScan]
-val dataSetTable = scan.getTable.unwrap(classOf[DataStreamTable[Any]])
-dataSetTable match {
-  case _: DataStreamTable[Any] =>
-true
-  case _ =>
-false
-}
+val appendTable = scan.getTable.unwrap(classOf[AppendStreamTable[Any]])
+val upsertTable = scan.getTable.unwrap(classOf[UpsertStreamTable[Any]])
+
+appendTable != null || upsertTable != null
 
 Review comment:
   can we avoid those nulls (in multiple places)? For example by using 
`isInstanceOf` or match? Handling null is dangerous and error prone.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701654#comment-16701654
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237008957
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, SingleRel}
+
+/**
+  * Represent an upsert source.
+  *
+  * @param keyNames   The upsert key names.
+  */
+class LogicalLastRow(
 
 Review comment:
   (I've to this responded above)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701656#comment-16701656
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237017454
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalLastRow.scala
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.logical
+
+import java.util.{List => JList}
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.{RelNode, SingleRel}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.table.plan.logical.rel.LogicalLastRow
+import org.apache.flink.table.plan.nodes.FlinkConventions
+
+class FlinkLogicalLastRow(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+child: RelNode,
+val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, child)
+  with FlinkLogicalRel {
+
+  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
+new FlinkLogicalLastRow(cluster, traitSet, inputs.get(0), keyNames)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): 
RelOptCost = {
+val child = this.getInput
+val rowCnt = mq.getRowCount(child)
+// take rowCnt and fieldCnt into account, so that cost will be smaller 
when generate LastRow
+// after Calc.
+planner.getCostFactory.makeCost(rowCnt, rowCnt * 
child.getRowType.getFieldCount, 0)
 
 Review comment:
   as mentioned above (in the no-op/renaming discussion), this cost computation 
is not/will not be correct in case of no-op


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701655#comment-16701655
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237008338
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+if (upsertStreamTable != null) {
+  val relTypes = scan.getRowType.getFieldList.map(_.getType)
+  val timeIndicatorIndexes = relTypes.zipWithIndex
+.filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+.map(_._2)
+  val input = if (timeIndicatorIndexes.nonEmpty) {
+// materialize time indicator
+val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs)
+materializerUtils.projectAndMaterializeFields(rewrittenScan, 
timeIndicatorIndexes.toSet)
+  } else {
+scan
+  }
+
+  LogicalLastRow.create(
 
 Review comment:
   I have revisited our previous discussion in the design/google doc and I 
still have the same concern as I had then:
   > In that case this is strange. It's like having 
filter/projection/aggregation nodes that after pruning/removing/pushing them 
down are sometimes NO-OPs and sometimes not. Besides being strange it can have 
some negative side effects:
   > - presence of a no-op node (when it doesn't need to be there) can 
block/mess with other optimisations or make them more complex
   > - it will pollute printed explain plan to the user. This is important, 
since this will be very costly node and it will be very hard to explain users 
when this node requires huge state and when not
   > - it would make cost computations more complicated
   
   Back then it sparked a discussion, where you were saying that it will never 
be a no-op and you wanted to `LastRow` always have a state and purpose besides 
upsert to retraction conversion (filtering out empty deletes). However that was 
resolved and filtering out empty deletes was dropped. Thus it brings me back to 
the issue of having this as a no-op node in the plan.
   
   This connects with your other response regarding naming `LastRow`
   > The LastRow node may be a no-op node in the case such as upsert source -> 
calc -> upsert sink. While LastRow will convert upsert stream to retract stream 
if a downstream node needs it to, such as upsert source -> calc -> retract 
sink. Whether convert to retract stream will be decided by RetractionRules.
   
   I would still argue that it should be named `UpsertToRetractionConverter` 
(or sth along those lines) and if not needed, it should be not in the plan. 
Maybe this means that our `RetractionRules` are not sufficient and needs some 
refactoring.
   
   I could see couple of solutions to that, but probably the best would be to 
deduce whether we need to insert `UpsertToRetractionConverter` or not inside 
the rule that is supposed to create it. Further optimisations/rewrites would 
have to correctly handle/preserve semantic/trait of upserts vs retractions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA

[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701653#comment-16701653
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r237004073
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
 ##
 @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], 
inputType: TypeInforma
 }
   }
 
+  /**
+* Converts the [[DataStream]] with upsert messages into a [[Table]] with 
keys.
+*
+* The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag, the second
+* field holds the record. A true [[Boolean]] flag indicates an update 
message, a false flag
+* indicates a delete message.
+*
+* The field name and key of the new [[Table]] can be specified like this:
+*
+* {{{
+*   val env = StreamExecutionEnvironment.getExecutionEnvironment
+*   val tEnv = TableEnvironment.getTableEnvironment(env)
+*
+*   val stream: DataStream[(Boolean, (String, Int))] = ...
+*   val table = stream.toKeyedTable(tEnv, 'name.key, 'amount)
+* }}}
+*
+* If field names are not explicitly specified, names are automatically 
extracted from the type
+* of the [[DataStream]].
+* If keys are not explicitly specified, an empty key will be used and the 
table will be a
+* single row table.
+*
+* @param tableEnv The [[StreamTableEnvironment]] in which the new 
[[Table]] is created.
+* @param fields The field names of the new [[Table]] (optional).
+* @return The resulting [[Table]].
+*/
+  def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): 
Table = {
 
 Review comment:
   Generally speaking `toTable`, `toTableFromUpsertStream`, 
`toTableFromRetractStream` sounds good to me, but what's the difference between 
`toTableFromRetractStream` vs `toTable`? Also secondly, would 
`toTableFromRetractStream` call `tableEnv.fromAppendStream()`? If so, there 
would be one another naming inconsistency. 
   
   Maybe renaming `fromDataStream ` to `fromAppendStream` was a bit incorrect? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701289#comment-16701289
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236914923
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, SingleRel}
+
+/**
+  * Represent an upsert source.
+  *
+  * @param keyNames   The upsert key names.
+  */
+class LogicalLastRow(
 
 Review comment:
   The `LastRow` node may be a no-op node in the case such as `upsert source -> 
calc -> upsert sink`. While `LastRow` will convert upsert stream to retract 
stream if a downstream node needs it to, such as `upsert source -> calc -> 
retract sink`. Whether convert to retract stream will be decided by 
RetractionRules.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701280#comment-16701280
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236914923
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, SingleRel}
+
+/**
+  * Represent an upsert source.
+  *
+  * @param keyNames   The upsert key names.
+  */
+class LogicalLastRow(
 
 Review comment:
   The `LastRow` node may be a no-op node in the case such as `upsert source -> 
calc -> upsert sink`. And `LastRow` will convert upsert stream to retract 
stream if a downstream node needs it to, such as `upsert source -> calc -> 
retract sink`. Whether convert to retract stream will be decided by 
RetractionRules.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701276#comment-16701276
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236913658
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
 ##
 @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], 
inputType: TypeInforma
 }
   }
 
+  /**
+* Converts the [[DataStream]] with upsert messages into a [[Table]] with 
keys.
+*
+* The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag, the second
 
 Review comment:
   The incoming messages from the source `DataStream` are expected to be 
encoded as `Tuple2`. I will change the description according to your 
suggestion. Thank you.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701274#comment-16701274
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236912969
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
 ##
 @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], 
inputType: TypeInforma
 }
   }
 
+  /**
+* Converts the [[DataStream]] with upsert messages into a [[Table]] with 
keys.
+*
+* The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag, the second
+* field holds the record. A true [[Boolean]] flag indicates an update 
message, a false flag
+* indicates a delete message.
+*
+* The field name and key of the new [[Table]] can be specified like this:
+*
+* {{{
+*   val env = StreamExecutionEnvironment.getExecutionEnvironment
+*   val tEnv = TableEnvironment.getTableEnvironment(env)
+*
+*   val stream: DataStream[(Boolean, (String, Int))] = ...
+*   val table = stream.toKeyedTable(tEnv, 'name.key, 'amount)
+* }}}
+*
+* If field names are not explicitly specified, names are automatically 
extracted from the type
+* of the [[DataStream]].
+* If keys are not explicitly specified, an empty key will be used and the 
table will be a
+* single row table.
+*
+* @param tableEnv The [[StreamTableEnvironment]] in which the new 
[[Table]] is created.
+* @param fields The field names of the new [[Table]] (optional).
+* @return The resulting [[Table]].
+*/
+  def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): 
Table = {
 
 Review comment:
   I prefer the name starts with toTable. So the methods would be: `toTable`, 
`toTableFromUpsertStream`, `toTableFromRetractStream`. We can find these 
methods easier, similar to `fromUpsertStream `, `fromAppendStream `, 
`fromDataSet `. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701263#comment-16701263
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236911934
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
 
 Review comment:
   I haven't thought much about how to support key definition on append stream. 
I think it may not be a no-op key definition. Maybe we have to do the following 
things:
   - Add similar logic in `UniqueKeyExtractor` to get keys.
   - Probably have to check whether data meet the key constraint. i.e, throw an 
exception when key data is not distinct. 
   - etc.
   Given the above things. I think we would better to not support it now? 
   I will rephrase the error message according to your suggestions. Thanks a 
lot.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701264#comment-16701264
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236911934
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
 
 Review comment:
   I haven't thought much about how to support key definition on append stream. 
I think it may not be a no-op key definition. Maybe we have to do the following 
things:
   - Add similar logic in `UniqueKeyExtractor` to get keys.
   - Probably have to check whether data meet the key constraint. i.e, throw an 
exception when key data is not distinct. 
   - etc.
   
   Given the above things. I think we would better to not support it now? 
   I will rephrase the error message according to your suggestions. Thanks a 
lot.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701252#comment-16701252
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236909228
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
+}
+
 // adjust field indexes and field names
 val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, 
proctime)
 val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, 
proctime)
 
-val dataStreamTable = new DataStreamTable[T](
+val dataStreamTable = new AppendStreamTable[T](
   dataStream,
   indexesWithIndicatorFields,
   namesWithIndicatorFields
 )
 registerTableInternal(name, dataStreamTable)
   }
 
+  /**
+* Registers an upsert [[DataStream]] as a table under a given name in the 
[[TableEnvironment]]'s
+* catalog.
+*
+* @param name The name under which the table is registered in the catalog.
+* @param dataStream The [[DataStream]] to register as table in the catalog.
+* @tparam T the type of the [[DataStream]].
+*/
+  protected def registerUpsertStreamInternal[T](name: String, dataStream: 
DataStream[T]): Unit = {
 
 Review comment:
   Oh, sorry about my typo. It should be "The primary key would not be 
necessary."


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701251#comment-16701251
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236909214
 
 

 ##
 File path: 
flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
 ##
 @@ -54,9 +54,9 @@ object StreamSQLExample {
   Order(4L, "beer", 1)))
 
 // convert DataStream to Table
-var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
 
 Review comment:
   ok, I will not forget about it. Thank you.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700205#comment-16700205
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236606838
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, SingleRel}
+
+/**
+  * Represent an upsert source.
+  *
+  * @param keyNames   The upsert key names.
+  */
+class LogicalLastRow(
 
 Review comment:
   >  We can't name it UpsertToRetractionsConverter since we don't always 
convert upsert to retractions in this node. 
   
   can you elaborate on that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700210#comment-16700210
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236605473
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
 ##
 @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], 
inputType: TypeInforma
 }
   }
 
+  /**
+* Converts the [[DataStream]] with upsert messages into a [[Table]] with 
keys.
+*
+* The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag, the second
 
 Review comment:
   The messages in the table will be encoded as `Tuple2`? Or the incoming 
messages from the source `DataStream` are expected to be encoded as `Tuple2`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700204#comment-16700204
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236606505
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ##
 @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   case f ~ _ ~ _ => RowtimeAttribute(f)
 }
 
+  // key
+
+  lazy val key: PackratParser[Expression] =
+(aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ KEY 
^^ {
 
 Review comment:
   Yes, I meant that this `(aliasMapping | "(" ~> aliasMapping <~ ")" | 
fieldReference)` is duplicated couple of times here. If it is possible to 
deduplicate it to:
   ```
   lazy val aliasOrFieldReference = aliasMapping | "(" ~> aliasMapping <~ ")" | 
fieldReference
   ```
   I think it would be better to do so.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700208#comment-16700208
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236601205
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
 
 Review comment:
   I meant that this check doesn't prevent from any bugs or from any other 
exceptions. It's just to block user from specifying a no-op key definition, 
right? That's why I'm not sure if we should block it.
   
   If we decide to keep this check/exception, I would refrain from naming 
`fromUpsertStream` in the exception. Maybe rephrasing it to:
   > Defining key on append stream do not have any effects
   
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700209#comment-16700209
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236597321
 
 

 ##
 File path: 
flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
 ##
 @@ -54,9 +54,9 @@ object StreamSQLExample {
   Order(4L, "beer", 1)))
 
 // convert DataStream to Table
-var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
 
 Review comment:
   I would prefer to document this rename in this PR, but if you want it all do 
together with upsert sources documentation then OK, but let's not forget about 
it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700207#comment-16700207
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236604746
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
 ##
 @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], 
inputType: TypeInforma
 }
   }
 
+  /**
+* Converts the [[DataStream]] with upsert messages into a [[Table]] with 
keys.
+*
+* The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag, the second
+* field holds the record. A true [[Boolean]] flag indicates an update 
message, a false flag
+* indicates a delete message.
+*
+* The field name and key of the new [[Table]] can be specified like this:
+*
+* {{{
+*   val env = StreamExecutionEnvironment.getExecutionEnvironment
+*   val tEnv = TableEnvironment.getTableEnvironment(env)
+*
+*   val stream: DataStream[(Boolean, (String, Int))] = ...
+*   val table = stream.toKeyedTable(tEnv, 'name.key, 'amount)
+* }}}
+*
+* If field names are not explicitly specified, names are automatically 
extracted from the type
+* of the [[DataStream]].
+* If keys are not explicitly specified, an empty key will be used and the 
table will be a
+* single row table.
+*
+* @param tableEnv The [[StreamTableEnvironment]] in which the new 
[[Table]] is created.
+* @param fields The field names of the new [[Table]] (optional).
+* @return The resulting [[Table]].
+*/
+  def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): 
Table = {
 
 Review comment:
   Maybe `upsertStreamToTable`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700206#comment-16700206
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236601884
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
+}
+
 // adjust field indexes and field names
 val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, 
proctime)
 val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, 
proctime)
 
-val dataStreamTable = new DataStreamTable[T](
+val dataStreamTable = new AppendStreamTable[T](
   dataStream,
   indexesWithIndicatorFields,
   namesWithIndicatorFields
 )
 registerTableInternal(name, dataStreamTable)
   }
 
+  /**
+* Registers an upsert [[DataStream]] as a table under a given name in the 
[[TableEnvironment]]'s
+* catalog.
+*
+* @param name The name under which the table is registered in the catalog.
+* @param dataStream The [[DataStream]] to register as table in the catalog.
+* @tparam T the type of the [[DataStream]].
+*/
+  protected def registerUpsertStreamInternal[T](name: String, dataStream: 
DataStream[T]): Unit = {
 
 Review comment:
   Ok, I get it - upsert streams without a primary key are single row tables. 
But what do you mean by:
   > The primary key would be necessary.
   
   ? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1663#comment-1663
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236537434
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -1101,6 +1114,10 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 referenceByName(name, p).map((_, name))
   case Alias(UnresolvedFieldReference(origName), name: String, _) =>
 referenceByName(origName, p).map((_, name))
+  case (Key(UnresolvedFieldReference(name: String))) =>
 
 Review comment:
   UnresolvedKeyFieldReference works fine. Thanks for your suggestions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699902#comment-16699902
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

sunjincheng121 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236513562
 
 

 ##
 File path: 
flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
 ##
 @@ -54,9 +54,9 @@ object StreamSQLExample {
   Order(4L, "beer", 1)))
 
 // convert DataStream to Table
-var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
 
 Review comment:
   +1 sync the changes to documentation and also agree the document change can 
be in an other PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699002#comment-16699002
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236267734
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, SingleRel}
+
+/**
+  * Represent an upsert source.
+  *
+  * @param keyNames   The upsert key names.
+  */
+class LogicalLastRow(
 
 Review comment:
   Year, I also think `LastRow` is not perfect. However, I can't find a better 
one. We can't name it `UpsertToRetractionsConverter ` since we don't always 
convert upsert to retractions in this node. `LastRow` does explain the purpose 
and the thing that it is doing, i.e. get the last row from the upsert stream by 
the defined key. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698994#comment-16698994
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236266862
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
 ##
 @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], 
inputType: TypeInforma
 }
   }
 
+  /**
+* Converts the [[DataStream]] with upsert messages into a [[Table]] with 
keys.
+*
+* The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag, the second
+* field holds the record. A true [[Boolean]] flag indicates an update 
message, a false flag
+* indicates a delete message.
+*
+* The field name and key of the new [[Table]] can be specified like this:
+*
+* {{{
+*   val env = StreamExecutionEnvironment.getExecutionEnvironment
+*   val tEnv = TableEnvironment.getTableEnvironment(env)
+*
+*   val stream: DataStream[(Boolean, (String, Int))] = ...
+*   val table = stream.toKeyedTable(tEnv, 'name.key, 'amount)
+* }}}
+*
+* If field names are not explicitly specified, names are automatically 
extracted from the type
+* of the [[DataStream]].
+* If keys are not explicitly specified, an empty key will be used and the 
table will be a
+* single row table.
+*
+* @param tableEnv The [[StreamTableEnvironment]] in which the new 
[[Table]] is created.
+* @param fields The field names of the new [[Table]] (optional).
+* @return The resulting [[Table]].
+*/
+  def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): 
Table = {
 
 Review comment:
   I think we can't use upsert to describe a Table. How about rename it to 
`toTableFromUpsert`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698991#comment-16698991
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236266642
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -1089,6 +1089,19 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 } else {
   referenceByName(origName, t).map((_, name))
 }
+  case (Key(UnresolvedFieldReference(name: String)), idx) =>
+if (isRefByPos) {
 
 Review comment:
   Same as bellow. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699001#comment-16699001
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236267700
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, SingleRel}
+
+/**
+  * Represent an upsert source.
 
 Review comment:
   I treat `LastRow` and `Source` as an entirety. However, I think you are 
right. Treat `LastRow` as a conversion node may be better.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699000#comment-16699000
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236267632
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/AppendStreamScan.scala
 ##
 @@ -36,20 +36,20 @@ import 
org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
   * It ensures that types without deterministic field order (e.g. POJOs) are 
not part of
   * the plan translation.
   */
-class DataStreamScan(
+class AppendStreamScan(
 
 Review comment:
   Yes, sorry about this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698998#comment-16698998
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236267295
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ##
 @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   case f ~ _ ~ _ => RowtimeAttribute(f)
 }
 
+  // key
+
+  lazy val key: PackratParser[Expression] =
+(aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ KEY 
^^ {
 
 Review comment:
   Do you mean deduplicate the code? I think we can add another `Expression`, 
e.g., `lazy val aliasOrFieldReference = aliasMapping | "(" ~> aliasMapping <~ 
")" | fieldReference`, and reuse it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698997#comment-16698997
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236267254
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ##
 @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   case f ~ _ ~ _ => RowtimeAttribute(f)
 }
 
+  // key
 
 Review comment:
   Ok, I will remove it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698990#comment-16698990
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236266595
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
 
 Review comment:
   Agree, specifing unique key on an AppendTable makes sense. For example, we 
only insert data into a table without an update. 
   Since supporting keys on AppendStream is another new feature. I think we can 
add this check now and remove it later if we support keys on AppendStream. The 
error message should be corrected as `Apply key on append stream has not been 
supported yet! You can use fromUpsertStream instead.`
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698995#comment-16698995
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236267147
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+if (upsertStreamTable != null) {
+  val relTypes = scan.getRowType.getFieldList.map(_.getType)
+  val timeIndicatorIndexes = relTypes.zipWithIndex
+.filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+.map(_._2)
+  val input = if (timeIndicatorIndexes.nonEmpty) {
+// materialize time indicator
+val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs)
+materializerUtils.projectAndMaterializeFields(rewrittenScan, 
timeIndicatorIndexes.toSet)
+  } else {
+scan
+  }
+
+  LogicalLastRow.create(
 
 Review comment:
   No. As we discussed before, `LastRow` will become a no-op for the case of 
`upsert source -> filter -> upsert sink`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698992#comment-16698992
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236266694
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -1101,6 +1114,10 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 referenceByName(name, p).map((_, name))
   case Alias(UnresolvedFieldReference(origName), name: String, _) =>
 referenceByName(origName, p).map((_, name))
+  case (Key(UnresolvedFieldReference(name: String))) =>
 
 Review comment:
   UnresolvedKeyFieldReference seems a good way. I will try to figure it out.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698833#comment-16698833
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236223294
 
 

 ##
 File path: 
flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
 ##
 @@ -54,9 +54,9 @@ object StreamSQLExample {
   Order(4L, "beer", 1)))
 
 // convert DataStream to Table
-var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
 
 Review comment:
   I plan to address the problems of document in 
[FLINK-10957](https://issues.apache.org/jira/browse/FLINK-10957). What do you 
think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698834#comment-16698834
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236223326
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
+}
+
 // adjust field indexes and field names
 val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, 
proctime)
 val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, 
proctime)
 
-val dataStreamTable = new DataStreamTable[T](
+val dataStreamTable = new AppendStreamTable[T](
   dataStream,
   indexesWithIndicatorFields,
   namesWithIndicatorFields
 )
 registerTableInternal(name, dataStreamTable)
   }
 
+  /**
+* Registers an upsert [[DataStream]] as a table under a given name in the 
[[TableEnvironment]]'s
+* catalog.
+*
+* @param name The name under which the table is registered in the catalog.
+* @param dataStream The [[DataStream]] to register as table in the catalog.
+* @tparam T the type of the [[DataStream]].
+*/
+  protected def registerUpsertStreamInternal[T](name: String, dataStream: 
DataStream[T]): Unit = {
 
 Review comment:
   The primary key would be necessary. It would be a single row table if the 
primary key has not been defined.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698770#comment-16698770
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236177333
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -1101,6 +1114,10 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 referenceByName(name, p).map((_, name))
   case Alias(UnresolvedFieldReference(origName), name: String, _) =>
 referenceByName(origName, p).map((_, name))
+  case (Key(UnresolvedFieldReference(name: String))) =>
 
 Review comment:
   Tbh, at this moment I'm not sure if adding `Key` as nested case class was a 
good idea. It adds quite a lot of boiler plate and special handling in multiple 
places, which contradicts a clean code. 
   
   I wonder if `UnresolvedFieldReference` shouldn't have some kind of "trait" 
that's a primary key. Where by a "trait" I mean any way to determine if that's 
a primary key or not. For example either some boolean flag or maybe introducing 
an `UnresolvedKeyFieldReference` that extends from `UnresolvedFieldReference`? 
I think both of those solutions would allow avoid this code/handling 
duplication in most of the places and special handle  `Key` only where it is 
necessary. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698771#comment-16698771
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236196723
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ##
 @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   case f ~ _ ~ _ => RowtimeAttribute(f)
 }
 
+  // key
 
 Review comment:
   nit: this comment is not helpful it just duplicates the field name


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698766#comment-16698766
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236197534
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 ##
 @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   case f ~ _ ~ _ => RowtimeAttribute(f)
 }
 
+  // key
+
+  lazy val key: PackratParser[Expression] =
+(aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ KEY 
^^ {
 
 Review comment:
   can we deduplicate `(aliasMapping | "(" ~> aliasMapping <~ ")" | 
fieldReference)`? I would expect so but I'm not familiar with this scala magic 
`ExpressionParser`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698763#comment-16698763
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236171634
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
 
 Review comment:
   I'm not sure if we need/want this check. For example for Temporal Joins, 
user might want to interpret upsert stream as an `AppendStreamTable`:
   
   
https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit#heading=h.yqq0cgo927fp
   
   also there might be other use cases where specifing primary key on an 
`AppendTable` makes sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698768#comment-16698768
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236198600
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, SingleRel}
+
+/**
+  * Represent an upsert source.
 
 Review comment:
   ? I think this comment is incorrect. This is not the source on it's own, but 
only a conversion class. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698759#comment-16698759
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236169075
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
+}
+
 // adjust field indexes and field names
 val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, 
proctime)
 val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, 
proctime)
 
-val dataStreamTable = new DataStreamTable[T](
+val dataStreamTable = new AppendStreamTable[T](
   dataStream,
   indexesWithIndicatorFields,
   namesWithIndicatorFields
 )
 registerTableInternal(name, dataStreamTable)
   }
 
+  /**
+* Registers an upsert [[DataStream]] as a table under a given name in the 
[[TableEnvironment]]'s
+* catalog.
+*
+* @param name The name under which the table is registered in the catalog.
+* @param dataStream The [[DataStream]] to register as table in the catalog.
+* @tparam T the type of the [[DataStream]].
+*/
+  protected def registerUpsertStreamInternal[T](name: String, dataStream: 
DataStream[T]): Unit = {
+
+val streamType: TypeInformation[T] = dataStream.getType match {
 
 Review comment:
   lines L586:L593 seems to be duplicated with L619:L626


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698762#comment-16698762
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236202082
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, SingleRel}
+
+/**
+  * Represent an upsert source.
+  *
+  * @param keyNames   The upsert key names.
+  */
+class LogicalLastRow(
 
 Review comment:
   I still think this is a bad and misleading name for this operator. `LastRow` 
doesn't explain neither the purpose, nor the actual thing that it's doing. If 
someone was asked without prior knowledge what `LastRow` RelNode/operator does, 
it would be impossible to guess the correct answer. It's even more confusing 
when you are binding `LastRow` with "upsert" name in the java doc:
   
   > Represent an upsert source.
   
   > The upsert key names.
   
   Please either change the name to something like 
`UpsertToRetractionsConverter` that explicitly states the purpose of this class 
(than you can keep `upsert` references in the java docs) or if you would like 
to keep naming by the thing that it does it would have to meet the following 
requirements:
   - `LastRow` is independent of the upsert -> retraction conversion logic
   - there is a reasonable chance that it will be used somewhere else outside 
of the upsert -> retraction conversion context 
   
   It's a bit like we would rename `LogicalJoin` to `HashTable`. Kind of true 
but misleading. It could make sense if:
   - `HashTable` is extracted to separate class
   - `LogicalJoin` relnode/operator/concept still exists, but is just using 
`HashTable`
   - `HashTable` is reused somewhere else


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698758#comment-16698758
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236187717
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
 ##
 @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], 
inputType: TypeInforma
 }
   }
 
+  /**
+* Converts the [[DataStream]] with upsert messages into a [[Table]] with 
keys.
+*
+* The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag, the second
+* field holds the record. A true [[Boolean]] flag indicates an update 
message, a false flag
+* indicates a delete message.
+*
+* The field name and key of the new [[Table]] can be specified like this:
+*
+* {{{
+*   val env = StreamExecutionEnvironment.getExecutionEnvironment
+*   val tEnv = TableEnvironment.getTableEnvironment(env)
+*
+*   val stream: DataStream[(Boolean, (String, Int))] = ...
+*   val table = stream.toKeyedTable(tEnv, 'name.key, 'amount)
+* }}}
+*
+* If field names are not explicitly specified, names are automatically 
extracted from the type
+* of the [[DataStream]].
+* If keys are not explicitly specified, an empty key will be used and the 
table will be a
+* single row table.
+*
+* @param tableEnv The [[StreamTableEnvironment]] in which the new 
[[Table]] is created.
+* @param fields The field names of the new [[Table]] (optional).
+* @return The resulting [[Table]].
+*/
+  def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): 
Table = {
 
 Review comment:
   This is some naming inconsistency: `toKeyedTable` vs `fromUpsertStream`. 
Rename `keyed` to `upsert`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698765#comment-16698765
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236177806
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -1112,13 +1129,17 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 exprs flatMap {
   case _: TimeAttribute =>
 None
-  case UnresolvedFieldReference(_) if referenced =>
+  case UnresolvedFieldReference(_) | Key(UnresolvedFieldReference(_)) 
if referenced =>
 // only accept the first field for an atomic type
 throw new TableException("Only the first field can reference an 
atomic type.")
   case UnresolvedFieldReference(name: String) =>
 referenced = true
 // first field reference is mapped to atomic type
 Some((0, name))
+  case Key(UnresolvedFieldReference(name: String)) =>
+referenced = true
 
 Review comment:
   ditto: another code duplication. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698773#comment-16698773
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236174343
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -1089,6 +1089,19 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 } else {
   referenceByName(origName, t).map((_, name))
 }
+  case (Key(UnresolvedFieldReference(name: String)), idx) =>
+if (isRefByPos) {
 
 Review comment:
   Please deduplicate this if and one if below, with non keyed versions. Either 
extract those if's to separate functions or convert whole match into a function 
sth like this:
   ```
   foo(expr, index) {
 match expr {
   case Key(keyExpr) => foo(keyExpr, index)
   case UnresolvedFieldReference(...) => ...
   case Alias(UnresolvedFieldReference(...)) => ...
 }
   }


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698769#comment-16698769
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236202531
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, SingleRel}
+
+/**
+  * Represent an upsert source.
+  *
+  * @param keyNames   The upsert key names.
 
 Review comment:
   if `keyNames` are "upsert key names" then just name it `upsertKeyNames` and 
drop the comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698772#comment-16698772
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236196242
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+if (upsertStreamTable != null) {
+  val relTypes = scan.getRowType.getFieldList.map(_.getType)
+  val timeIndicatorIndexes = relTypes.zipWithIndex
+.filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+.map(_._2)
+  val input = if (timeIndicatorIndexes.nonEmpty) {
+// materialize time indicator
+val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs)
+materializerUtils.projectAndMaterializeFields(rewrittenScan, 
timeIndicatorIndexes.toSet)
+  } else {
+scan
+  }
+
+  LogicalLastRow.create(
 
 Review comment:
   Do we ALWAYS convert upserts to retractions? Even for pipelines `upsert 
source -> filter -> upsert sink` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698760#comment-16698760
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236195894
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+val relTypes = scan.getRowType.getFieldList.map(_.getType)
+val getTimeIndicatorIndexes = relTypes.zipWithIndex
+  .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+  .map(_._2)
+if (upsertStreamTable != null) {
+  val input = if (getTimeIndicatorIndexes.size > 0) {
+// materialize time indicator
+val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs)
+materializerUtils.projectAndMaterializeFields(rewrittenScan, 
getTimeIndicatorIndexes.toSet)
+  } else {
+scan
+  }
+
+  LogicalLastRow.create(
 
 Review comment:
   But you could probably add this node when changing convention. We assume 
that `TabelScan` rel node represents all kinds of table scans and it always can 
produce retractions, while for some stage (`FlinkLogical`?) we freeze it to 
either retractions/upserts.
   
   Regardless of that, `RelTimeIndicatorConverter.scala` is not the right place 
to create `LogicalLastRow`. You could also add a single pass rule, that is 
applied only once, that inserts upsert -> retraction conversion.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698761#comment-16698761
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236163914
 
 

 ##
 File path: 
flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
 ##
 @@ -54,9 +54,9 @@ object StreamSQLExample {
   Order(4L, "beer", 1)))
 
 // convert DataStream to Table
-var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
 
 Review comment:
   This will also require changes in the documentation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698767#comment-16698767
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236168761
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
+}
+
 // adjust field indexes and field names
 val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, 
proctime)
 val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, 
proctime)
 
-val dataStreamTable = new DataStreamTable[T](
+val dataStreamTable = new AppendStreamTable[T](
   dataStream,
   indexesWithIndicatorFields,
   namesWithIndicatorFields
 )
 registerTableInternal(name, dataStreamTable)
   }
 
+  /**
+* Registers an upsert [[DataStream]] as a table under a given name in the 
[[TableEnvironment]]'s
+* catalog.
+*
+* @param name The name under which the table is registered in the catalog.
+* @param dataStream The [[DataStream]] to register as table in the catalog.
+* @tparam T the type of the [[DataStream]].
+*/
+  protected def registerUpsertStreamInternal[T](name: String, dataStream: 
DataStream[T]): Unit = {
 
 Review comment:
   aren't we missing here the user defined primary key? What's the value of 
upsert stream without a defined primary key?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698764#comment-16698764
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236197832
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/AppendStreamScan.scala
 ##
 @@ -36,20 +36,20 @@ import 
org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
   * It ensures that types without deterministic field order (e.g. POJOs) are 
not part of
   * the plan translation.
   */
-class DataStreamScan(
+class AppendStreamScan(
 
 Review comment:
   Shouldn't this belong to the previous commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16697175#comment-16697175
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime 
DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#issuecomment-441249337
 
 
   @pnowojski @sunjincheng121 @dianfu Thanks for your review and suggestions. I 
have split and updated the pr. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696631#comment-16696631
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime 
DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#issuecomment-441202156
 
 
   @pnowojski  thanks for your review. Make sense to me. I will split it into 4 
commits according to your suggestions. Meanwhile, I will address the comments 
from @dianfu . Thank you all.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696540#comment-16696540
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on issue #6787: [FLINK-8577][table] Implement proctime 
DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#issuecomment-441185723
 
 
   I've started reviewing the code and as we talked offline, could you split 
this PR into 4 commits:
   - Add `fromAppendStream` and Deprecate `fromDataStream`
   - Introduce `LastRow` Rules
   - Introduce `LastRow` runtime functions
   - Optimize upsert sources followed by `Calc`
   
   ? Having more, smaller, independent commits speeds up reviewing and help in 
the future whenever someone is digging through git's commit history to 
understand why/how was something implemented. Especially splitting of 
refactoring/renaming commits from feature adding/changing or bug fixing commits 
is important.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696467#comment-16696467
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235835262
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+val relTypes = scan.getRowType.getFieldList.map(_.getType)
+val getTimeIndicatorIndexes = relTypes.zipWithIndex
+  .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+  .map(_._2)
+if (upsertStreamTable != null) {
+  val input = if (getTimeIndicatorIndexes.size > 0) {
+// materialize time indicator
+val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs)
+materializerUtils.projectAndMaterializeFields(rewrittenScan, 
getTimeIndicatorIndexes.toSet)
+  } else {
+scan
+  }
+
+  LogicalLastRow.create(
 
 Review comment:
   Year, It would be nice if we can. However, it is not easy to generate 
LastRow in a Rule. For example, with the following rule:
   ```
   class GenerateLastRowRule extends RelOptRule(
 operand(classOf[TableScan], none),
 "GenerateLastRowRule") {
 ...
   }
   ```
   The Rule will apply infinitly. Moreover, we can't write the rule with two 
operands, because some plan may only contain one Scan node.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696452#comment-16696452
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235858319
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamLastRow.scala
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.{CRowKeySelector, LastRowProcessFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode for ingesting upsert stream from source.
+  */
+class DataStreamLastRow(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   input: RelNode,
+   inputSchema: RowSchema,
+   schema: RowSchema,
+   val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, input)
+  with DataStreamRel{
+
+  lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex
+.filter(e => keyNames.contains(e._1))
+.map(_._2).toArray
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new DataStreamLastRow(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputSchema,
+  schema,
+  keyNames)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter ={
+super.explainTerms(pw)
+  .itemIf("keys", keyNames.mkString(", "), keyNames.size != 0)
+  .item("select", input.getRowType.getFieldNames.toArray.mkString(", "))
+  }
+
+  override def toString: String = {
+s"LastRow(${
+  if (keyNames.size != 0) {
+s"keys:(${keyNames.mkString(", ")}), "
+  } else {
+""
+  }
+}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))"
+  }
+
+  override def producesUpdates: Boolean = true
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val inputDS =
+  getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, 
queryConfig)
+val outRowType = CRowTypeInfo(schema.typeInfo)
+
+val needRetraction = DataStreamRetractionRules.isAccRetract(this)
+val result: DataStream[CRow] = if (needRetraction) {
+  val processFunction = new LastRowProcessFunction(
+new RowTypeInfo(schema.fieldTypeInfos.toArray, 
schema.fieldNames.toArray),
 
 Review comment:
   You are right.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertF

[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696411#comment-16696411
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235852606
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/FromUpsertStreamITCase.scala
 ##
 @@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.watermark.Watermark
+import 
org.apache.flink.table.runtime.stream.sql.FromUpsertStreamITCase.{TestPojo, 
TimestampWithEqualWatermark}
+import org.apache.flink.table.runtime.stream.table.{RowCollector, 
TestUpsertSink}
+import org.junit.Assert._
+import org.junit._
+
+class FromUpsertStreamITCase extends StreamingWithStateTestBase {
 
 Review comment:
   UpsertStream sounds like Upsert a stream into a sink. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696399#comment-16696399
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235850176
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/JavaTupleToCRowProcessRunner.scala
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.conversion
+
+import _root_.java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * ProcessRunner with [[CRow]] output.
+  */
+class JavaTupleToCRowProcessRunner(
 
 Review comment:
   Make sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696398#comment-16696398
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235849985
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamLastRow.scala
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.{CRowKeySelector, LastRowProcessFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode for ingesting upsert stream from source.
+  */
+class DataStreamLastRow(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   input: RelNode,
+   inputSchema: RowSchema,
+   schema: RowSchema,
+   val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, input)
+  with DataStreamRel{
+
+  lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex
+.filter(e => keyNames.contains(e._1))
+.map(_._2).toArray
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new DataStreamLastRow(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputSchema,
+  schema,
+  keyNames)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter ={
+super.explainTerms(pw)
+  .itemIf("keys", keyNames.mkString(", "), keyNames.size != 0)
+  .item("select", input.getRowType.getFieldNames.toArray.mkString(", "))
+  }
+
+  override def toString: String = {
+s"LastRow(${
+  if (keyNames.size != 0) {
+s"keys:(${keyNames.mkString(", ")}), "
+  } else {
+""
+  }
+}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))"
+  }
+
+  override def producesUpdates: Boolean = true
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val inputDS =
+  getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, 
queryConfig)
+val outRowType = CRowTypeInfo(schema.typeInfo)
+
+val needRetraction = DataStreamRetractionRules.isAccRetract(this)
+val result: DataStream[CRow] = if (needRetraction) {
+  val processFunction = new LastRowProcessFunction(
+new RowTypeInfo(schema.fieldTypeInfos.toArray, 
schema.fieldNames.toArray),
 
 Review comment:
   schema.typeInfo can be cast to RowTypeInfo as it's always RowTypeInfo.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = 

[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696382#comment-16696382
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235847810
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/JavaTupleToCRowProcessRunner.scala
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.conversion
+
+import _root_.java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * ProcessRunner with [[CRow]] output.
+  */
+class JavaTupleToCRowProcessRunner(
 
 Review comment:
   Doing checks will influence the performance. I think we can generate code 
according to the type to solve the problem. However, we can improve this in 
another pr. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696312#comment-16696312
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235835262
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+val relTypes = scan.getRowType.getFieldList.map(_.getType)
+val getTimeIndicatorIndexes = relTypes.zipWithIndex
+  .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+  .map(_._2)
+if (upsertStreamTable != null) {
+  val input = if (getTimeIndicatorIndexes.size > 0) {
+// materialize time indicator
+val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs)
+materializerUtils.projectAndMaterializeFields(rewrittenScan, 
getTimeIndicatorIndexes.toSet)
+  } else {
+scan
+  }
+
+  LogicalLastRow.create(
 
 Review comment:
   Year, It would be nice if we can. A rule can only transform from a node to a 
new node with a same rowtype. However, the rowtype of LastRow is different from 
the scan, i.e, the tuple type has been flattened. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696315#comment-16696315
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235835509
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamLastRow.scala
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.{CRowKeySelector, LastRowProcessFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode for ingesting upsert stream from source.
+  */
+class DataStreamLastRow(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   input: RelNode,
+   inputSchema: RowSchema,
+   schema: RowSchema,
+   val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, input)
+  with DataStreamRel{
+
+  lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex
+.filter(e => keyNames.contains(e._1))
+.map(_._2).toArray
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new DataStreamLastRow(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputSchema,
+  schema,
+  keyNames)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter ={
+super.explainTerms(pw)
+  .itemIf("keys", keyNames.mkString(", "), keyNames.size != 0)
+  .item("select", input.getRowType.getFieldNames.toArray.mkString(", "))
+  }
+
+  override def toString: String = {
+s"LastRow(${
+  if (keyNames.size != 0) {
+s"keys:(${keyNames.mkString(", ")}), "
+  } else {
+""
+  }
+}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))"
+  }
+
+  override def producesUpdates: Boolean = true
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val inputDS =
+  getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, 
queryConfig)
+val outRowType = CRowTypeInfo(schema.typeInfo)
+
+val needRetraction = DataStreamRetractionRules.isAccRetract(this)
+val result: DataStream[CRow] = if (needRetraction) {
+  val processFunction = new LastRowProcessFunction(
+new RowTypeInfo(schema.fieldTypeInfos.toArray, 
schema.fieldNames.toArray),
 
 Review comment:
   `LastRowProcessFunction` requires a type of RowTypeInfo while 
`schema.typeInfo` returns a type of TypeInformation[Row]


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {c

[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696311#comment-16696311
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235835262
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+val relTypes = scan.getRowType.getFieldList.map(_.getType)
+val getTimeIndicatorIndexes = relTypes.zipWithIndex
+  .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+  .map(_._2)
+if (upsertStreamTable != null) {
+  val input = if (getTimeIndicatorIndexes.size > 0) {
+// materialize time indicator
+val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs)
+materializerUtils.projectAndMaterializeFields(rewrittenScan, 
getTimeIndicatorIndexes.toSet)
+  } else {
+scan
+  }
+
+  LogicalLastRow.create(
 
 Review comment:
   Year, It would be nice if we can. A rule can only transform from a node to a 
new node with a same rowtype. However, the rowtype of LastRow is different from 
the scan, i.e, the tuple type has been flattened. I suggest to 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694621#comment-16694621
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235316296
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, SingleRel}
+
+/**
+  * Represent an Upsert source.
 
 Review comment:
   Upsert -> upsert


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694638#comment-16694638
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235315365
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+val relTypes = scan.getRowType.getFieldList.map(_.getType)
+val getTimeIndicatorIndexes = relTypes.zipWithIndex
+  .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+  .map(_._2)
+if (upsertStreamTable != null) {
+  val input = if (getTimeIndicatorIndexes.size > 0) {
+// materialize time indicator
+val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs)
+materializerUtils.projectAndMaterializeFields(rewrittenScan, 
getTimeIndicatorIndexes.toSet)
+  } else {
+scan
+  }
+
+  LogicalLastRow.create(
 
 Review comment:
   It's a little tricky to generate the LogicalLastRow when visiting TableScan 
node. Is it possible to create a rule to do that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694624#comment-16694624
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235317112
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamLastRow.scala
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.{CRowKeySelector, LastRowProcessFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode for ingesting upsert stream from source.
+  */
+class DataStreamLastRow(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   input: RelNode,
+   inputSchema: RowSchema,
+   schema: RowSchema,
+   val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, input)
+  with DataStreamRel{
+
+  lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex
+.filter(e => keyNames.contains(e._1))
+.map(_._2).toArray
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new DataStreamLastRow(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputSchema,
+  schema,
+  keyNames)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter ={
+super.explainTerms(pw)
+  .itemIf("keys", keyNames.mkString(", "), keyNames.size != 0)
 
 Review comment:
   keyNames.nonEmpty


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694616#comment-16694616
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235313039
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+val relTypes = scan.getRowType.getFieldList.map(_.getType)
+val getTimeIndicatorIndexes = relTypes.zipWithIndex
 
 Review comment:
   rename getTimeIndicatorIndexes to timeIndicatorIndexes


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694622#comment-16694622
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235321744
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/LastRowProcessFunction.scala
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Function used to handle upsert inputs. Output a retract message if there 
is a new update.
+  *
+  * @param rowTypeInfo the output row type info.
+  * @param queryConfig the configuration for the query.
+  */
+class LastRowProcessFunction(
+private val rowTypeInfo: RowTypeInfo,
+private val queryConfig: StreamQueryConfig)
+  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+with Logging {
+
+  private var prevRow: CRow = _
+  // stores the accumulators
+  private var state: ValueState[Row] = _
+
+  override def open(config: Configuration) {
+
+prevRow = new CRow(new Row(rowTypeInfo.getArity), false)
+
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("LastRowState", rowTypeInfo)
+state = getRuntimeContext.getState(stateDescriptor)
+
+initCleanupTimeState("LastRowCleanupTime")
+LOG.info("Init LastRowProcessFunction.")
+  }
+
+  override def processElement(
+  inputC: CRow,
+  ctx: ProcessFunction[CRow, CRow]#Context,
+  out: Collector[CRow]): Unit = {
+
+val currentTime = ctx.timerService().currentProcessingTime()
+// register state-cleanup timer
+registerProcessingCleanupTimer(ctx, currentTime)
+
+val pre = state.value()
+val current = inputC.row
+
+if (inputC.change) {
+  // ignore same record
+  if (!stateCleaningEnabled && pre != null && pre.equals(current)) {
+return
+  }
+  state.update(current)
+  // retract prevRow
+  if (pre != null) {
+prevRow.row = pre
+out.collect(prevRow)
+  }
+  // output currentRow
+  out.collect(inputC)
+} else {
+  state.clear()
+  if (pre != null) {
+prevRow.row = pre
+out.collect(prevRow)
+  } else {
+// else input is a delete row we ingnore it, since delete on nothing 
means nothing.
 
 Review comment:
   ingnore -> ignore


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(

[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694625#comment-16694625
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235316850
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamLastRow.scala
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.{CRowKeySelector, LastRowProcessFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode for ingesting upsert stream from source.
+  */
+class DataStreamLastRow(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   input: RelNode,
+   inputSchema: RowSchema,
+   schema: RowSchema,
+   val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, input)
+  with DataStreamRel{
+
+  lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex
+.filter(e => keyNames.contains(e._1))
+.map(_._2).toArray
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new DataStreamLastRow(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputSchema,
+  schema,
+  keyNames)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter ={
+super.explainTerms(pw)
+  .itemIf("keys", keyNames.mkString(", "), keyNames.size != 0)
+  .item("select", input.getRowType.getFieldNames.toArray.mkString(", "))
+  }
+
+  override def toString: String = {
+s"LastRow(${
+  if (keyNames.size != 0) {
 
 Review comment:
   keyNames.nonEmpty


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694619#comment-16694619
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235312380
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+val relTypes = scan.getRowType.getFieldList.map(_.getType)
+val getTimeIndicatorIndexes = relTypes.zipWithIndex
 
 Review comment:
   move relTypes and getTimeIndicatorIndexes under the if statement or make 
them lazy


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694631#comment-16694631
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235361617
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/FromUpsertStreamITCase.scala
 ##
 @@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.watermark.Watermark
+import 
org.apache.flink.table.runtime.stream.sql.FromUpsertStreamITCase.{TestPojo, 
TimestampWithEqualWatermark}
+import org.apache.flink.table.runtime.stream.table.{RowCollector, 
TestUpsertSink}
+import org.junit.Assert._
+import org.junit._
+
+class FromUpsertStreamITCase extends StreamingWithStateTestBase {
 
 Review comment:
   Rename to UpsertStreamITCase?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694617#comment-16694617
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235315365
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+val relTypes = scan.getRowType.getFieldList.map(_.getType)
+val getTimeIndicatorIndexes = relTypes.zipWithIndex
+  .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+  .map(_._2)
+if (upsertStreamTable != null) {
+  val input = if (getTimeIndicatorIndexes.size > 0) {
+// materialize time indicator
+val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs)
+materializerUtils.projectAndMaterializeFields(rewrittenScan, 
getTimeIndicatorIndexes.toSet)
+  } else {
+scan
+  }
+
+  LogicalLastRow.create(
 
 Review comment:
   It's a little tricky to generate the LogicalLastRow when visiting TableScan 
node. If it's possible to create a rule to do that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694627#comment-16694627
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235361226
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/ScalaTupleToCRowProcessRunner.scala
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.conversion
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * ProcessRunner with [[CRow]] output.
+  */
+class ScalaTupleToCRowProcessRunner(
+name: String,
+code: String,
+@transient var returnType: TypeInformation[CRow])
+  extends ProcessFunction[(Boolean, Any), CRow]
+  with ResultTypeQueryable[CRow]
+  with Compiler[ProcessFunction[Any, Row]]
+  with Logging {
+
+  private var function: ProcessFunction[Any, Row] = _
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code")
+val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+LOG.debug("Instantiating ProcessFunction.")
+function = clazz.newInstance()
 
 Review comment:
   Add the open/close logic such as 
FunctionUtils.setFunctionRuntimeContext/FunctionUtils.openFunction/FunctionUtils.closeFunction


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694615#comment-16694615
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235313379
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##
 @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(exchange: LogicalExchange): RelNode =
 throw new TableException("Logical exchange in a stream environment is not 
supported yet.")
 
-  override def visit(scan: TableScan): RelNode = scan
+  override def visit(scan: TableScan): RelNode = {
+val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]])
+val relTypes = scan.getRowType.getFieldList.map(_.getType)
+val getTimeIndicatorIndexes = relTypes.zipWithIndex
+  .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1))
+  .map(_._2)
+if (upsertStreamTable != null) {
+  val input = if (getTimeIndicatorIndexes.size > 0) {
 
 Review comment:
   getTimeIndicatorIndexes.nonEmpty


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694629#comment-16694629
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235346023
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 ##
 @@ -19,22 +19,138 @@
 package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.{TableConfig, Types}
 import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
-import org.apache.flink.table.runtime.CRowOutputProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import 
org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, 
JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner}
 
 trait StreamScan extends CommonScan[CRow] with DataStreamRel {
 
+  protected def convertUpsertToInternalRow(
+  schema: RowSchema,
+  input: DataStream[Any],
+  fieldIdxs: Array[Int],
+  config: TableConfig,
+  rowtimeExpression: Option[RexNode]): DataStream[CRow] = {
+
+val internalType = schema.typeInfo
+val cRowType = CRowTypeInfo(internalType)
+
+val hasTimeIndicator = fieldIdxs.exists(f =>
+  f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER ||
+f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER)
+
+val dsType = input.getType
+
+dsType match {
+// Scala tuple
+  case t: CaseClassTypeInfo[_]
+if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == 
Types.BOOLEAN =>
+
+val inputType = t.getTypeAt[Any](1)
+if (inputType == internalType && !hasTimeIndicator) {
+  // input is already of correct type. Only need to wrap it as CRow
+  input.asInstanceOf[DataStream[(Boolean, Row)]]
+.map(new RichMapFunction[(Boolean, Row), CRow] {
+  @transient private var outCRow: CRow = null
 
 Review comment:
   null -> _


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694628#comment-16694628
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235351015
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -555,18 +555,95 @@ abstract class StreamTableEnvironment(
   s"But is: ${execEnv.getStreamTimeCharacteristic}")
 }
 
+// Can not apply key on append stream
+if (extractUniqueKeys(fields).nonEmpty) {
+  throw new TableException(
+s"Can not apply key on append stream, use fromUpsertStream instead.")
+}
+
 // adjust field indexes and field names
 val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, 
proctime)
 val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, 
proctime)
 
-val dataStreamTable = new DataStreamTable[T](
+val dataStreamTable = new AppendStreamTable[T](
   dataStream,
   indexesWithIndicatorFields,
   namesWithIndicatorFields
 )
 registerTableInternal(name, dataStreamTable)
   }
 
+  /**
+* Registers an upsert [[DataStream]] as a table under a given name in the 
[[TableEnvironment]]'s
+* catalog.
+*
+* @param name The name under which the table is registered in the catalog.
+* @param dataStream The [[DataStream]] to register as table in the catalog.
+* @tparam T the type of the [[DataStream]].
+*/
+  protected def registerUpsertStreamInternal[T](name: String, dataStream: 
DataStream[T]): Unit = {
+
+val streamType: TypeInformation[T] =
+  dataStream.getType.asInstanceOf[TupleTypeInfo[JTuple2[JBool, 
T]]].getTypeAt(1)
 
 Review comment:
   Need also consider CaseClassTypeInfo


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694626#comment-16694626
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235360525
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/JavaTupleToCRowProcessRunner.scala
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.conversion
+
+import _root_.java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * ProcessRunner with [[CRow]] output.
+  */
+class JavaTupleToCRowProcessRunner(
 
 Review comment:
   Seems that we can reuse the 
ExternalTypeToCRowProcessRunner/CRowOutputProcessRunner. Just need to do some 
checks whether the input is java Tuple, scala tuple or other types.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694630#comment-16694630
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235362084
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala
 ##
 @@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, 
unaryNode}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Test
+
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import java.lang.{Boolean => JBool}
+
+class FromUpsertStreamTest extends TableTestBase {
 
 Review comment:
   Rename to UpsertStreamTest?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694623#comment-16694623
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235320988
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/LastRowProcessFunction.scala
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Function used to handle upsert inputs. Output a retract message if there 
is a new update.
+  *
+  * @param rowTypeInfo the output row type info.
+  * @param queryConfig the configuration for the query.
+  */
+class LastRowProcessFunction(
+private val rowTypeInfo: RowTypeInfo,
+private val queryConfig: StreamQueryConfig)
+  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+with Logging {
+
+  private var prevRow: CRow = _
+  // stores the accumulators
+  private var state: ValueState[Row] = _
 
 Review comment:
   state and prevRow can be transient. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694618#comment-16694618
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235361188
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/JavaTupleToCRowProcessRunner.scala
 ##
 @@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.conversion
+
+import _root_.java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * ProcessRunner with [[CRow]] output.
+  */
+class JavaTupleToCRowProcessRunner(
+name: String,
+code: String,
+@transient var returnType: TypeInformation[CRow])
+  extends ProcessFunction[JTuple2[JBool, Any], CRow]
+  with ResultTypeQueryable[CRow]
+  with Compiler[ProcessFunction[Any, Row]]
+  with Logging {
+
+  private var function: ProcessFunction[Any, Row] = _
+  private var cRowWrapper: CRowWrappingCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code")
+val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+LOG.debug("Instantiating ProcessFunction.")
+function = clazz.newInstance()
 
 Review comment:
   Add the open/close logic such as 
FunctionUtils.setFunctionRuntimeContext/FunctionUtils.openFunction/FunctionUtils.closeFunction


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694620#comment-16694620
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

dianfu commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r235318373
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamLastRow.scala
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.{CRowKeySelector, LastRowProcessFunction}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Flink RelNode for ingesting upsert stream from source.
+  */
+class DataStreamLastRow(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   input: RelNode,
+   inputSchema: RowSchema,
+   schema: RowSchema,
+   val keyNames: Seq[String])
+  extends SingleRel(cluster, traitSet, input)
+  with DataStreamRel{
+
+  lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex
+.filter(e => keyNames.contains(e._1))
+.map(_._2).toArray
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new DataStreamLastRow(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  inputSchema,
+  schema,
+  keyNames)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter ={
+super.explainTerms(pw)
+  .itemIf("keys", keyNames.mkString(", "), keyNames.size != 0)
+  .item("select", input.getRowType.getFieldNames.toArray.mkString(", "))
+  }
+
+  override def toString: String = {
+s"LastRow(${
+  if (keyNames.size != 0) {
+s"keys:(${keyNames.mkString(", ")}), "
+  } else {
+""
+  }
+}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))"
+  }
+
+  override def producesUpdates: Boolean = true
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  queryConfig: StreamQueryConfig): DataStream[CRow] = {
+
+val inputDS =
+  getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, 
queryConfig)
+val outRowType = CRowTypeInfo(schema.typeInfo)
+
+val needRetraction = DataStreamRetractionRules.isAccRetract(this)
+val result: DataStream[CRow] = if (needRetraction) {
+  val processFunction = new LastRowProcessFunction(
+new RowTypeInfo(schema.fieldTypeInfos.toArray, 
schema.fieldNames.toArray),
 
 Review comment:
   why not use schema.typeInfo directly?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> 

[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694472#comment-16694472
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime 
DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#issuecomment-440596555
 
 
   @sunjincheng121  thanks for your attention. I have rebased to the master. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694319#comment-16694319
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

sunjincheng121 commented on issue #6787: [FLINK-8577][table] Implement proctime 
DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#issuecomment-440561427
 
 
   @hequn8128 Thanks for the PR. 
   I am glad to review the changes after you rebase the code!
   Best,
   Jincheng


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688840#comment-16688840
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime 
DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#issuecomment-439240243
 
 
   Thanks, @pnowojski ! Is there anything I can help for the release? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687964#comment-16687964
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

pnowojski commented on issue #6787: [FLINK-8577][table] Implement proctime 
DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#issuecomment-439025683
 
 
   Sorry @hequn8128 that you have to wait that long and I remember about this 
PR, but I was busy with 1.7 release. I hopefully will be able to get back to 
this one in the next couple of days.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-11-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683663#comment-16683663
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime 
DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#issuecomment-437853763
 
 
   @pnowojski  Hi, would be great if you can take a look when you are free. 
Thank you. 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654736#comment-16654736
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime 
DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#issuecomment-430900222
 
 
   Update pr with the following changes: 1. materialize time indicators 2. 
refactor rules


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651377#comment-16651377
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime 
DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#issuecomment-430163860
 
 
   Discussed with Fabian in 
[FLINK-8578](https://issues.apache.org/jira/browse/FLINK-8578), we have to 
materialize the proc-time field and convert row-time attributes into regular 
TIMESTAMP attributes. I will update the PR ASAP. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635730#comment-16635730
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 opened a new pull request #6787: [FLINK-8577][table] Implement 
proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787
 
 
   
   ## What is the purpose of the change
   
   Currently, only append stream can be ingested as a stream table. This pull 
request implement proctime DataStream to Table upsert conversion.
   
   Api looks like:
   
   ```
   DataStream[(Boolean, (String, Long, Int))] input = ???
   // upsert with keyedTable 
   table = tEnv.fromUpsertStream(input, 'a, 'b, 'c.key)
   // upsert without key -> single row tableTable 
   table = tEnv.fromUpsertFromStream(input, 'a, 'b, 'c) 
   ```
   
   A simple design doc can be fond 
[here](https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing).
   
   
   ## Brief change log
   
 - Add `fromUpsertStream()` and `fromAppendStream()` in  java and scala 
`StreamTableEnvironment` and deprecate `fromDataStream`.
 - Add key support in the source definition including parse `key` keyword. 
 - Add `DataStreamLastRowRule` and `DataStreamLastRowAfterCalcRule`. Both 
rules generate `DataStreamLastRow` to handle upsert stream. The differences 
between the two rule is `DataStreamLastRowAfterCalcRule` will take calc into 
consideration and generate LastRow DataStreamRel node after calc. This can 
decrease state size in LastRow.
 - Add `LastRowProcessFunction` to handle upsert messages and generate 
retractions if there is an update.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Add java api test in `JavaSqlITCase`
 - Add IT test cases in `FromUpsertStreamITCase`
 - Add sql plan tests in `FromUpsertStreamTest`
 - Add key extract in `UpdatingPlanCheckerTest`
 - Add validation test in `StreamTableEnvironmentValidationTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (not documented, documents will 
be added in the later pr)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-08-14 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16579945#comment-16579945
 ] 

Hequn Cheng commented on FLINK-8577:


A simple design 
[doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
 about this subtask. Any suggestions are welcomed!

> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-02-09 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359247#comment-16359247
 ] 

Hequn Cheng commented on FLINK-8577:


Hi, [~fhueske] [~twalthr]

Agree, I also prefer option 1. It's the main reason i closed 
[FLINK-8579|https://issues.apache.org/jira/browse/FLINK-8579]. 

+1 for the method name.

> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-02-08 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357003#comment-16357003
 ] 

Timo Walther commented on FLINK-8577:
-

Maybe we should also rename the methods to find them easier: 
{{tEnv.fromUpsertStream}}, {{tEnv.fromAppendStream}}, {{tEnv.fromDataSet()}}.

> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-02-08 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357000#comment-16357000
 ] 

Timo Walther commented on FLINK-8577:
-

Yes, I also prefer option 1. Always require a flag. This also keeps the number 
of possible methods small and thus the API concise.

{code}
// always require flags
table = tEnv.upsertFromStream(flaggedInput, 'a, 'b, 'c.key)
// always require flags, so add them manually
 table = tEnv.upsertFromStream(input.map(new InsertFlagger()), 'a, 'b, 'c.key)
{code}



> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-02-08 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356714#comment-16356714
 ] 

Fabian Hueske commented on FLINK-8577:
--

I was thinking about how to handle deletion flags. I see three options:
 # always require deletion flags. DataStreams that are append only, would need 
to add a map to add the flag.
 # have special methods for upsert with and without deletion flags
 # have a mandatory parameter that determines whether the input has flags or 
not.

So the choices are:
DataStream[(String, Long, Int)] input = ???
DataStream[(Boolean, (String, Long, Int))] flaggedInput = ???

// WITH DELETE FLAGS

// always require flags
table = tEnv.upsertFromStream(flaggedInput, 'a, 'b, 'c.key)
// special method
table = tEnv.upsertFromStreamWithDeletes(flaggedInput, 'a, 'b, 'c.key)
// mandatory parameter. 
table = tEnv.upsertFromStream(flaggedInput, deletes = true, 'a, 'b, 'c.key)

// WITHOUT DELETE FLAGS

// always require flags, so add them manually
table = tEnv.upsertFromStream(input.map(new InsertFlagger()), 'a, 'b, 'c.key)
// special method
table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
// mandatory parameter
table = tEnv.upsertFromStream(input, deletes = false, 'a, 'b, 'c.key)
I think I'm preferring option 1. because it keeps the API slim and is 
consistent with the format of the UpsertTableSink.

What do you think [~hequn8128], [~twalthr]?

> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-02-08 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356684#comment-16356684
 ] 

Fabian Hueske commented on FLINK-8577:
--

[~twalthr], what do you think about the API?

> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >