This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 409a955ca [hotfix] Paimon should always write bucket0 for 
bucket-unwared log table (#1661)
409a955ca is described below

commit 409a955ca0bcd04eb085a9863239896648725fb4
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Sep 11 10:54:51 2025 +0800

    [hotfix] Paimon should always write bucket0 for bucket-unwared log table 
(#1661)
---
 .../lake/paimon/tiering/append/AppendOnlyWriter.java  | 11 ++++++++++-
 .../fluss/lake/paimon/tiering/PaimonTieringTest.java  | 19 +++++++++++--------
 2 files changed, 21 insertions(+), 9 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
index b8a55eb00..2d571358d 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
@@ -22,6 +22,7 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.record.LogRecord;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.TableWriteImpl;
 
@@ -34,6 +35,8 @@ import static 
org.apache.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUS
 /** A {@link RecordWriter} to write to Paimon's append-only table. */
 public class AppendOnlyWriter extends RecordWriter<InternalRow> {
 
+    private final FileStoreTable fileStoreTable;
+
     public AppendOnlyWriter(
             FileStoreTable fileStoreTable,
             TableBucket tableBucket,
@@ -48,6 +51,7 @@ public class AppendOnlyWriter extends 
RecordWriter<InternalRow> {
                 tableBucket,
                 partition,
                 partitionKeys); // Pass to parent
+        this.fileStoreTable = fileStoreTable;
     }
 
     @Override
@@ -56,6 +60,11 @@ public class AppendOnlyWriter extends 
RecordWriter<InternalRow> {
         // hacky, call internal method tableWrite.getWrite() to support
         // to write to given partition, otherwise, it'll always extract a 
partition from Paimon row
         // which may be costly
-        tableWrite.getWrite().write(partition, bucket, flussRecordAsPaimonRow);
+        int writtenBucket = bucket;
+        // if bucket-unaware mode, we have to use bucket = 0 to write to 
follow paimon best practice
+        if (fileStoreTable.store().bucketMode() == BucketMode.BUCKET_UNAWARE) {
+            writtenBucket = 0;
+        }
+        tableWrite.getWrite().write(partition, writtenBucket, 
flussRecordAsPaimonRow);
     }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
index 87bdefa69..d1e2b3456 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -41,7 +41,6 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
@@ -80,7 +79,6 @@ import static org.apache.fluss.record.ChangeType.DELETE;
 import static org.apache.fluss.record.ChangeType.INSERT;
 import static org.apache.fluss.record.ChangeType.UPDATE_AFTER;
 import static org.apache.fluss.record.ChangeType.UPDATE_BEFORE;
-import static org.apache.fluss.utils.Preconditions.checkState;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** The UT for tiering to Paimon via {@link PaimonLakeTieringFactory}. */
@@ -624,12 +622,17 @@ class PaimonTieringTest {
             // for log table, we can't filter by bucket directly, filter file 
by __bucket column
             for (Split split : readBuilder.newScan().plan().splits()) {
                 DataSplit dataSplit = (DataSplit) split;
-                List<DataFileMeta> dataFileMetas = dataSplit.dataFiles();
-                checkState(dataFileMetas.size() == 1);
-                DataFileMeta dataFileMeta = dataFileMetas.get(0);
-                // filter by __bucket column
-                if (dataFileMeta.valueStats().maxValues().getInt(3) == bucket
-                        && dataFileMeta.valueStats().minValues().getInt(3) == 
bucket) {
+                // bucket is always 0
+                assertThat(dataSplit.bucket()).isEqualTo(0);
+                // filter by __bucket column, remove any data file that don't 
belone to this bucket
+                dataSplit
+                        .dataFiles()
+                        .removeIf(
+                                dataFileMeta ->
+                                        
!(dataFileMeta.valueStats().maxValues().getInt(3) == bucket
+                                                && 
dataFileMeta.valueStats().minValues().getInt(3)
+                                                        == bucket));
+                if (!dataSplit.dataFiles().isEmpty()) {
                     splits.add(split);
                 }
             }

Reply via email to