This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5169a5c79d [core] Add open method to PartitionMarkDone for
CustomPartitionMarkDone. (#4995)
5169a5c79d is described below
commit 5169a5c79d3679fe87502c686731f013b50e9951
Author: HunterXHunter <[email protected]>
AuthorDate: Mon Feb 10 16:34:53 2025 +0800
[core] Add open method to PartitionMarkDone for CustomPartitionMarkDone.
(#4995)
---
.../actions/HttpReportMarkDoneAction.java | 23 ++++++++--------
.../partition/actions/PartitionMarkDoneAction.java | 32 ++++++++++++++++------
.../action/MarkPartitionDoneActionITCase.java | 6 +++-
.../CustomPartitionMarkDoneActionTest.java | 2 +-
.../partition/HttpReportMarkDoneActionTest.java | 16 +++++------
.../MockCustomPartitionMarkDoneAction.java | 12 +++++++-
6 files changed, 61 insertions(+), 30 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java
index 39c17406b3..4b4f7b6895 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/HttpReportMarkDoneAction.java
@@ -55,20 +55,22 @@ import static
org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
/** Report partition submission information to remote http server. */
public class HttpReportMarkDoneAction implements PartitionMarkDoneAction {
- private final OkHttpClient client;
- private final String url;
- private final ObjectMapper mapper;
+ private OkHttpClient client;
+ private String url;
+ private ObjectMapper mapper;
private static final MediaType MEDIA_TYPE =
MediaType.parse("application/json");
- private final FileStoreTable fileStoreTable;
+ private String tableName;
+ private String location;
- private final String params;
+ private String params;
private static final String RESPONSE_SUCCESS = "SUCCESS";
private static final String THREAD_NAME =
"PAIMON-HTTP-REPORT-MARK-DONE-ACTION-THREAD";
- public HttpReportMarkDoneAction(FileStoreTable fileStoreTable, CoreOptions
options) {
+ @Override
+ public void open(FileStoreTable fileStoreTable, CoreOptions options) {
Preconditions.checkArgument(
!StringUtils.isNullOrWhitespaceOnly(options.httpReportMarkDoneActionUrl()),
@@ -76,9 +78,11 @@ public class HttpReportMarkDoneAction implements
PartitionMarkDoneAction {
"Parameter %s must be non-empty when you use
`http-report` partition mark done action.",
PARTITION_MARK_DONE_ACTION_URL.key()));
- this.fileStoreTable = fileStoreTable;
this.params = options.httpReportMarkDoneActionParams();
this.url = options.httpReportMarkDoneActionUrl();
+ this.tableName = fileStoreTable.fullName();
+ this.location = fileStoreTable.location().toString();
+
this.mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
@@ -102,10 +106,7 @@ public class HttpReportMarkDoneAction implements
PartitionMarkDoneAction {
HttpReportMarkDoneResponse response =
post(
new HttpReportMarkDoneRequest(
- params,
- fileStoreTable.fullName(),
- fileStoreTable.location().toString(),
- partition),
+ params, this.tableName, this.location,
partition),
Collections.emptyMap());
Preconditions.checkState(
reportIsSuccess(response),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
index ee12fce528..2c48ce8d9b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
@@ -37,6 +37,8 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
/** Action to mark partitions done. */
public interface PartitionMarkDoneAction extends Closeable {
+ default void open(FileStoreTable fileStoreTable, CoreOptions options) {}
+
void markDone(String partition) throws Exception;
static List<PartitionMarkDoneAction> createActions(
@@ -44,23 +46,37 @@ public interface PartitionMarkDoneAction extends Closeable {
return options.partitionMarkDoneActions().stream()
.map(
action -> {
+ PartitionMarkDoneAction instance;
switch (action) {
case SUCCESS_FILE:
- return new SuccessFileMarkDoneAction(
- fileStoreTable.fileIO(),
fileStoreTable.location());
+ instance =
+ new SuccessFileMarkDoneAction(
+ fileStoreTable.fileIO(),
+ fileStoreTable.location());
+ break;
case DONE_PARTITION:
- return new AddDonePartitionAction(
-
createPartitionHandler(fileStoreTable, options));
+ instance =
+ new AddDonePartitionAction(
+ createPartitionHandler(
+ fileStoreTable,
options));
+ break;
case MARK_EVENT:
- return new MarkPartitionDoneEventAction(
-
createPartitionHandler(fileStoreTable, options));
+ instance =
+ new MarkPartitionDoneEventAction(
+ createPartitionHandler(
+ fileStoreTable,
options));
+ break;
case HTTP_REPORT:
- return new
HttpReportMarkDoneAction(fileStoreTable, options);
+ instance = new HttpReportMarkDoneAction();
+ break;
case CUSTOM:
- return generateCustomMarkDoneAction(cl,
options);
+ instance =
generateCustomMarkDoneAction(cl, options);
+ break;
default:
throw new
UnsupportedOperationException(action.toString());
}
+ instance.open(fileStoreTable, options);
+ return instance;
})
.collect(Collectors.toList());
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
index 0a29aebf22..e2189f4738 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
@@ -167,6 +167,7 @@ public class MarkPartitionDoneActionITCase extends
ActionITCaseBase {
@MethodSource("testArguments")
public void testCustomPartitionMarkDoneAction(boolean hasPk, String
invoker) throws Exception {
+ MockCustomPartitionMarkDoneAction.getMarkedDonePartitions().clear();
Map<String, String> options = new HashMap<>(2);
options.put(PARTITION_MARK_DONE_ACTION.key(), SUCCESS_FILE + "," +
CUSTOM);
options.put(
@@ -174,6 +175,7 @@ public class MarkPartitionDoneActionITCase extends
ActionITCaseBase {
MockCustomPartitionMarkDoneAction.class.getName());
FileStoreTable table = prepareTable(hasPk, options);
+ String fullTableName = table.fullName();
switch (invoker) {
case "action":
@@ -217,7 +219,9 @@ public class MarkPartitionDoneActionITCase extends
ActionITCaseBase {
assertThat(successFile2).isNotNull();
assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions())
- .containsExactlyInAnyOrder("partKey0=0/partKey1=1/",
"partKey0=1/partKey1=0/");
+ .containsExactlyInAnyOrder(
+ "table=" + fullTableName +
",partition=partKey0=0/partKey1=1/",
+ "table=" + fullTableName +
",partition=partKey0=1/partKey1=0/");
}
@ParameterizedTest
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
index 73ba630b63..55d1cabdcd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
@@ -99,6 +99,6 @@ public class CustomPartitionMarkDoneActionTest extends
TableTestBase {
assertThat(table2.fileIO().exists(successFile)).isEqualTo(true);
assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions().iterator().next())
- .isEqualTo("a=0/");
+ .isEqualTo("table=default.T,partition=a=0/");
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java
index b79597cb81..dea8b1d47a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java
@@ -80,7 +80,8 @@ public class HttpReportMarkDoneActionTest {
@Test
public void testHttpReportMarkDoneActionSuccessResponse() throws Exception
{
- HttpReportMarkDoneAction httpReportMarkDoneAction =
createHttpReportMarkDoneAction();
+ HttpReportMarkDoneAction httpReportMarkDoneAction = new
HttpReportMarkDoneAction();
+ httpReportMarkDoneAction.open(fileStoreTable, createCoreOptions());
server.enqueueResponse(successResponse, 200);
@@ -97,7 +98,9 @@ public class HttpReportMarkDoneActionTest {
// test params is null.
params = null;
- HttpReportMarkDoneAction httpReportMarkDoneAction3 =
createHttpReportMarkDoneAction();
+ HttpReportMarkDoneAction httpReportMarkDoneAction3 = new
HttpReportMarkDoneAction();
+ httpReportMarkDoneAction3.open(fileStoreTable, createCoreOptions());
+
server.enqueueResponse(successResponse, 200);
httpReportMarkDoneAction3.markDone(partition);
RecordedRequest request3 = server.takeRequest(10, TimeUnit.SECONDS);
@@ -105,8 +108,9 @@ public class HttpReportMarkDoneActionTest {
}
@Test
- public void testHttpReportMarkDoneActionFailedResponse() throws Exception {
- HttpReportMarkDoneAction markDoneAction =
createHttpReportMarkDoneAction();
+ public void testHttpReportMarkDoneActionFailedResponse() {
+ HttpReportMarkDoneAction markDoneAction = new
HttpReportMarkDoneAction();
+ markDoneAction.open(fileStoreTable, createCoreOptions());
// status failed.
server.enqueueResponse(failedResponse, 200);
@@ -160,10 +164,6 @@ public class HttpReportMarkDoneActionTest {
return new CoreOptions(httpOptions);
}
- public HttpReportMarkDoneAction createHttpReportMarkDoneAction() {
- return new HttpReportMarkDoneAction(fileStoreTable,
createCoreOptions());
- }
-
public FileStoreTable createFileStoreTable() throws Exception {
org.apache.paimon.fs.Path tablePath =
new
org.apache.paimon.fs.Path(folder.newFolder().toURI().toString());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
index f8d9b40346..cf86b29cb6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
@@ -18,7 +18,9 @@
package org.apache.paimon.flink.sink.partition;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
+import org.apache.paimon.table.FileStoreTable;
import java.io.IOException;
import java.util.HashSet;
@@ -29,9 +31,17 @@ public class MockCustomPartitionMarkDoneAction implements
PartitionMarkDoneActio
private static final Set<String> markedDonePartitions = new HashSet<>();
+ private String tableName;
+
+ @Override
+ public void open(FileStoreTable fileStoreTable, CoreOptions options) {
+ this.tableName = fileStoreTable.fullName();
+ }
+
@Override
public void markDone(String partition) {
- MockCustomPartitionMarkDoneAction.markedDonePartitions.add(partition);
+ MockCustomPartitionMarkDoneAction.markedDonePartitions.add(
+ String.format("table=%s,partition=%s", tableName, partition));
}
public static Set<String> getMarkedDonePartitions() {