This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 455d6f395 [lake/paimon] use paimon PK row type as comparator (#2980)
455d6f395 is described below
commit 455d6f3954b85776b2fbaf6e2fc84c3c3e308123
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Apr 2 13:56:53 2026 +0800
[lake/paimon] use paimon PK row type as comparator (#2980)
---
.../paimon/source/PaimonSortedRecordReader.java | 2 +-
.../source/PaimonSortedRecordReaderTest.java | 50 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 1 deletion(-)
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
index 1940b2055..17220c854 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
@@ -49,7 +49,7 @@ public class PaimonSortedRecordReader extends
PaimonRecordReader implements Sort
PrimaryKeyTableUtils.addKeyNamePrefix(
fileStoreTable.schema().primaryKeysFields()));
this.comparator =
- toFlussRowComparator(paimonRowType, new
KeyComparatorSupplier(pkKeyType).get());
+ toFlussRowComparator(pkKeyType, new
KeyComparatorSupplier(pkKeyType).get());
}
@Override
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
index 21a6333fd..af16eac21 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
@@ -26,6 +26,7 @@ import org.apache.fluss.lake.source.SortedRecordReader;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.ProjectedRow;
import org.apache.fluss.row.TimestampLtz;
@@ -38,6 +39,7 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataTypes;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -96,6 +98,54 @@ class PaimonSortedRecordReaderTest extends
PaimonSourceTestBase {
}
}
+ @Test
+ void testComparatorUsesPrimaryKeyRowType() throws Exception {
+ TablePath tablePath = TablePath.of(DEFAULT_DB,
"pkTable_composite_timestamp_key");
+
+ createTable(
+ tablePath,
+ Schema.newBuilder()
+ .column("member_id", DataTypes.BIGINT())
+ .column("product_id", DataTypes.STRING())
+ .column("channel_key", DataTypes.STRING())
+ .column("product_name", DataTypes.STRING())
+ .column("seq_time", DataTypes.TIMESTAMP(0))
+ .column("order_id", DataTypes.STRING())
+ .primaryKey("member_id", "channel_key", "seq_time",
"order_id")
+ .option(CoreOptions.BUCKET.key(), "1")
+ .option(CoreOptions.BUCKET_KEY.key(),
"member_id,channel_key,seq_time")
+ .build());
+
+ LakeSource<PaimonSplit> lakeSource =
lakeStorage.createLakeSource(tablePath);
+ RecordReader recordReader = lakeSource.createRecordReader(() -> null);
+ assertThat(recordReader).isInstanceOf(PaimonSortedRecordReader.class);
+
+ Comparator<InternalRow> comparator = ((SortedRecordReader)
recordReader).order();
+ Table table = getTable(tablePath);
+ int[] pkIndex = table.rowType().getFieldIndices(table.primaryKeys());
+
+ GenericRow row1 = new GenericRow(6);
+ row1.setField(0, 1L);
+ row1.setField(1,
org.apache.fluss.row.BinaryString.fromString("product-1"));
+ row1.setField(2,
org.apache.fluss.row.BinaryString.fromString("channel-a"));
+ row1.setField(3,
org.apache.fluss.row.BinaryString.fromString("name-1"));
+ row1.setField(4, TimestampNtz.fromMillis(1_700_000_000_000L));
+ row1.setField(5,
org.apache.fluss.row.BinaryString.fromString("order-1"));
+
+ GenericRow row2 = new GenericRow(6);
+ row2.setField(0, 1L);
+ row2.setField(1,
org.apache.fluss.row.BinaryString.fromString("product-2"));
+ row2.setField(2,
org.apache.fluss.row.BinaryString.fromString("channel-a"));
+ row2.setField(3,
org.apache.fluss.row.BinaryString.fromString("name-2"));
+ row2.setField(4, TimestampNtz.fromMillis(1_700_000_000_001L));
+ row2.setField(5,
org.apache.fluss.row.BinaryString.fromString("order-2"));
+
+ InternalRow pkRow1 = ProjectedRow.from(pkIndex).replaceRow(row1);
+ InternalRow pkRow2 = ProjectedRow.from(pkIndex).replaceRow(row2);
+
+ assertThat(comparator.compare(pkRow1, pkRow2)).isLessThan(0);
+ }
+
private static <T> boolean isSorted(Iterator<T> iterator, Comparator<?
super T> comparator) {
if (!iterator.hasNext()) {
return true;