This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a014ec32a [core] Rename PartitionListener to CommitListener (#5765)
4a014ec32a is described below

commit 4a014ec32afbe452c5fb39cfe6547e8237d2fb41
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 18 11:11:34 2025 +0800

    [core] Rename PartitionListener to CommitListener (#5765)
---
 .../procedure/MarkPartitionDoneProcedure.java      |  2 +-
 .../flink/action/MarkPartitionDoneAction.java      |  2 +-
 .../procedure/MarkPartitionDoneProcedure.java      |  2 +-
 .../apache/paimon/flink/sink/StoreCommitter.java   | 14 +++++------
 ...{PartitionListener.java => CommitListener.java} |  5 ++--
 ...artitionListeners.java => CommitListeners.java} | 29 ++++++++++++----------
 ...arkDone.java => PartitionMarkDoneListener.java} | 23 ++++++++---------
 .../sink/partition/ReportPartStatsListener.java    |  7 +++---
 .../CustomPartitionMarkDoneActionTest.java         |  6 ++---
 .../sink/partition/PartitionMarkDoneTest.java      | 12 +++++----
 10 files changed, 52 insertions(+), 50 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
index 22abfb3f3b..e67811c4e0 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
@@ -32,7 +32,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
 import java.io.IOException;
 import java.util.List;
 
-import static 
org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone;
+import static 
org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone;
 import static org.apache.paimon.utils.ParameterUtils.getPartitions;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
