This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 409a955ca [hotfix] Paimon should always write bucket0 for
bucket-unwared log table (#1661)
409a955ca is described below
commit 409a955ca0bcd04eb085a9863239896648725fb4
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Sep 11 10:54:51 2025 +0800
[hotfix] Paimon should always write bucket0 for bucket-unwared log table
(#1661)
---
.../lake/paimon/tiering/append/AppendOnlyWriter.java | 11 ++++++++++-
.../fluss/lake/paimon/tiering/PaimonTieringTest.java | 19 +++++++++++--------
2 files changed, 21 insertions(+), 9 deletions(-)
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
index b8a55eb00..2d571358d 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java
@@ -22,6 +22,7 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.LogRecord;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.TableWriteImpl;
@@ -34,6 +35,8 @@ import static
org.apache.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUS
/** A {@link RecordWriter} to write to Paimon's append-only table. */
public class AppendOnlyWriter extends RecordWriter<InternalRow> {
+ private final FileStoreTable fileStoreTable;
+
public AppendOnlyWriter(
FileStoreTable fileStoreTable,
TableBucket tableBucket,
@@ -48,6 +51,7 @@ public class AppendOnlyWriter extends
RecordWriter<InternalRow> {
tableBucket,
partition,
partitionKeys); // Pass to parent
+ this.fileStoreTable = fileStoreTable;
}
@Override
@@ -56,6 +60,11 @@ public class AppendOnlyWriter extends
RecordWriter<InternalRow> {
// hacky, call internal method tableWrite.getWrite() to support
// to write to given partition, otherwise, it'll always extract a
partition from Paimon row
// which may be costly
- tableWrite.getWrite().write(partition, bucket, flussRecordAsPaimonRow);
+ int writtenBucket = bucket;
+ // if bucket-unaware mode, we have to use bucket = 0 to write to
follow paimon best practice
+ if (fileStoreTable.store().bucketMode() == BucketMode.BUCKET_UNAWARE) {
+ writtenBucket = 0;
+ }
+ tableWrite.getWrite().write(partition, writtenBucket,
flussRecordAsPaimonRow);
}
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
index 87bdefa69..d1e2b3456 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -41,7 +41,6 @@ import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
@@ -80,7 +79,6 @@ import static org.apache.fluss.record.ChangeType.DELETE;
import static org.apache.fluss.record.ChangeType.INSERT;
import static org.apache.fluss.record.ChangeType.UPDATE_AFTER;
import static org.apache.fluss.record.ChangeType.UPDATE_BEFORE;
-import static org.apache.fluss.utils.Preconditions.checkState;
import static org.assertj.core.api.Assertions.assertThat;
/** The UT for tiering to Paimon via {@link PaimonLakeTieringFactory}. */
@@ -624,12 +622,17 @@ class PaimonTieringTest {
// for log table, we can't filter by bucket directly, filter file
by __bucket column
for (Split split : readBuilder.newScan().plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
- List<DataFileMeta> dataFileMetas = dataSplit.dataFiles();
- checkState(dataFileMetas.size() == 1);
- DataFileMeta dataFileMeta = dataFileMetas.get(0);
- // filter by __bucket column
- if (dataFileMeta.valueStats().maxValues().getInt(3) == bucket
- && dataFileMeta.valueStats().minValues().getInt(3) ==
bucket) {
+ // bucket is always 0
+ assertThat(dataSplit.bucket()).isEqualTo(0);
+ // filter by __bucket column, remove any data file that don't
belone to this bucket
+ dataSplit
+ .dataFiles()
+ .removeIf(
+ dataFileMeta ->
+
!(dataFileMeta.valueStats().maxValues().getInt(3) == bucket
+ &&
dataFileMeta.valueStats().minValues().getInt(3)
+ == bucket));
+ if (!dataSplit.dataFiles().isEmpty()) {
splits.add(split);
}
}