This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5434595a14 [core] Make bucketed append table write initial lighter 
(#6741)
5434595a14 is described below

commit 5434595a148a339dac115c450bf2afde3526d376
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 8 22:45:00 2025 +0800

    [core] Make bucketed append table write initial lighter (#6741)
---
 .../main/java/org/apache/paimon/CoreOptions.java   |  4 ++
 .../append/BucketedAppendCompactManager.java       | 33 +-------------
 .../operation/BucketedAppendFileStoreWrite.java    | 14 ++++++
 .../paimon/table/AppendOnlyFileStoreTable.java     |  5 +-
 .../table/source/AppendOnlySplitGenerator.java     |  5 +-
 .../paimon/table/AppendOnlySimpleTableTest.java    | 53 ++++++++++++++++++++++
 6 files changed, 77 insertions(+), 37 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e9077ff589..b0862356f5 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2929,6 +2929,10 @@ public class CoreOptions implements Serializable {
         return consumerId;
     }
 
+    public boolean bucketAppendOrdered() {
+        return options.get(BUCKET_APPEND_ORDERED);
+    }
+
     @Nullable
     public Integer fullCompactionDeltaCommits() {
         return options.get(FULL_COMPACTION_DELTA_COMMITS);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
index 4a057f9576..df7c57bc7f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
@@ -78,7 +78,7 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
             @Nullable CompactionMetrics.Reporter metricsReporter) {
         this.executor = executor;
         this.dvMaintainer = dvMaintainer;
-        this.toCompact = new PriorityQueue<>(fileComparator(false));
+        this.toCompact = new 
PriorityQueue<>(Comparator.comparing(DataFileMeta::minSequenceNumber));
         this.toCompact.addAll(restored);
         this.minFileNum = minFileNum;
         this.targetFileSize = targetFileSize;
@@ -357,35 +357,4 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
     public interface CompactRewriter {
         List<DataFileMeta> rewrite(List<DataFileMeta> compactBefore) throws 
Exception;
     }
-
-    /**
-     * New files may be created during the compaction process, then the 
results of the compaction
-     * may be put after the new files, and this order will be disrupted. We 
need to ensure this
-     * order, so we force the order by sequence.
-     */
-    public static Comparator<DataFileMeta> fileComparator(boolean 
ignoreOverlap) {
-        return (o1, o2) -> {
-            if (o1 == o2) {
-                return 0;
-            }
-
-            if (!ignoreOverlap && isOverlap(o1, o2)) {
-                LOG.warn(
-                        String.format(
-                                "There should no overlap in append files, but 
Range1(%s, %s), Range2(%s, %s),"
-                                        + " check if you have multiple write 
jobs.",
-                                o1.minSequenceNumber(),
-                                o1.maxSequenceNumber(),
-                                o2.minSequenceNumber(),
-                                o2.maxSequenceNumber()));
-            }
-
-            return Long.compare(o1.minSequenceNumber(), 
o2.minSequenceNumber());
-        };
-    }
-
-    private static boolean isOverlap(DataFileMeta o1, DataFileMeta o2) {
-        return o2.minSequenceNumber() <= o1.maxSequenceNumber()
-                && o2.maxSequenceNumber() >= o1.minSequenceNumber();
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
index 71f10d4b3d..a3f2083e25 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
@@ -68,9 +68,23 @@ public class BucketedAppendFileStoreWrite extends 
BaseAppendFileStoreWrite {
                 options,
                 dvMaintainerFactory,
                 tableName);
+        if (!options.bucketAppendOrdered()) {
+            super.withIgnorePreviousFiles(options.writeOnly());
+        }
         this.commitUser = commitUser;
     }
 
+    @Override
+    public void withIgnorePreviousFiles(boolean ignorePrevious) {
+        if (options.bucketAppendOrdered()) {
+            super.withIgnorePreviousFiles(ignorePrevious);
+        } else {
+            // for unordered, don't need sequence number
+            // all writers to be empty if write only
+            super.withIgnorePreviousFiles(options.writeOnly());
+        }
+    }
+
     @Override
     protected CompactManager getCompactManager(
             BinaryRow partition,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 4ac41361ac..4aec22c203 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -88,8 +88,9 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
 
     @Override
     protected SplitGenerator splitGenerator() {
-        long targetSplitSize = store().options().splitTargetSize();
-        long openFileCost = store().options().splitOpenFileCost();
+        CoreOptions options = store().options();
+        long targetSplitSize = options.splitTargetSize();
+        long openFileCost = options.splitOpenFileCost();
         return coreOptions().dataEvolutionEnabled()
                 ? new DataEvolutionSplitGenerator(targetSplitSize, 
openFileCost)
                 : new AppendOnlySplitGenerator(targetSplitSize, openFileCost, 
bucketMode());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
index 52dfad74e4..92744443cd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
@@ -24,12 +24,11 @@ import org.apache.paimon.utils.BinPacking;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static 
org.apache.paimon.append.BucketedAppendCompactManager.fileComparator;
-
 /** Append only implementation of {@link SplitGenerator}. */
 public class AppendOnlySplitGenerator implements SplitGenerator {
 
@@ -52,7 +51,7 @@ public class AppendOnlySplitGenerator implements 
SplitGenerator {
     @Override
     public List<SplitGroup> splitForBatch(List<DataFileMeta> input) {
         List<DataFileMeta> files = new ArrayList<>(input);
-        files.sort(fileComparator(bucketMode == BucketMode.BUCKET_UNAWARE));
+        files.sort(Comparator.comparing(DataFileMeta::minSequenceNumber));
         Function<DataFileMeta, Long> weightFunc = file -> 
Math.max(file.fileSize(), openFileCost);
         return BinPacking.packForOrdered(files, weightFunc, 
targetSplitSize).stream()
                 .map(SplitGroup::rawConvertibleGroup)
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index 2470fdf749..201442b6cc 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -37,7 +37,9 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.BundleRecords;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.operation.BaseAppendFileStoreWrite;
 import org.apache.paimon.operation.FileStoreWrite;
 import org.apache.paimon.options.MemorySize;
@@ -103,6 +105,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_APPEND_ORDERED;
 import static org.apache.paimon.CoreOptions.BUCKET_KEY;
 import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
@@ -121,6 +124,56 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /** Tests for {@link AppendOnlyFileStoreTable}. */
 public class AppendOnlySimpleTableTest extends SimpleTableTestBase {
 
+    @Test
+    public void testBucketedAppendTableWriteWithInit() throws Exception {
+        innerTestBucketedAppendTableWriteInit(true);
+    }
+
+    @Test
+    public void testBucketedAppendTableWriteNoInit() throws Exception {
+        innerTestBucketedAppendTableWriteInit(false);
+    }
+
+    public void innerTestBucketedAppendTableWriteInit(boolean ordered) throws 
Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        options -> {
+                            options.set(BUCKET, 2);
+                            options.set(BUCKET_KEY, "a");
+                            options.set(WRITE_ONLY, true);
+                            options.set(BUCKET_APPEND_ORDERED, ordered);
+                        });
+
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+
+        // 1. first write
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(rowData(1, 10, 100L));
+            commit.commit(write.prepareCommit());
+        }
+
+        // 2. delete all manifests
+        ManifestList manifestList = 
table.store().manifestListFactory().create();
+        ManifestFile manifestFile = 
table.store().manifestFileFactory().create();
+        List<ManifestFileMeta> manifests =
+                manifestList.readAllManifests(table.latestSnapshot().get());
+        for (ManifestFileMeta manifest : manifests) {
+            manifestFile.delete(manifest.fileName());
+        }
+
+        // 3. check new write
+        try (BatchTableWrite write = writeBuilder.newWrite()) {
+            if (ordered) {
+                assertThatThrownBy(() -> write.write(rowData(1, 10, 100L)))
+                        .hasMessageContaining("FileNotFoundException");
+            } else {
+                // no exception
+                write.write(rowData(1, 10, 100L));
+            }
+        }
+    }
+
     @Test
     public void testOverwriteNeverFail() throws Exception {
         FileStoreTable table = createFileStoreTable();

Reply via email to