This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7d3bf999a8 [flink] Support loading custom CommitListener at runtime
(#5763)
7d3bf999a8 is described below
commit 7d3bf999a83b403f4666757df225527b52daa517
Author: yuzelin <[email protected]>
AuthorDate: Wed Jun 18 21:00:26 2025 +0800
[flink] Support loading custom CommitListener at runtime (#5763)
---
.../generated/flink_connector_configuration.html | 6 +
.../procedure/MarkPartitionDoneProcedure.java | 2 +-
.../apache/paimon/flink/FlinkConnectorOptions.java | 8 +
.../flink/action/MarkPartitionDoneAction.java | 2 +-
.../procedure/MarkPartitionDoneProcedure.java | 2 +-
.../apache/paimon/flink/sink/StoreCommitter.java | 2 +-
.../{partition => listener}/CommitListener.java | 2 +-
.../flink/sink/listener/CommitListenerFactory.java | 45 ++++
.../{partition => listener}/CommitListeners.java | 24 +-
.../PartitionMarkDoneListener.java | 2 +-
.../PartitionMarkDoneTrigger.java | 2 +-
.../ReportPartStatsListener.java | 3 +-
.../action/MarkPartitionDoneActionITCase.java | 2 +-
.../AddDonePartitionActionTest.java | 2 +-
.../sink/listener/CustomCommitListenerTest.java | 140 ++++++++++++
.../CustomPartitionMarkDoneActionTest.java | 9 +-
.../HttpReportMarkDoneActionTest.java | 2 +-
.../flink/sink/listener/ListenerTestUtils.java | 87 ++++++++
.../MockCustomPartitionMarkDoneAction.java | 2 +-
.../PartitionMarkDoneTest.java} | 81 ++++---
.../PartitionMarkDoneTriggerTest.java | 2 +-
.../SuccessFileMarkDoneActionTest.java | 2 +-
.../WatermarkPartitionMarkDoneTest.java | 19 +-
.../sink/partition/PartitionMarkDoneTest.java | 241 ---------------------
.../services/org.apache.paimon.factories.Factory | 4 +-
25 files changed, 371 insertions(+), 322 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index fa9b8e4d9c..10f3338e64 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -32,6 +32,12 @@ under the License.
<td>Integer</td>
<td>Maximum number of threads to copy bytes from small changelog
files. By default is the number of processors available to the Java virtual
machine.</td>
</tr>
+ <tr>
+ <td><h5>commit.custom-listeners</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Commit listener will be called after a successful commit. This
option list custom commit listener identifiers separated by comma.</td>
+ </tr>
<tr>
<td><h5>end-input.watermark</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
index e67811c4e0..18a90c294c 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
@@ -32,7 +32,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
import java.io.IOException;
import java.util.List;
-import static
org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone;
+import static
org.apache.paimon.flink.sink.listener.PartitionMarkDoneListener.markDone;
import static org.apache.paimon.utils.ParameterUtils.getPartitions;
import static org.apache.paimon.utils.Preconditions.checkArgument;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index d524920e82..ea468abceb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -515,6 +515,14 @@ public class FlinkConnectorOptions {
.withDescription(
"If true, the split generation process would be
performed during runtime on a Flink task, instead of on the JobManager during
initialization phase.");
+ public static final ConfigOption<String> COMMIT_CUSTOM_LISTENERS =
+ key("commit.custom-listeners")
+ .stringType()
+ .defaultValue("")
+ .withDescription(
+ "Commit listener will be called after a successful
commit. This option list custom commit "
+ + "listener identifiers separated by
comma.");
+
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
index bfad0bf1b6..b69edffc10 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
@@ -26,7 +26,7 @@ import org.apache.paimon.utils.PartitionPathUtils;
import java.util.List;
import java.util.Map;
-import static
org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone;
+import static
org.apache.paimon.flink.sink.listener.PartitionMarkDoneListener.markDone;
/** Table partition mark done action for Flink. */
public class MarkPartitionDoneAction extends TableActionBase {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
index 90ebd48003..5bee1a76e2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
@@ -35,7 +35,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
import java.io.IOException;
import java.util.List;
-import static
org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone;
+import static
org.apache.paimon.flink.sink.listener.PartitionMarkDoneListener.markDone;
import static org.apache.paimon.utils.ParameterUtils.getPartitions;
import static org.apache.paimon.utils.Preconditions.checkArgument;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index 34a087f678..c9451634cd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
-import org.apache.paimon.flink.sink.partition.CommitListeners;
+import org.apache.paimon.flink.sink.listener.CommitListeners;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.BucketMode;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListener.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListener.java
similarity index 95%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListener.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListener.java
index b5efc36018..b52d8709fe 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListener.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListener.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
import org.apache.paimon.manifest.ManifestCommittable;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListenerFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListenerFactory.java
new file mode 100644
index 0000000000..e3b3205358
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListenerFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.listener;
+
+import org.apache.paimon.factories.Factory;
+import org.apache.paimon.factories.FactoryUtil;
+import org.apache.paimon.flink.sink.Committer;
+import org.apache.paimon.table.FileStoreTable;
+
+import java.util.Optional;
+
+/** Factory for {@link CommitListener}. */
+public interface CommitListenerFactory extends Factory {
+
+ String identifier();
+
+ Optional<CommitListener> create(Committer.Context context, FileStoreTable
table)
+ throws Exception;
+
+ static Optional<CommitListener> create(
+ Committer.Context context, FileStoreTable table, String
identifier) throws Exception {
+ CommitListenerFactory factory =
+ FactoryUtil.discoverFactory(
+ CommitListenerFactory.class.getClassLoader(),
+ CommitListenerFactory.class,
+ identifier);
+ return factory.create(context, table);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListeners.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListeners.java
similarity index 74%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListeners.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListeners.java
index 58ee1f969e..6885b0436e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListeners.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListeners.java
@@ -16,17 +16,23 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.StringUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
+
+import static
org.apache.paimon.flink.FlinkConnectorOptions.COMMIT_CUSTOM_LISTENERS;
/** Partition listeners. */
public class CommitListeners implements Closeable {
@@ -81,6 +87,22 @@ public class CommitListeners implements Closeable {
table)
.ifPresent(listeners::add);
+ // custom listeners
+ String identifiers =
Options.fromMap(table.options()).get(COMMIT_CUSTOM_LISTENERS);
+ Arrays.stream(identifiers.split(","))
+ .filter(identifier ->
!StringUtils.isNullOrWhitespaceOnly(identifier))
+ .map(
+ identifier -> {
+ try {
+ return CommitListenerFactory.create(context,
table, identifier);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .forEach(listeners::add);
+
return new CommitListeners(listeners);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
similarity index 99%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
index b5b82cda1b..5dbd9f3a09 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
similarity index 99%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
index 1a6605cd1c..bd524acb66 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/ReportPartStatsListener.java
similarity index 98%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/ReportPartStatsListener.java
index 4b815d336f..488b54cbd7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/ReportPartStatsListener.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.manifest.ManifestCommittable;
@@ -52,7 +52,6 @@ import java.util.Set;
*/
public class ReportPartStatsListener implements CommitListener {
- @SuppressWarnings("unchecked")
private static final ListStateDescriptor<Map<String, Long>>
PENDING_REPORT_STATE_DESC =
new ListStateDescriptor<>(
"pending-report-hms-partition",
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
index e2189f4738..c1e58c5ac5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
-import
org.apache.paimon.flink.sink.partition.MockCustomPartitionMarkDoneAction;
+import org.apache.paimon.flink.sink.listener.MockCustomPartitionMarkDoneAction;
import org.apache.paimon.fs.Path;
import org.apache.paimon.partition.actions.HttpReportMarkDoneAction;
import org.apache.paimon.partition.file.SuccessFile;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/AddDonePartitionActionTest.java
similarity index 98%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/AddDonePartitionActionTest.java
index 3818b29fdb..af58800e04 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/AddDonePartitionActionTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.partition.PartitionStatistics;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomCommitListenerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomCommitListenerTest.java
new file mode 100644
index 0000000000..7d6441cc7a
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomCommitListenerTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.listener;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sink.Committer;
+import org.apache.paimon.flink.sink.StoreCommitter;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static
org.apache.paimon.flink.sink.listener.ListenerTestUtils.createMockContext;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test custom {@link CommitListener}. */
+public class CustomCommitListenerTest {
+
+ @TempDir java.nio.file.Path tempDir;
+ private static final Map<String, Set<String>> commitListenerResult = new
ConcurrentHashMap<>();
+
+ @Test
+ public void testCustomCommitListener() throws Exception {
+ Path tablePath = new Path(tempDir.toString());
+ SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
tablePath);
+ String testId = UUID.randomUUID().toString();
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("pt", DataTypes.STRING())
+ .partitionKeys("pt")
+ .option(
+
FlinkConnectorOptions.COMMIT_CUSTOM_LISTENERS.key(),
+ "partition-collector")
+ .option("test-listener-id", testId)
+ .build();
+ schemaManager.createTable(schema);
+
+ FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tablePath);
+ String commitUser = UUID.randomUUID().toString();
+
+ StreamTableWrite write = table.newWrite(commitUser);
+ write.write(GenericRow.of(1, BinaryString.fromString("20250101")));
+ write.write(GenericRow.of(1, BinaryString.fromString("20250102")));
+ List<CommitMessage> commitMessages = write.prepareCommit(false, 1);
+ write.close();
+
+ StoreCommitter committer =
+ new StoreCommitter(
+ table, table.newCommit(commitUser),
createMockContext(true, false));
+ ManifestCommittable committable = new ManifestCommittable(1L, null);
+ commitMessages.forEach(committable::addFileCommittable);
+ committer.commit(Collections.singletonList(committable));
+ committer.close();
+
+
assertThat(commitListenerResult.get(testId)).containsExactly("20250101",
"20250102");
+ }
+
+ /** A mock {@link CommitListener}. */
+ public static class TestPartitionCollector implements CommitListener {
+
+ private final String testId;
+
+ public TestPartitionCollector(String testId) {
+ this.testId = testId;
+ }
+
+ @Override
+ public void notifyCommittable(List<ManifestCommittable> committables) {
+ commitListenerResult
+ .computeIfAbsent(testId, k -> new HashSet<>())
+ .addAll(
+ committables.stream()
+ .flatMap(c ->
c.fileCommittables().stream())
+ .map(CommitMessage::partition)
+ .map(p -> p.getString(0).toString())
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public void snapshotState() throws Exception {}
+
+ @Override
+ public void close() throws IOException {}
+
+ /** A mock {@link CommitListenerFactory}. */
+ public static class Factory implements CommitListenerFactory {
+
+ @Override
+ public String identifier() {
+ return "partition-collector";
+ }
+
+ @Override
+ public Optional<CommitListener> create(Committer.Context context,
FileStoreTable table)
+ throws Exception {
+ return Optional.of(
+ new
TestPartitionCollector(table.options().get("test-listener-id")));
+ }
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomPartitionMarkDoneActionTest.java
similarity index 92%
copy from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
copy to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomPartitionMarkDoneActionTest.java
index a4f9a0e574..9d7cad8f71 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomPartitionMarkDoneActionTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
@@ -26,6 +26,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;
+import
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -33,7 +34,7 @@ import static
org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
import static org.apache.paimon.CoreOptions.PartitionMarkDoneAction.CUSTOM;
-import static
org.apache.paimon.flink.sink.partition.PartitionMarkDoneTest.notifyCommits;
+import static
org.apache.paimon.flink.sink.listener.ListenerTestUtils.notifyCommits;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for custom PartitionMarkDoneAction. */
@@ -66,7 +67,7 @@ public class CustomPartitionMarkDoneActionTest extends
TableTestBase {
getClass().getClassLoader(),
false,
false,
- new
PartitionMarkDoneTest.MockOperatorStateStore(),
+ new MockOperatorStateStore(),
table))
.hasMessageContaining(
String.format(
@@ -90,7 +91,7 @@ public class CustomPartitionMarkDoneActionTest extends
TableTestBase {
getClass().getClassLoader(),
false,
false,
- new
PartitionMarkDoneTest.MockOperatorStateStore(),
+ new MockOperatorStateStore(),
table2)
.get();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/HttpReportMarkDoneActionTest.java
similarity index 99%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/HttpReportMarkDoneActionTest.java
index 065591b9df..8114711ba6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/HttpReportMarkDoneActionTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.FileIOFinder;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java
new file mode 100644
index 0000000000..3439415c2f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.listener;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.Committer;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileTestUtils;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
+
+import java.util.UUID;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+
+class ListenerTestUtils {
+
+ static Committer.Context createMockContext(
+ boolean streamingCheckpointEnabled, boolean isRestored) {
+ return Committer.createContext(
+ UUID.randomUUID().toString(),
+ null,
+ streamingCheckpointEnabled,
+ isRestored,
+ new MockOperatorStateStore(),
+ 1,
+ 1);
+ }
+
+ static void notifyCommits(PartitionMarkDoneListener markDone, boolean
isCompact) {
+ notifyCommits(markDone, isCompact, true);
+ }
+
+ static void notifyCommits(
+ PartitionMarkDoneListener markDone,
+ boolean isCompact,
+ boolean partitionMarkDoneRecoverFromState) {
+ ManifestCommittable committable = new
ManifestCommittable(Long.MAX_VALUE);
+ DataFileMeta file = DataFileTestUtils.newFile();
+ CommitMessageImpl compactMessage;
+ if (isCompact) {
+ compactMessage =
+ new CommitMessageImpl(
+ BinaryRow.singleColumn(0),
+ 0,
+ 1,
+ new DataIncrement(emptyList(), emptyList(),
emptyList()),
+ new CompactIncrement(singletonList(file),
emptyList(), emptyList()),
+ new IndexIncrement(emptyList()));
+ } else {
+ compactMessage =
+ new CommitMessageImpl(
+ BinaryRow.singleColumn(0),
+ 0,
+ 1,
+ new DataIncrement(singletonList(file),
emptyList(), emptyList()),
+ new CompactIncrement(emptyList(), emptyList(),
emptyList()),
+ new IndexIncrement(emptyList()));
+ }
+ committable.addFileCommittable(compactMessage);
+ if (partitionMarkDoneRecoverFromState) {
+ markDone.notifyCommittable(singletonList(committable));
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/MockCustomPartitionMarkDoneAction.java
similarity index 97%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/MockCustomPartitionMarkDoneAction.java
index cf86b29cb6..427f86c2ae 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/MockCustomPartitionMarkDoneAction.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java
similarity index 50%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java
index a4f9a0e574..43597da654 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java
@@ -16,32 +16,43 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;
-import org.assertj.core.api.Assertions;
+import
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
import org.junit.jupiter.api.Test;
+import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
-import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
-import static org.apache.paimon.CoreOptions.PartitionMarkDoneAction.CUSTOM;
-import static
org.apache.paimon.flink.sink.partition.PartitionMarkDoneTest.notifyCommits;
+import static
org.apache.paimon.flink.sink.listener.ListenerTestUtils.notifyCommits;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for custom PartitionMarkDoneAction. */
-public class CustomPartitionMarkDoneActionTest extends TableTestBase {
+class PartitionMarkDoneTest extends TableTestBase {
@Test
- public void testCustomPartitionMarkDoneAction() throws Exception {
+ public void testTriggerByCompaction() throws Exception {
+ innerTest(true, true);
+ }
+
+ @Test
+ public void testNotTriggerByCompaction() throws Exception {
+ innerTest(false, true);
+ }
+ @Test
+ public void testNotTriggerWhenRecoveryFromState() throws Exception {
+ innerTest(false, false);
+ }
+
+ private void innerTest(boolean deletionVectors, boolean
partitionMarkDoneRecoverFromState)
+ throws Exception {
Identifier identifier = identifier("T");
Schema schema =
Schema.newBuilder()
@@ -51,54 +62,36 @@ public class CustomPartitionMarkDoneActionTest extends
TableTestBase {
.partitionKeys("a")
.primaryKey("a", "b")
.option(PARTITION_MARK_DONE_WHEN_END_INPUT.key(),
"true")
- .option(PARTITION_MARK_DONE_ACTION.key(),
"success-file,custom")
+ .option(PARTITION_MARK_DONE_ACTION.key(),
"success-file")
+ .option(
+ DELETION_VECTORS_ENABLED.key(),
+ Boolean.valueOf(deletionVectors).toString())
.build();
catalog.createTable(identifier, schema, true);
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
Path location = table.location();
Path successFile = new Path(location, "a=0/_SUCCESS");
-
- // Throwing the exception, if the parameter
'partition.mark-done-action.custom.class' is not
- // set.
- Assertions.assertThatThrownBy(
- () ->
- PartitionMarkDoneListener.create(
- getClass().getClassLoader(),
- false,
- false,
- new
PartitionMarkDoneTest.MockOperatorStateStore(),
- table))
- .hasMessageContaining(
- String.format(
- "You need to set [%s] when you add [%s] mark
done action in your property [%s].",
- PARTITION_MARK_DONE_CUSTOM_CLASS.key(),
- CUSTOM,
- PARTITION_MARK_DONE_ACTION.key()));
-
- // Set parameter 'partition.mark-done-action.custom.class'.
- catalog.alterTable(
- identifier,
- SchemaChange.setOption(
- PARTITION_MARK_DONE_CUSTOM_CLASS.key(),
- MockCustomPartitionMarkDoneAction.class.getName()),
- true);
-
- FileStoreTable table2 = (FileStoreTable) catalog.getTable(identifier);
-
PartitionMarkDoneListener markDone =
PartitionMarkDoneListener.create(
getClass().getClassLoader(),
false,
false,
- new
PartitionMarkDoneTest.MockOperatorStateStore(),
- table2)
+ new MockOperatorStateStore(),
+ table)
.get();
- notifyCommits(markDone, false);
+ if (!partitionMarkDoneRecoverFromState) {
+ notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
+ assertThat(table.fileIO().exists(successFile)).isEqualTo(false);
+ return;
+ }
- assertThat(table2.fileIO().exists(successFile)).isEqualTo(true);
+ notifyCommits(markDone, true, partitionMarkDoneRecoverFromState);
+
assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors);
-
assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions().iterator().next())
- .isEqualTo("table=default.T,partition=a=0/");
+ if (!deletionVectors) {
+ notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
+ assertThat(table.fileIO().exists(successFile)).isEqualTo(true);
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTriggerTest.java
similarity index 99%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTriggerTest.java
index b00906d1c1..acc9552ceb 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTriggerTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
import org.apache.paimon.partition.PartitionTimeExtractor;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/SuccessFileMarkDoneActionTest.java
similarity index 97%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/SuccessFileMarkDoneActionTest.java
index 95be025405..355418f338 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/SuccessFileMarkDoneActionTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/WatermarkPartitionMarkDoneTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/WatermarkPartitionMarkDoneTest.java
similarity index 90%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/WatermarkPartitionMarkDoneTest.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/WatermarkPartitionMarkDoneTest.java
index 0b292d4da6..275c1967c6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/WatermarkPartitionMarkDoneTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/WatermarkPartitionMarkDoneTest.java
@@ -16,14 +16,13 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.StoreCommitter;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -36,8 +35,6 @@ import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.DataTypes;
-import org.apache.flink.metrics.groups.OperatorMetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
@@ -50,6 +47,7 @@ import static java.util.Collections.emptyList;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
+import static
org.apache.paimon.flink.sink.listener.ListenerTestUtils.createMockContext;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link FlinkConnectorOptions.PartitionMarkDoneActionMode}. */
@@ -77,19 +75,8 @@ public class WatermarkPartitionMarkDoneTest extends
TableTestBase {
TableWriteImpl<?> write = table.newWrite("user1");
TableCommitImpl commit = table.newCommit("user1");
- OperatorMetricGroup metricGroup =
UnregisteredMetricsGroup.createOperatorMetricGroup();
StoreCommitter committer =
- new StoreCommitter(
- table,
- commit,
- Committer.createContext(
- "user1",
- metricGroup,
- true,
- false,
- new
PartitionMarkDoneTest.MockOperatorStateStore(),
- 1,
- 1));
+ new StoreCommitter(table, commit, createMockContext(true,
false));
write.write(GenericRow.of(BinaryString.fromString("2025-03-01 12"), 1,
1));
write.write(GenericRow.of(BinaryString.fromString("2025-03-01 13"), 1,
1));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
deleted file mode 100644
index 9e541d5ff0..0000000000
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink.partition;
-
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.io.CompactIncrement;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataFileTestUtils;
-import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
-import org.apache.paimon.manifest.ManifestCommittable;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.TableTestBase;
-import org.apache.paimon.table.sink.CommitMessageImpl;
-import org.apache.paimon.types.DataTypes;
-
-import org.apache.flink.api.common.state.BroadcastState;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.junit.jupiter.api.Test;
-
-import javax.annotation.Nonnull;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
-import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
-import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
-import static org.assertj.core.api.Assertions.assertThat;
-
-class PartitionMarkDoneTest extends TableTestBase {
-
- @Test
- public void testTriggerByCompaction() throws Exception {
- innerTest(true, true);
- }
-
- @Test
- public void testNotTriggerByCompaction() throws Exception {
- innerTest(false, true);
- }
-
- @Test
- public void testNotTriggerWhenRecoveryFromState() throws Exception {
- innerTest(false, false);
- }
-
- private void innerTest(boolean deletionVectors, boolean
partitionMarkDoneRecoverFromState)
- throws Exception {
- Identifier identifier = identifier("T");
- Schema schema =
- Schema.newBuilder()
- .column("a", DataTypes.INT())
- .column("b", DataTypes.INT())
- .column("c", DataTypes.INT())
- .partitionKeys("a")
- .primaryKey("a", "b")
- .option(PARTITION_MARK_DONE_WHEN_END_INPUT.key(),
"true")
- .option(PARTITION_MARK_DONE_ACTION.key(),
"success-file")
- .option(
- DELETION_VECTORS_ENABLED.key(),
- Boolean.valueOf(deletionVectors).toString())
- .build();
- catalog.createTable(identifier, schema, true);
- FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
- Path location = table.location();
- Path successFile = new Path(location, "a=0/_SUCCESS");
- PartitionMarkDoneListener markDone =
- PartitionMarkDoneListener.create(
- getClass().getClassLoader(),
- false,
- false,
- new MockOperatorStateStore(),
- table)
- .get();
-
- if (!partitionMarkDoneRecoverFromState) {
- notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
- assertThat(table.fileIO().exists(successFile)).isEqualTo(false);
- return;
- }
-
- notifyCommits(markDone, true, partitionMarkDoneRecoverFromState);
-
assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors);
-
- if (!deletionVectors) {
- notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
- assertThat(table.fileIO().exists(successFile)).isEqualTo(true);
- }
- }
-
- public static void notifyCommits(PartitionMarkDoneListener markDone,
boolean isCompact) {
- notifyCommits(markDone, isCompact, true);
- }
-
- private static void notifyCommits(
- PartitionMarkDoneListener markDone,
- boolean isCompact,
- boolean partitionMarkDoneRecoverFromState) {
- ManifestCommittable committable = new
ManifestCommittable(Long.MAX_VALUE);
- DataFileMeta file = DataFileTestUtils.newFile();
- CommitMessageImpl compactMessage;
- if (isCompact) {
- compactMessage =
- new CommitMessageImpl(
- BinaryRow.singleColumn(0),
- 0,
- 1,
- new DataIncrement(emptyList(), emptyList(),
emptyList()),
- new CompactIncrement(singletonList(file),
emptyList(), emptyList()),
- new IndexIncrement(emptyList()));
- } else {
- compactMessage =
- new CommitMessageImpl(
- BinaryRow.singleColumn(0),
- 0,
- 1,
- new DataIncrement(singletonList(file),
emptyList(), emptyList()),
- new CompactIncrement(emptyList(), emptyList(),
emptyList()),
- new IndexIncrement(emptyList()));
- }
- committable.addFileCommittable(compactMessage);
- if (partitionMarkDoneRecoverFromState) {
- markDone.notifyCommittable(singletonList(committable));
- }
- }
-
- public static class MockOperatorStateStore implements OperatorStateStore {
-
- @Override
- public <K, V> BroadcastState<K, V> getBroadcastState(
- MapStateDescriptor<K, V> stateDescriptor) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <S> ListState<S> getListState(ListStateDescriptor<S>
stateDescriptor) {
- return new MockListState<>();
- }
-
- @Override
- public <S> ListState<S> getUnionListState(ListStateDescriptor<S>
stateDescriptor) {
- throw new UnsupportedOperationException();
- }
-
- // @Override is skipped for compatibility with Flink 1.x.
- public <K, V> BroadcastState<K, V> getBroadcastState(
- org.apache.flink.api.common.state.v2.MapStateDescriptor<K, V>
mapStateDescriptor)
- throws Exception {
- throw new UnsupportedOperationException();
- }
-
- // @Override is skipped for compatibility with Flink 1.x.
- public <S> org.apache.flink.api.common.state.v2.ListState<S>
getListState(
- org.apache.flink.api.common.state.v2.ListStateDescriptor<S>
listStateDescriptor)
- throws Exception {
- throw new UnsupportedOperationException();
- }
-
- // @Override is skipped for compatibility with Flink 1.x.
- public <S> org.apache.flink.api.common.state.v2.ListState<S>
getUnionListState(
- org.apache.flink.api.common.state.v2.ListStateDescriptor<S>
listStateDescriptor)
- throws Exception {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Set<String> getRegisteredStateNames() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Set<String> getRegisteredBroadcastStateNames() {
- throw new UnsupportedOperationException();
- }
- }
-
- public static class MockListState<T> implements ListState<T> {
-
- private final List<T> backingList = new ArrayList<>();
-
- public MockListState() {}
-
- @Override
- public void update(List<T> values) {
- this.backingList.clear();
- this.addAll(values);
- }
-
- @Override
- public void addAll(List<T> values) {
- this.backingList.addAll(values);
- }
-
- @Override
- public Iterable<T> get() {
- return new Iterable<T>() {
- @Nonnull
- public Iterator<T> iterator() {
- return MockListState.this.backingList.iterator();
- }
- };
- }
-
- @Override
- public void add(T value) {
- this.backingList.add(value);
- }
-
- @Override
- public void clear() {
- this.backingList.clear();
- }
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
index 3c05b5fba3..39300b1797 100644
---
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -16,4 +16,6 @@
org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory
# Catalog lock factory
-org.apache.paimon.flink.FileSystemCatalogITCase$FileSystemCatalogDummyLockFactory
\ No newline at end of file
+org.apache.paimon.flink.FileSystemCatalogITCase$FileSystemCatalogDummyLockFactory
+
+org.apache.paimon.flink.sink.listener.CustomCommitListenerTest$TestPartitionCollector$Factory
\ No newline at end of file