JingsongLi commented on a change in pull request #9909: [FLINK-14381][table] 
Partition field names should be got from CatalogTable instead of source/sink
URL: https://github.com/apache/flink/pull/9909#discussion_r338367203
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala
 ##########
 @@ -311,3 +283,93 @@ object PartitionableSinkITCase {
     row(4, 4L, "你好,陌生人,我是中国人,你来自哪里?")
   )
 }
+
+private class TestSink(
+    rowType: RowTypeInfo,
+    supportsGrouping: Boolean,
+    partitionColumns: Array[String])
+    extends StreamTableSink[Row]
+        with PartitionableTableSink {
+  private var staticPartitions: JMap[String, String] = _
+
+  override def getPartitionFieldNames: JList[String] = partitionColumns.toList
+
+  override def setStaticPartition(partitions: JMap[String, String]): Unit =
+    this.staticPartitions = partitions
+
+  override def configure(fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
+
+  override def configurePartitionGrouping(s: Boolean): Boolean = {
+    supportsGrouping
+  }
+
+  override def getTableSchema: TableSchema = {
+    new TableSchema(Array("a", "b", "c"), type3.getFieldTypes)
+  }
+
+  override def getOutputType: RowTypeInfo = type3
+
+  override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+    dataStream.addSink(new UnsafeMemorySinkFunction(type3))
+        .setParallelism(dataStream.getParallelism)
+  }
+
+  override def consumeDataStream(dataStream: DataStream[Row]): 
DataStreamSink[_] = {
+    dataStream.addSink(new UnsafeMemorySinkFunction(type3))
+        .setParallelism(dataStream.getParallelism)
+  }
+
+  def getStaticPartitions: JMap[String, String] = {
+    staticPartitions
+  }
+}
+
+class TestPartitionableSinkFactory extends TableSinkFactory[Row] with 
TableSourceFactory[Row] {
+
+  override def requiredContext(): util.Map[String, String] = {
+    val context = new util.HashMap[String, String]()
+    context.put(CONNECTOR_TYPE, "TestPartitionableSink")
+    context
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    val supported = new util.ArrayList[String]()
+    supported.add("*")
+    supported
+  }
+
+  override def createTableSink(properties: util.Map[String, String]): 
TableSink[Row] = {
+    val dp = new DescriptorProperties()
+    dp.putProperties(properties)
+
+    val schema = dp.getTableSchema(SCHEMA)
+    val supportsGrouping = dp.getBoolean("supports-grouping")
+    val partitionColumns = dp.getArray("partition-column", new 
function.Function[String, String] {
+      override def apply(t: String): String = dp.getString(t)
+    })
+    new TestSink(
+      schema.toRowType.asInstanceOf[RowTypeInfo],
+      supportsGrouping,
+      partitionColumns.asScala.toArray[String])
+  }
+
+  /**
+    * Remove it after FLINK-14387.
+    */
+  override def createTableSource(properties: JMap[String, String]): 
TableSource[Row] = {
 
 Review comment:
   Yes, only used for `getTableSchema`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to