This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5434595a14 [core] Make bucketed append table write initial lighter
(#6741)
5434595a14 is described below
commit 5434595a148a339dac115c450bf2afde3526d376
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 8 22:45:00 2025 +0800
[core] Make bucketed append table write initial lighter (#6741)
---
.../main/java/org/apache/paimon/CoreOptions.java | 4 ++
.../append/BucketedAppendCompactManager.java | 33 +-------------
.../operation/BucketedAppendFileStoreWrite.java | 14 ++++++
.../paimon/table/AppendOnlyFileStoreTable.java | 5 +-
.../table/source/AppendOnlySplitGenerator.java | 5 +-
.../paimon/table/AppendOnlySimpleTableTest.java | 53 ++++++++++++++++++++++
6 files changed, 77 insertions(+), 37 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e9077ff589..b0862356f5 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2929,6 +2929,10 @@ public class CoreOptions implements Serializable {
return consumerId;
}
+ public boolean bucketAppendOrdered() {
+ return options.get(BUCKET_APPEND_ORDERED);
+ }
+
@Nullable
public Integer fullCompactionDeltaCommits() {
return options.get(FULL_COMPACTION_DELTA_COMMITS);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
index 4a057f9576..df7c57bc7f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
@@ -78,7 +78,7 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
@Nullable CompactionMetrics.Reporter metricsReporter) {
this.executor = executor;
this.dvMaintainer = dvMaintainer;
- this.toCompact = new PriorityQueue<>(fileComparator(false));
+ this.toCompact = new
PriorityQueue<>(Comparator.comparing(DataFileMeta::minSequenceNumber));
this.toCompact.addAll(restored);
this.minFileNum = minFileNum;
this.targetFileSize = targetFileSize;
@@ -357,35 +357,4 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
public interface CompactRewriter {
List<DataFileMeta> rewrite(List<DataFileMeta> compactBefore) throws
Exception;
}
-
- /**
- * New files may be created during the compaction process, then the
results of the compaction
- * may be put after the new files, and this order will be disrupted. We
need to ensure this
- * order, so we force the order by sequence.
- */
- public static Comparator<DataFileMeta> fileComparator(boolean
ignoreOverlap) {
- return (o1, o2) -> {
- if (o1 == o2) {
- return 0;
- }
-
- if (!ignoreOverlap && isOverlap(o1, o2)) {
- LOG.warn(
- String.format(
- "There should no overlap in append files, but
Range1(%s, %s), Range2(%s, %s),"
- + " check if you have multiple write
jobs.",
- o1.minSequenceNumber(),
- o1.maxSequenceNumber(),
- o2.minSequenceNumber(),
- o2.maxSequenceNumber()));
- }
-
- return Long.compare(o1.minSequenceNumber(),
o2.minSequenceNumber());
- };
- }
-
- private static boolean isOverlap(DataFileMeta o1, DataFileMeta o2) {
- return o2.minSequenceNumber() <= o1.maxSequenceNumber()
- && o2.maxSequenceNumber() >= o1.minSequenceNumber();
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
index 71f10d4b3d..a3f2083e25 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
@@ -68,9 +68,23 @@ public class BucketedAppendFileStoreWrite extends
BaseAppendFileStoreWrite {
options,
dvMaintainerFactory,
tableName);
+ if (!options.bucketAppendOrdered()) {
+ super.withIgnorePreviousFiles(options.writeOnly());
+ }
this.commitUser = commitUser;
}
+ @Override
+ public void withIgnorePreviousFiles(boolean ignorePrevious) {
+ if (options.bucketAppendOrdered()) {
+ super.withIgnorePreviousFiles(ignorePrevious);
+ } else {
+ // for unordered, don't need sequence number
+ // all writers to be empty if write only
+ super.withIgnorePreviousFiles(options.writeOnly());
+ }
+ }
+
@Override
protected CompactManager getCompactManager(
BinaryRow partition,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 4ac41361ac..4aec22c203 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -88,8 +88,9 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
@Override
protected SplitGenerator splitGenerator() {
- long targetSplitSize = store().options().splitTargetSize();
- long openFileCost = store().options().splitOpenFileCost();
+ CoreOptions options = store().options();
+ long targetSplitSize = options.splitTargetSize();
+ long openFileCost = options.splitOpenFileCost();
return coreOptions().dataEvolutionEnabled()
? new DataEvolutionSplitGenerator(targetSplitSize,
openFileCost)
: new AppendOnlySplitGenerator(targetSplitSize, openFileCost,
bucketMode());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
index 52dfad74e4..92744443cd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendOnlySplitGenerator.java
@@ -24,12 +24,11 @@ import org.apache.paimon.utils.BinPacking;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static
org.apache.paimon.append.BucketedAppendCompactManager.fileComparator;
-
/** Append only implementation of {@link SplitGenerator}. */
public class AppendOnlySplitGenerator implements SplitGenerator {
@@ -52,7 +51,7 @@ public class AppendOnlySplitGenerator implements
SplitGenerator {
@Override
public List<SplitGroup> splitForBatch(List<DataFileMeta> input) {
List<DataFileMeta> files = new ArrayList<>(input);
- files.sort(fileComparator(bucketMode == BucketMode.BUCKET_UNAWARE));
+ files.sort(Comparator.comparing(DataFileMeta::minSequenceNumber));
Function<DataFileMeta, Long> weightFunc = file ->
Math.max(file.fileSize(), openFileCost);
return BinPacking.packForOrdered(files, weightFunc,
targetSplitSize).stream()
.map(SplitGroup::rawConvertibleGroup)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index 2470fdf749..201442b6cc 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -37,7 +37,9 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.options.MemorySize;
@@ -103,6 +105,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_APPEND_ORDERED;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
@@ -121,6 +124,56 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests for {@link AppendOnlyFileStoreTable}. */
public class AppendOnlySimpleTableTest extends SimpleTableTestBase {
+ @Test
+ public void testBucketedAppendTableWriteWithInit() throws Exception {
+ innerTestBucketedAppendTableWriteInit(true);
+ }
+
+ @Test
+ public void testBucketedAppendTableWriteNoInit() throws Exception {
+ innerTestBucketedAppendTableWriteInit(false);
+ }
+
+ public void innerTestBucketedAppendTableWriteInit(boolean ordered) throws
Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ options.set(BUCKET, 2);
+ options.set(BUCKET_KEY, "a");
+ options.set(WRITE_ONLY, true);
+ options.set(BUCKET_APPEND_ORDERED, ordered);
+ });
+
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+
+ // 1. first write
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(write.prepareCommit());
+ }
+
+ // 2. delete all manifests
+ ManifestList manifestList =
table.store().manifestListFactory().create();
+ ManifestFile manifestFile =
table.store().manifestFileFactory().create();
+ List<ManifestFileMeta> manifests =
+ manifestList.readAllManifests(table.latestSnapshot().get());
+ for (ManifestFileMeta manifest : manifests) {
+ manifestFile.delete(manifest.fileName());
+ }
+
+ // 3. check new write
+ try (BatchTableWrite write = writeBuilder.newWrite()) {
+ if (ordered) {
+ assertThatThrownBy(() -> write.write(rowData(1, 10, 100L)))
+ .hasMessageContaining("FileNotFoundException");
+ } else {
+ // no exception
+ write.write(rowData(1, 10, 100L));
+ }
+ }
+ }
+
@Test
public void testOverwriteNeverFail() throws Exception {
FileStoreTable table = createFileStoreTable();