index c566af0a19..bfad0bf1b6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
@@ -26,7 +26,7 @@ import org.apache.paimon.utils.PartitionPathUtils;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone;
+import static 
org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone;
 
 /** Table partition mark done action for Flink. */
 public class MarkPartitionDoneAction extends TableActionBase {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
index d73553045b..90ebd48003 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
@@ -35,7 +35,7 @@ import org.apache.flink.table.procedure.ProcedureContext;
 import java.io.IOException;
 import java.util.List;
 
-import static 
org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone;
+import static 
org.apache.paimon.flink.sink.partition.PartitionMarkDoneListener.markDone;
 import static org.apache.paimon.utils.ParameterUtils.getPartitions;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index 3b5d5bf627..34a087f678 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
-import org.apache.paimon.flink.sink.partition.PartitionListeners;
+import org.apache.paimon.flink.sink.partition.CommitListeners;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.table.BucketMode;
@@ -44,7 +44,7 @@ public class StoreCommitter implements Committer<Committable, 
ManifestCommittabl
 
     private final TableCommitImpl commit;
     @Nullable private final CommitterMetrics committerMetrics;
-    private final PartitionListeners partitionListeners;
+    private final CommitListeners commitListeners;
     private final boolean allowLogOffsetDuplicate;
 
     public StoreCommitter(FileStoreTable table, TableCommit commit, Context 
context) {
@@ -58,7 +58,7 @@ public class StoreCommitter implements Committer<Committable, 
ManifestCommittabl
         }
 
         try {
-            this.partitionListeners = PartitionListeners.create(context, 
table);
+            this.commitListeners = CommitListeners.create(context, table);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -110,7 +110,7 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
             throws IOException, InterruptedException {
         commit.commitMultiple(committables, false);
         calcNumBytesAndRecordsOut(committables);
-        partitionListeners.notifyCommittable(committables);
+        commitListeners.notifyCommittable(committables);
     }
 
     @Override
@@ -119,7 +119,7 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
             boolean checkAppendFiles,
             boolean partitionMarkDoneRecoverFromState) {
         int committed = commit.filterAndCommitMultiple(globalCommittables, 
checkAppendFiles);
-        partitionListeners.notifyCommittable(globalCommittables, 
partitionMarkDoneRecoverFromState);
+        commitListeners.notifyCommittable(globalCommittables, 
partitionMarkDoneRecoverFromState);
 
         return committed;
     }
@@ -127,7 +127,7 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
     @Override
     public Map<Long, List<Committable>> 
groupByCheckpoint(Collection<Committable> committables) {
         try {
-            partitionListeners.snapshotState();
+            commitListeners.snapshotState();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -142,7 +142,7 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
     @Override
     public void close() throws Exception {
         commit.close();
-        partitionListeners.close();
+        commitListeners.close();
     }
 
     public boolean allowLogOffsetDuplicate() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListener.java
similarity index 85%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListener.java
index 18c365081a..fe70820651 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListener.java
@@ -24,10 +24,9 @@ import java.io.Closeable;
 import java.util.List;
 
 /** The partition listener. */
-public interface PartitionListener extends Closeable {
+public interface CommitListener extends Closeable {
 
-    void notifyCommittable(
-            List<ManifestCommittable> committables, boolean 
partitionMarkDoneRecoverFromState);
+    void notifyCommittable(List<ManifestCommittable> committables);
 
     void snapshotState() throws Exception;
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListeners.java
similarity index 72%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListeners.java
index 8f8d633d43..58ee1f969e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/CommitListeners.java
@@ -29,30 +29,33 @@ import java.util.ArrayList;
 import java.util.List;
 
 /** Partition listeners. */
-public class PartitionListeners implements Closeable {
+public class CommitListeners implements Closeable {
 
-    private final List<PartitionListener> listeners;
+    private final List<CommitListener> listeners;
 
-    private PartitionListeners(List<PartitionListener> listeners) {
+    private CommitListeners(List<CommitListener> listeners) {
         this.listeners = listeners;
     }
 
     public void notifyCommittable(List<ManifestCommittable> committables) {
-        for (PartitionListener trigger : listeners) {
-            trigger.notifyCommittable(committables, true);
+        for (CommitListener listener : listeners) {
+            listener.notifyCommittable(committables);
         }
     }
 
     public void notifyCommittable(
             List<ManifestCommittable> committables, boolean 
partitionMarkDoneRecoverFromState) {
-        for (PartitionListener trigger : listeners) {
-            trigger.notifyCommittable(committables, 
partitionMarkDoneRecoverFromState);
+        for (CommitListener listener : listeners) {
+            if (partitionMarkDoneRecoverFromState
+                    || !(listener instanceof PartitionMarkDoneListener)) {
+                listener.notifyCommittable(committables);
+            }
         }
     }
 
     public void snapshotState() throws Exception {
-        for (PartitionListener trigger : listeners) {
-            trigger.snapshotState();
+        for (CommitListener listener : listeners) {
+            listener.snapshotState();
         }
     }
 
@@ -61,16 +64,16 @@ public class PartitionListeners implements Closeable {
         IOUtils.closeAllQuietly(listeners);
     }
 
-    public static PartitionListeners create(Committer.Context context, 
FileStoreTable table)
+    public static CommitListeners create(Committer.Context context, 
FileStoreTable table)
             throws Exception {
-        List<PartitionListener> listeners = new ArrayList<>();
+        List<CommitListener> listeners = new ArrayList<>();
 
         // partition statistics reporter
         ReportPartStatsListener.create(context.isRestored(), 
context.stateStore(), table)
                 .ifPresent(listeners::add);
 
         // partition mark done
-        PartitionMarkDone.create(
+        PartitionMarkDoneListener.create(
                         context.getClass().getClassLoader(),
                         context.streamingCheckpointEnabled(),
                         context.isRestored(),
@@ -78,6 +81,6 @@ public class PartitionListeners implements Closeable {
                         table)
                 .ifPresent(listeners::add);
 
-        return new PartitionListeners(listeners);
+        return new CommitListeners(listeners);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java
similarity index 93%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java
index 96081dee0c..b5b82cda1b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneListener.java
@@ -51,9 +51,9 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_MODE;
 
 /** Mark partition done. */
-public class PartitionMarkDone implements PartitionListener {
+public class PartitionMarkDoneListener implements CommitListener {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionMarkDone.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionMarkDoneListener.class);
 
     private final InternalRowPartitionComputer partitionComputer;
     private final PartitionMarkDoneTrigger trigger;
@@ -61,7 +61,7 @@ public class PartitionMarkDone implements PartitionListener {
     private final boolean waitCompaction;
     private final PartitionMarkDoneActionMode partitionMarkDoneActionMode;
 
-    public static Optional<PartitionMarkDone> create(
+    public static Optional<PartitionMarkDoneListener> create(
             ClassLoader cl,
             boolean isStreaming,
             boolean isRestored,
@@ -96,7 +96,7 @@ public class PartitionMarkDone implements PartitionListener {
                                 || coreOptions.mergeEngine() == 
MergeEngine.FIRST_ROW);
 
         return Optional.of(
-                new PartitionMarkDone(
+                new PartitionMarkDoneListener(
                         partitionComputer,
                         trigger,
                         actions,
@@ -119,7 +119,7 @@ public class PartitionMarkDone implements PartitionListener 
{
         return table.partitionKeys().isEmpty();
     }
 
-    public PartitionMarkDone(
+    public PartitionMarkDoneListener(
             InternalRowPartitionComputer partitionComputer,
             PartitionMarkDoneTrigger trigger,
             List<PartitionMarkDoneAction> actions,
@@ -133,14 +133,11 @@ public class PartitionMarkDone implements 
PartitionListener {
     }
 
     @Override
-    public void notifyCommittable(
-            List<ManifestCommittable> committables, boolean 
partitionMarkDoneRecoverFromState) {
-        if (partitionMarkDoneRecoverFromState) {
-            if (partitionMarkDoneActionMode == 
PartitionMarkDoneActionMode.WATERMARK) {
-                markDoneByWatermark(committables);
-            } else {
-                markDoneByProcessTime(committables);
-            }
+    public void notifyCommittable(List<ManifestCommittable> committables) {
+        if (partitionMarkDoneActionMode == 
PartitionMarkDoneActionMode.WATERMARK) {
+            markDoneByWatermark(committables);
+        } else {
+            markDoneByProcessTime(committables);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
index db20656b85..4b815d336f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
@@ -50,7 +50,7 @@ import java.util.Set;
  * This listener will collect data from the newly touched partition and then 
decide when to trigger
  * a report based on the partition's idle time.
  */
-public class ReportPartStatsListener implements PartitionListener {
+public class ReportPartStatsListener implements CommitListener {
 
     @SuppressWarnings("unchecked")
     private static final ListStateDescriptor<Map<String, Long>> 
PENDING_REPORT_STATE_DESC =
@@ -85,8 +85,8 @@ public class ReportPartStatsListener implements 
PartitionListener {
         this.idleTime = idleTime;
     }
 
-    public void notifyCommittable(
-            List<ManifestCommittable> committables, boolean 
partitionMarkDoneRecoverFromState) {
+    @Override
+    public void notifyCommittable(List<ManifestCommittable> committables) {
         Set<String> partition = new HashSet<>();
         boolean endInput = false;
         for (ManifestCommittable committable : committables) {
@@ -136,6 +136,7 @@ public class ReportPartStatsListener implements 
PartitionListener {
         return result;
     }
 
+    @Override
     public void snapshotState() throws Exception {
         
pendingPartitionsState.update(Collections.singletonList(pendingPartitions));
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
index 55d1cabdcd..a4f9a0e574 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
@@ -62,7 +62,7 @@ public class CustomPartitionMarkDoneActionTest extends 
TableTestBase {
         // set.
         Assertions.assertThatThrownBy(
                         () ->
-                                PartitionMarkDone.create(
+                                PartitionMarkDoneListener.create(
                                         getClass().getClassLoader(),
                                         false,
                                         false,
@@ -85,8 +85,8 @@ public class CustomPartitionMarkDoneActionTest extends 
TableTestBase {
 
         FileStoreTable table2 = (FileStoreTable) catalog.getTable(identifier);
 
-        PartitionMarkDone markDone =
-                PartitionMarkDone.create(
+        PartitionMarkDoneListener markDone =
+                PartitionMarkDoneListener.create(
                                 getClass().getClassLoader(),
                                 false,
                                 false,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
index eb7de0fd41..9e541d5ff0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
@@ -91,8 +91,8 @@ class PartitionMarkDoneTest extends TableTestBase {
         FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
         Path location = table.location();
         Path successFile = new Path(location, "a=0/_SUCCESS");
-        PartitionMarkDone markDone =
-                PartitionMarkDone.create(
+        PartitionMarkDoneListener markDone =
+                PartitionMarkDoneListener.create(
                                 getClass().getClassLoader(),
                                 false,
                                 false,
@@ -115,12 +115,12 @@ class PartitionMarkDoneTest extends TableTestBase {
         }
     }
 
-    public static void notifyCommits(PartitionMarkDone markDone, boolean 
isCompact) {
+    public static void notifyCommits(PartitionMarkDoneListener markDone, 
boolean isCompact) {
         notifyCommits(markDone, isCompact, true);
     }
 
     private static void notifyCommits(
-            PartitionMarkDone markDone,
+            PartitionMarkDoneListener markDone,
             boolean isCompact,
             boolean partitionMarkDoneRecoverFromState) {
         ManifestCommittable committable = new 
ManifestCommittable(Long.MAX_VALUE);
@@ -146,7 +146,9 @@ class PartitionMarkDoneTest extends TableTestBase {
                             new IndexIncrement(emptyList()));
         }
         committable.addFileCommittable(compactMessage);
-        markDone.notifyCommittable(singletonList(committable), 
partitionMarkDoneRecoverFromState);
+        if (partitionMarkDoneRecoverFromState) {
+            markDone.notifyCommittable(singletonList(committable));
+        }
     }
 
     public static class MockOperatorStateStore implements OperatorStateStore {

Reply via email to