This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5169a5c79d [core] Add open method to PartitionMarkDone for 
CustomPartitionMarkDone. (#4995)
5169a5c79d is described below

commit 5169a5c79d3679fe87502c686731f013b50e9951
Author: HunterXHunter <[email protected]>
AuthorDate: Mon Feb 10 16:34:53 2025 +0800

    [core] Add open method to PartitionMarkDone for CustomPartitionMarkDone. 
(#4995)
---
 .../actions/HttpReportMarkDoneAction.java          | 23 ++++++++--------
 .../partition/actions/PartitionMarkDoneAction.java | 32 ++++++++++++++++------
 .../action/MarkPartitionDoneActionITCase.java      |  6 +++-
 .../CustomPartitionMarkDoneActionTest.java         |  2 +-
 .../partition/HttpReportMarkDoneActionTest.java    | 16 +++++------
 .../MockCustomPartitionMarkDoneAction.java         | 12 +++++++-
 6 files changed, 61 insertions(+), 30 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java
index 39c17406b3..4b4f7b6895 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java
@@ -55,20 +55,22 @@ import static 
org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
 /** Report partition submission information to remote http server. */
 public class HttpReportMarkDoneAction implements PartitionMarkDoneAction {
 
-    private final OkHttpClient client;
-    private final String url;
-    private final ObjectMapper mapper;
+    private OkHttpClient client;
+    private String url;
+    private ObjectMapper mapper;
     private static final MediaType MEDIA_TYPE = 
MediaType.parse("application/json");
 
-    private final FileStoreTable fileStoreTable;
+    private String tableName;
+    private String location;
 
-    private final String params;
+    private String params;
 
     private static final String RESPONSE_SUCCESS = "SUCCESS";
 
     private static final String THREAD_NAME = 
"PAIMON-HTTP-REPORT-MARK-DONE-ACTION-THREAD";
 
-    public HttpReportMarkDoneAction(FileStoreTable fileStoreTable, CoreOptions 
options) {
+    @Override
+    public void open(FileStoreTable fileStoreTable, CoreOptions options) {
 
         Preconditions.checkArgument(
                 
!StringUtils.isNullOrWhitespaceOnly(options.httpReportMarkDoneActionUrl()),
@@ -76,9 +78,11 @@ public class HttpReportMarkDoneAction implements 
PartitionMarkDoneAction {
                         "Parameter %s must be non-empty when you use 
`http-report` partition mark done action.",
                         PARTITION_MARK_DONE_ACTION_URL.key()));
 
-        this.fileStoreTable = fileStoreTable;
         this.params = options.httpReportMarkDoneActionParams();
         this.url = options.httpReportMarkDoneActionUrl();
+        this.tableName = fileStoreTable.fullName();
+        this.location = fileStoreTable.location().toString();
+
         this.mapper = new ObjectMapper();
         mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
         mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
@@ -102,10 +106,7 @@ public class HttpReportMarkDoneAction implements 
PartitionMarkDoneAction {
         HttpReportMarkDoneResponse response =
                 post(
                         new HttpReportMarkDoneRequest(
-                                params,
-                                fileStoreTable.fullName(),
-                                fileStoreTable.location().toString(),
-                                partition),
+                                params, this.tableName, this.location, 
partition),
                         Collections.emptyMap());
         Preconditions.checkState(
                 reportIsSuccess(response),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
index ee12fce528..2c48ce8d9b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
@@ -37,6 +37,8 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
 /** Action to mark partitions done. */
 public interface PartitionMarkDoneAction extends Closeable {
 
+    default void open(FileStoreTable fileStoreTable, CoreOptions options) {}
+
     void markDone(String partition) throws Exception;
 
     static List<PartitionMarkDoneAction> createActions(
@@ -44,23 +46,37 @@ public interface PartitionMarkDoneAction extends Closeable {
         return options.partitionMarkDoneActions().stream()
                 .map(
                         action -> {
+                            PartitionMarkDoneAction instance;
                             switch (action) {
                                 case SUCCESS_FILE:
-                                    return new SuccessFileMarkDoneAction(
-                                            fileStoreTable.fileIO(), 
fileStoreTable.location());
+                                    instance =
+                                            new SuccessFileMarkDoneAction(
+                                                    fileStoreTable.fileIO(),
+                                                    fileStoreTable.location());
+                                    break;
                                 case DONE_PARTITION:
-                                    return new AddDonePartitionAction(
-                                            
createPartitionHandler(fileStoreTable, options));
+                                    instance =
+                                            new AddDonePartitionAction(
+                                                    createPartitionHandler(
+                                                            fileStoreTable, 
options));
+                                    break;
                                 case MARK_EVENT:
-                                    return new MarkPartitionDoneEventAction(
-                                            
createPartitionHandler(fileStoreTable, options));
+                                    instance =
+                                            new MarkPartitionDoneEventAction(
+                                                    createPartitionHandler(
+                                                            fileStoreTable, 
options));
+                                    break;
                                 case HTTP_REPORT:
-                                    return new 
HttpReportMarkDoneAction(fileStoreTable, options);
+                                    instance = new HttpReportMarkDoneAction();
+                                    break;
                                 case CUSTOM:
-                                    return generateCustomMarkDoneAction(cl, 
options);
+                                    instance = 
generateCustomMarkDoneAction(cl, options);
+                                    break;
                                 default:
                                     throw new 
UnsupportedOperationException(action.toString());
                             }
+                            instance.open(fileStoreTable, options);
+                            return instance;
                         })
                 .collect(Collectors.toList());
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
index 0a29aebf22..e2189f4738 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
@@ -167,6 +167,7 @@ public class MarkPartitionDoneActionITCase extends 
ActionITCaseBase {
     @MethodSource("testArguments")
     public void testCustomPartitionMarkDoneAction(boolean hasPk, String 
invoker) throws Exception {
 
+        MockCustomPartitionMarkDoneAction.getMarkedDonePartitions().clear();
         Map<String, String> options = new HashMap<>(2);
         options.put(PARTITION_MARK_DONE_ACTION.key(), SUCCESS_FILE + "," + 
CUSTOM);
         options.put(
@@ -174,6 +175,7 @@ public class MarkPartitionDoneActionITCase extends 
ActionITCaseBase {
                 MockCustomPartitionMarkDoneAction.class.getName());
 
         FileStoreTable table = prepareTable(hasPk, options);
+        String fullTableName = table.fullName();
 
         switch (invoker) {
             case "action":
@@ -217,7 +219,9 @@ public class MarkPartitionDoneActionITCase extends 
ActionITCaseBase {
         assertThat(successFile2).isNotNull();
 
         assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions())
-                .containsExactlyInAnyOrder("partKey0=0/partKey1=1/", 
"partKey0=1/partKey1=0/");
+                .containsExactlyInAnyOrder(
+                        "table=" + fullTableName + 
",partition=partKey0=0/partKey1=1/",
+                        "table=" + fullTableName + 
",partition=partKey0=1/partKey1=0/");
     }
 
     @ParameterizedTest
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
index 73ba630b63..55d1cabdcd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
@@ -99,6 +99,6 @@ public class CustomPartitionMarkDoneActionTest extends 
TableTestBase {
         assertThat(table2.fileIO().exists(successFile)).isEqualTo(true);
 
         
assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions().iterator().next())
-                .isEqualTo("a=0/");
+                .isEqualTo("table=default.T,partition=a=0/");
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java
index b79597cb81..dea8b1d47a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java
@@ -80,7 +80,8 @@ public class HttpReportMarkDoneActionTest {
 
     @Test
     public void testHttpReportMarkDoneActionSuccessResponse() throws Exception 
{
-        HttpReportMarkDoneAction httpReportMarkDoneAction = 
createHttpReportMarkDoneAction();
+        HttpReportMarkDoneAction httpReportMarkDoneAction = new 
HttpReportMarkDoneAction();
+        httpReportMarkDoneAction.open(fileStoreTable, createCoreOptions());
 
         server.enqueueResponse(successResponse, 200);
 
@@ -97,7 +98,9 @@ public class HttpReportMarkDoneActionTest {
 
         // test params is null.
         params = null;
-        HttpReportMarkDoneAction httpReportMarkDoneAction3 = 
createHttpReportMarkDoneAction();
+        HttpReportMarkDoneAction httpReportMarkDoneAction3 = new 
HttpReportMarkDoneAction();
+        httpReportMarkDoneAction3.open(fileStoreTable, createCoreOptions());
+
         server.enqueueResponse(successResponse, 200);
         httpReportMarkDoneAction3.markDone(partition);
         RecordedRequest request3 = server.takeRequest(10, TimeUnit.SECONDS);
@@ -105,8 +108,9 @@ public class HttpReportMarkDoneActionTest {
     }
 
     @Test
-    public void testHttpReportMarkDoneActionFailedResponse() throws Exception {
-        HttpReportMarkDoneAction markDoneAction = 
createHttpReportMarkDoneAction();
+    public void testHttpReportMarkDoneActionFailedResponse() {
+        HttpReportMarkDoneAction markDoneAction = new 
HttpReportMarkDoneAction();
+        markDoneAction.open(fileStoreTable, createCoreOptions());
 
         // status failed.
         server.enqueueResponse(failedResponse, 200);
@@ -160,10 +164,6 @@ public class HttpReportMarkDoneActionTest {
         return new CoreOptions(httpOptions);
     }
 
-    public HttpReportMarkDoneAction createHttpReportMarkDoneAction() {
-        return new HttpReportMarkDoneAction(fileStoreTable, 
createCoreOptions());
-    }
-
     public FileStoreTable createFileStoreTable() throws Exception {
         org.apache.paimon.fs.Path tablePath =
                 new 
org.apache.paimon.fs.Path(folder.newFolder().toURI().toString());
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
index f8d9b40346..cf86b29cb6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
@@ -18,7 +18,9 @@
 
 package org.apache.paimon.flink.sink.partition;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
+import org.apache.paimon.table.FileStoreTable;
 
 import java.io.IOException;
 import java.util.HashSet;
@@ -29,9 +31,17 @@ public class MockCustomPartitionMarkDoneAction implements 
PartitionMarkDoneActio
 
     private static final Set<String> markedDonePartitions = new HashSet<>();
 
+    private String tableName;
+
+    @Override
+    public void open(FileStoreTable fileStoreTable, CoreOptions options) {
+        this.tableName = fileStoreTable.fullName();
+    }
+
     @Override
     public void markDone(String partition) {
-        MockCustomPartitionMarkDoneAction.markedDonePartitions.add(partition);
+        MockCustomPartitionMarkDoneAction.markedDonePartitions.add(
+                String.format("table=%s,partition=%s", tableName, partition));
     }
 
     public static Set<String> getMarkedDonePartitions() {

Reply via email to