This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2538f69321 [core] DataTableRead should add auth columns to row filters
2538f69321 is described below

commit 2538f69321423bdeb061e1b6e7ccf0ab873fbea4
Author: JingsongLi <[email protected]>
AuthorDate: Thu Jan 29 16:18:05 2026 +0800

    [core] DataTableRead should add auth columns to row filters
---
 .../apache/paimon/predicate/PredicateVisitor.java  | 37 +++++++++++++++
 .../table/format/predicate/PredicateUtils.java     | 33 +------------
 .../paimon/table/source/AbstractDataTableRead.java | 49 +++++++++++++++++---
 .../paimon/flink/source/FlinkSourceBuilder.java    | 54 +---------------------
 .../paimon/spark/PaimonBaseScanBuilder.scala       | 26 +----------
 5 files changed, 83 insertions(+), 116 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
index c741c05d46..f14cb32886 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
@@ -18,10 +18,47 @@
 
 package org.apache.paimon.predicate;
 
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
 /** A visitor to visit {@link Predicate}. */
 public interface PredicateVisitor<T> {
 
     T visit(LeafPredicate predicate);
 
     T visit(CompoundPredicate predicate);
+
+    static Set<String> collectFieldNames(@Nullable Predicate predicate) {
+        if (predicate == null) {
+            return Collections.emptySet();
+        }
+        return predicate.visit(new FieldNameCollector());
+    }
+
+    /** A visitor that collects all field names referenced by a predicate. */
+    class FieldNameCollector implements PredicateVisitor<Set<String>> {
+
+        @Override
+        public Set<String> visit(LeafPredicate predicate) {
+            Set<String> fieldNames = new HashSet<>();
+            for (Object input : predicate.transform().inputs()) {
+                if (input instanceof FieldRef) {
+                    fieldNames.add(((FieldRef) input).name());
+                }
+            }
+            return fieldNames;
+        }
+
+        @Override
+        public Set<String> visit(CompoundPredicate predicate) {
+            Set<String> fieldNames = new HashSet<>();
+            for (Predicate child : predicate.children()) {
+                fieldNames.addAll(child.visit(this));
+            }
+            return fieldNames;
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java
index aac792f196..86ef0a13ad 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java
@@ -18,12 +18,8 @@
 
 package org.apache.paimon.table.format.predicate;
 
-import org.apache.paimon.predicate.CompoundPredicate;
-import org.apache.paimon.predicate.FieldRef;
-import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.predicate.PredicateVisitor;
 import org.apache.paimon.types.RowType;
 
 import java.util.Arrays;
@@ -35,6 +31,7 @@ import java.util.Optional;
 import java.util.Set;
 
 import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
+import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames;
 
 /** Utility methods for working with {@link Predicate}s. */
 public class PredicateUtils {
@@ -69,32 +66,4 @@ public class PredicateUtils {
 
         return result;
     }
-
-    public static Set<String> collectFieldNames(Predicate predicate) {
-        return predicate.visit(new FieldNameCollector());
-    }
-
-    /** A visitor that collects all field names referenced by a predicate. */
-    private static class FieldNameCollector implements 
PredicateVisitor<Set<String>> {
-
-        @Override
-        public Set<String> visit(LeafPredicate predicate) {
-            Set<String> fieldNames = new HashSet<>();
-            for (Object input : predicate.transform().inputs()) {
-                if (input instanceof FieldRef) {
-                    fieldNames.add(((FieldRef) input).name());
-                }
-            }
-            return fieldNames;
-        }
-
-        @Override
-        public Set<String> visit(CompoundPredicate predicate) {
-            Set<String> fieldNames = new HashSet<>();
-            for (Predicate child : predicate.children()) {
-                fieldNames.addAll(child.visit(this));
-            }
-            return fieldNames;
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index e1dae66b30..691cf0f765 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -26,9 +26,16 @@ import 
org.apache.paimon.predicate.PredicateProjectionConverter;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ListUtils;
+import org.apache.paimon.utils.ProjectedRow;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames;
 
 /** A {@link InnerTableRead} for data table. */
 public abstract class AbstractDataTableRead implements InnerTableRead {
@@ -82,16 +89,17 @@ public abstract class AbstractDataTableRead implements 
InnerTableRead {
 
     @Override
     public final RecordReader<InternalRow> createReader(Split split) throws 
IOException {
-        TableQueryAuthResult queryAuthResult = null;
+        TableQueryAuthResult authResult = null;
         if (split instanceof QueryAuthSplit) {
             QueryAuthSplit authSplit = (QueryAuthSplit) split;
             split = authSplit.split();
-            queryAuthResult = authSplit.authResult();
+            authResult = authSplit.authResult();
         }
-        RecordReader<InternalRow> reader = reader(split);
-        if (queryAuthResult != null) {
-            RowType type = readType == null ? schema.logicalRowType() : 
readType;
-            reader = queryAuthResult.doAuth(reader, type);
+        RecordReader<InternalRow> reader;
+        if (authResult == null) {
+            reader = reader(split);
+        } else {
+            reader = authedReader(split, authResult);
         }
         if (executeFilter) {
             reader = executeFilter(reader);
@@ -100,6 +108,35 @@ public abstract class AbstractDataTableRead implements 
InnerTableRead {
         return reader;
     }
 
+    private RecordReader<InternalRow> authedReader(Split split, 
TableQueryAuthResult authResult)
+            throws IOException {
+        RecordReader<InternalRow> reader;
+        RowType tableType = schema.logicalRowType();
+        RowType readType = this.readType == null ? tableType : this.readType;
+        Predicate authPredicate = authResult.extractPredicate();
+        ProjectedRow backRow = null;
+        if (authPredicate != null) {
+            Set<String> authFields = collectFieldNames(authPredicate);
+            List<String> readFields = readType.getFieldNames();
+            List<String> authAddNames = new ArrayList<>();
+            for (String field : tableType.getFieldNames()) {
+                if (authFields.contains(field) && !readFields.contains(field)) 
{
+                    authAddNames.add(field);
+                }
+            }
+            if (!authAddNames.isEmpty()) {
+                readType = tableType.project(ListUtils.union(readFields, 
authAddNames));
+                withReadType(readType);
+                backRow = 
ProjectedRow.from(readType.projectIndexes(readFields));
+            }
+        }
+        reader = authResult.doAuth(reader(split), readType);
+        if (backRow != null) {
+            reader = reader.transform(backRow::replaceRow);
+        }
+        return reader;
+    }
+
     private RecordReader<InternalRow> executeFilter(RecordReader<InternalRow> 
reader) {
         if (predicate == null) {
             return reader;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 6497f28184..3e96dec1ea 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.source;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.catalog.TableQueryAuthResult;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.NestedProjectedRowData;
 import org.apache.paimon.flink.Projection;
@@ -34,10 +33,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.format.predicate.PredicateUtils;
 import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.table.source.TableQueryAuth;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -61,11 +57,8 @@ import org.apache.flink.types.Row;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
-import java.util.Set;
 
 import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SOURCE_OPERATOR_UID_SUFFIX;
@@ -196,7 +189,7 @@ public class FlinkSourceBuilder {
     private ReadBuilder createReadBuilder(@Nullable 
org.apache.paimon.types.RowType readType) {
         ReadBuilder readBuilder = table.newReadBuilder();
         if (readType != null) {
-            readBuilder.withReadType(readTypeWithAuth(readType));
+            readBuilder.withReadType(readType);
         }
         if (predicate != null) {
             readBuilder.withFilter(predicate);
@@ -397,49 +390,4 @@ public class FlinkSourceBuilder {
                 "The align mode of paimon source currently only supports 
EXACTLY_ONCE checkpoint mode. Please set "
                         + "execution.checkpointing.mode to exactly-once");
     }
-
-    private org.apache.paimon.types.RowType readTypeWithAuth(
-            org.apache.paimon.types.RowType readType) {
-        if (!(table instanceof FileStoreTable)) {
-            return readType;
-        }
-
-        FileStoreTable fileStoreTable = (FileStoreTable) table;
-        TableQueryAuth auth =
-                fileStoreTable.catalogEnvironment().tableQueryAuth(new 
CoreOptions(conf.toMap()));
-        if (auth == null) {
-            return readType;
-        }
-
-        List<String> requiredFieldNames = readType.getFieldNames();
-        TableQueryAuthResult result = auth.auth(requiredFieldNames);
-        if (result == null) {
-            return readType;
-        }
-
-        Predicate authPredicate = result.extractPredicate();
-        if (authPredicate == null) {
-            return readType;
-        }
-
-        Set<String> authFieldNames = 
PredicateUtils.collectFieldNames(authPredicate);
-        Set<String> requiredFieldNameSet = new HashSet<>(requiredFieldNames);
-
-        List<DataField> additionalFields = new ArrayList<>();
-        for (DataField field : table.rowType().getFields()) {
-            if (authFieldNames.contains(field.name())
-                    && !requiredFieldNameSet.contains(field.name())) {
-                additionalFields.add(field);
-            }
-        }
-
-        if (additionalFields.isEmpty()) {
-            return readType;
-        }
-
-        // Create new read type with additional fields
-        List<DataField> newFields = new ArrayList<>(readType.getFields());
-        newFields.addAll(additionalFields);
-        return new org.apache.paimon.types.RowType(newFields);
-    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 4bca5bad11..47723171e4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -22,8 +22,7 @@ import org.apache.paimon.CoreOptions
 import org.apache.paimon.partition.PartitionPredicate
 import 
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
 import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, 
TopN, VectorSearch}
-import org.apache.paimon.table.{FileStoreTable, SpecialFields, Table}
-import org.apache.paimon.table.format.predicate.PredicateUtils
+import org.apache.paimon.table.{SpecialFields, Table}
 import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.connector.expressions.filter.{Predicate => 
SparkPredicate}
@@ -59,29 +58,6 @@ abstract class PaimonBaseScanBuilder
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
     this.requiredSchema = requiredSchema
-    pruneColumnsWithAuth(requiredSchema)
-  }
-
-  private def pruneColumnsWithAuth(requiredSchema: StructType): Unit = {
-    val auth = table match {
-      case fileStoreTable: FileStoreTable =>
-        fileStoreTable.catalogEnvironment().tableQueryAuth(coreOptions)
-      case _ =>
-        return
-    }
-
-    val result = auth.auth(requiredSchema.fieldNames.toList.asJava)
-    if (result != null) {
-      val predicate = result.extractPredicate()
-      if (predicate != null) {
-        val names = PredicateUtils.collectFieldNames(predicate)
-        val fullType = SparkTypeUtils.fromPaimonRowType(table.rowType())
-        val requiredFieldNames = requiredSchema.fieldNames.toSet
-        val addFields = fullType.fields.filter(
-          field => names.contains(field.name) && 
!requiredFieldNames.contains(field.name))
-        this.requiredSchema = StructType(requiredSchema.fields ++ addFields)
-      }
-    }
   }
 
   override def pushPredicates(predicates: Array[SparkPredicate]): 
Array[SparkPredicate] = {

Reply via email to