[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698767#comment-16698767
 ] 

ASF GitHub Bot commented on FLINK-8577:
---------------------------------------

pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r236168761
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##########
 @@ -555,18 +555,105 @@ abstract class StreamTableEnvironment(
           s"But is: ${execEnv.getStreamTimeCharacteristic}")
     }
 
+    // Can not apply key on append stream
+    if (extractUniqueKeys(fields).nonEmpty) {
+      throw new TableException(
+        s"Can not apply key on append stream, use fromUpsertStream instead.")
+    }
+
     // adjust field indexes and field names
     val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, 
proctime)
     val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, 
proctime)
 
-    val dataStreamTable = new DataStreamTable[T](
+    val dataStreamTable = new AppendStreamTable[T](
       dataStream,
       indexesWithIndicatorFields,
       namesWithIndicatorFields
     )
     registerTableInternal(name, dataStreamTable)
   }
 
+  /**
+    * Registers an upsert [[DataStream]] as a table under a given name in the 
[[TableEnvironment]]'s
+    * catalog.
+    *
+    * @param name The name under which the table is registered in the catalog.
+    * @param dataStream The [[DataStream]] to register as table in the catalog.
+    * @tparam T the type of the [[DataStream]].
+    */
+  protected def registerUpsertStreamInternal[T](name: String, dataStream: 
DataStream[T]): Unit = {
 
 Review comment:
   aren't we missing here the user defined primary key? What's the value of 
upsert stream without a defined primary key?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> ---------------------------------------------------------
>
>                 Key: FLINK-8577
>                 URL: https://issues.apache.org/jira/browse/FLINK-8577
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>            Priority: Major
>              Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to