luoyuxia commented on code in PR #22831: URL: https://github.com/apache/flink/pull/22831#discussion_r1246196096
########## docs/content/docs/connectors/table/filesystem.md: ########## @@ -470,6 +470,14 @@ The partition commit policy defines what action is taken when partitions are com <td>String</td> <td>The partition commit policy class for implement PartitionCommitPolicy interface. Only work in custom commit policy.</td> </tr> + <tr> + <td><h5>sink.partition-commit.policy.class.parameters</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The parameters passed to the constructor of the custom commit policy, with multiple parameters separated by semicolons, such as 'param1;param2'. For example, 'param1;param2'. The configuration value will be split into a list (['param1', 'param2']) and passed to the constructor of the custom commit policy class. This option is optional, if not configured, the default constructor will be used.</td> Review Comment: ```suggestion <td>The parameters passed to the constructor of the custom commit policy, with multiple parameters separated by semicolons, such as 'param1;param2'. The configuration value will be split into a list (['param1', 'param2']) and passed to the constructor of the custom commit policy class. This option is optional, if not configured, the default constructor will be used.</td> ``` ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSinkTest.java: ########## @@ -149,4 +163,54 @@ private static String buildSinkTableSql( private static String buildInsertIntoSql(String sinkTable, String sourceTable) { return String.format("INSERT INTO %s SELECT * FROM %s", sinkTable, sourceTable); } + + @Test + public void testFileSystemTableSinkWithCustomCommitPolicy() throws Exception { Review Comment: Suggestion, I think the test can be simpiled as following: ``` final String outputTable = "outputTable"; final String customPartitionCommitPolicyClassName = TestCustomCommitPolicy.class.getName(); final TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); String ddl = "CREATE TABLE %s (" + " a INT," + " b STRING," + " d STRING," + " e STRING" + ") PARTITIONED BY (d, e) WITH (" + "'connector'='filesystem'," + "'path'='/tmp'," + "'format'='testcsv'," + "'sink.partition-commit.delay'='0s'," + "'sink.partition-commit.policy.kind'='custom'," + "'sink.partition-commit.policy.class'='%s'," + "'sink.partition-commit.policy.class.parameters'='test1;test2'" + ")"; ddl = String.format(ddl, outputTable, customPartitionCommitPolicyClassName); tEnv.executeSql(ddl); tEnv.executeSql( "insert into outputTable select *" + " from (values (1, 'a', '2020-05-03', '3'), " + "(2, 'x', '2020-05-03', '4'))") .await(); Set<String> actualCommittedPaths = TestCustomCommitPolicy.getCommittedPartitionPathsAndReset(); Set<String> expectedCommittedPaths = new HashSet<>( Arrays.asList( "test1test2", "/tmp/d=2020-05-03/e=3", "/tmp/d=2020-05-03/e=4")); assertEquals(expectedCommittedPaths, actualCommittedPaths); ``` ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java: ########## @@ -220,6 +221,19 @@ public class FileSystemConnectorOptions { "The partition commit policy class for implement" + " PartitionCommitPolicy interface. Only work in custom commit policy"); + public static final ConfigOption<List<String>> SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS = + key("sink.partition-commit.policy.class.parameters") + .stringType() + .asList() + .noDefaultValue() + .withDescription( + "The parameters passed to the constructor of the custom commit policy, " + + " with multiple parameters separated by semicolons, such as 'param1;param2'." + + " For example, 'param1;param2'. The configuration value will be split" Review Comment: ```suggestion + " The configuration value will be split" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org