xuyangzhong commented on code in PR #25889:
URL: https://github.com/apache/flink/pull/25889#discussion_r1904983840


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala:
##########
@@ -110,26 +111,25 @@ object TestTableSourceSinks {
 
   def createCsvTemporarySinkTable(
       tEnv: TableEnvironment,
-      schema: TableSchema,
+      tableSchema: TableSchema,
       tableName: String,
       numFiles: Int = 1): String = {
-    val tempFile = File.createTempFile("csv-test", null)
-    tempFile.deleteOnExit()
-    val path = tempFile.getAbsolutePath
-
-    val sinkOptions = collection.mutable.Map(
-      "connector.type" -> "filesystem",
-      "connector.path" -> path,
-      "format.type" -> "csv",
-      "format.write-mode" -> "OVERWRITE",
-      "format.num-files" -> numFiles.toString
-    )
-    sinkOptions.putAll(new Schema().schema(schema).toProperties)
-
-    val sink = new 
CsvBatchTableSinkFactory().createStreamTableSink(sinkOptions);
-    
tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(tableName,
 sink)
-
-    path
+    val tempDir = Files.createTempDirectory("csv-test").toFile
+    tempDir.deleteOnExit()
+    val tempDirPath = tempDir.getAbsolutePath
+
+    Files.createTempDirectory("csv-test")
+    val schema = tableSchema.toSchema
+    val tableDescriptor = TableDescriptor

Review Comment:
   The old CSV sink writes to a specified file, so it needs to distinguish 
between `append` and `override` write modes. In contrast, the new filesystem 
connector writes to a directory, so it does not need to be aware of `append` or 
`override` write modes.



##########
flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java:
##########
@@ -107,23 +113,44 @@ public static void main(String[] args) throws Exception {
 
             // register sink table
             String sinkTableName = QUERY_PREFIX + queryId + "_sinkTable";
-            ((TableEnvironmentInternal) tableEnvironment)
-                    .registerTableSinkInternal(
-                            sinkTableName,
-                            new CsvTableSink(
-                                    sinkTablePath + FILE_SEPARATOR + queryId + 
RESULT_SUFFIX,
-                                    COL_DELIMITER,
-                                    1,
-                                    FileSystem.WriteMode.OVERWRITE,
-                                    resultTable.getSchema().getFieldNames(),
-                                    
resultTable.getSchema().getFieldDataTypes()));
+            tableEnvironment.createTable(
+                    sinkTableName,
+                    getTableDescriptor(
+                            sinkTablePath + FILE_SEPARATOR + queryId + 
RESULT_SUFFIX,
+                            resultTable.getSchema().getFieldNames(),
+                            resultTable.getSchema().getFieldDataTypes()));
             TableResult tableResult = resultTable.executeInsert(sinkTableName);
             // wait job finish
             tableResult.getJobClient().get().getJobExecutionResult().get();
             System.out.println("[INFO]Run TPC-DS query " + queryId + " 
success.");
         }
     }
 
+    private static TableDescriptor getTableDescriptor(
+            String path, String[] fieldNames, DataType[] fieldDataTypes) {
+        final Schema.Builder schemaBuilder = Schema.newBuilder();
+        for (int i = 0; i < fieldNames.length; i++) {
+            schemaBuilder.column(fieldNames[i], fieldDataTypes[i]);
+        }
+        final Schema schema = schemaBuilder.build();
+
+        final FormatDescriptor formatDescriptor =
+                FormatDescriptor.forFormat("csv")
+                        .option(CsvFormatOptions.FIELD_DELIMITER, 
COL_DELIMITER)
+                        .build();
+
+        return TableDescriptor.forConnector(FileSystemTableFactory.IDENTIFIER)
+                .schema(schema)
+                .option(FileSystemConnectorOptions.PATH, path)

Review Comment:
   The old CSV sink writes to a specified file, so it needs to distinguish 
between `append` and `override` write modes. In contrast, the new filesystem 
connector writes to a directory, so it does not need to be aware of `append` or 
`override` write modes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to