luoyuxia commented on code in PR #22831:
URL: https://github.com/apache/flink/pull/22831#discussion_r1246196096


##########
docs/content/docs/connectors/table/filesystem.md:
##########
@@ -470,6 +470,14 @@ The partition commit policy defines what action is taken 
when partitions are com
         <td>String</td>
         <td>The partition commit policy class for implement 
PartitionCommitPolicy interface. Only work in custom commit policy.</td>
     </tr>
+    <tr>
+        <td><h5>sink.partition-commit.policy.class.parameters</h5></td>
+        <td>optional</td>
+        <td>yes</td>
+        <td style="word-wrap: break-word;">(none)</td>
+        <td>String</td>
+        <td>The parameters passed to the constructor of the custom commit 
policy, with multiple parameters separated by semicolons, such as 
'param1;param2'. For example, 'param1;param2'. The configuration value will be 
split into a list (['param1', 'param2']) and passed to the constructor of the 
custom commit policy class. This option is optional, if not configured, the 
default constructor will be used.</td>

Review Comment:
   ```suggestion
           <td>The parameters passed to the constructor of the custom commit 
policy, with multiple parameters separated by semicolons, such as 
'param1;param2'.  The configuration value will be split into a list (['param1', 
'param2']) and passed to the constructor of the custom commit policy class. 
This option is optional, if not configured, the default constructor will be 
used.</td>
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSinkTest.java:
##########
@@ -149,4 +163,54 @@ private static String buildSinkTableSql(
     private static String buildInsertIntoSql(String sinkTable, String 
sourceTable) {
         return String.format("INSERT INTO %s SELECT * FROM %s", sinkTable, 
sourceTable);
     }
+
+    @Test
+    public void testFileSystemTableSinkWithCustomCommitPolicy() throws 
Exception {

Review Comment:
   Suggestion, I think the test can be simpiled as following:
   ```
   final String outputTable = "outputTable";
           final String customPartitionCommitPolicyClassName = 
TestCustomCommitPolicy.class.getName();
           final TableEnvironment tEnv =
                   
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
           String ddl =
                   "CREATE TABLE %s ("
                           + "  a INT,"
                           + "  b STRING,"
                           + "  d STRING,"
                           + "  e STRING"
                           + ") PARTITIONED BY (d, e) WITH ("
                           + "'connector'='filesystem',"
                           + "'path'='/tmp',"
                           + "'format'='testcsv',"
                           + "'sink.partition-commit.delay'='0s',"
                           + "'sink.partition-commit.policy.kind'='custom',"
                           + "'sink.partition-commit.policy.class'='%s',"
                           + 
"'sink.partition-commit.policy.class.parameters'='test1;test2'"
                           + ")";
           ddl = String.format(ddl, outputTable, 
customPartitionCommitPolicyClassName);
           tEnv.executeSql(ddl);
           tEnv.executeSql(
                           "insert into outputTable select *"
                                   + " from (values (1, 'a', '2020-05-03', 
'3'), "
                                   + "(2, 'x', '2020-05-03', '4'))")
                   .await();
           Set<String> actualCommittedPaths =
                   TestCustomCommitPolicy.getCommittedPartitionPathsAndReset();
           Set<String> expectedCommittedPaths =
                   new HashSet<>(
                           Arrays.asList(
                                   "test1test2", "/tmp/d=2020-05-03/e=3", 
"/tmp/d=2020-05-03/e=4"));
           assertEquals(expectedCommittedPaths, actualCommittedPaths);
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java:
##########
@@ -220,6 +221,19 @@ public class FileSystemConnectorOptions {
                             "The partition commit policy class for implement"
                                     + " PartitionCommitPolicy interface. Only 
work in custom commit policy");
 
+    public static final ConfigOption<List<String>> 
SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS =
+            key("sink.partition-commit.policy.class.parameters")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The parameters passed to the constructor of the 
custom commit policy, "
+                                    + " with multiple parameters separated by 
semicolons, such as 'param1;param2'."
+                                    + " For example, 'param1;param2'. The 
configuration value will be split"

Review Comment:
   ```suggestion
                                       + " The configuration value will be 
split"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to