This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d3bf999a8 [flink] Support loading custom CommitListener at runtime 
(#5763)
7d3bf999a8 is described below

commit 7d3bf999a83b403f4666757df225527b52daa517
Author: yuzelin <[email protected]>
AuthorDate: Wed Jun 18 21:00:26 2025 +0800

    [flink] Support loading custom CommitListener at runtime (#5763)
---
 .../generated/flink_connector_configuration.html   |   6 +
 .../procedure/MarkPartitionDoneProcedure.java      |   2 +-
 .../apache/paimon/flink/FlinkConnectorOptions.java |   8 +
 .../flink/action/MarkPartitionDoneAction.java      |   2 +-
 .../procedure/MarkPartitionDoneProcedure.java      |   2 +-
 .../apache/paimon/flink/sink/StoreCommitter.java   |   2 +-
 .../{partition => listener}/CommitListener.java    |   2 +-
 .../flink/sink/listener/CommitListenerFactory.java |  45 ++++
 .../{partition => listener}/CommitListeners.java   |  24 +-
 .../PartitionMarkDoneListener.java                 |   2 +-
 .../PartitionMarkDoneTrigger.java                  |   2 +-
 .../ReportPartStatsListener.java                   |   3 +-
 .../action/MarkPartitionDoneActionITCase.java      |   2 +-
 .../AddDonePartitionActionTest.java                |   2 +-
 .../sink/listener/CustomCommitListenerTest.java    | 140 ++++++++++++
 .../CustomPartitionMarkDoneActionTest.java         |   9 +-
 .../HttpReportMarkDoneActionTest.java              |   2 +-
 .../flink/sink/listener/ListenerTestUtils.java     |  87 ++++++++
 .../MockCustomPartitionMarkDoneAction.java         |   2 +-
 .../PartitionMarkDoneTest.java}                    |  81 ++++---
 .../PartitionMarkDoneTriggerTest.java              |   2 +-
 .../SuccessFileMarkDoneActionTest.java             |   2 +-
 .../WatermarkPartitionMarkDoneTest.java            |  19 +-
 .../sink/partition/PartitionMarkDoneTest.java      | 241 ---------------------
 .../services/org.apache.paimon.factories.Factory   |   4 +-
 25 files changed, 371 insertions(+), 322 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index fa9b8e4d9c..10f3338e64 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -32,6 +32,12 @@ under the License.
             <td>Integer</td>
             <td>Maximum number of threads to copy bytes from small changelog 
files. By default is the number of processors available to the Java virtual 
machine.</td>
         </tr>
+        <tr>
+            <td><h5>commit.custom-listeners</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Commit listener will be called after a successful commit. This 
option list custom commit listener identifiers separated by comma.</td>
+        </tr>
         <tr>
             <td><h5>end-input.watermark</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
index e67811c4e0..18a90c294c 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
@@ -32,7 +32,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
 import java.io.IOException;
 import java.util.List;
 
-import static 
org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone;
+import static 
org.apache.paimon.flink.sink.listener.PartitionMarkDoneListener.markDone;
 import static org.apache.paimon.utils.ParameterUtils.getPartitions;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index d524920e82..ea468abceb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -515,6 +515,14 @@ public class FlinkConnectorOptions {
                     .withDescription(
                             "If true, the split generation process would be 
performed during runtime on a Flink task, instead of on the JobManager during 
initialization phase.");
 
+    public static final ConfigOption<String> COMMIT_CUSTOM_LISTENERS =
+            key("commit.custom-listeners")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription(
+                            "Commit listener will be called after a successful 
commit. This option list custom commit "
+                                    + "listener identifiers separated by 
comma.");
+
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = FlinkConnectorOptions.class.getFields();
         final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
index bfad0bf1b6..b69edffc10 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
@@ -26,7 +26,7 @@ import org.apache.paimon.utils.PartitionPathUtils;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone;
+import static 
org.apache.paimon.flink.sink.listener.PartitionMarkDoneListener.markDone;
 
 /** Table partition mark done action for Flink. */
 public class MarkPartitionDoneAction extends TableActionBase {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
index 90ebd48003..5bee1a76e2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
@@ -35,7 +35,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
 import java.io.IOException;
 import java.util.List;
 
-import static 
org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone;
+import static 
org.apache.paimon.flink.sink.listener.PartitionMarkDoneListener.markDone;
 import static org.apache.paimon.utils.ParameterUtils.getPartitions;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index 34a087f678..c9451634cd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
-import org.apache.paimon.flink.sink.partition.CommitListeners;
+import org.apache.paimon.flink.sink.listener.CommitListeners;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.table.BucketMode;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListener.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListener.java
similarity index 95%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListener.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListener.java
index b5efc36018..b52d8709fe 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListener.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListener.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
 
 import org.apache.paimon.manifest.ManifestCommittable;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListenerFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListenerFactory.java
new file mode 100644
index 0000000000..e3b3205358
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListenerFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.listener;
+
+import org.apache.paimon.factories.Factory;
+import org.apache.paimon.factories.FactoryUtil;
+import org.apache.paimon.flink.sink.Committer;
+import org.apache.paimon.table.FileStoreTable;
+
+import java.util.Optional;
+
+/** Factory for {@link CommitListener}. */
+public interface CommitListenerFactory extends Factory {
+
+    String identifier();
+
+    Optional<CommitListener> create(Committer.Context context, FileStoreTable 
table)
+            throws Exception;
+
+    static Optional<CommitListener> create(
+            Committer.Context context, FileStoreTable table, String 
identifier) throws Exception {
+        CommitListenerFactory factory =
+                FactoryUtil.discoverFactory(
+                        CommitListenerFactory.class.getClassLoader(),
+                        CommitListenerFactory.class,
+                        identifier);
+        return factory.create(context, table);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListeners.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListeners.java
similarity index 74%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListeners.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListeners.java
index 58ee1f969e..6885b0436e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListeners.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/CommitListeners.java
@@ -16,17 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
 
 import org.apache.paimon.flink.sink.Committer;
 import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.StringUtils;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.COMMIT_CUSTOM_LISTENERS;
 
 /** Partition listeners. */
 public class CommitListeners implements Closeable {
@@ -81,6 +87,22 @@ public class CommitListeners implements Closeable {
                         table)
                 .ifPresent(listeners::add);
 
+        // custom listeners
+        String identifiers = 
Options.fromMap(table.options()).get(COMMIT_CUSTOM_LISTENERS);
+        Arrays.stream(identifiers.split(","))
+                .filter(identifier -> 
!StringUtils.isNullOrWhitespaceOnly(identifier))
+                .map(
+                        identifier -> {
+                            try {
+                                return CommitListenerFactory.create(context, 
table, identifier);
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        })
+                .filter(Optional::isPresent)
+                .map(Optional::get)
+                .forEach(listeners::add);
+
         return new CommitListeners(listeners);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
similarity index 99%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
index b5b82cda1b..5dbd9f3a09 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.MergeEngine;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
similarity index 99%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
index 1a6605cd1c..bd524acb66 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/ReportPartStatsListener.java
similarity index 98%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/ReportPartStatsListener.java
index 4b815d336f..488b54cbd7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/ReportPartStatsListener.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.manifest.ManifestCommittable;
@@ -52,7 +52,6 @@ import java.util.Set;
  */
 public class ReportPartStatsListener implements CommitListener {
 
-    @SuppressWarnings("unchecked")
     private static final ListStateDescriptor<Map<String, Long>> 
PENDING_REPORT_STATE_DESC =
             new ListStateDescriptor<>(
                     "pending-report-hms-partition",
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
index e2189f4738..c1e58c5ac5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.action;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
-import 
org.apache.paimon.flink.sink.partition.MockCustomPartitionMarkDoneAction;
+import org.apache.paimon.flink.sink.listener.MockCustomPartitionMarkDoneAction;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.partition.actions.HttpReportMarkDoneAction;
 import org.apache.paimon.partition.file.SuccessFile;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/AddDonePartitionActionTest.java
similarity index 98%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/AddDonePartitionActionTest.java
index 3818b29fdb..af58800e04 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/AddDonePartitionActionTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.partition.PartitionStatistics;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomCommitListenerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomCommitListenerTest.java
new file mode 100644
index 0000000000..7d6441cc7a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomCommitListenerTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.listener;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sink.Committer;
+import org.apache.paimon.flink.sink.StoreCommitter;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.paimon.flink.sink.listener.ListenerTestUtils.createMockContext;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test custom {@link CommitListener}. */
+public class CustomCommitListenerTest {
+
+    @TempDir java.nio.file.Path tempDir;
+    private static final Map<String, Set<String>> commitListenerResult = new 
ConcurrentHashMap<>();
+
+    @Test
+    public void testCustomCommitListener() throws Exception {
+        Path tablePath = new Path(tempDir.toString());
+        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
tablePath);
+        String testId = UUID.randomUUID().toString();
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("pt", DataTypes.STRING())
+                        .partitionKeys("pt")
+                        .option(
+                                
FlinkConnectorOptions.COMMIT_CUSTOM_LISTENERS.key(),
+                                "partition-collector")
+                        .option("test-listener-id", testId)
+                        .build();
+        schemaManager.createTable(schema);
+
+        FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath);
+        String commitUser = UUID.randomUUID().toString();
+
+        StreamTableWrite write = table.newWrite(commitUser);
+        write.write(GenericRow.of(1, BinaryString.fromString("20250101")));
+        write.write(GenericRow.of(1, BinaryString.fromString("20250102")));
+        List<CommitMessage> commitMessages = write.prepareCommit(false, 1);
+        write.close();
+
+        StoreCommitter committer =
+                new StoreCommitter(
+                        table, table.newCommit(commitUser), 
createMockContext(true, false));
+        ManifestCommittable committable = new ManifestCommittable(1L, null);
+        commitMessages.forEach(committable::addFileCommittable);
+        committer.commit(Collections.singletonList(committable));
+        committer.close();
+
+        
assertThat(commitListenerResult.get(testId)).containsExactly("20250101", 
"20250102");
+    }
+
+    /** A mock {@link CommitListener}. */
+    public static class TestPartitionCollector implements CommitListener {
+
+        private final String testId;
+
+        public TestPartitionCollector(String testId) {
+            this.testId = testId;
+        }
+
+        @Override
+        public void notifyCommittable(List<ManifestCommittable> committables) {
+            commitListenerResult
+                    .computeIfAbsent(testId, k -> new HashSet<>())
+                    .addAll(
+                            committables.stream()
+                                    .flatMap(c -> 
c.fileCommittables().stream())
+                                    .map(CommitMessage::partition)
+                                    .map(p -> p.getString(0).toString())
+                                    .collect(Collectors.toSet()));
+        }
+
+        @Override
+        public void snapshotState() throws Exception {}
+
+        @Override
+        public void close() throws IOException {}
+
+        /** A mock {@link CommitListenerFactory}. */
+        public static class Factory implements CommitListenerFactory {
+
+            @Override
+            public String identifier() {
+                return "partition-collector";
+            }
+
+            @Override
+            public Optional<CommitListener> create(Committer.Context context, 
FileStoreTable table)
+                    throws Exception {
+                return Optional.of(
+                        new 
TestPartitionCollector(table.options().get("test-listener-id")));
+            }
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomPartitionMarkDoneActionTest.java
similarity index 92%
copy from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
copy to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomPartitionMarkDoneActionTest.java
index a4f9a0e574..9d7cad8f71 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomPartitionMarkDoneActionTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.Path;
@@ -26,6 +26,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.types.DataTypes;
 
+import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -33,7 +34,7 @@ import static 
org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
 import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS;
 import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
 import static org.apache.paimon.CoreOptions.PartitionMarkDoneAction.CUSTOM;
-import static 
org.apache.paimon.flink.sink.partition.PartitionMarkDoneTest.notifyCommits;
+import static 
org.apache.paimon.flink.sink.listener.ListenerTestUtils.notifyCommits;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for custom PartitionMarkDoneAction. */
@@ -66,7 +67,7 @@ public class CustomPartitionMarkDoneActionTest extends 
TableTestBase {
                                         getClass().getClassLoader(),
                                         false,
                                         false,
-                                        new 
PartitionMarkDoneTest.MockOperatorStateStore(),
+                                        new MockOperatorStateStore(),
                                         table))
                 .hasMessageContaining(
                         String.format(
@@ -90,7 +91,7 @@ public class CustomPartitionMarkDoneActionTest extends 
TableTestBase {
                                 getClass().getClassLoader(),
                                 false,
                                 false,
-                                new 
PartitionMarkDoneTest.MockOperatorStateStore(),
+                                new MockOperatorStateStore(),
                                 table2)
                         .get();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/HttpReportMarkDoneActionTest.java
similarity index 99%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/HttpReportMarkDoneActionTest.java
index 065591b9df..8114711ba6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/HttpReportMarkDoneActionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/HttpReportMarkDoneActionTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.fs.FileIOFinder;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java
new file mode 100644
index 0000000000..3439415c2f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.listener;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.Committer;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileTestUtils;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
+
+import java.util.UUID;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+
+class ListenerTestUtils {
+
+    static Committer.Context createMockContext(
+            boolean streamingCheckpointEnabled, boolean isRestored) {
+        return Committer.createContext(
+                UUID.randomUUID().toString(),
+                null,
+                streamingCheckpointEnabled,
+                isRestored,
+                new MockOperatorStateStore(),
+                1,
+                1);
+    }
+
+    static void notifyCommits(PartitionMarkDoneListener markDone, boolean 
isCompact) {
+        notifyCommits(markDone, isCompact, true);
+    }
+
+    static void notifyCommits(
+            PartitionMarkDoneListener markDone,
+            boolean isCompact,
+            boolean partitionMarkDoneRecoverFromState) {
+        ManifestCommittable committable = new 
ManifestCommittable(Long.MAX_VALUE);
+        DataFileMeta file = DataFileTestUtils.newFile();
+        CommitMessageImpl compactMessage;
+        if (isCompact) {
+            compactMessage =
+                    new CommitMessageImpl(
+                            BinaryRow.singleColumn(0),
+                            0,
+                            1,
+                            new DataIncrement(emptyList(), emptyList(), 
emptyList()),
+                            new CompactIncrement(singletonList(file), 
emptyList(), emptyList()),
+                            new IndexIncrement(emptyList()));
+        } else {
+            compactMessage =
+                    new CommitMessageImpl(
+                            BinaryRow.singleColumn(0),
+                            0,
+                            1,
+                            new DataIncrement(singletonList(file), 
emptyList(), emptyList()),
+                            new CompactIncrement(emptyList(), emptyList(), 
emptyList()),
+                            new IndexIncrement(emptyList()));
+        }
+        committable.addFileCommittable(compactMessage);
+        if (partitionMarkDoneRecoverFromState) {
+            markDone.notifyCommittable(singletonList(committable));
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/MockCustomPartitionMarkDoneAction.java
similarity index 97%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/MockCustomPartitionMarkDoneAction.java
index cf86b29cb6..427f86c2ae 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/MockCustomPartitionMarkDoneAction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java
similarity index 50%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java
index a4f9a0e574..43597da654 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java
@@ -16,32 +16,43 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.types.DataTypes;
 
-import org.assertj.core.api.Assertions;
+import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
 import org.junit.jupiter.api.Test;
 
+import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
 import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
-import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS;
 import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
-import static org.apache.paimon.CoreOptions.PartitionMarkDoneAction.CUSTOM;
-import static 
org.apache.paimon.flink.sink.partition.PartitionMarkDoneTest.notifyCommits;
+import static 
org.apache.paimon.flink.sink.listener.ListenerTestUtils.notifyCommits;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for custom PartitionMarkDoneAction. */
-public class CustomPartitionMarkDoneActionTest extends TableTestBase {
+class PartitionMarkDoneTest extends TableTestBase {
 
     @Test
-    public void testCustomPartitionMarkDoneAction() throws Exception {
+    public void testTriggerByCompaction() throws Exception {
+        innerTest(true, true);
+    }
+
+    @Test
+    public void testNotTriggerByCompaction() throws Exception {
+        innerTest(false, true);
+    }
 
+    @Test
+    public void testNotTriggerWhenRecoveryFromState() throws Exception {
+        innerTest(false, false);
+    }
+
+    private void innerTest(boolean deletionVectors, boolean 
partitionMarkDoneRecoverFromState)
+            throws Exception {
         Identifier identifier = identifier("T");
         Schema schema =
                 Schema.newBuilder()
@@ -51,54 +62,36 @@ public class CustomPartitionMarkDoneActionTest extends 
TableTestBase {
                         .partitionKeys("a")
                         .primaryKey("a", "b")
                         .option(PARTITION_MARK_DONE_WHEN_END_INPUT.key(), 
"true")
-                        .option(PARTITION_MARK_DONE_ACTION.key(), 
"success-file,custom")
+                        .option(PARTITION_MARK_DONE_ACTION.key(), 
"success-file")
+                        .option(
+                                DELETION_VECTORS_ENABLED.key(),
+                                Boolean.valueOf(deletionVectors).toString())
                         .build();
         catalog.createTable(identifier, schema, true);
         FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
         Path location = table.location();
         Path successFile = new Path(location, "a=0/_SUCCESS");
-
-        // Throwing the exception, if the parameter 
'partition.mark-done-action.custom.class' is not
-        // set.
-        Assertions.assertThatThrownBy(
-                        () ->
-                                PartitionMarkDoneListener.create(
-                                        getClass().getClassLoader(),
-                                        false,
-                                        false,
-                                        new 
PartitionMarkDoneTest.MockOperatorStateStore(),
-                                        table))
-                .hasMessageContaining(
-                        String.format(
-                                "You need to set [%s] when you add [%s] mark 
done action in your property [%s].",
-                                PARTITION_MARK_DONE_CUSTOM_CLASS.key(),
-                                CUSTOM,
-                                PARTITION_MARK_DONE_ACTION.key()));
-
-        // Set parameter 'partition.mark-done-action.custom.class'.
-        catalog.alterTable(
-                identifier,
-                SchemaChange.setOption(
-                        PARTITION_MARK_DONE_CUSTOM_CLASS.key(),
-                        MockCustomPartitionMarkDoneAction.class.getName()),
-                true);
-
-        FileStoreTable table2 = (FileStoreTable) catalog.getTable(identifier);
-
         PartitionMarkDoneListener markDone =
                 PartitionMarkDoneListener.create(
                                 getClass().getClassLoader(),
                                 false,
                                 false,
-                                new 
PartitionMarkDoneTest.MockOperatorStateStore(),
-                                table2)
+                                new MockOperatorStateStore(),
+                                table)
                         .get();
 
-        notifyCommits(markDone, false);
+        if (!partitionMarkDoneRecoverFromState) {
+            notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
+            assertThat(table.fileIO().exists(successFile)).isEqualTo(false);
+            return;
+        }
 
-        assertThat(table2.fileIO().exists(successFile)).isEqualTo(true);
+        notifyCommits(markDone, true, partitionMarkDoneRecoverFromState);
+        
assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors);
 
-        
assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions().iterator().next())
-                .isEqualTo("table=default.T,partition=a=0/");
+        if (!deletionVectors) {
+            notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
+            assertThat(table.fileIO().exists(successFile)).isEqualTo(true);
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTriggerTest.java
similarity index 99%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTriggerTest.java
index b00906d1c1..acc9552ceb 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTriggerTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
 
 import org.apache.paimon.partition.PartitionTimeExtractor;
 import org.apache.paimon.testutils.assertj.PaimonAssertions;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/SuccessFileMarkDoneActionTest.java
similarity index 97%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/SuccessFileMarkDoneActionTest.java
index 95be025405..355418f338 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/SuccessFileMarkDoneActionTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
 
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/WatermarkPartitionMarkDoneTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/WatermarkPartitionMarkDoneTest.java
similarity index 90%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/WatermarkPartitionMarkDoneTest.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/WatermarkPartitionMarkDoneTest.java
index 0b292d4da6..275c1967c6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/WatermarkPartitionMarkDoneTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/WatermarkPartitionMarkDoneTest.java
@@ -16,14 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.flink.sink.listener;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.sink.Committer;
 import org.apache.paimon.flink.sink.StoreCommitter;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
@@ -36,8 +35,6 @@ import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.types.DataTypes;
 
-import org.apache.flink.metrics.groups.OperatorMetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.junit.jupiter.api.Test;
 
 import java.time.LocalDateTime;
@@ -50,6 +47,7 @@ import static java.util.Collections.emptyList;
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
+import static 
org.apache.paimon.flink.sink.listener.ListenerTestUtils.createMockContext;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link FlinkConnectorOptions.PartitionMarkDoneActionMode}. */
@@ -77,19 +75,8 @@ public class WatermarkPartitionMarkDoneTest extends 
TableTestBase {
 
         TableWriteImpl<?> write = table.newWrite("user1");
         TableCommitImpl commit = table.newCommit("user1");
-        OperatorMetricGroup metricGroup = 
UnregisteredMetricsGroup.createOperatorMetricGroup();
         StoreCommitter committer =
-                new StoreCommitter(
-                        table,
-                        commit,
-                        Committer.createContext(
-                                "user1",
-                                metricGroup,
-                                true,
-                                false,
-                                new 
PartitionMarkDoneTest.MockOperatorStateStore(),
-                                1,
-                                1));
+                new StoreCommitter(table, commit, createMockContext(true, 
false));
 
         write.write(GenericRow.of(BinaryString.fromString("2025-03-01 12"), 1, 
1));
         write.write(GenericRow.of(BinaryString.fromString("2025-03-01 13"), 1, 
1));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
deleted file mode 100644
index 9e541d5ff0..0000000000
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink.partition;
-
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.io.CompactIncrement;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataFileTestUtils;
-import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
-import org.apache.paimon.manifest.ManifestCommittable;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.TableTestBase;
-import org.apache.paimon.table.sink.CommitMessageImpl;
-import org.apache.paimon.types.DataTypes;
-
-import org.apache.flink.api.common.state.BroadcastState;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.junit.jupiter.api.Test;
-
-import javax.annotation.Nonnull;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
-import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
-import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
-import static org.assertj.core.api.Assertions.assertThat;
-
-class PartitionMarkDoneTest extends TableTestBase {
-
-    @Test
-    public void testTriggerByCompaction() throws Exception {
-        innerTest(true, true);
-    }
-
-    @Test
-    public void testNotTriggerByCompaction() throws Exception {
-        innerTest(false, true);
-    }
-
-    @Test
-    public void testNotTriggerWhenRecoveryFromState() throws Exception {
-        innerTest(false, false);
-    }
-
-    private void innerTest(boolean deletionVectors, boolean 
partitionMarkDoneRecoverFromState)
-            throws Exception {
-        Identifier identifier = identifier("T");
-        Schema schema =
-                Schema.newBuilder()
-                        .column("a", DataTypes.INT())
-                        .column("b", DataTypes.INT())
-                        .column("c", DataTypes.INT())
-                        .partitionKeys("a")
-                        .primaryKey("a", "b")
-                        .option(PARTITION_MARK_DONE_WHEN_END_INPUT.key(), 
"true")
-                        .option(PARTITION_MARK_DONE_ACTION.key(), 
"success-file")
-                        .option(
-                                DELETION_VECTORS_ENABLED.key(),
-                                Boolean.valueOf(deletionVectors).toString())
-                        .build();
-        catalog.createTable(identifier, schema, true);
-        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
-        Path location = table.location();
-        Path successFile = new Path(location, "a=0/_SUCCESS");
-        PartitionMarkDoneListener markDone =
-                PartitionMarkDoneListener.create(
-                                getClass().getClassLoader(),
-                                false,
-                                false,
-                                new MockOperatorStateStore(),
-                                table)
-                        .get();
-
-        if (!partitionMarkDoneRecoverFromState) {
-            notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
-            assertThat(table.fileIO().exists(successFile)).isEqualTo(false);
-            return;
-        }
-
-        notifyCommits(markDone, true, partitionMarkDoneRecoverFromState);
-        
assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors);
-
-        if (!deletionVectors) {
-            notifyCommits(markDone, false, partitionMarkDoneRecoverFromState);
-            assertThat(table.fileIO().exists(successFile)).isEqualTo(true);
-        }
-    }
-
-    public static void notifyCommits(PartitionMarkDoneListener markDone, 
boolean isCompact) {
-        notifyCommits(markDone, isCompact, true);
-    }
-
-    private static void notifyCommits(
-            PartitionMarkDoneListener markDone,
-            boolean isCompact,
-            boolean partitionMarkDoneRecoverFromState) {
-        ManifestCommittable committable = new 
ManifestCommittable(Long.MAX_VALUE);
-        DataFileMeta file = DataFileTestUtils.newFile();
-        CommitMessageImpl compactMessage;
-        if (isCompact) {
-            compactMessage =
-                    new CommitMessageImpl(
-                            BinaryRow.singleColumn(0),
-                            0,
-                            1,
-                            new DataIncrement(emptyList(), emptyList(), 
emptyList()),
-                            new CompactIncrement(singletonList(file), 
emptyList(), emptyList()),
-                            new IndexIncrement(emptyList()));
-        } else {
-            compactMessage =
-                    new CommitMessageImpl(
-                            BinaryRow.singleColumn(0),
-                            0,
-                            1,
-                            new DataIncrement(singletonList(file), 
emptyList(), emptyList()),
-                            new CompactIncrement(emptyList(), emptyList(), 
emptyList()),
-                            new IndexIncrement(emptyList()));
-        }
-        committable.addFileCommittable(compactMessage);
-        if (partitionMarkDoneRecoverFromState) {
-            markDone.notifyCommittable(singletonList(committable));
-        }
-    }
-
-    public static class MockOperatorStateStore implements OperatorStateStore {
-
-        @Override
-        public <K, V> BroadcastState<K, V> getBroadcastState(
-                MapStateDescriptor<K, V> stateDescriptor) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public <S> ListState<S> getListState(ListStateDescriptor<S> 
stateDescriptor) {
-            return new MockListState<>();
-        }
-
-        @Override
-        public <S> ListState<S> getUnionListState(ListStateDescriptor<S> 
stateDescriptor) {
-            throw new UnsupportedOperationException();
-        }
-
-        // @Override is skipped for compatibility with Flink 1.x.
-        public <K, V> BroadcastState<K, V> getBroadcastState(
-                org.apache.flink.api.common.state.v2.MapStateDescriptor<K, V> 
mapStateDescriptor)
-                throws Exception {
-            throw new UnsupportedOperationException();
-        }
-
-        // @Override is skipped for compatibility with Flink 1.x.
-        public <S> org.apache.flink.api.common.state.v2.ListState<S> 
getListState(
-                org.apache.flink.api.common.state.v2.ListStateDescriptor<S> 
listStateDescriptor)
-                throws Exception {
-            throw new UnsupportedOperationException();
-        }
-
-        // @Override is skipped for compatibility with Flink 1.x.
-        public <S> org.apache.flink.api.common.state.v2.ListState<S> 
getUnionListState(
-                org.apache.flink.api.common.state.v2.ListStateDescriptor<S> 
listStateDescriptor)
-                throws Exception {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public Set<String> getRegisteredStateNames() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public Set<String> getRegisteredBroadcastStateNames() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    public static class MockListState<T> implements ListState<T> {
-
-        private final List<T> backingList = new ArrayList<>();
-
-        public MockListState() {}
-
-        @Override
-        public void update(List<T> values) {
-            this.backingList.clear();
-            this.addAll(values);
-        }
-
-        @Override
-        public void addAll(List<T> values) {
-            this.backingList.addAll(values);
-        }
-
-        @Override
-        public Iterable<T> get() {
-            return new Iterable<T>() {
-                @Nonnull
-                public Iterator<T> iterator() {
-                    return MockListState.this.backingList.iterator();
-                }
-            };
-        }
-
-        @Override
-        public void add(T value) {
-            this.backingList.add(value);
-        }
-
-        @Override
-        public void clear() {
-            this.backingList.clear();
-        }
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
index 3c05b5fba3..39300b1797 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -16,4 +16,6 @@
 org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory
 
 # Catalog lock factory
-org.apache.paimon.flink.FileSystemCatalogITCase$FileSystemCatalogDummyLockFactory
\ No newline at end of file
+org.apache.paimon.flink.FileSystemCatalogITCase$FileSystemCatalogDummyLockFactory
+
+org.apache.paimon.flink.sink.listener.CustomCommitListenerTest$TestPartitionCollector$Factory
\ No newline at end of file

Reply via email to