This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 5f61f4725 [paimon] Avoid cast to KeyValueFileStore to get 
keyComparator in paimon (#1649)
5f61f4725 is described below

commit 5f61f4725340e7fb9656f3f0d27c46d17bf31e09
Author: Junbo Wang <[email protected]>
AuthorDate: Fri Sep 5 16:18:13 2025 +0800

    [paimon] Avoid cast to KeyValueFileStore to get keyComparator in paimon 
(#1649)
---
 .../lake/paimon/source/PaimonSortedRecordReader.java | 12 ++++++++----
 .../lake/paimon/source/PaimonSourceTestBase.java     | 20 ++++++++++++++++++++
 2 files changed, 28 insertions(+), 4 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
index 0131788fb..467c4435b 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonSortedRecordReader.java
@@ -20,10 +20,11 @@ package org.apache.fluss.lake.paimon.source;
 import org.apache.fluss.lake.source.SortedRecordReader;
 import org.apache.fluss.row.InternalRow;
 
-import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.PrimaryKeyTableUtils;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.KeyComparatorSupplier;
 
 import javax.annotation.Nullable;
 
@@ -42,10 +43,13 @@ public class PaimonSortedRecordReader extends 
PaimonRecordReader implements Sort
             @Nullable Predicate predicate)
             throws IOException {
         super(fileStoreTable, split, project, predicate);
+        RowType pkKeyType =
+                new RowType(
+                        
PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR.keyFields(
+                                fileStoreTable.schema()));
+
         this.comparator =
-                toFlussRowComparator(
-                        paimonRowType,
-                        ((KeyValueFileStore) 
fileStoreTable.store()).newKeyComparator());
+                toFlussRowComparator(paimonRowType, new 
KeyComparatorSupplier(pkKeyType).get());
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSourceTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSourceTestBase.java
index 9155a0d3b..c9cc7ff5c 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSourceTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/PaimonSourceTestBase.java
@@ -23,6 +23,9 @@ import org.apache.fluss.lake.paimon.PaimonLakeStorage;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.utils.CloseableIterator;
 
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
@@ -60,10 +63,27 @@ class PaimonSourceTestBase {
         Configuration configuration = new Configuration();
         configuration.setString("type", "paimon");
         configuration.setString("warehouse", tempWarehouseDir.toString());
+        configuration.setString("user", "root");
+        configuration.setString("password", "root-password");
         lakeStorage = new PaimonLakeStorage(configuration);
         paimonCatalog =
                 CatalogFactory.createCatalog(
                         
CatalogContext.create(Options.fromMap(configuration.toMap())));
+        initPaimonPrivilege();
+    }
+
+    // Test for paimon privilege table
+    public static void initPaimonPrivilege() {
+        StreamTableEnvironment streamTEnv =
+                StreamTableEnvironment.create(
+                        StreamExecutionEnvironment.getExecutionEnvironment(),
+                        EnvironmentSettings.inStreamingMode());
+        streamTEnv.executeSql(
+                String.format(
+                        "create catalog %s with ('type'='paimon', 'warehouse' 
= '%s')",
+                        "paimon_catalog", tempWarehouseDir));
+        streamTEnv.executeSql(
+                "CALL 
paimon_catalog.sys.init_file_based_privilege('root-password');");
     }
 
     public void createTable(TablePath tablePath, Schema schema) throws 
Exception {

Reply via email to