pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r254983164
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##########
 @@ -555,18 +555,102 @@ abstract class StreamTableEnvironment(
           s"But is: ${execEnv.getStreamTimeCharacteristic}")
     }
 
+    // Can not apply key on append stream
+    if (extractUniqueKeys(fields).nonEmpty) {
+      throw new TableException(
+        s"Defining key on append stream has not been supported yet, use 
fromUpsertStream instead.")
+    }
+
     // adjust field indexes and field names
     val indexesWithIndicatorFields = adjustFieldIndexes(fieldIndexes, rowtime, 
proctime)
     val namesWithIndicatorFields = adjustFieldNames(fieldNames, rowtime, 
proctime)
 
-    val dataStreamTable = new DataStreamTable[T](
+    val dataStreamTable = new AppendStreamTable[T](
       dataStream,
       indexesWithIndicatorFields,
       namesWithIndicatorFields
     )
     registerTableInternal(name, dataStreamTable)
   }
 
+  private def getTypeFromUpsertStream[T](dataStream: DataStream[T]): 
TypeInformation[T] = {
+    dataStream.getType match {
+      case c: CaseClassTypeInfo[_]
+        if (c.getTypeClass.equals(classOf[Tuple2[_, _]])) => c.getTypeAt(1)
+      case t: TupleTypeInfo[_]
+        if (t.getTypeClass.equals(classOf[JTuple2[_, _]])) => t.getTypeAt(1)
+      case _ =>
+        throw new TableException("You can only upsert from a datastream with 
type of Tuple2!")
+    }
+  }
+
+  /**
+    * Registers an upsert [[DataStream]] as a table under a given name in the 
[[TableEnvironment]]'s
+    * catalog.
+    *
+    * @param name The name under which the table is registered in the catalog.
+    * @param dataStream The [[DataStream]] to register as table in the catalog.
+    * @tparam T the type of the [[DataStream]].
+    */
+  protected def registerUpsertStreamInternal[T](name: String, dataStream: 
DataStream[T]): Unit = {
+
+    val streamType: TypeInformation[T] = getTypeFromUpsertStream(dataStream)
+
+    val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType)
+    val dataStreamTable = new UpsertStreamTable[T](
+      dataStream,
+      fieldIndexes,
+      fieldNames
+    )
+    registerTableInternal(name, dataStreamTable)
+  }
+
+  /**
+    * Registers an upsert [[DataStream]] as a table under a given name with 
field names as specified
+    * by field expressions in the [[TableEnvironment]]'s catalog.
+    *
+    * @param name The name under which the table is registered in the catalog.
+    * @param dataStream The [[DataStream]] to register as table in the catalog.
+    * @param fields The field expressions to define the field names of the 
table.
+    * @tparam T The type of the [[DataStream]].
+    */
+  protected def registerUpsertStreamInternal[T](
+      name: String,
+      dataStream: DataStream[T],
+      fields: Array[Expression])
+  : Unit = {
+
+    val streamType: TypeInformation[T] = getTypeFromUpsertStream(dataStream)
+
+    // get field names and types for all non-replaced fields
+    val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType, fields)
+
+    // validate and extract time attributes
+    val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, 
fields)
 
 Review comment:
   I think rowtime columns shouldn't be allowed for upsert streams at the 
moment.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to