This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2f9b0a0c87 [flink] Refactor code in
RewritePostponeBucketCommittableOperator (#5923)
2f9b0a0c87 is described below
commit 2f9b0a0c876f87e525c4b278eaa8e945ca71bb2d
Author: tsreaper <[email protected]>
AuthorDate: Fri Jul 18 18:04:38 2025 +0800
[flink] Refactor code in RewritePostponeBucketCommittableOperator (#5923)
---
...java => PostponeBucketCommittableRewriter.java} | 55 +++------
.../RewritePostponeBucketCommittableOperator.java | 125 +--------------------
2 files changed, 22 insertions(+), 158 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
similarity index 78%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
index 9c9c528f54..2991fb4ecc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.postpone;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
-import org.apache.paimon.flink.utils.BoundedOneInputOperator;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileMeta;
@@ -33,8 +32,6 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -47,34 +44,19 @@ import java.util.Map;
* Rewrite committable from postpone bucket table compactor. It moves all new
files into compact
* results, and delete unused new files, because compactor only produce
compact snapshots.
*/
-public class RewritePostponeBucketCommittableOperator
- extends BoundedOneInputOperator<Committable, Committable> {
-
- private static final long serialVersionUID = 1L;
+public class PostponeBucketCommittableRewriter {
private final FileStoreTable table;
+ private final FileStorePathFactory pathFactory;
+ private final Map<BinaryRow, Map<Integer, BucketFiles>> buckets;
- private transient FileStorePathFactory pathFactory;
- private transient Map<BinaryRow, Map<Integer, BucketFiles>> buckets;
-
- public RewritePostponeBucketCommittableOperator(FileStoreTable table) {
+ public PostponeBucketCommittableRewriter(FileStoreTable table) {
this.table = table;
+ this.pathFactory = table.store().pathFactory();
+ this.buckets = new HashMap<>();
}
- @Override
- public void open() throws Exception {
- pathFactory = table.store().pathFactory();
- buckets = new HashMap<>();
- }
-
- @Override
- public void processElement(StreamRecord<Committable> element) throws
Exception {
- Committable committable = element.getValue();
- if (committable.kind() != Committable.Kind.FILE) {
- output.collect(element);
- }
-
- CommitMessageImpl message = (CommitMessageImpl)
committable.wrappedCommittable();
+ public void add(CommitMessageImpl message) {
buckets.computeIfAbsent(message.partition(), p -> new HashMap<>())
.computeIfAbsent(
message.bucket(),
@@ -86,26 +68,23 @@ public class RewritePostponeBucketCommittableOperator
.update(message);
}
- @Override
- public void endInput() throws Exception {
- emitAll(Long.MAX_VALUE);
- }
-
- protected void emitAll(long checkpointId) {
+ public List<Committable> emitAll(long checkpointId) {
+ List<Committable> result = new ArrayList<>();
for (Map.Entry<BinaryRow, Map<Integer, BucketFiles>> partitionEntry :
buckets.entrySet()) {
for (Map.Entry<Integer, BucketFiles> bucketEntry :
partitionEntry.getValue().entrySet()) {
BucketFiles bucketFiles = bucketEntry.getValue();
- output.collect(
- new StreamRecord<>(
- new Committable(
- checkpointId,
- Committable.Kind.FILE,
- bucketFiles.makeMessage(
- partitionEntry.getKey(),
bucketEntry.getKey()))));
+ Committable committable =
+ new Committable(
+ checkpointId,
+ Committable.Kind.FILE,
+ bucketFiles.makeMessage(
+ partitionEntry.getKey(),
bucketEntry.getKey()));
+ result.add(committable);
}
}
buckets.clear();
+ return result;
}
private static class BucketFiles {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
index 9c9c528f54..43b4cc1244 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/RewritePostponeBucketCommittableOperator.java
@@ -18,44 +18,21 @@
package org.apache.paimon.flink.postpone;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.utils.BoundedOneInputOperator;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.io.CompactIncrement;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
-import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Rewrite committable from postpone bucket table compactor. It moves all new
files into compact
- * results, and delete unused new files, because compactor only produce
compact snapshots.
- */
+/** Rewrite committable from postpone bucket table compactor. */
public class RewritePostponeBucketCommittableOperator
extends BoundedOneInputOperator<Committable, Committable> {
private static final long serialVersionUID = 1L;
private final FileStoreTable table;
-
- private transient FileStorePathFactory pathFactory;
- private transient Map<BinaryRow, Map<Integer, BucketFiles>> buckets;
+ private transient PostponeBucketCommittableRewriter rewriter;
public RewritePostponeBucketCommittableOperator(FileStoreTable table) {
this.table = table;
@@ -63,8 +40,7 @@ public class RewritePostponeBucketCommittableOperator
@Override
public void open() throws Exception {
- pathFactory = table.store().pathFactory();
- buckets = new HashMap<>();
+ rewriter = new PostponeBucketCommittableRewriter(table);
}
@Override
@@ -74,16 +50,7 @@ public class RewritePostponeBucketCommittableOperator
output.collect(element);
}
- CommitMessageImpl message = (CommitMessageImpl)
committable.wrappedCommittable();
- buckets.computeIfAbsent(message.partition(), p -> new HashMap<>())
- .computeIfAbsent(
- message.bucket(),
- b ->
- new BucketFiles(
- pathFactory.createDataFilePathFactory(
- message.partition(),
message.bucket()),
- table.fileIO()))
- .update(message);
+ rewriter.add((CommitMessageImpl) committable.wrappedCommittable());
}
@Override
@@ -92,88 +59,6 @@ public class RewritePostponeBucketCommittableOperator
}
protected void emitAll(long checkpointId) {
- for (Map.Entry<BinaryRow, Map<Integer, BucketFiles>> partitionEntry :
buckets.entrySet()) {
- for (Map.Entry<Integer, BucketFiles> bucketEntry :
- partitionEntry.getValue().entrySet()) {
- BucketFiles bucketFiles = bucketEntry.getValue();
- output.collect(
- new StreamRecord<>(
- new Committable(
- checkpointId,
- Committable.Kind.FILE,
- bucketFiles.makeMessage(
- partitionEntry.getKey(),
bucketEntry.getKey()))));
- }
- }
- buckets.clear();
- }
-
- private static class BucketFiles {
-
- private final DataFilePathFactory pathFactory;
- private final FileIO fileIO;
-
- private @Nullable Integer totalBuckets;
- private final Map<String, DataFileMeta> newFiles;
- private final List<DataFileMeta> compactBefore;
- private final List<DataFileMeta> compactAfter;
- private final List<DataFileMeta> changelogFiles;
- private final List<IndexFileMeta> newIndexFiles;
- private final List<IndexFileMeta> deletedIndexFiles;
-
- private BucketFiles(DataFilePathFactory pathFactory, FileIO fileIO) {
- this.pathFactory = pathFactory;
- this.fileIO = fileIO;
-
- this.newFiles = new LinkedHashMap<>();
- this.compactBefore = new ArrayList<>();
- this.compactAfter = new ArrayList<>();
- this.changelogFiles = new ArrayList<>();
- this.newIndexFiles = new ArrayList<>();
- this.deletedIndexFiles = new ArrayList<>();
- }
-
- private void update(CommitMessageImpl message) {
- totalBuckets = message.totalBuckets();
-
- for (DataFileMeta file : message.newFilesIncrement().newFiles()) {
- newFiles.put(file.fileName(), file);
- }
-
- Map<String, Path> toDelete = new HashMap<>();
- for (DataFileMeta file :
message.compactIncrement().compactBefore()) {
- if (newFiles.containsKey(file.fileName())) {
- toDelete.put(file.fileName(), pathFactory.toPath(file));
- newFiles.remove(file.fileName());
- } else {
- compactBefore.add(file);
- }
- }
-
- for (DataFileMeta file :
message.compactIncrement().compactAfter()) {
- compactAfter.add(file);
- toDelete.remove(file.fileName());
- }
-
-
changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
- changelogFiles.addAll(message.compactIncrement().changelogFiles());
-
- newIndexFiles.addAll(message.indexIncrement().newIndexFiles());
-
deletedIndexFiles.addAll(message.indexIncrement().deletedIndexFiles());
-
- toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path));
- }
-
- private CommitMessageImpl makeMessage(BinaryRow partition, int bucket)
{
- List<DataFileMeta> realCompactAfter = new
ArrayList<>(newFiles.values());
- realCompactAfter.addAll(compactAfter);
- return new CommitMessageImpl(
- partition,
- bucket,
- totalBuckets,
- DataIncrement.emptyIncrement(),
- new CompactIncrement(compactBefore, realCompactAfter,
changelogFiles),
- new IndexIncrement(newIndexFiles, deletedIndexFiles));
- }
+ rewriter.emitAll(checkpointId).forEach(c -> output.collect(new
StreamRecord<>(c)));
}
}