This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f9b0a0c87 [flink] Refactor code in 
RewritePostponeBucketCommittableOperator (#5923)
2f9b0a0c87 is described below

commit 2f9b0a0c876f87e525c4b278eaa8e945ca71bb2d
Author: tsreaper <[email protected]>
AuthorDate: Fri Jul 18 18:04:38 2025 +0800

    [flink] Refactor code in RewritePostponeBucketCommittableOperator (#5923)
---
 ...java => PostponeBucketCommittableRewriter.java} |  55 +++------
 .../RewritePostponeBucketCommittableOperator.java  | 125 +--------------------
 2 files changed, 22 insertions(+), 158 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
similarity index 78%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
index 9c9c528f54..2991fb4ecc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.postpone;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.flink.sink.Committable;
-import org.apache.paimon.flink.utils.BoundedOneInputOperator;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileMeta;
@@ -33,8 +32,6 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.FileStorePathFactory;
 
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -47,34 +44,19 @@ import java.util.Map;
  * Rewrite committable from postpone bucket table compactor. It moves all new 
files into compact
  * results, and delete unused new files, because compactor only produce 
compact snapshots.
  */
-public class RewritePostponeBucketCommittableOperator
-        extends BoundedOneInputOperator<Committable, Committable> {
-
-    private static final long serialVersionUID = 1L;
+public class PostponeBucketCommittableRewriter {
 
     private final FileStoreTable table;
+    private final FileStorePathFactory pathFactory;
+    private final Map<BinaryRow, Map<Integer, BucketFiles>> buckets;
 
-    private transient FileStorePathFactory pathFactory;
-    private transient Map<BinaryRow, Map<Integer, BucketFiles>> buckets;
-
-    public RewritePostponeBucketCommittableOperator(FileStoreTable table) {
+    public PostponeBucketCommittableRewriter(FileStoreTable table) {
         this.table = table;
+        this.pathFactory = table.store().pathFactory();
+        this.buckets = new HashMap<>();
     }
 
-    @Override
-    public void open() throws Exception {
-        pathFactory = table.store().pathFactory();
-        buckets = new HashMap<>();
-    }
-
-    @Override
-    public void processElement(StreamRecord<Committable> element) throws 
Exception {
-        Committable committable = element.getValue();
-        if (committable.kind() != Committable.Kind.FILE) {
-            output.collect(element);
-        }
-
-        CommitMessageImpl message = (CommitMessageImpl) 
committable.wrappedCommittable();
+    public void add(CommitMessageImpl message) {
         buckets.computeIfAbsent(message.partition(), p -> new HashMap<>())
                 .computeIfAbsent(
                         message.bucket(),
@@ -86,26 +68,23 @@ public class RewritePostponeBucketCommittableOperator
                 .update(message);
     }
 
-    @Override
-    public void endInput() throws Exception {
-        emitAll(Long.MAX_VALUE);
-    }
-
-    protected void emitAll(long checkpointId) {
+    public List<Committable> emitAll(long checkpointId) {
+        List<Committable> result = new ArrayList<>();
         for (Map.Entry<BinaryRow, Map<Integer, BucketFiles>> partitionEntry : 
buckets.entrySet()) {
             for (Map.Entry<Integer, BucketFiles> bucketEntry :
                     partitionEntry.getValue().entrySet()) {
                 BucketFiles bucketFiles = bucketEntry.getValue();
-                output.collect(
-                        new StreamRecord<>(
-                                new Committable(
-                                        checkpointId,
-                                        Committable.Kind.FILE,
-                                        bucketFiles.makeMessage(
-                                                partitionEntry.getKey(), 
bucketEntry.getKey()))));
+                Committable committable =
+                        new Committable(
+                                checkpointId,
+                                Committable.Kind.FILE,
+                                bucketFiles.makeMessage(
+                                        partitionEntry.getKey(), 
bucketEntry.getKey()));
+                result.add(committable);
             }
         }
         buckets.clear();
+        return result;
     }
 
     private static class BucketFiles {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
index 9c9c528f54..43b4cc1244 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
@@ -18,44 +18,21 @@
 
 package org.apache.paimon.flink.postpone;
 
-import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.utils.BoundedOneInputOperator;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.io.CompactIncrement;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessageImpl;
-import org.apache.paimon.utils.FileStorePathFactory;
 
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Rewrite committable from postpone bucket table compactor. It moves all new 
files into compact
- * results, and delete unused new files, because compactor only produce 
compact snapshots.
- */
+/** Rewrite committable from postpone bucket table compactor. */
 public class RewritePostponeBucketCommittableOperator
         extends BoundedOneInputOperator<Committable, Committable> {
 
     private static final long serialVersionUID = 1L;
 
     private final FileStoreTable table;
-
-    private transient FileStorePathFactory pathFactory;
-    private transient Map<BinaryRow, Map<Integer, BucketFiles>> buckets;
+    private transient PostponeBucketCommittableRewriter rewriter;
 
     public RewritePostponeBucketCommittableOperator(FileStoreTable table) {
         this.table = table;
@@ -63,8 +40,7 @@ public class RewritePostponeBucketCommittableOperator
 
     @Override
     public void open() throws Exception {
-        pathFactory = table.store().pathFactory();
-        buckets = new HashMap<>();
+        rewriter = new PostponeBucketCommittableRewriter(table);
     }
 
     @Override
@@ -74,16 +50,7 @@ public class RewritePostponeBucketCommittableOperator
             output.collect(element);
         }
 
-        CommitMessageImpl message = (CommitMessageImpl) 
committable.wrappedCommittable();
-        buckets.computeIfAbsent(message.partition(), p -> new HashMap<>())
-                .computeIfAbsent(
-                        message.bucket(),
-                        b ->
-                                new BucketFiles(
-                                        pathFactory.createDataFilePathFactory(
-                                                message.partition(), 
message.bucket()),
-                                        table.fileIO()))
-                .update(message);
+        rewriter.add((CommitMessageImpl) committable.wrappedCommittable());
     }
 
     @Override
@@ -92,88 +59,6 @@ public class RewritePostponeBucketCommittableOperator
     }
 
     protected void emitAll(long checkpointId) {
-        for (Map.Entry<BinaryRow, Map<Integer, BucketFiles>> partitionEntry : 
buckets.entrySet()) {
-            for (Map.Entry<Integer, BucketFiles> bucketEntry :
-                    partitionEntry.getValue().entrySet()) {
-                BucketFiles bucketFiles = bucketEntry.getValue();
-                output.collect(
-                        new StreamRecord<>(
-                                new Committable(
-                                        checkpointId,
-                                        Committable.Kind.FILE,
-                                        bucketFiles.makeMessage(
-                                                partitionEntry.getKey(), 
bucketEntry.getKey()))));
-            }
-        }
-        buckets.clear();
-    }
-
-    private static class BucketFiles {
-
-        private final DataFilePathFactory pathFactory;
-        private final FileIO fileIO;
-
-        private @Nullable Integer totalBuckets;
-        private final Map<String, DataFileMeta> newFiles;
-        private final List<DataFileMeta> compactBefore;
-        private final List<DataFileMeta> compactAfter;
-        private final List<DataFileMeta> changelogFiles;
-        private final List<IndexFileMeta> newIndexFiles;
-        private final List<IndexFileMeta> deletedIndexFiles;
-
-        private BucketFiles(DataFilePathFactory pathFactory, FileIO fileIO) {
-            this.pathFactory = pathFactory;
-            this.fileIO = fileIO;
-
-            this.newFiles = new LinkedHashMap<>();
-            this.compactBefore = new ArrayList<>();
-            this.compactAfter = new ArrayList<>();
-            this.changelogFiles = new ArrayList<>();
-            this.newIndexFiles = new ArrayList<>();
-            this.deletedIndexFiles = new ArrayList<>();
-        }
-
-        private void update(CommitMessageImpl message) {
-            totalBuckets = message.totalBuckets();
-
-            for (DataFileMeta file : message.newFilesIncrement().newFiles()) {
-                newFiles.put(file.fileName(), file);
-            }
-
-            Map<String, Path> toDelete = new HashMap<>();
-            for (DataFileMeta file : 
message.compactIncrement().compactBefore()) {
-                if (newFiles.containsKey(file.fileName())) {
-                    toDelete.put(file.fileName(), pathFactory.toPath(file));
-                    newFiles.remove(file.fileName());
-                } else {
-                    compactBefore.add(file);
-                }
-            }
-
-            for (DataFileMeta file : 
message.compactIncrement().compactAfter()) {
-                compactAfter.add(file);
-                toDelete.remove(file.fileName());
-            }
-
-            
changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
-            changelogFiles.addAll(message.compactIncrement().changelogFiles());
-
-            newIndexFiles.addAll(message.indexIncrement().newIndexFiles());
-            
deletedIndexFiles.addAll(message.indexIncrement().deletedIndexFiles());
-
-            toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path));
-        }
-
-        private CommitMessageImpl makeMessage(BinaryRow partition, int bucket) 
{
-            List<DataFileMeta> realCompactAfter = new 
ArrayList<>(newFiles.values());
-            realCompactAfter.addAll(compactAfter);
-            return new CommitMessageImpl(
-                    partition,
-                    bucket,
-                    totalBuckets,
-                    DataIncrement.emptyIncrement(),
-                    new CompactIncrement(compactBefore, realCompactAfter, 
changelogFiles),
-                    new IndexIncrement(newIndexFiles, deletedIndexFiles));
-        }
+        rewriter.emitAll(checkpointId).forEach(c -> output.collect(new 
StreamRecord<>(c)));
     }
 }

Reply via email to