This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 5f61f4725 [paimon] Avoid cast to KeyValueFileStore to get
keyComparator in paimon (#1649)
5f61f4725 is described below
commit 5f61f4725340e7fb9656f3f0d27c46d17bf31e09
Author: Junbo Wang <[email protected]>
AuthorDate: Fri Sep 5 16:18:13 2025 +0800
[paimon] Avoid cast to KeyValueFileStore to get keyComparator in paimon
(#1649)
---
.../lake/paimon/source/PaimonSortedRecordReader.java | 12 ++++++++----
.../lake/paimon/source/PaimonSourceTestBase.java | 20 ++++++++++++++++++++
2 files changed, 28 insertions(+), 4 deletions(-)
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
index 0131788fb..467c4435b 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
@@ -20,10 +20,11 @@ package org.apache.fluss.lake.paimon.source;
import org.apache.fluss.lake.source.SortedRecordReader;
import org.apache.fluss.row.InternalRow;
-import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.PrimaryKeyTableUtils;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.KeyComparatorSupplier;
import javax.annotation.Nullable;
@@ -42,10 +43,13 @@ public class PaimonSortedRecordReader extends
PaimonRecordReader implements Sort
@Nullable Predicate predicate)
throws IOException {
super(fileStoreTable, split, project, predicate);
+ RowType pkKeyType =
+ new RowType(
+
PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR.keyFields(
+ fileStoreTable.schema()));
+
this.comparator =
- toFlussRowComparator(
- paimonRowType,
- ((KeyValueFileStore)
fileStoreTable.store()).newKeyComparator());
+ toFlussRowComparator(paimonRowType, new
KeyComparatorSupplier(pkKeyType).get());
}
@Override
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSourceTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSourceTestBase.java
index 9155a0d3b..c9cc7ff5c 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSourceTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSourceTestBase.java
@@ -23,6 +23,9 @@ import org.apache.fluss.lake.paimon.PaimonLakeStorage;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.utils.CloseableIterator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
@@ -60,10 +63,27 @@ class PaimonSourceTestBase {
Configuration configuration = new Configuration();
configuration.setString("type", "paimon");
configuration.setString("warehouse", tempWarehouseDir.toString());
+ configuration.setString("user", "root");
+ configuration.setString("password", "root-password");
lakeStorage = new PaimonLakeStorage(configuration);
paimonCatalog =
CatalogFactory.createCatalog(
CatalogContext.create(Options.fromMap(configuration.toMap())));
+ initPaimonPrivilege();
+ }
+
+ // Test for paimon privilege table
+ public static void initPaimonPrivilege() {
+ StreamTableEnvironment streamTEnv =
+ StreamTableEnvironment.create(
+ StreamExecutionEnvironment.getExecutionEnvironment(),
+ EnvironmentSettings.inStreamingMode());
+ streamTEnv.executeSql(
+ String.format(
+ "create catalog %s with ('type'='paimon', 'warehouse'
= '%s')",
+ "paimon_catalog", tempWarehouseDir));
+ streamTEnv.executeSql(
+ "CALL
paimon_catalog.sys.init_file_based_privilege('root-password');");
}
public void createTable(TablePath tablePath, Schema schema) throws
Exception {