[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710249#comment-16710249 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#issuecomment-444533622 @pnowojski Hi, thanks a lot for your review and great suggestions. I have updated the pr according to your suggestions. Changes mainly include: - Add `UnresolvedKeyFieldReference` - Rename `LastRow` to `UpsertToRetraction` - Generate `UpsertToRetraction` from `RelTimeIndicatorConverter` to `EnumerableToLogicalTableScan` - Add `RemoveDataStreamUpsertToRetractionRule` to `UpsertToRetraction` relnode if it is a no-op node. Would be great if you can take a look when you are free. Thanks a lot. Best, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702649#comment-16702649 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r237328473 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalLastRow.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalLastRow +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalLastRow( +cluster: RelOptCluster, +traitSet: RelTraitSet, +child: RelNode, +val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, child) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = { +new FlinkLogicalLastRow(cluster, traitSet, inputs.get(0), keyNames) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +val child = this.getInput +val rowCnt = mq.getRowCount(child) +// take rowCnt and fieldCnt into account, so that cost will be smaller when generate LastRow +// after Calc. +planner.getCostFactory.makeCost(rowCnt, rowCnt * child.getRowType.getFieldCount, 0) Review comment: Yes, it would be better if we know it is a no-op node in the logical optimize phase. I think the computation is ok since it is always right to push calc down for the plan. The plan is right. However, it is true that it will introduce some side effect, i.e., with greater search space during the optimization. It is not easy to know whether it is a no-op node in the logical phase since the retraction rules have not yet been performed. Do you have any suggestions? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702618#comment-16702618 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r237328473 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalLastRow.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalLastRow +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalLastRow( +cluster: RelOptCluster, +traitSet: RelTraitSet, +child: RelNode, +val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, child) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = { +new FlinkLogicalLastRow(cluster, traitSet, inputs.get(0), keyNames) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +val child = this.getInput +val rowCnt = mq.getRowCount(child) +// take rowCnt and fieldCnt into account, so that cost will be smaller when generate LastRow +// after Calc. +planner.getCostFactory.makeCost(rowCnt, rowCnt * child.getRowType.getFieldCount, 0) Review comment: Yes, it would be better if we know it is a no-op node in the logical optimize phase. I think the computation is ok since it is always right to push calc down for the plan. The plan is right. However, it is true that it will introduce some side effect, i.e., with greater search space. It is not easy to know whether it is a no-op node in the logical phase since the retraction rules have not yet been performed. Do you have any suggestions? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702619#comment-16702619 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r237328203 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala ## @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInforma } } + /** +* Converts the [[DataStream]] with upsert messages into a [[Table]] with keys. +* +* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second +* field holds the record. A true [[Boolean]] flag indicates an update message, a false flag +* indicates a delete message. +* +* The field name and key of the new [[Table]] can be specified like this: +* +* {{{ +* val env = StreamExecutionEnvironment.getExecutionEnvironment +* val tEnv = TableEnvironment.getTableEnvironment(env) +* +* val stream: DataStream[(Boolean, (String, Int))] = ... +* val table = stream.toKeyedTable(tEnv, 'name.key, 'amount) +* }}} +* +* If field names are not explicitly specified, names are automatically extracted from the type +* of the [[DataStream]]. +* If keys are not explicitly specified, an empty key will be used and the table will be a +* single row table. +* +* @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created. +* @param fields The field names of the new [[Table]] (optional). +* @return The resulting [[Table]]. +*/ + def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = { Review comment: I think the difference between `toTableFromRetractStream` and `toTable` is `toTable` means `toTableFromAppendStream`. The method names are consistence. Methods in `StreamTableEnvironment`: - fromAppendStream - fromUpsertStream - fromRetractStream Methods in `DataStreamConversions`: - toTableFromAppendStream - toTableFromUpsertStream - toTableFromRetractStream Renaming `fromDataStream` to `fromAppendStream` is consistent with renaming `toDataStream` to `toAppendStream` by [FLINK-6543](https://issues.apache.org/jira/browse/FLINK-6543) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702617#comment-16702617 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r237328431 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +if (upsertStreamTable != null) { + val relTypes = scan.getRowType.getFieldList.map(_.getType) + val timeIndicatorIndexes = relTypes.zipWithIndex +.filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) +.map(_._2) + val input = if (timeIndicatorIndexes.nonEmpty) { +// materialize time indicator +val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs) +materializerUtils.projectAndMaterializeFields(rewrittenScan, timeIndicatorIndexes.toSet) + } else { +scan + } + + LogicalLastRow.create( Review comment: You are right. Currently, there is no such no-op node in the execution plan. If you still concern with the no-op node in the optimization plan, I can add a rule in the RetractionRules to remove it. In this way, we can name `LastRow` to `UpsertToRetractionConverter`. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702620#comment-16702620 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r237329703 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala ## @@ -36,25 +36,32 @@ class DataStreamScanRule override def matches(call: RelOptRuleCall): Boolean = { val scan: FlinkLogicalNativeTableScan = call.rel(0).asInstanceOf[FlinkLogicalNativeTableScan] -val dataSetTable = scan.getTable.unwrap(classOf[DataStreamTable[Any]]) -dataSetTable match { - case _: DataStreamTable[Any] => -true - case _ => -false -} +val appendTable = scan.getTable.unwrap(classOf[AppendStreamTable[Any]]) +val upsertTable = scan.getTable.unwrap(classOf[UpsertStreamTable[Any]]) + +appendTable != null || upsertTable != null Review comment: I think you are right. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701652#comment-16701652 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r237018239 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala ## @@ -36,25 +36,32 @@ class DataStreamScanRule override def matches(call: RelOptRuleCall): Boolean = { val scan: FlinkLogicalNativeTableScan = call.rel(0).asInstanceOf[FlinkLogicalNativeTableScan] -val dataSetTable = scan.getTable.unwrap(classOf[DataStreamTable[Any]]) -dataSetTable match { - case _: DataStreamTable[Any] => -true - case _ => -false -} +val appendTable = scan.getTable.unwrap(classOf[AppendStreamTable[Any]]) +val upsertTable = scan.getTable.unwrap(classOf[UpsertStreamTable[Any]]) + +appendTable != null || upsertTable != null Review comment: can we avoid those nulls (in multiple places)? For example by using `isInstanceOf` or match? Handling null is dangerous and error prone. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701654#comment-16701654 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r237008957 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} + +/** + * Represent an upsert source. + * + * @param keyNames The upsert key names. + */ +class LogicalLastRow( Review comment: (I've to this responded above) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701656#comment-16701656 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r237017454 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalLastRow.scala ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.logical + +import java.util.{List => JList} + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.table.plan.logical.rel.LogicalLastRow +import org.apache.flink.table.plan.nodes.FlinkConventions + +class FlinkLogicalLastRow( +cluster: RelOptCluster, +traitSet: RelTraitSet, +child: RelNode, +val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, child) + with FlinkLogicalRel { + + override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = { +new FlinkLogicalLastRow(cluster, traitSet, inputs.get(0), keyNames) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { +val child = this.getInput +val rowCnt = mq.getRowCount(child) +// take rowCnt and fieldCnt into account, so that cost will be smaller when generate LastRow +// after Calc. +planner.getCostFactory.makeCost(rowCnt, rowCnt * child.getRowType.getFieldCount, 0) Review comment: as mentioned above (in the no-op/renaming discussion), this cost computation is not/will not be correct in case of no-op This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701655#comment-16701655 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r237008338 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +if (upsertStreamTable != null) { + val relTypes = scan.getRowType.getFieldList.map(_.getType) + val timeIndicatorIndexes = relTypes.zipWithIndex +.filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) +.map(_._2) + val input = if (timeIndicatorIndexes.nonEmpty) { +// materialize time indicator +val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs) +materializerUtils.projectAndMaterializeFields(rewrittenScan, timeIndicatorIndexes.toSet) + } else { +scan + } + + LogicalLastRow.create( Review comment: I have revisited our previous discussion in the design/google doc and I still have the same concern as I had then: > In that case this is strange. It's like having filter/projection/aggregation nodes that after pruning/removing/pushing them down are sometimes NO-OPs and sometimes not. Besides being strange it can have some negative side effects: > - presence of a no-op node (when it doesn't need to be there) can block/mess with other optimisations or make them more complex > - it will pollute printed explain plan to the user. This is important, since this will be very costly node and it will be very hard to explain users when this node requires huge state and when not > - it would make cost computations more complicated Back then it sparked a discussion, where you were saying that it will never be a no-op and you wanted to `LastRow` always have a state and purpose besides upsert to retraction conversion (filtering out empty deletes). However that was resolved and filtering out empty deletes was dropped. Thus it brings me back to the issue of having this as a no-op node in the plan. This connects with your other response regarding naming `LastRow` > The LastRow node may be a no-op node in the case such as upsert source -> calc -> upsert sink. While LastRow will convert upsert stream to retract stream if a downstream node needs it to, such as upsert source -> calc -> retract sink. Whether convert to retract stream will be decided by RetractionRules. I would still argue that it should be named `UpsertToRetractionConverter` (or sth along those lines) and if not needed, it should be not in the plan. Maybe this means that our `RetractionRules` are not sufficient and needs some refactoring. I could see couple of solutions to that, but probably the best would be to deduce whether we need to insert `UpsertToRetractionConverter` or not inside the rule that is supposed to create it. Further optimisations/rewrites would have to correctly handle/preserve semantic/trait of upserts vs retractions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701653#comment-16701653 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r237004073 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala ## @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInforma } } + /** +* Converts the [[DataStream]] with upsert messages into a [[Table]] with keys. +* +* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second +* field holds the record. A true [[Boolean]] flag indicates an update message, a false flag +* indicates a delete message. +* +* The field name and key of the new [[Table]] can be specified like this: +* +* {{{ +* val env = StreamExecutionEnvironment.getExecutionEnvironment +* val tEnv = TableEnvironment.getTableEnvironment(env) +* +* val stream: DataStream[(Boolean, (String, Int))] = ... +* val table = stream.toKeyedTable(tEnv, 'name.key, 'amount) +* }}} +* +* If field names are not explicitly specified, names are automatically extracted from the type +* of the [[DataStream]]. +* If keys are not explicitly specified, an empty key will be used and the table will be a +* single row table. +* +* @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created. +* @param fields The field names of the new [[Table]] (optional). +* @return The resulting [[Table]]. +*/ + def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = { Review comment: Generally speaking `toTable`, `toTableFromUpsertStream`, `toTableFromRetractStream` sounds good to me, but what's the difference between `toTableFromRetractStream` vs `toTable`? Also secondly, would `toTableFromRetractStream` call `tableEnv.fromAppendStream()`? If so, there would be one another naming inconsistency. Maybe renaming `fromDataStream ` to `fromAppendStream` was a bit incorrect? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701289#comment-16701289 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236914923 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} + +/** + * Represent an upsert source. + * + * @param keyNames The upsert key names. + */ +class LogicalLastRow( Review comment: The `LastRow` node may be a no-op node in the case such as `upsert source -> calc -> upsert sink`. While `LastRow` will convert upsert stream to retract stream if a downstream node needs it to, such as `upsert source -> calc -> retract sink`. Whether convert to retract stream will be decided by RetractionRules. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701280#comment-16701280 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236914923 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} + +/** + * Represent an upsert source. + * + * @param keyNames The upsert key names. + */ +class LogicalLastRow( Review comment: The `LastRow` node may be a no-op node in the case such as `upsert source -> calc -> upsert sink`. And `LastRow` will convert upsert stream to retract stream if a downstream node needs it to, such as `upsert source -> calc -> retract sink`. Whether convert to retract stream will be decided by RetractionRules. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701276#comment-16701276 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236913658 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala ## @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInforma } } + /** +* Converts the [[DataStream]] with upsert messages into a [[Table]] with keys. +* +* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second Review comment: The incoming messages from the source `DataStream` are expected to be encoded as `Tuple2`. I will change the description according to your suggestion. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701274#comment-16701274 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236912969 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala ## @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInforma } } + /** +* Converts the [[DataStream]] with upsert messages into a [[Table]] with keys. +* +* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second +* field holds the record. A true [[Boolean]] flag indicates an update message, a false flag +* indicates a delete message. +* +* The field name and key of the new [[Table]] can be specified like this: +* +* {{{ +* val env = StreamExecutionEnvironment.getExecutionEnvironment +* val tEnv = TableEnvironment.getTableEnvironment(env) +* +* val stream: DataStream[(Boolean, (String, Int))] = ... +* val table = stream.toKeyedTable(tEnv, 'name.key, 'amount) +* }}} +* +* If field names are not explicitly specified, names are automatically extracted from the type +* of the [[DataStream]]. +* If keys are not explicitly specified, an empty key will be used and the table will be a +* single row table. +* +* @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created. +* @param fields The field names of the new [[Table]] (optional). +* @return The resulting [[Table]]. +*/ + def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = { Review comment: I prefer the name starts with toTable. So the methods would be: `toTable`, `toTableFromUpsertStream`, `toTableFromRetractStream`. We can find these methods easier, similar to `fromUpsertStream `, `fromAppendStream `, `fromDataSet `. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701263#comment-16701263 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236911934 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") Review comment: I haven't thought much about how to support key definition on append stream. I think it may not be a no-op key definition. Maybe we have to do the following things: - Add similar logic in `UniqueKeyExtractor` to get keys. - Probably have to check whether data meet the key constraint. i.e, throw an exception when key data is not distinct. - etc. Given the above things. I think we would better to not support it now? I will rephrase the error message according to your suggestions. Thanks a lot. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701264#comment-16701264 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236911934 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") Review comment: I haven't thought much about how to support key definition on append stream. I think it may not be a no-op key definition. Maybe we have to do the following things: - Add similar logic in `UniqueKeyExtractor` to get keys. - Probably have to check whether data meet the key constraint. i.e, throw an exception when key data is not distinct. - etc. Given the above things. I think we would better to not support it now? I will rephrase the error message according to your suggestions. Thanks a lot. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701252#comment-16701252 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236909228 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") +} + // adjust field indexes and field names val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, proctime) val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, proctime) -val dataStreamTable = new DataStreamTable[T]( +val dataStreamTable = new AppendStreamTable[T]( dataStream, indexesWithIndicatorFields, namesWithIndicatorFields ) registerTableInternal(name, dataStreamTable) } + /** +* Registers an upsert [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s +* catalog. +* +* @param name The name under which the table is registered in the catalog. +* @param dataStream The [[DataStream]] to register as table in the catalog. +* @tparam T the type of the [[DataStream]]. +*/ + protected def registerUpsertStreamInternal[T](name: String, dataStream: DataStream[T]): Unit = { Review comment: Oh, sorry about my typo. It should be "The primary key would not be necessary." This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701251#comment-16701251 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236909214 ## File path: flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala ## @@ -54,9 +54,9 @@ object StreamSQLExample { Order(4L, "beer", 1))) // convert DataStream to Table -var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount) Review comment: ok, I will not forget about it. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700205#comment-16700205 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236606838 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} + +/** + * Represent an upsert source. + * + * @param keyNames The upsert key names. + */ +class LogicalLastRow( Review comment: > We can't name it UpsertToRetractionsConverter since we don't always convert upsert to retractions in this node. can you elaborate on that? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700210#comment-16700210 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236605473 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala ## @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInforma } } + /** +* Converts the [[DataStream]] with upsert messages into a [[Table]] with keys. +* +* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second Review comment: The messages in the table will be encoded as `Tuple2`? Or the incoming messages from the source `DataStream` are expected to be encoded as `Tuple2`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700204#comment-16700204 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236606505 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala ## @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { case f ~ _ ~ _ => RowtimeAttribute(f) } + // key + + lazy val key: PackratParser[Expression] = +(aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ KEY ^^ { Review comment: Yes, I meant that this `(aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference)` is duplicated couple of times here. If it is possible to deduplicate it to: ``` lazy val aliasOrFieldReference = aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference ``` I think it would be better to do so. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700208#comment-16700208 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236601205 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") Review comment: I meant that this check doesn't prevent from any bugs or from any other exceptions. It's just to block user from specifying a no-op key definition, right? That's why I'm not sure if we should block it. If we decide to keep this check/exception, I would refrain from naming `fromUpsertStream` in the exception. Maybe rephrasing it to: > Defining key on append stream do not have any effects ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700209#comment-16700209 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236597321 ## File path: flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala ## @@ -54,9 +54,9 @@ object StreamSQLExample { Order(4L, "beer", 1))) // convert DataStream to Table -var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount) Review comment: I would prefer to document this rename in this PR, but if you want it all do together with upsert sources documentation then OK, but let's not forget about it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700207#comment-16700207 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236604746 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala ## @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInforma } } + /** +* Converts the [[DataStream]] with upsert messages into a [[Table]] with keys. +* +* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second +* field holds the record. A true [[Boolean]] flag indicates an update message, a false flag +* indicates a delete message. +* +* The field name and key of the new [[Table]] can be specified like this: +* +* {{{ +* val env = StreamExecutionEnvironment.getExecutionEnvironment +* val tEnv = TableEnvironment.getTableEnvironment(env) +* +* val stream: DataStream[(Boolean, (String, Int))] = ... +* val table = stream.toKeyedTable(tEnv, 'name.key, 'amount) +* }}} +* +* If field names are not explicitly specified, names are automatically extracted from the type +* of the [[DataStream]]. +* If keys are not explicitly specified, an empty key will be used and the table will be a +* single row table. +* +* @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created. +* @param fields The field names of the new [[Table]] (optional). +* @return The resulting [[Table]]. +*/ + def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = { Review comment: Maybe `upsertStreamToTable`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16700206#comment-16700206 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236601884 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") +} + // adjust field indexes and field names val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, proctime) val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, proctime) -val dataStreamTable = new DataStreamTable[T]( +val dataStreamTable = new AppendStreamTable[T]( dataStream, indexesWithIndicatorFields, namesWithIndicatorFields ) registerTableInternal(name, dataStreamTable) } + /** +* Registers an upsert [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s +* catalog. +* +* @param name The name under which the table is registered in the catalog. +* @param dataStream The [[DataStream]] to register as table in the catalog. +* @tparam T the type of the [[DataStream]]. +*/ + protected def registerUpsertStreamInternal[T](name: String, dataStream: DataStream[T]): Unit = { Review comment: Ok, I get it - upsert streams without a primary key are single row tables. But what do you mean by: > The primary key would be necessary. ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1663#comment-1663 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236537434 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -1101,6 +1114,10 @@ abstract class TableEnvironment(val config: TableConfig) { referenceByName(name, p).map((_, name)) case Alias(UnresolvedFieldReference(origName), name: String, _) => referenceByName(origName, p).map((_, name)) + case (Key(UnresolvedFieldReference(name: String))) => Review comment: UnresolvedKeyFieldReference works fine. Thanks for your suggestions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699902#comment-16699902 ] ASF GitHub Bot commented on FLINK-8577: --- sunjincheng121 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236513562 ## File path: flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala ## @@ -54,9 +54,9 @@ object StreamSQLExample { Order(4L, "beer", 1))) // convert DataStream to Table -var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount) Review comment: +1 sync the changes to documentation and also agree the document change can be in an other PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699002#comment-16699002 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236267734 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} + +/** + * Represent an upsert source. + * + * @param keyNames The upsert key names. + */ +class LogicalLastRow( Review comment: Year, I also think `LastRow` is not perfect. However, I can't find a better one. We can't name it `UpsertToRetractionsConverter ` since we don't always convert upsert to retractions in this node. `LastRow` does explain the purpose and the thing that it is doing, i.e. get the last row from the upsert stream by the defined key. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698994#comment-16698994 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236266862 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala ## @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInforma } } + /** +* Converts the [[DataStream]] with upsert messages into a [[Table]] with keys. +* +* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second +* field holds the record. A true [[Boolean]] flag indicates an update message, a false flag +* indicates a delete message. +* +* The field name and key of the new [[Table]] can be specified like this: +* +* {{{ +* val env = StreamExecutionEnvironment.getExecutionEnvironment +* val tEnv = TableEnvironment.getTableEnvironment(env) +* +* val stream: DataStream[(Boolean, (String, Int))] = ... +* val table = stream.toKeyedTable(tEnv, 'name.key, 'amount) +* }}} +* +* If field names are not explicitly specified, names are automatically extracted from the type +* of the [[DataStream]]. +* If keys are not explicitly specified, an empty key will be used and the table will be a +* single row table. +* +* @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created. +* @param fields The field names of the new [[Table]] (optional). +* @return The resulting [[Table]]. +*/ + def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = { Review comment: I think we can't use upsert to describe a Table. How about rename it to `toTableFromUpsert`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698991#comment-16698991 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236266642 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -1089,6 +1089,19 @@ abstract class TableEnvironment(val config: TableConfig) { } else { referenceByName(origName, t).map((_, name)) } + case (Key(UnresolvedFieldReference(name: String)), idx) => +if (isRefByPos) { Review comment: Same as bellow. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699001#comment-16699001 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236267700 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} + +/** + * Represent an upsert source. Review comment: I treat `LastRow` and `Source` as an entirety. However, I think you are right. Treat `LastRow` as a conversion node may be better. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16699000#comment-16699000 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236267632 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/AppendStreamScan.scala ## @@ -36,20 +36,20 @@ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo * It ensures that types without deterministic field order (e.g. POJOs) are not part of * the plan translation. */ -class DataStreamScan( +class AppendStreamScan( Review comment: Yes, sorry about this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698998#comment-16698998 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236267295 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala ## @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { case f ~ _ ~ _ => RowtimeAttribute(f) } + // key + + lazy val key: PackratParser[Expression] = +(aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ KEY ^^ { Review comment: Do you mean deduplicate the code? I think we can add another `Expression`, e.g., `lazy val aliasOrFieldReference = aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference`, and reuse it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698997#comment-16698997 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236267254 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala ## @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { case f ~ _ ~ _ => RowtimeAttribute(f) } + // key Review comment: Ok, I will remove it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698990#comment-16698990 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236266595 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") Review comment: Agree, specifing unique key on an AppendTable makes sense. For example, we only insert data into a table without an update. Since supporting keys on AppendStream is another new feature. I think we can add this check now and remove it later if we support keys on AppendStream. The error message should be corrected as `Apply key on append stream has not been supported yet! You can use fromUpsertStream instead.` What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698995#comment-16698995 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236267147 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +if (upsertStreamTable != null) { + val relTypes = scan.getRowType.getFieldList.map(_.getType) + val timeIndicatorIndexes = relTypes.zipWithIndex +.filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) +.map(_._2) + val input = if (timeIndicatorIndexes.nonEmpty) { +// materialize time indicator +val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs) +materializerUtils.projectAndMaterializeFields(rewrittenScan, timeIndicatorIndexes.toSet) + } else { +scan + } + + LogicalLastRow.create( Review comment: No. As we discussed before, `LastRow` will become a no-op for the case of `upsert source -> filter -> upsert sink`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698992#comment-16698992 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236266694 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -1101,6 +1114,10 @@ abstract class TableEnvironment(val config: TableConfig) { referenceByName(name, p).map((_, name)) case Alias(UnresolvedFieldReference(origName), name: String, _) => referenceByName(origName, p).map((_, name)) + case (Key(UnresolvedFieldReference(name: String))) => Review comment: UnresolvedKeyFieldReference seems a good way. I will try to figure it out. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698833#comment-16698833 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236223294 ## File path: flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala ## @@ -54,9 +54,9 @@ object StreamSQLExample { Order(4L, "beer", 1))) // convert DataStream to Table -var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount) Review comment: I plan to address the problems of document in [FLINK-10957](https://issues.apache.org/jira/browse/FLINK-10957). What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698834#comment-16698834 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236223326 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") +} + // adjust field indexes and field names val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, proctime) val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, proctime) -val dataStreamTable = new DataStreamTable[T]( +val dataStreamTable = new AppendStreamTable[T]( dataStream, indexesWithIndicatorFields, namesWithIndicatorFields ) registerTableInternal(name, dataStreamTable) } + /** +* Registers an upsert [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s +* catalog. +* +* @param name The name under which the table is registered in the catalog. +* @param dataStream The [[DataStream]] to register as table in the catalog. +* @tparam T the type of the [[DataStream]]. +*/ + protected def registerUpsertStreamInternal[T](name: String, dataStream: DataStream[T]): Unit = { Review comment: The primary key would be necessary. It would be a single row table if the primary key has not been defined. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698770#comment-16698770 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236177333 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -1101,6 +1114,10 @@ abstract class TableEnvironment(val config: TableConfig) { referenceByName(name, p).map((_, name)) case Alias(UnresolvedFieldReference(origName), name: String, _) => referenceByName(origName, p).map((_, name)) + case (Key(UnresolvedFieldReference(name: String))) => Review comment: Tbh, at this moment I'm not sure if adding `Key` as nested case class was a good idea. It adds quite a lot of boiler plate and special handling in multiple places, which contradicts a clean code. I wonder if `UnresolvedFieldReference` shouldn't have some kind of "trait" that's a primary key. Where by a "trait" I mean any way to determine if that's a primary key or not. For example either some boolean flag or maybe introducing an `UnresolvedKeyFieldReference` that extends from `UnresolvedFieldReference`? I think both of those solutions would allow avoid this code/handling duplication in most of the places and special handle `Key` only where it is necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698771#comment-16698771 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236196723 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala ## @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { case f ~ _ ~ _ => RowtimeAttribute(f) } + // key Review comment: nit: this comment is not helpful it just duplicates the field name This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698766#comment-16698766 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236197534 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala ## @@ -535,6 +536,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { case f ~ _ ~ _ => RowtimeAttribute(f) } + // key + + lazy val key: PackratParser[Expression] = +(aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference) ~ "." ~ KEY ^^ { Review comment: can we deduplicate `(aliasMapping | "(" ~> aliasMapping <~ ")" | fieldReference)`? I would expect so but I'm not familiar with this scala magic `ExpressionParser`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698763#comment-16698763 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236171634 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") Review comment: I'm not sure if we need/want this check. For example for Temporal Joins, user might want to interpret upsert stream as an `AppendStreamTable`: https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk/edit#heading=h.yqq0cgo927fp also there might be other use cases where specifing primary key on an `AppendTable` makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698768#comment-16698768 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236198600 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} + +/** + * Represent an upsert source. Review comment: ? I think this comment is incorrect. This is not the source on it's own, but only a conversion class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698759#comment-16698759 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236169075 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") +} + // adjust field indexes and field names val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, proctime) val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, proctime) -val dataStreamTable = new DataStreamTable[T]( +val dataStreamTable = new AppendStreamTable[T]( dataStream, indexesWithIndicatorFields, namesWithIndicatorFields ) registerTableInternal(name, dataStreamTable) } + /** +* Registers an upsert [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s +* catalog. +* +* @param name The name under which the table is registered in the catalog. +* @param dataStream The [[DataStream]] to register as table in the catalog. +* @tparam T the type of the [[DataStream]]. +*/ + protected def registerUpsertStreamInternal[T](name: String, dataStream: DataStream[T]): Unit = { + +val streamType: TypeInformation[T] = dataStream.getType match { Review comment: lines L586:L593 seems to be duplicated with L619:L626 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698762#comment-16698762 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236202082 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} + +/** + * Represent an upsert source. + * + * @param keyNames The upsert key names. + */ +class LogicalLastRow( Review comment: I still think this is a bad and misleading name for this operator. `LastRow` doesn't explain neither the purpose, nor the actual thing that it's doing. If someone was asked without prior knowledge what `LastRow` RelNode/operator does, it would be impossible to guess the correct answer. It's even more confusing when you are binding `LastRow` with "upsert" name in the java doc: > Represent an upsert source. > The upsert key names. Please either change the name to something like `UpsertToRetractionsConverter` that explicitly states the purpose of this class (than you can keep `upsert` references in the java docs) or if you would like to keep naming by the thing that it does it would have to meet the following requirements: - `LastRow` is independent of the upsert -> retraction conversion logic - there is a reasonable chance that it will be used somewhere else outside of the upsert -> retraction conversion context It's a bit like we would rename `LogicalJoin` to `HashTable`. Kind of true but misleading. It could make sense if: - `HashTable` is extracted to separate class - `LogicalJoin` relnode/operator/concept still exists, but is just using `HashTable` - `HashTable` is reused somewhere else This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698758#comment-16698758 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236187717 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala ## @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInforma } } + /** +* Converts the [[DataStream]] with upsert messages into a [[Table]] with keys. +* +* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second +* field holds the record. A true [[Boolean]] flag indicates an update message, a false flag +* indicates a delete message. +* +* The field name and key of the new [[Table]] can be specified like this: +* +* {{{ +* val env = StreamExecutionEnvironment.getExecutionEnvironment +* val tEnv = TableEnvironment.getTableEnvironment(env) +* +* val stream: DataStream[(Boolean, (String, Int))] = ... +* val table = stream.toKeyedTable(tEnv, 'name.key, 'amount) +* }}} +* +* If field names are not explicitly specified, names are automatically extracted from the type +* of the [[DataStream]]. +* If keys are not explicitly specified, an empty key will be used and the table will be a +* single row table. +* +* @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created. +* @param fields The field names of the new [[Table]] (optional). +* @return The resulting [[Table]]. +*/ + def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = { Review comment: This is some naming inconsistency: `toKeyedTable` vs `fromUpsertStream`. Rename `keyed` to `upsert`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698765#comment-16698765 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236177806 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -1112,13 +1129,17 @@ abstract class TableEnvironment(val config: TableConfig) { exprs flatMap { case _: TimeAttribute => None - case UnresolvedFieldReference(_) if referenced => + case UnresolvedFieldReference(_) | Key(UnresolvedFieldReference(_)) if referenced => // only accept the first field for an atomic type throw new TableException("Only the first field can reference an atomic type.") case UnresolvedFieldReference(name: String) => referenced = true // first field reference is mapped to atomic type Some((0, name)) + case Key(UnresolvedFieldReference(name: String)) => +referenced = true Review comment: ditto: another code duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698773#comment-16698773 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236174343 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -1089,6 +1089,19 @@ abstract class TableEnvironment(val config: TableConfig) { } else { referenceByName(origName, t).map((_, name)) } + case (Key(UnresolvedFieldReference(name: String)), idx) => +if (isRefByPos) { Review comment: Please deduplicate this if and one if below, with non keyed versions. Either extract those if's to separate functions or convert whole match into a function sth like this: ``` foo(expr, index) { match expr { case Key(keyExpr) => foo(keyExpr, index) case UnresolvedFieldReference(...) => ... case Alias(UnresolvedFieldReference(...)) => ... } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698769#comment-16698769 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236202531 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} + +/** + * Represent an upsert source. + * + * @param keyNames The upsert key names. Review comment: if `keyNames` are "upsert key names" then just name it `upsertKeyNames` and drop the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698772#comment-16698772 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236196242 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +if (upsertStreamTable != null) { + val relTypes = scan.getRowType.getFieldList.map(_.getType) + val timeIndicatorIndexes = relTypes.zipWithIndex +.filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) +.map(_._2) + val input = if (timeIndicatorIndexes.nonEmpty) { +// materialize time indicator +val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs) +materializerUtils.projectAndMaterializeFields(rewrittenScan, timeIndicatorIndexes.toSet) + } else { +scan + } + + LogicalLastRow.create( Review comment: Do we ALWAYS convert upserts to retractions? Even for pipelines `upsert source -> filter -> upsert sink` ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698760#comment-16698760 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236195894 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +val relTypes = scan.getRowType.getFieldList.map(_.getType) +val getTimeIndicatorIndexes = relTypes.zipWithIndex + .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) + .map(_._2) +if (upsertStreamTable != null) { + val input = if (getTimeIndicatorIndexes.size > 0) { +// materialize time indicator +val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs) +materializerUtils.projectAndMaterializeFields(rewrittenScan, getTimeIndicatorIndexes.toSet) + } else { +scan + } + + LogicalLastRow.create( Review comment: But you could probably add this node when changing convention. We assume that `TabelScan` rel node represents all kinds of table scans and it always can produce retractions, while for some stage (`FlinkLogical`?) we freeze it to either retractions/upserts. Regardless of that, `RelTimeIndicatorConverter.scala` is not the right place to create `LogicalLastRow`. You could also add a single pass rule, that is applied only once, that inserts upsert -> retraction conversion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698761#comment-16698761 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236163914 ## File path: flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala ## @@ -54,9 +54,9 @@ object StreamSQLExample { Order(4L, "beer", 1))) // convert DataStream to Table -var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount) Review comment: This will also require changes in the documentation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698767#comment-16698767 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236168761 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") +} + // adjust field indexes and field names val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, proctime) val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, proctime) -val dataStreamTable = new DataStreamTable[T]( +val dataStreamTable = new AppendStreamTable[T]( dataStream, indexesWithIndicatorFields, namesWithIndicatorFields ) registerTableInternal(name, dataStreamTable) } + /** +* Registers an upsert [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s +* catalog. +* +* @param name The name under which the table is registered in the catalog. +* @param dataStream The [[DataStream]] to register as table in the catalog. +* @tparam T the type of the [[DataStream]]. +*/ + protected def registerUpsertStreamInternal[T](name: String, dataStream: DataStream[T]): Unit = { Review comment: aren't we missing here the user defined primary key? What's the value of upsert stream without a defined primary key? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698764#comment-16698764 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r236197832 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/AppendStreamScan.scala ## @@ -36,20 +36,20 @@ import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo * It ensures that types without deterministic field order (e.g. POJOs) are not part of * the plan translation. */ -class DataStreamScan( +class AppendStreamScan( Review comment: Shouldn't this belong to the previous commit? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16697175#comment-16697175 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#issuecomment-441249337 @pnowojski @sunjincheng121 @dianfu Thanks for your review and suggestions. I have split and updated the pr. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696631#comment-16696631 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#issuecomment-441202156 @pnowojski thanks for your review. Make sense to me. I will split it into 4 commits according to your suggestions. Meanwhile, I will address the comments from @dianfu . Thank you all. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696540#comment-16696540 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#issuecomment-441185723 I've started reviewing the code and as we talked offline, could you split this PR into 4 commits: - Add `fromAppendStream` and Deprecate `fromDataStream` - Introduce `LastRow` Rules - Introduce `LastRow` runtime functions - Optimize upsert sources followed by `Calc` ? Having more, smaller, independent commits speeds up reviewing and help in the future whenever someone is digging through git's commit history to understand why/how was something implemented. Especially splitting of refactoring/renaming commits from feature adding/changing or bug fixing commits is important. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696467#comment-16696467 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235835262 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +val relTypes = scan.getRowType.getFieldList.map(_.getType) +val getTimeIndicatorIndexes = relTypes.zipWithIndex + .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) + .map(_._2) +if (upsertStreamTable != null) { + val input = if (getTimeIndicatorIndexes.size > 0) { +// materialize time indicator +val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs) +materializerUtils.projectAndMaterializeFields(rewrittenScan, getTimeIndicatorIndexes.toSet) + } else { +scan + } + + LogicalLastRow.create( Review comment: Year, It would be nice if we can. However, it is not easy to generate LastRow in a Rule. For example, with the following rule: ``` class GenerateLastRowRule extends RelOptRule( operand(classOf[TableScan], none), "GenerateLastRowRule") { ... } ``` The Rule will apply infinitly. Moreover, we can't write the rule with two operands, because some plan may only contain one Scan node. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696452#comment-16696452 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235858319 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamLastRow.scala ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.{CRowKeySelector, LastRowProcessFunction} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +import scala.collection.JavaConversions._ + +/** + * Flink RelNode for ingesting upsert stream from source. + */ +class DataStreamLastRow( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + inputSchema: RowSchema, + schema: RowSchema, + val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, input) + with DataStreamRel{ + + lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex +.filter(e => keyNames.contains(e._1)) +.map(_._2).toArray + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new DataStreamLastRow( + cluster, + traitSet, + inputs.get(0), + inputSchema, + schema, + keyNames) + } + + override def explainTerms(pw: RelWriter): RelWriter ={ +super.explainTerms(pw) + .itemIf("keys", keyNames.mkString(", "), keyNames.size != 0) + .item("select", input.getRowType.getFieldNames.toArray.mkString(", ")) + } + + override def toString: String = { +s"LastRow(${ + if (keyNames.size != 0) { +s"keys:(${keyNames.mkString(", ")}), " + } else { +"" + } +}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))" + } + + override def producesUpdates: Boolean = true + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + queryConfig: StreamQueryConfig): DataStream[CRow] = { + +val inputDS = + getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) +val outRowType = CRowTypeInfo(schema.typeInfo) + +val needRetraction = DataStreamRetractionRules.isAccRetract(this) +val result: DataStream[CRow] = if (needRetraction) { + val processFunction = new LastRowProcessFunction( +new RowTypeInfo(schema.fieldTypeInfos.toArray, schema.fieldNames.toArray), Review comment: You are right. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertF
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696411#comment-16696411 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235852606 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/FromUpsertStreamITCase.scala ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.sql + +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.runtime.stream.sql.FromUpsertStreamITCase.{TestPojo, TimestampWithEqualWatermark} +import org.apache.flink.table.runtime.stream.table.{RowCollector, TestUpsertSink} +import org.junit.Assert._ +import org.junit._ + +class FromUpsertStreamITCase extends StreamingWithStateTestBase { Review comment: UpsertStream sounds like Upsert a stream into a sink. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696399#comment-16696399 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235850176 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/JavaTupleToCRowProcessRunner.scala ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.conversion + +import _root_.java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * ProcessRunner with [[CRow]] output. + */ +class JavaTupleToCRowProcessRunner( Review comment: Make sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696398#comment-16696398 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235849985 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamLastRow.scala ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.{CRowKeySelector, LastRowProcessFunction} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +import scala.collection.JavaConversions._ + +/** + * Flink RelNode for ingesting upsert stream from source. + */ +class DataStreamLastRow( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + inputSchema: RowSchema, + schema: RowSchema, + val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, input) + with DataStreamRel{ + + lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex +.filter(e => keyNames.contains(e._1)) +.map(_._2).toArray + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new DataStreamLastRow( + cluster, + traitSet, + inputs.get(0), + inputSchema, + schema, + keyNames) + } + + override def explainTerms(pw: RelWriter): RelWriter ={ +super.explainTerms(pw) + .itemIf("keys", keyNames.mkString(", "), keyNames.size != 0) + .item("select", input.getRowType.getFieldNames.toArray.mkString(", ")) + } + + override def toString: String = { +s"LastRow(${ + if (keyNames.size != 0) { +s"keys:(${keyNames.mkString(", ")}), " + } else { +"" + } +}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))" + } + + override def producesUpdates: Boolean = true + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + queryConfig: StreamQueryConfig): DataStream[CRow] = { + +val inputDS = + getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) +val outRowType = CRowTypeInfo(schema.typeInfo) + +val needRetraction = DataStreamRetractionRules.isAccRetract(this) +val result: DataStream[CRow] = if (needRetraction) { + val processFunction = new LastRowProcessFunction( +new RowTypeInfo(schema.fieldTypeInfos.toArray, schema.fieldNames.toArray), Review comment: schema.typeInfo can be cast to RowTypeInfo as it's always RowTypeInfo. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input =
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696382#comment-16696382 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235847810 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/JavaTupleToCRowProcessRunner.scala ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.conversion + +import _root_.java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * ProcessRunner with [[CRow]] output. + */ +class JavaTupleToCRowProcessRunner( Review comment: Doing checks will influence the performance. I think we can generate code according to the type to solve the problem. However, we can improve this in another pr. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696312#comment-16696312 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235835262 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +val relTypes = scan.getRowType.getFieldList.map(_.getType) +val getTimeIndicatorIndexes = relTypes.zipWithIndex + .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) + .map(_._2) +if (upsertStreamTable != null) { + val input = if (getTimeIndicatorIndexes.size > 0) { +// materialize time indicator +val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs) +materializerUtils.projectAndMaterializeFields(rewrittenScan, getTimeIndicatorIndexes.toSet) + } else { +scan + } + + LogicalLastRow.create( Review comment: Year, It would be nice if we can. A rule can only transform from a node to a new node with a same rowtype. However, the rowtype of LastRow is different from the scan, i.e, the tuple type has been flattened. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696315#comment-16696315 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235835509 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamLastRow.scala ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.{CRowKeySelector, LastRowProcessFunction} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +import scala.collection.JavaConversions._ + +/** + * Flink RelNode for ingesting upsert stream from source. + */ +class DataStreamLastRow( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + inputSchema: RowSchema, + schema: RowSchema, + val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, input) + with DataStreamRel{ + + lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex +.filter(e => keyNames.contains(e._1)) +.map(_._2).toArray + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new DataStreamLastRow( + cluster, + traitSet, + inputs.get(0), + inputSchema, + schema, + keyNames) + } + + override def explainTerms(pw: RelWriter): RelWriter ={ +super.explainTerms(pw) + .itemIf("keys", keyNames.mkString(", "), keyNames.size != 0) + .item("select", input.getRowType.getFieldNames.toArray.mkString(", ")) + } + + override def toString: String = { +s"LastRow(${ + if (keyNames.size != 0) { +s"keys:(${keyNames.mkString(", ")}), " + } else { +"" + } +}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))" + } + + override def producesUpdates: Boolean = true + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + queryConfig: StreamQueryConfig): DataStream[CRow] = { + +val inputDS = + getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) +val outRowType = CRowTypeInfo(schema.typeInfo) + +val needRetraction = DataStreamRetractionRules.isAccRetract(this) +val result: DataStream[CRow] = if (needRetraction) { + val processFunction = new LastRowProcessFunction( +new RowTypeInfo(schema.fieldTypeInfos.toArray, schema.fieldNames.toArray), Review comment: `LastRowProcessFunction` requires a type of RowTypeInfo while `schema.typeInfo` returns a type of TypeInformation[Row] This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {c
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696311#comment-16696311 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235835262 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +val relTypes = scan.getRowType.getFieldList.map(_.getType) +val getTimeIndicatorIndexes = relTypes.zipWithIndex + .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) + .map(_._2) +if (upsertStreamTable != null) { + val input = if (getTimeIndicatorIndexes.size > 0) { +// materialize time indicator +val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs) +materializerUtils.projectAndMaterializeFields(rewrittenScan, getTimeIndicatorIndexes.toSet) + } else { +scan + } + + LogicalLastRow.create( Review comment: Year, It would be nice if we can. A rule can only transform from a node to a new node with a same rowtype. However, the rowtype of LastRow is different from the scan, i.e, the tuple type has been flattened. I suggest to This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694621#comment-16694621 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235316296 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalLastRow.scala ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.logical.rel + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} + +/** + * Represent an Upsert source. Review comment: Upsert -> upsert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694638#comment-16694638 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235315365 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +val relTypes = scan.getRowType.getFieldList.map(_.getType) +val getTimeIndicatorIndexes = relTypes.zipWithIndex + .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) + .map(_._2) +if (upsertStreamTable != null) { + val input = if (getTimeIndicatorIndexes.size > 0) { +// materialize time indicator +val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs) +materializerUtils.projectAndMaterializeFields(rewrittenScan, getTimeIndicatorIndexes.toSet) + } else { +scan + } + + LogicalLastRow.create( Review comment: It's a little tricky to generate the LogicalLastRow when visiting TableScan node. Is it possible to create a rule to do that? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694624#comment-16694624 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235317112 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamLastRow.scala ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.{CRowKeySelector, LastRowProcessFunction} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +import scala.collection.JavaConversions._ + +/** + * Flink RelNode for ingesting upsert stream from source. + */ +class DataStreamLastRow( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + inputSchema: RowSchema, + schema: RowSchema, + val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, input) + with DataStreamRel{ + + lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex +.filter(e => keyNames.contains(e._1)) +.map(_._2).toArray + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new DataStreamLastRow( + cluster, + traitSet, + inputs.get(0), + inputSchema, + schema, + keyNames) + } + + override def explainTerms(pw: RelWriter): RelWriter ={ +super.explainTerms(pw) + .itemIf("keys", keyNames.mkString(", "), keyNames.size != 0) Review comment: keyNames.nonEmpty This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694616#comment-16694616 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235313039 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +val relTypes = scan.getRowType.getFieldList.map(_.getType) +val getTimeIndicatorIndexes = relTypes.zipWithIndex Review comment: rename getTimeIndicatorIndexes to timeIndicatorIndexes This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694622#comment-16694622 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235321744 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/LastRowProcessFunction.scala ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Function used to handle upsert inputs. Output a retract message if there is a new update. + * + * @param rowTypeInfo the output row type info. + * @param queryConfig the configuration for the query. + */ +class LastRowProcessFunction( +private val rowTypeInfo: RowTypeInfo, +private val queryConfig: StreamQueryConfig) + extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig) +with Logging { + + private var prevRow: CRow = _ + // stores the accumulators + private var state: ValueState[Row] = _ + + override def open(config: Configuration) { + +prevRow = new CRow(new Row(rowTypeInfo.getArity), false) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("LastRowState", rowTypeInfo) +state = getRuntimeContext.getState(stateDescriptor) + +initCleanupTimeState("LastRowCleanupTime") +LOG.info("Init LastRowProcessFunction.") + } + + override def processElement( + inputC: CRow, + ctx: ProcessFunction[CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + +val currentTime = ctx.timerService().currentProcessingTime() +// register state-cleanup timer +registerProcessingCleanupTimer(ctx, currentTime) + +val pre = state.value() +val current = inputC.row + +if (inputC.change) { + // ignore same record + if (!stateCleaningEnabled && pre != null && pre.equals(current)) { +return + } + state.update(current) + // retract prevRow + if (pre != null) { +prevRow.row = pre +out.collect(prevRow) + } + // output currentRow + out.collect(inputC) +} else { + state.clear() + if (pre != null) { +prevRow.row = pre +out.collect(prevRow) + } else { +// else input is a delete row we ingnore it, since delete on nothing means nothing. Review comment: ingnore -> ignore This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694625#comment-16694625 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235316850 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamLastRow.scala ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.{CRowKeySelector, LastRowProcessFunction} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +import scala.collection.JavaConversions._ + +/** + * Flink RelNode for ingesting upsert stream from source. + */ +class DataStreamLastRow( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + inputSchema: RowSchema, + schema: RowSchema, + val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, input) + with DataStreamRel{ + + lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex +.filter(e => keyNames.contains(e._1)) +.map(_._2).toArray + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new DataStreamLastRow( + cluster, + traitSet, + inputs.get(0), + inputSchema, + schema, + keyNames) + } + + override def explainTerms(pw: RelWriter): RelWriter ={ +super.explainTerms(pw) + .itemIf("keys", keyNames.mkString(", "), keyNames.size != 0) + .item("select", input.getRowType.getFieldNames.toArray.mkString(", ")) + } + + override def toString: String = { +s"LastRow(${ + if (keyNames.size != 0) { Review comment: keyNames.nonEmpty This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694619#comment-16694619 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235312380 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +val relTypes = scan.getRowType.getFieldList.map(_.getType) +val getTimeIndicatorIndexes = relTypes.zipWithIndex Review comment: move relTypes and getTimeIndicatorIndexes under the if statement or make them lazy This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694631#comment-16694631 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235361617 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/FromUpsertStreamITCase.scala ## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.sql + +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.runtime.stream.sql.FromUpsertStreamITCase.{TestPojo, TimestampWithEqualWatermark} +import org.apache.flink.table.runtime.stream.table.{RowCollector, TestUpsertSink} +import org.junit.Assert._ +import org.junit._ + +class FromUpsertStreamITCase extends StreamingWithStateTestBase { Review comment: Rename to UpsertStreamITCase? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694617#comment-16694617 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235315365 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +val relTypes = scan.getRowType.getFieldList.map(_.getType) +val getTimeIndicatorIndexes = relTypes.zipWithIndex + .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) + .map(_._2) +if (upsertStreamTable != null) { + val input = if (getTimeIndicatorIndexes.size > 0) { +// materialize time indicator +val rewrittenScan = scan.copy(scan.getTraitSet, scan.getInputs) +materializerUtils.projectAndMaterializeFields(rewrittenScan, getTimeIndicatorIndexes.toSet) + } else { +scan + } + + LogicalLastRow.create( Review comment: It's a little tricky to generate the LogicalLastRow when visiting TableScan node. If it's possible to create a rule to do that? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694627#comment-16694627 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235361226 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/ScalaTupleToCRowProcessRunner.scala ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.conversion + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * ProcessRunner with [[CRow]] output. + */ +class ScalaTupleToCRowProcessRunner( +name: String, +code: String, +@transient var returnType: TypeInformation[CRow]) + extends ProcessFunction[(Boolean, Any), CRow] + with ResultTypeQueryable[CRow] + with Compiler[ProcessFunction[Any, Row]] + with Logging { + + private var function: ProcessFunction[Any, Row] = _ + private var cRowWrapper: CRowWrappingCollector = _ + + override def open(parameters: Configuration): Unit = { +LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code") +val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) +LOG.debug("Instantiating ProcessFunction.") +function = clazz.newInstance() Review comment: Add the open/close logic such as FunctionUtils.setFunctionRuntimeContext/FunctionUtils.openFunction/FunctionUtils.closeFunction This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694615#comment-16694615 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235313379 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ## @@ -165,7 +165,30 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(exchange: LogicalExchange): RelNode = throw new TableException("Logical exchange in a stream environment is not supported yet.") - override def visit(scan: TableScan): RelNode = scan + override def visit(scan: TableScan): RelNode = { +val upsertStreamTable = scan.getTable.unwrap(classOf[UpsertStreamTable[_]]) +val relTypes = scan.getRowType.getFieldList.map(_.getType) +val getTimeIndicatorIndexes = relTypes.zipWithIndex + .filter(e => FlinkTypeFactory.isTimeIndicatorType(e._1)) + .map(_._2) +if (upsertStreamTable != null) { + val input = if (getTimeIndicatorIndexes.size > 0) { Review comment: getTimeIndicatorIndexes.nonEmpty This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694629#comment-16694629 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235346023 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ## @@ -19,22 +19,138 @@ package org.apache.flink.table.plan.nodes.datastream import org.apache.calcite.rex.RexNode -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.api.{TableConfig, Types} import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row -import org.apache.flink.table.runtime.CRowOutputProcessRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.table.runtime.conversion.{ExternalTypeToCRowProcessRunner, JavaTupleToCRowProcessRunner, ScalaTupleToCRowProcessRunner} trait StreamScan extends CommonScan[CRow] with DataStreamRel { + protected def convertUpsertToInternalRow( + schema: RowSchema, + input: DataStream[Any], + fieldIdxs: Array[Int], + config: TableConfig, + rowtimeExpression: Option[RexNode]): DataStream[CRow] = { + +val internalType = schema.typeInfo +val cRowType = CRowTypeInfo(internalType) + +val hasTimeIndicator = fieldIdxs.exists(f => + f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER || +f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER) + +val dsType = input.getType + +dsType match { +// Scala tuple + case t: CaseClassTypeInfo[_] +if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == Types.BOOLEAN => + +val inputType = t.getTypeAt[Any](1) +if (inputType == internalType && !hasTimeIndicator) { + // input is already of correct type. Only need to wrap it as CRow + input.asInstanceOf[DataStream[(Boolean, Row)]] +.map(new RichMapFunction[(Boolean, Row), CRow] { + @transient private var outCRow: CRow = null Review comment: null -> _ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694628#comment-16694628 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235351015 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -555,18 +555,95 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } +// Can not apply key on append stream +if (extractUniqueKeys(fields).nonEmpty) { + throw new TableException( +s"Can not apply key on append stream, use fromUpsertStream instead.") +} + // adjust field indexes and field names val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, proctime) val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, proctime) -val dataStreamTable = new DataStreamTable[T]( +val dataStreamTable = new AppendStreamTable[T]( dataStream, indexesWithIndicatorFields, namesWithIndicatorFields ) registerTableInternal(name, dataStreamTable) } + /** +* Registers an upsert [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s +* catalog. +* +* @param name The name under which the table is registered in the catalog. +* @param dataStream The [[DataStream]] to register as table in the catalog. +* @tparam T the type of the [[DataStream]]. +*/ + protected def registerUpsertStreamInternal[T](name: String, dataStream: DataStream[T]): Unit = { + +val streamType: TypeInformation[T] = + dataStream.getType.asInstanceOf[TupleTypeInfo[JTuple2[JBool, T]]].getTypeAt(1) Review comment: Need also consider CaseClassTypeInfo This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694626#comment-16694626 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235360525 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/JavaTupleToCRowProcessRunner.scala ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.conversion + +import _root_.java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * ProcessRunner with [[CRow]] output. + */ +class JavaTupleToCRowProcessRunner( Review comment: Seems that we can reuse the ExternalTypeToCRowProcessRunner/CRowOutputProcessRunner. Just need to do some checks whether the input is java Tuple, scala tuple or other types. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694630#comment-16694630 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235362084 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/FromUpsertStreamTest.scala ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.Types +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestUtil.{UpsertTableNode, term, unaryNode} +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.types.Row +import org.junit.Test + +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import java.lang.{Boolean => JBool} + +class FromUpsertStreamTest extends TableTestBase { Review comment: Rename to UpsertStreamTest? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694623#comment-16694623 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235320988 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/LastRowProcessFunction.scala ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * Function used to handle upsert inputs. Output a retract message if there is a new update. + * + * @param rowTypeInfo the output row type info. + * @param queryConfig the configuration for the query. + */ +class LastRowProcessFunction( +private val rowTypeInfo: RowTypeInfo, +private val queryConfig: StreamQueryConfig) + extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig) +with Logging { + + private var prevRow: CRow = _ + // stores the accumulators + private var state: ValueState[Row] = _ Review comment: state and prevRow can be transient. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694618#comment-16694618 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235361188 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/JavaTupleToCRowProcessRunner.scala ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.conversion + +import _root_.java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * ProcessRunner with [[CRow]] output. + */ +class JavaTupleToCRowProcessRunner( +name: String, +code: String, +@transient var returnType: TypeInformation[CRow]) + extends ProcessFunction[JTuple2[JBool, Any], CRow] + with ResultTypeQueryable[CRow] + with Compiler[ProcessFunction[Any, Row]] + with Logging { + + private var function: ProcessFunction[Any, Row] = _ + private var cRowWrapper: CRowWrappingCollector = _ + + override def open(parameters: Configuration): Unit = { +LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code") +val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) +LOG.debug("Instantiating ProcessFunction.") +function = clazz.newInstance() Review comment: Add the open/close logic such as FunctionUtils.setFunctionRuntimeContext/FunctionUtils.openFunction/FunctionUtils.closeFunction This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694620#comment-16694620 ] ASF GitHub Bot commented on FLINK-8577: --- dianfu commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r235318373 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamLastRow.scala ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import java.util + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.java.functions.NullByteKeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} +import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.{CRowKeySelector, LastRowProcessFunction} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} + +import scala.collection.JavaConversions._ + +/** + * Flink RelNode for ingesting upsert stream from source. + */ +class DataStreamLastRow( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + inputSchema: RowSchema, + schema: RowSchema, + val keyNames: Seq[String]) + extends SingleRel(cluster, traitSet, input) + with DataStreamRel{ + + lazy val keyIndexes = getRowType.getFieldNames.zipWithIndex +.filter(e => keyNames.contains(e._1)) +.map(_._2).toArray + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { +new DataStreamLastRow( + cluster, + traitSet, + inputs.get(0), + inputSchema, + schema, + keyNames) + } + + override def explainTerms(pw: RelWriter): RelWriter ={ +super.explainTerms(pw) + .itemIf("keys", keyNames.mkString(", "), keyNames.size != 0) + .item("select", input.getRowType.getFieldNames.toArray.mkString(", ")) + } + + override def toString: String = { +s"LastRow(${ + if (keyNames.size != 0) { +s"keys:(${keyNames.mkString(", ")}), " + } else { +"" + } +}select:(${input.getRowType.getFieldNames.toArray.mkString(", ")}))" + } + + override def producesUpdates: Boolean = true + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + queryConfig: StreamQueryConfig): DataStream[CRow] = { + +val inputDS = + getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) +val outRowType = CRowTypeInfo(schema.typeInfo) + +val needRetraction = DataStreamRetractionRules.isAccRetract(this) +val result: DataStream[CRow] = if (needRetraction) { + val processFunction = new LastRowProcessFunction( +new RowTypeInfo(schema.fieldTypeInfos.toArray, schema.fieldNames.toArray), Review comment: why not use schema.typeInfo directly? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable >
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694472#comment-16694472 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#issuecomment-440596555 @sunjincheng121 thanks for your attention. I have rebased to the master. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16694319#comment-16694319 ] ASF GitHub Bot commented on FLINK-8577: --- sunjincheng121 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#issuecomment-440561427 @hequn8128 Thanks for the PR. I am glad to review the changes after you rebase the code! Best, Jincheng This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688840#comment-16688840 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#issuecomment-439240243 Thanks, @pnowojski ! Is there anything I can help for the release? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687964#comment-16687964 ] ASF GitHub Bot commented on FLINK-8577: --- pnowojski commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#issuecomment-439025683 Sorry @hequn8128 that you have to wait that long and I remember about this PR, but I was busy with 1.7 release. I hopefully will be able to get back to this one in the next couple of days. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683663#comment-16683663 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#issuecomment-437853763 @pnowojski Hi, would be great if you can take a look when you are free. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16654736#comment-16654736 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#issuecomment-430900222 Update pr with the following changes: 1. materialize time indicators 2. refactor rules This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651377#comment-16651377 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#issuecomment-430163860 Discussed with Fabian in [FLINK-8578](https://issues.apache.org/jira/browse/FLINK-8578), we have to materialize the proc-time field and convert row-time attributes into regular TIMESTAMP attributes. I will update the PR ASAP. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635730#comment-16635730 ] ASF GitHub Bot commented on FLINK-8577: --- hequn8128 opened a new pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787 ## What is the purpose of the change Currently, only append stream can be ingested as a stream table. This pull request implement proctime DataStream to Table upsert conversion. Api looks like: ``` DataStream[(Boolean, (String, Long, Int))] input = ??? // upsert with keyedTable table = tEnv.fromUpsertStream(input, 'a, 'b, 'c.key) // upsert without key -> single row tableTable table = tEnv.fromUpsertFromStream(input, 'a, 'b, 'c) ``` A simple design doc can be fond [here](https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing). ## Brief change log - Add `fromUpsertStream()` and `fromAppendStream()` in java and scala `StreamTableEnvironment` and deprecate `fromDataStream`. - Add key support in the source definition including parse `key` keyword. - Add `DataStreamLastRowRule` and `DataStreamLastRowAfterCalcRule`. Both rules generate `DataStreamLastRow` to handle upsert stream. The differences between the two rule is `DataStreamLastRowAfterCalcRule` will take calc into consideration and generate LastRow DataStreamRel node after calc. This can decrease state size in LastRow. - Add `LastRowProcessFunction` to handle upsert messages and generate retractions if there is an update. ## Verifying this change This change added tests and can be verified as follows: - Add java api test in `JavaSqlITCase` - Add IT test cases in `FromUpsertStreamITCase` - Add sql plan tests in `FromUpsertStreamTest` - Add key extract in `UpdatingPlanCheckerTest` - Add validation test in `StreamTableEnvironmentValidationTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not documented, documents will be added in the later pr) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} > A simple design > [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] > about this subtask. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16579945#comment-16579945 ] Hequn Cheng commented on FLINK-8577: A simple design [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing] about this subtask. Any suggestions are welcomed! > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359247#comment-16359247 ] Hequn Cheng commented on FLINK-8577: Hi, [~fhueske] [~twalthr] Agree, I also prefer option 1. It's the main reason i closed [FLINK-8579|https://issues.apache.org/jira/browse/FLINK-8579]. +1 for the method name. > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357003#comment-16357003 ] Timo Walther commented on FLINK-8577: - Maybe we should also rename the methods to find them easier: {{tEnv.fromUpsertStream}}, {{tEnv.fromAppendStream}}, {{tEnv.fromDataSet()}}. > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357000#comment-16357000 ] Timo Walther commented on FLINK-8577: - Yes, I also prefer option 1. Always require a flag. This also keeps the number of possible methods small and thus the API concise. {code} // always require flags table = tEnv.upsertFromStream(flaggedInput, 'a, 'b, 'c.key) // always require flags, so add them manually table = tEnv.upsertFromStream(input.map(new InsertFlagger()), 'a, 'b, 'c.key) {code} > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356714#comment-16356714 ] Fabian Hueske commented on FLINK-8577: -- I was thinking about how to handle deletion flags. I see three options: # always require deletion flags. DataStreams that are append only, would need to add a map to add the flag. # have special methods for upsert with and without deletion flags # have a mandatory parameter that determines whether the input has flags or not. So the choices are: DataStream[(String, Long, Int)] input = ??? DataStream[(Boolean, (String, Long, Int))] flaggedInput = ??? // WITH DELETE FLAGS // always require flags table = tEnv.upsertFromStream(flaggedInput, 'a, 'b, 'c.key) // special method table = tEnv.upsertFromStreamWithDeletes(flaggedInput, 'a, 'b, 'c.key) // mandatory parameter. table = tEnv.upsertFromStream(flaggedInput, deletes = true, 'a, 'b, 'c.key) // WITHOUT DELETE FLAGS // always require flags, so add them manually table = tEnv.upsertFromStream(input.map(new InsertFlagger()), 'a, 'b, 'c.key) // special method table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) // mandatory parameter table = tEnv.upsertFromStream(input, deletes = false, 'a, 'b, 'c.key) I think I'm preferring option 1. because it keeps the API slim and is consistent with the format of the UpsertTableSink. What do you think [~hequn8128], [~twalthr]? > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.
[ https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356684#comment-16356684 ] Fabian Hueske commented on FLINK-8577: -- [~twalthr], what do you think about the API? > Implement proctime DataStream to Table upsert conversion. > - > > Key: FLINK-8577 > URL: https://issues.apache.org/jira/browse/FLINK-8577 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > Api will looks like: > {code:java} > DataStream[(String, Long, Int)] input = ??? > // upsert with keyTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key) > // upsert without key -> single row tableTable > table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)