luoyuxia commented on code in PR #22831:
URL: https://github.com/apache/flink/pull/22831#discussion_r1243092293


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java:
##########
@@ -456,7 +456,8 @@ private DataStreamSink<?> createBatchCompactSink(
                 new PartitionCommitPolicyFactory(
                         
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND),
                         
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS),
-                        
conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME));
+                        
conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME),
+                        
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS));

Review Comment:
   ```suggestion
                           
conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS));
   ```



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java:
##########
@@ -610,7 +611,8 @@ private DataStreamSink<Row> createBatchNoCompactSink(
                 new PartitionCommitPolicyFactory(
                         
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND),
                         
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS),
-                        
conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME)));
+                        
conf.get(HiveOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME),
+                        
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS)));

Review Comment:
   dito



##########
docs/content.zh/docs/connectors/table/filesystem.md:
##########
@@ -451,6 +451,13 @@ public class HourPartTimeExtractor implements 
PartitionTimeExtractor {
         <td>String</td>
         <td> 实现 PartitionCommitPolicy 接口的分区提交策略类。只有在 custom 提交策略下才使用该类。</td>
     </tr>
+    <tr>
+        <td><h5>sink.partition-commit.policy.class.parameters</h5></td>
+        <td style="word-wrap: break-word;">(无)</td>
+        <td>String</td>
+        <td> 传入 custom 提交策略类的字符串参数, 要求多个参数之间用分号分隔, 比如 'param1;param2',

Review Comment:
   ```suggestion
           <td> 传入 custom 提交策略类构造器的参数, 多个参数之间用分号分隔, 比如 'param1;param2',
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionCommitPolicyFactory.java:
##########
@@ -63,11 +67,23 @@ public List<PartitionCommitPolicy> createPolicyChain(
                                             successFileName, fsSupplier.get());
                                 case PartitionCommitPolicy.CUSTOM:
                                     try {
-                                        return (PartitionCommitPolicy)
-                                                
cl.loadClass(customClass).newInstance();
-                                    } catch (ClassNotFoundException
-                                            | IllegalAccessException
-                                            | InstantiationException e) {
+                                        if (parameters != null && 
!parameters.isEmpty()) {
+                                            String[] paramStrings = 
parameters.toArray(new String[parameters.size()]);

Review Comment:
   ```suggestion
                                               String[] paramStrings = 
parameters.toArray(new String[0]);
   ```



##########
docs/content/docs/connectors/table/filesystem.md:
##########
@@ -470,6 +470,14 @@ The partition commit policy defines what action is taken 
when partitions are com
         <td>String</td>
         <td>The partition commit policy class for implement 
PartitionCommitPolicy interface. Only work in custom commit policy.</td>
     </tr>
+    <tr>
+        <td><h5>sink.partition-commit.policy.class.parameters</h5></td>
+        <td>optional</td>
+        <td>yes</td>
+        <td style="word-wrap: break-word;">(none)</td>
+        <td>String</td>
+        <td>The custom commit policy class can accept a string argument, which 
can include multiple arguments separated by semicolons. For example, 
'param1;param2'. The string argument will be split into a list (['param1', 
'param2']) and passed as constructor parameters to the custom commit policy 
class.</td>

Review Comment:
   ```suggestion
           <td>The parameters passed to the constructor of the custom commit 
policy, with multiple parameters separated by semicolons, such as 
'param1;param2'.  For example, 'param1;param2'. The configuration value will be 
split into a list (['param1', 'param2']) and passed to the constructor of the 
custom commit policy class.</td>
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionCommitPolicyFactory.java:
##########
@@ -63,11 +67,23 @@ public List<PartitionCommitPolicy> createPolicyChain(
                                             successFileName, fsSupplier.get());
                                 case PartitionCommitPolicy.CUSTOM:
                                     try {
-                                        return (PartitionCommitPolicy)
-                                                
cl.loadClass(customClass).newInstance();
-                                    } catch (ClassNotFoundException
-                                            | IllegalAccessException
-                                            | InstantiationException e) {
+                                        if (parameters != null && 
!parameters.isEmpty()) {

Review Comment:
   ```suggestion
                                          if 
(!CollectionUtil.isNullOrEmpty(parameters)) {
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionCommitPolicyFactory.java:
##########
@@ -63,11 +67,23 @@ public List<PartitionCommitPolicy> createPolicyChain(
                                             successFileName, fsSupplier.get());
                                 case PartitionCommitPolicy.CUSTOM:
                                     try {
-                                        return (PartitionCommitPolicy)
-                                                
cl.loadClass(customClass).newInstance();
-                                    } catch (ClassNotFoundException
-                                            | IllegalAccessException
-                                            | InstantiationException e) {
+                                        if (parameters != null && 
!parameters.isEmpty()) {
+                                            String[] paramStrings = 
parameters.toArray(new String[parameters.size()]);
+                                            Class<?>[] classes = new 
Class<?>[parameters.size()];
+                                            for (int i = 0; i < 
parameters.size(); i++) {
+                                                classes[i] = String.class;
+                                            }
+                                            return (PartitionCommitPolicy)
+                                                    cl.loadClass(customClass)
+                                                            
.getConstructor(classes)
+                                                            
.newInstance(paramStrings);

Review Comment:
   ```suggestion
                                                               
.newInstance((Object[]) paramStrings);
   ```



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -137,6 +138,9 @@ public class HiveOptions {
     public static final ConfigOption<String> 
SINK_PARTITION_COMMIT_POLICY_CLASS =
             FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS;
 
+    public static final ConfigOption<List<String>> 
SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS =

Review Comment:
   I don't think we need to introduce a option to HiveOption. We can reuse the 
`FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS` 



##########
docs/content.zh/docs/connectors/table/filesystem.md:
##########
@@ -451,6 +451,13 @@ public class HourPartTimeExtractor implements 
PartitionTimeExtractor {
         <td>String</td>
         <td> 实现 PartitionCommitPolicy 接口的分区提交策略类。只有在 custom 提交策略下才使用该类。</td>
     </tr>
+    <tr>
+        <td><h5>sink.partition-commit.policy.class.parameters</h5></td>
+        <td style="word-wrap: break-word;">(无)</td>
+        <td>String</td>
+        <td> 传入 custom 提交策略类的字符串参数, 要求多个参数之间用分号分隔, 比如 'param1;param2',
+        该字符串将被切分为列表(['param1','param2'])并传给 custom 提交策略类的构造器。该项为可选项。</td>

Review Comment:
   ```suggestion
           该字符串将被切分为列表(['param1','param2'])并传给 custom 
提交策略类的构造器。该项为可选项,不配置的话将使用类的默认构造方法。</td>
   ```



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java:
##########
@@ -258,6 +263,38 @@ void testPartitionPathNotExist() throws Exception {
         assertThat(outputPath.toFile().list()).isEqualTo(new String[0]);
     }
 
+    @Test
+    void testCustomCommitPolicyWithParameters() {

Review Comment:
   Sorry, I mean IT not UT. You can refer to this pr 
https://github.com/apache/flink/pull/17245.
   I think we can add test in `FileSystemTableSinkTest`.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java:
##########
@@ -220,6 +221,15 @@ public class FileSystemConnectorOptions {
                             "The partition commit policy class for implement"
                                     + " PartitionCommitPolicy interface. Only 
work in custom commit policy");
 
+    public static final ConfigOption<List<String>> 
SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS =
+            key("sink.partition-commit.policy.class.parameters")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription(
+                            "A semicolon-separated string of parameters for 
the custom commit policy class"

Review Comment:
   We can resue the doc in the above:
   ```
   The parameters passed to the constructor of the custom commit policy, with 
multiple parameters separated by semicolons, such as 'param1;param2'.
    For example, 'param1;param2'. The configuration value will be split into a 
list (['param1', 'param2']) and passed to the constructor of the custom commit 
policy class.
   ```



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java:
##########
@@ -137,6 +138,9 @@ public class HiveOptions {
     public static final ConfigOption<String> 
SINK_PARTITION_COMMIT_POLICY_CLASS =
             FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS;
 
+    public static final ConfigOption<List<String>> 
SINK_PARTITION_COMMIT_POLICY_CLASS_PARAMETERS =

Review Comment:
   Btw, these two lines in HiveOption can also be delegated to Filesystem 
option:
   ```
     public static final ConfigOption<String> 
SINK_PARTITION_COMMIT_POLICY_CLASS =
               FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_CLASS;
   
       public static final ConfigOption<String> 
SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME =
               
FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
   ```
   Could you please also help remove them in this pr?



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java:
##########
@@ -258,6 +263,38 @@ void testPartitionPathNotExist() throws Exception {
         assertThat(outputPath.toFile().list()).isEqualTo(new String[0]);
     }
 
+    @Test
+    void testCustomCommitPolicyWithParameters() {

Review Comment:
   Btw, I don't think we need the test in here if we have a IT.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to