luoyuxia commented on code in PR #22831: URL: https://github.com/apache/flink/pull/22831#discussion_r1243092293
########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java: ########## @@ -456,7 +456,8 @@ private DataStreamSink<?> createBatchCompactSink( new PartitionCommitPolicyFactory( conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND), conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS), - conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME)); + conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME), + conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS)); Review Comment: ```suggestion conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS)); ``` ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java: ########## @@ -610,7 +611,8 @@ private DataStreamSink<Row> createBatchNoCompactSink( new PartitionCommitPolicyFactory( conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND), conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS), - conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME))); + conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME), + conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS))); Review Comment: dito ########## docs/content.zh/docs/connectors/table/filesystem.md: ########## @@ -451,6 +451,13 @@ public class HourPartTimeExtractor implements PartitionTimeExtractor { <td>String</td> <td> 实现 PartitionCommitPolicy 接口的分区提交策略类。只有在 custom 提交策略下才使用该类。</td> </tr> + <tr> + <td><h5>sink.partition-commit.policy.class.parameters</h5></td> + <td style="word-wrap: break-word;">(无)</td> + <td>String</td> + <td> 传入 custom 提交策略类的字符串参数, 要求多个参数之间用分号分隔, 比如 'param1;param2', Review Comment: ```suggestion <td> 传入 custom 提交策略类构造器的参数, 多个参数之间用分号分隔, 比如 'param1;param2', ``` ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionCommitPolicyFactory.java: ########## @@ -63,11 +67,23 @@ public List<PartitionCommitPolicy> createPolicyChain( successFileName, fsSupplier.get()); case PartitionCommitPolicy.CUSTOM: try { - return (PartitionCommitPolicy) - cl.loadClass(customClass).newInstance(); - } catch (ClassNotFoundException - | IllegalAccessException - | InstantiationException e) { + if (parameters != null && !parameters.isEmpty()) { + String[] paramStrings = parameters.toArray(new String[parameters.size()]); Review Comment: ```suggestion String[] paramStrings = parameters.toArray(new String[0]); ``` ########## docs/content/docs/connectors/table/filesystem.md: ########## @@ -470,6 +470,14 @@ The partition commit policy defines what action is taken when partitions are com <td>String</td> <td>The partition commit policy class for implement PartitionCommitPolicy interface. Only work in custom commit policy.</td> </tr> + <tr> + <td><h5>sink.partition-commit.policy.class.parameters</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The custom commit policy class can accept a string argument, which can include multiple arguments separated by semicolons. For example, 'param1;param2'. The string argument will be split into a list (['param1', 'param2']) and passed as constructor parameters to the custom commit policy class.</td> Review Comment: ```suggestion <td>The parameters passed to the constructor of the custom commit policy, with multiple parameters separated by semicolons, such as 'param1;param2'. For example, 'param1;param2'. The configuration value will be split into a list (['param1', 'param2']) and passed to the constructor of the custom commit policy class.</td> ``` ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionCommitPolicyFactory.java: ########## @@ -63,11 +67,23 @@ public List<PartitionCommitPolicy> createPolicyChain( successFileName, fsSupplier.get()); case PartitionCommitPolicy.CUSTOM: try { - return (PartitionCommitPolicy) - cl.loadClass(customClass).newInstance(); - } catch (ClassNotFoundException - | IllegalAccessException - | InstantiationException e) { + if (parameters != null && !parameters.isEmpty()) { Review Comment: ```suggestion if (!CollectionUtil.isNullOrEmpty(parameters)) { ``` ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionCommitPolicyFactory.java: ########## @@ -63,11 +67,23 @@ public List<PartitionCommitPolicy> createPolicyChain( successFileName, fsSupplier.get()); case PartitionCommitPolicy.CUSTOM: try { - return (PartitionCommitPolicy) - cl.loadClass(customClass).newInstance(); - } catch (ClassNotFoundException - | IllegalAccessException - | InstantiationException e) { + if (parameters != null && !parameters.isEmpty()) { + String[] paramStrings = parameters.toArray(new String[parameters.size()]); + Class<?>[] classes = new Class<?>[parameters.size()]; + for (int i = 0; i < parameters.size(); i++) { + classes[i] = String.class; + } + return (PartitionCommitPolicy) + cl.loadClass(customClass) + .getConstructor(classes) + .newInstance(paramStrings); Review Comment: ```suggestion .newInstance((Object[]) paramStrings); ``` ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java: ########## @@ -137,6 +138,9 @@ public class HiveOptions { public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_CLASS = FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS; + public static final ConfigOption<List<String>> SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS = Review Comment: I don't think we need to introduce a option to HiveOption. We can reuse the `FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS` ########## docs/content.zh/docs/connectors/table/filesystem.md: ########## @@ -451,6 +451,13 @@ public class HourPartTimeExtractor implements PartitionTimeExtractor { <td>String</td> <td> 实现 PartitionCommitPolicy 接口的分区提交策略类。只有在 custom 提交策略下才使用该类。</td> </tr> + <tr> + <td><h5>sink.partition-commit.policy.class.parameters</h5></td> + <td style="word-wrap: break-word;">(无)</td> + <td>String</td> + <td> 传入 custom 提交策略类的字符串参数, 要求多个参数之间用分号分隔, 比如 'param1;param2', + 该字符串将被切分为列表(['param1','param2'])并传给 custom 提交策略类的构造器。该项为可选项。</td> Review Comment: ```suggestion 该字符串将被切分为列表(['param1','param2'])并传给 custom 提交策略类的构造器。该项为可选项,不配置的话将使用类的默认构造方法。</td> ``` ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java: ########## @@ -258,6 +263,38 @@ void testPartitionPathNotExist() throws Exception { assertThat(outputPath.toFile().list()).isEqualTo(new String[0]); } + @Test + void testCustomCommitPolicyWithParameters() { Review Comment: Sorry, I mean IT not UT. You can refer to this pr https://github.com/apache/flink/pull/17245. I think we can add test in `FileSystemTableSinkTest`. ########## flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java: ########## @@ -220,6 +221,15 @@ public class FileSystemConnectorOptions { "The partition commit policy class for implement" + " PartitionCommitPolicy interface. Only work in custom commit policy"); + public static final ConfigOption<List<String>> SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS = + key("sink.partition-commit.policy.class.parameters") + .stringType() + .asList() + .noDefaultValue() + .withDescription( + "A semicolon-separated string of parameters for the custom commit policy class" Review Comment: We can resue the doc in the above: ``` The parameters passed to the constructor of the custom commit policy, with multiple parameters separated by semicolons, such as 'param1;param2'. For example, 'param1;param2'. The configuration value will be split into a list (['param1', 'param2']) and passed to the constructor of the custom commit policy class. ``` ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java: ########## @@ -137,6 +138,9 @@ public class HiveOptions { public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_CLASS = FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS; + public static final ConfigOption<List<String>> SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS = Review Comment: Btw, these two lines in HiveOption can also be delegated to Filesystem option: ``` public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_CLASS = FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS; public static final ConfigOption<String> SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME = FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME; ``` Could you please also help remove them in this pr? ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java: ########## @@ -258,6 +263,38 @@ void testPartitionPathNotExist() throws Exception { assertThat(outputPath.toFile().list()).isEqualTo(new String[0]); } + @Test + void testCustomCommitPolicyWithParameters() { Review Comment: Btw, I don't think we need the test in here if we have a IT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org