This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e15be4ad09 [core] Add row filter & column masking support for table 
read (#7034)
e15be4ad09 is described below

commit e15be4ad09a9a91a6154727754a1feb358d08600
Author: Jiajia Li <[email protected]>
AuthorDate: Wed Jan 14 22:11:16 2026 +0800

    [core] Add row filter & column masking support for table read (#7034)
---
 .../paimon/catalog/TableQueryAuthResult.java       |   4 +
 .../paimon/table/source/AbstractDataTableScan.java |  35 +-
 .../paimon/table/source/AuthAwareTableRead.java    | 311 +++++++++++++
 .../paimon/table/source/DataTableBatchScan.java    |   8 +-
 .../paimon/table/source/DataTableStreamScan.java   |   8 +-
 .../apache/paimon/table/source/QueryAuthSplit.java | 232 ++++++++++
 .../paimon/table/source/ReadBuilderImpl.java       |   7 +
 .../source/TableQueryAuthResultSerializer.java     |  75 ++++
 .../apache/paimon/rest/MockRESTCatalogTest.java    | 483 ++++++++++++++++++++-
 .../org/apache/paimon/rest/RESTCatalogServer.java  |  39 +-
 .../org/apache/paimon/rest/RESTCatalogTest.java    |  22 +-
 .../org/apache/paimon/flink/RESTCatalogITCase.java | 378 +++++++++++++++-
 .../paimon/spark/scan/BinPackingSplits.scala       |  29 +-
 .../paimon/spark/SparkCatalogWithRestTest.java     | 292 +++++++++++++
 14 files changed, 1883 insertions(+), 40 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/TableQueryAuthResult.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/TableQueryAuthResult.java
index dcc94031a8..c4c33f241a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/catalog/TableQueryAuthResult.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/catalog/TableQueryAuthResult.java
@@ -50,4 +50,8 @@ public class TableQueryAuthResult {
     public Map<String, Transform> columnMasking() {
         return columnMasking;
     }
+
+    public boolean isEmpty() {
+        return rowFilter == null && (columnMasking == null || 
columnMasking.isEmpty());
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index dcadfad1ff..c64b75557a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.TableQueryAuthResult;
 import org.apache.paimon.consumer.Consumer;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
@@ -64,14 +65,13 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.TimeZone;
 
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
-import static 
org.apache.paimon.CoreOptions.IncrementalBetweenScanMode.CHANGELOG;
-import static org.apache.paimon.CoreOptions.IncrementalBetweenScanMode.DELTA;
 import static org.apache.paimon.CoreOptions.IncrementalBetweenScanMode.DIFF;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -87,6 +87,7 @@ abstract class AbstractDataTableScan implements DataTableScan 
{
     private final TableQueryAuth queryAuth;
 
     @Nullable private RowType readType;
+    @Nullable private TableQueryAuthResult authResult;
 
     protected AbstractDataTableScan(
             TableSchema schema,
@@ -165,12 +166,34 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
         return this;
     }
 
-    protected void authQuery() {
+    protected TableQueryAuthResult authQuery() {
         if (!options.queryAuthEnabled()) {
-            return;
+            return null;
         }
-        queryAuth.auth(readType == null ? null : readType.getFieldNames());
-        // TODO add support for row level access control
+        if (authResult == null) {
+            authResult = queryAuth.auth(readType == null ? null : 
readType.getFieldNames());
+        }
+        return authResult;
+    }
+
+    protected TableScan.Plan applyAuthToSplits(Plan plan) {
+        TableQueryAuthResult authResult = authQuery();
+        if (authResult == null || authResult.isEmpty()) {
+            return plan;
+        }
+
+        List<Split> splits = plan.splits();
+        List<Split> authSplits = new ArrayList<>(splits.size());
+        for (Split split : splits) {
+            if (split instanceof DataSplit) {
+                DataSplit dataSplit = (DataSplit) split;
+                authSplits.add(QueryAuthSplit.wrap(dataSplit, authResult));
+            } else {
+                authSplits.add(split);
+            }
+        }
+
+        return new DataFilePlan<>(authSplits);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AuthAwareTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AuthAwareTableRead.java
new file mode 100644
index 0000000000..a7772b3c0a
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AuthAwareTableRead.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.catalog.TableQueryAuthResult;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateVisitor;
+import org.apache.paimon.predicate.Transform;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.InternalRowUtils.get;
+
+/** A {@link TableRead} wrapper that checks splits for authorization 
information. */
+public class AuthAwareTableRead implements TableRead {
+
+    private final TableRead wrapped;
+    private final RowType outputRowType;
+
+    public AuthAwareTableRead(TableRead wrapped, RowType outputRowType) {
+        this.wrapped = wrapped;
+        this.outputRowType = outputRowType;
+    }
+
+    @Override
+    public TableRead withMetricRegistry(MetricRegistry registry) {
+        return new AuthAwareTableRead(wrapped.withMetricRegistry(registry), 
outputRowType);
+    }
+
+    @Override
+    public TableRead executeFilter() {
+        return new AuthAwareTableRead(wrapped.executeFilter(), outputRowType);
+    }
+
+    @Override
+    public TableRead withIOManager(IOManager ioManager) {
+        return new AuthAwareTableRead(wrapped.withIOManager(ioManager), 
outputRowType);
+    }
+
+    @Override
+    public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+        if (split instanceof QueryAuthSplit) {
+            TableQueryAuthResult authResult = ((QueryAuthSplit) 
split).authResult();
+            if (authResult != null) {
+                RecordReader<InternalRow> reader =
+                        wrapped.createReader(((QueryAuthSplit) 
split).dataSplit());
+                // Apply row-level filter if present
+                Predicate rowFilter = authResult.rowFilter();
+                if (rowFilter != null) {
+                    Predicate remappedFilter = 
remapPredicateToOutputRow(outputRowType, rowFilter);
+                    if (remappedFilter != null) {
+                        reader = new FilterRecordReader(reader, 
remappedFilter);
+                    }
+                }
+
+                // Apply column masking if present
+                Map<String, Transform> columnMasking = 
authResult.columnMasking();
+                if (columnMasking != null && !columnMasking.isEmpty()) {
+                    MaskingApplier applier = new MaskingApplier(outputRowType, 
columnMasking);
+                    reader = new MaskingRecordReader(reader, applier);
+                }
+
+                return reader;
+            }
+        }
+        return wrapped.createReader(split);
+    }
+
+    private static class FilterRecordReader implements 
RecordReader<InternalRow> {
+
+        private final RecordReader<InternalRow> wrapped;
+        private final Predicate predicate;
+
+        private FilterRecordReader(RecordReader<InternalRow> wrapped, 
Predicate predicate) {
+            this.wrapped = wrapped;
+            this.predicate = predicate;
+        }
+
+        @Nullable
+        @Override
+        public RecordIterator<InternalRow> readBatch() throws IOException {
+            RecordIterator<InternalRow> batch = wrapped.readBatch();
+            if (batch == null) {
+                return null;
+            }
+            return new FilterRecordIterator(batch, predicate);
+        }
+
+        @Override
+        public void close() throws IOException {
+            wrapped.close();
+        }
+    }
+
+    private static class FilterRecordIterator implements 
RecordReader.RecordIterator<InternalRow> {
+
+        private final RecordReader.RecordIterator<InternalRow> wrapped;
+        private final Predicate predicate;
+
+        private FilterRecordIterator(
+                RecordReader.RecordIterator<InternalRow> wrapped, Predicate 
predicate) {
+            this.wrapped = wrapped;
+            this.predicate = predicate;
+        }
+
+        @Nullable
+        @Override
+        public InternalRow next() throws IOException {
+            while (true) {
+                InternalRow row = wrapped.next();
+                if (row == null) {
+                    return null;
+                }
+                if (predicate.test(row)) {
+                    return row;
+                }
+            }
+        }
+
+        @Override
+        public void releaseBatch() {
+            wrapped.releaseBatch();
+        }
+    }
+
+    private static class MaskingRecordReader implements 
RecordReader<InternalRow> {
+
+        private final RecordReader<InternalRow> wrapped;
+        private final MaskingApplier applier;
+
+        private MaskingRecordReader(RecordReader<InternalRow> wrapped, 
MaskingApplier applier) {
+            this.wrapped = wrapped;
+            this.applier = applier;
+        }
+
+        @Nullable
+        @Override
+        public RecordIterator<InternalRow> readBatch() throws IOException {
+            RecordIterator<InternalRow> batch = wrapped.readBatch();
+            if (batch == null) {
+                return null;
+            }
+            return batch.transform(applier::apply);
+        }
+
+        @Override
+        public void close() throws IOException {
+            wrapped.close();
+        }
+    }
+
+    private static class MaskingApplier {
+
+        private final RowType outputRowType;
+        private final Map<Integer, Transform> remapped;
+
+        private MaskingApplier(RowType outputRowType, Map<String, Transform> 
masking) {
+            this.outputRowType = outputRowType;
+            this.remapped = remapToOutputRow(outputRowType, masking);
+        }
+
+        private InternalRow apply(InternalRow row) {
+            if (remapped.isEmpty()) {
+                return row;
+            }
+            int arity = outputRowType.getFieldCount();
+            GenericRow out = new GenericRow(row.getRowKind(), arity);
+            for (int i = 0; i < arity; i++) {
+                DataType type = outputRowType.getTypeAt(i);
+                out.setField(i, get(row, i, type));
+            }
+            for (Map.Entry<Integer, Transform> e : remapped.entrySet()) {
+                int targetIndex = e.getKey();
+                Transform transform = e.getValue();
+                Object masked = transform.transform(row);
+                out.setField(targetIndex, masked);
+            }
+            return out;
+        }
+
+        private static Map<Integer, Transform> remapToOutputRow(
+                RowType outputRowType, Map<String, Transform> masking) {
+            Map<Integer, Transform> out = new HashMap<>();
+            if (masking == null || masking.isEmpty()) {
+                return out;
+            }
+
+            for (Map.Entry<String, Transform> e : masking.entrySet()) {
+                String targetColumn = e.getKey();
+                Transform transform = e.getValue();
+                if (targetColumn == null || transform == null) {
+                    continue;
+                }
+
+                int targetIndex = outputRowType.getFieldIndex(targetColumn);
+                if (targetIndex < 0) {
+                    continue;
+                }
+
+                List<Object> newInputs = new ArrayList<>();
+                for (Object input : transform.inputs()) {
+                    if (input instanceof FieldRef) {
+                        FieldRef ref = (FieldRef) input;
+                        int newIndex = outputRowType.getFieldIndex(ref.name());
+                        if (newIndex < 0) {
+                            throw new IllegalArgumentException(
+                                    "Column masking refers to field '"
+                                            + ref.name()
+                                            + "' which is not present in 
output row type "
+                                            + outputRowType);
+                        }
+                        DataType type = outputRowType.getTypeAt(newIndex);
+                        newInputs.add(new FieldRef(newIndex, ref.name(), 
type));
+                    } else {
+                        newInputs.add(input);
+                    }
+                }
+                out.put(targetIndex, transform.copyWithNewInputs(newInputs));
+            }
+            return out;
+        }
+    }
+
+    private static Predicate remapPredicateToOutputRow(RowType outputRowType, 
Predicate predicate) {
+        return predicate.visit(new PredicateRemapper(outputRowType));
+    }
+
+    private static class PredicateRemapper implements 
PredicateVisitor<Predicate> {
+        private final RowType outputRowType;
+
+        private PredicateRemapper(RowType outputRowType) {
+            this.outputRowType = outputRowType;
+        }
+
+        @Override
+        public Predicate visit(LeafPredicate predicate) {
+            Transform transform = predicate.transform();
+            List<Object> newInputs = new ArrayList<>();
+            boolean hasUnmappedField = false;
+            for (Object input : transform.inputs()) {
+                if (input instanceof FieldRef) {
+                    FieldRef ref = (FieldRef) input;
+                    String fieldName = ref.name();
+                    int newIndex = outputRowType.getFieldIndex(fieldName);
+                    if (newIndex < 0) {
+                        hasUnmappedField = true;
+                        break;
+                    }
+                    DataType type = outputRowType.getTypeAt(newIndex);
+                    newInputs.add(new FieldRef(newIndex, fieldName, type));
+                } else {
+                    newInputs.add(input);
+                }
+            }
+            if (hasUnmappedField) {
+                return null;
+            }
+            return predicate.copyWithNewInputs(newInputs);
+        }
+
+        @Override
+        public Predicate visit(CompoundPredicate predicate) {
+            List<Predicate> remappedChildren = new ArrayList<>();
+            for (Predicate child : predicate.children()) {
+                Predicate remapped = child.visit(this);
+                if (remapped != null) {
+                    remappedChildren.add(remapped);
+                }
+            }
+            if (remappedChildren.isEmpty()) {
+                return null;
+            }
+            if (remappedChildren.size() == 1) {
+                return remappedChildren.get(0);
+            }
+            return new CompoundPredicate(predicate.function(), 
remappedChildren);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 4419b3890e..9fe11c44fd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -91,8 +91,6 @@ public class DataTableBatchScan extends AbstractDataTableScan 
{
 
     @Override
     public TableScan.Plan plan() {
-        authQuery();
-
         if (startingScanner == null) {
             startingScanner = createStartingScanner(false);
         }
@@ -101,13 +99,13 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
             hasNext = false;
             Optional<StartingScanner.Result> pushed = applyPushDownLimit();
             if (pushed.isPresent()) {
-                return DataFilePlan.fromResult(pushed.get());
+                return 
applyAuthToSplits(DataFilePlan.fromResult(pushed.get()));
             }
             pushed = applyPushDownTopN();
             if (pushed.isPresent()) {
-                return DataFilePlan.fromResult(pushed.get());
+                return 
applyAuthToSplits(DataFilePlan.fromResult(pushed.get()));
             }
-            return 
DataFilePlan.fromResult(startingScanner.scan(snapshotReader));
+            return 
applyAuthToSplits(DataFilePlan.fromResult(startingScanner.scan(snapshotReader)));
         } else {
             throw new EndOfScanException();
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 16bc8509eb..678cf0f4df 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -118,8 +118,6 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
 
     @Override
     public Plan plan() {
-        authQuery();
-
         if (!initialized) {
             initScanner();
         }
@@ -182,7 +180,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
                     "Starting snapshot is {}, next snapshot will be {}.",
                     scannedResult.plan().snapshotId(),
                     nextSnapshotId);
-            return scannedResult.plan();
+            return applyAuthToSplits(scannedResult.plan());
         } else if (result instanceof StartingScanner.NextSnapshot) {
             nextSnapshotId = ((StartingScanner.NextSnapshot) 
result).nextSnapshotId();
             isFullPhaseEnd =
@@ -223,7 +221,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
                     if (overwritePlan.splits().isEmpty()) {
                         continue;
                     }
-                    return overwritePlan;
+                    return applyAuthToSplits(overwritePlan);
                 }
             }
 
@@ -235,7 +233,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
                 if (plan.splits().isEmpty()) {
                     continue;
                 }
-                return plan;
+                return applyAuthToSplits(plan);
             } else {
                 nextSnapshotId++;
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
new file mode 100644
index 0000000000..9daa9fe9ea
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.catalog.TableQueryAuthResult;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputView;
+import org.apache.paimon.stats.SimpleStatsEvolutions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * A wrapper class for {@link DataSplit} that adds query authorization 
information. This class
+ * delegates all Split interface methods to the wrapped DataSplit, while 
providing additional auth
+ * result functionality.
+ */
+public class QueryAuthSplit extends DataSplit {
+
+    private static final long serialVersionUID = 1L;
+
+    private DataSplit dataSplit;
+    @Nullable private TableQueryAuthResult authResult;
+
+    public QueryAuthSplit(DataSplit dataSplit, @Nullable TableQueryAuthResult 
authResult) {
+        this.dataSplit = dataSplit;
+        this.authResult = authResult;
+    }
+
+    public DataSplit dataSplit() {
+        return dataSplit;
+    }
+
+    @Nullable
+    public TableQueryAuthResult authResult() {
+        return authResult;
+    }
+
+    // Delegate all DataSplit methods to the wrapped instance
+
+    public long snapshotId() {
+        return dataSplit.snapshotId();
+    }
+
+    public BinaryRow partition() {
+        return dataSplit.partition();
+    }
+
+    public int bucket() {
+        return dataSplit.bucket();
+    }
+
+    public String bucketPath() {
+        return dataSplit.bucketPath();
+    }
+
+    @Nullable
+    public Integer totalBuckets() {
+        return dataSplit.totalBuckets();
+    }
+
+    public List<DataFileMeta> beforeFiles() {
+        return dataSplit.beforeFiles();
+    }
+
+    public Optional<List<DeletionFile>> beforeDeletionFiles() {
+        return dataSplit.beforeDeletionFiles();
+    }
+
+    public List<DataFileMeta> dataFiles() {
+        return dataSplit.dataFiles();
+    }
+
+    @Override
+    public Optional<List<DeletionFile>> deletionFiles() {
+        return dataSplit.deletionFiles();
+    }
+
+    public boolean isStreaming() {
+        return dataSplit.isStreaming();
+    }
+
+    public boolean rawConvertible() {
+        return dataSplit.rawConvertible();
+    }
+
+    public OptionalLong latestFileCreationEpochMillis() {
+        return dataSplit.latestFileCreationEpochMillis();
+    }
+
+    public OptionalLong earliestFileCreationEpochMillis() {
+        return dataSplit.earliestFileCreationEpochMillis();
+    }
+
+    public long rowCount() {
+        return dataSplit.rowCount();
+    }
+
+    public boolean mergedRowCountAvailable() {
+        return dataSplit.mergedRowCountAvailable();
+    }
+
+    public long mergedRowCount() {
+        return dataSplit.mergedRowCount();
+    }
+
+    public Object minValue(
+            int fieldIndex,
+            org.apache.paimon.types.DataField dataField,
+            SimpleStatsEvolutions evolutions) {
+        return dataSplit.minValue(fieldIndex, dataField, evolutions);
+    }
+
+    public Object maxValue(
+            int fieldIndex,
+            org.apache.paimon.types.DataField dataField,
+            SimpleStatsEvolutions evolutions) {
+        return dataSplit.maxValue(fieldIndex, dataField, evolutions);
+    }
+
+    public Long nullCount(int fieldIndex, SimpleStatsEvolutions evolutions) {
+        return dataSplit.nullCount(fieldIndex, evolutions);
+    }
+
+    public long partialMergedRowCount() {
+        return dataSplit.partialMergedRowCount();
+    }
+
+    @Override
+    public Optional<List<RawFile>> convertToRawFiles() {
+        return dataSplit.convertToRawFiles();
+    }
+
+    @Override
+    @Nullable
+    public Optional<List<IndexFile>> indexFiles() {
+        return dataSplit.indexFiles();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        QueryAuthSplit that = (QueryAuthSplit) o;
+        return Objects.equals(dataSplit, that.dataSplit)
+                && Objects.equals(authResult, that.authResult);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(dataSplit, authResult);
+    }
+
+    @Override
+    public String toString() {
+        return "QueryAuthSplit{" + "dataSplit=" + dataSplit + ", authResult=" 
+ authResult + '}';
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        serialize(new org.apache.paimon.io.DataOutputViewStreamWrapper(out));
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        QueryAuthSplit other = deserialize(new DataInputViewStreamWrapper(in));
+        this.dataSplit = other.dataSplit;
+        this.authResult = other.authResult;
+    }
+
+    public void serialize(DataOutputView out) throws IOException {
+        // Serialize the wrapped DataSplit
+        dataSplit.serialize(out);
+
+        // Serialize authResult
+        if (authResult != null) {
+            out.writeBoolean(true);
+            TableQueryAuthResultSerializer.serialize(authResult, out);
+        } else {
+            out.writeBoolean(false);
+        }
+    }
+
+    public static QueryAuthSplit deserialize(DataInputView in) throws 
IOException {
+        // Deserialize the wrapped DataSplit
+        DataSplit dataSplit = DataSplit.deserialize(in);
+
+        // Deserialize authResult
+        TableQueryAuthResult authResult = null;
+        if (in.readBoolean()) {
+            authResult = TableQueryAuthResultSerializer.deserialize(in);
+        }
+
+        return new QueryAuthSplit(dataSplit, authResult);
+    }
+
+    public static QueryAuthSplit wrap(
+            DataSplit dataSplit, @Nullable TableQueryAuthResult authResult) {
+        if (authResult == null || authResult.isEmpty()) {
+            return new QueryAuthSplit(dataSplit, null);
+        }
+        return new QueryAuthSplit(dataSplit, authResult);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index c81dfd8e01..8da04edae0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -26,6 +26,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.InnerTable;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
@@ -256,6 +257,12 @@ public class ReadBuilderImpl implements ReadBuilder {
         if (variantAccessInfo != null) {
             read.withVariantAccess(variantAccessInfo);
         }
+        if (table instanceof FileStoreTable) {
+            CoreOptions options = new CoreOptions(table.options());
+            if (options.queryAuthEnabled()) {
+                return new AuthAwareTableRead(read, readType());
+            }
+        }
         return read;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuthResultSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuthResultSerializer.java
new file mode 100644
index 0000000000..c557ee08e0
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuthResultSerializer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.catalog.TableQueryAuthResult;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataOutputView;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.Transform;
+import org.apache.paimon.rest.RESTApi;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Serializer for {@link TableQueryAuthResult}. */
+public class TableQueryAuthResultSerializer {
+    public static void serialize(TableQueryAuthResult authResult, 
DataOutputView out)
+            throws IOException {
+        // Serialize row filter
+        if (authResult.rowFilter() != null) {
+            out.writeBoolean(true);
+            String predicateJson = RESTApi.toJson(authResult.rowFilter());
+            out.writeUTF(predicateJson);
+        } else {
+            out.writeBoolean(false);
+        }
+
+        // Serialize column masking
+        Map<String, Transform> columnMasking = authResult.columnMasking();
+        out.writeInt(columnMasking.size());
+        for (Map.Entry<String, Transform> entry : columnMasking.entrySet()) {
+            out.writeUTF(entry.getKey());
+            String transformJson = RESTApi.toJson(entry.getValue());
+            out.writeUTF(transformJson);
+        }
+    }
+
+    public static TableQueryAuthResult deserialize(DataInputView in) throws 
IOException {
+        // Deserialize row filter
+        Predicate rowFilter = null;
+        if (in.readBoolean()) {
+            String predicateJson = in.readUTF();
+            rowFilter = RESTApi.fromJson(predicateJson, Predicate.class);
+        }
+
+        // Deserialize column masking
+        int maskingSize = in.readInt();
+        Map<String, Transform> columnMasking = new HashMap<>(maskingSize);
+        for (int i = 0; i < maskingSize; i++) {
+            String columnName = in.readUTF();
+            String transformJson = in.readUTF();
+            Transform transform = RESTApi.fromJson(transformJson, 
Transform.class);
+            columnMasking.put(columnName, transform);
+        }
+
+        return new TableQueryAuthResult(rowFilter, columnMasking);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
index 7a52bf1af9..f628a68ea0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
@@ -26,13 +26,25 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.TableQueryAuthResult;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.CastTransform;
+import org.apache.paimon.predicate.ConcatTransform;
+import org.apache.paimon.predicate.ConcatWsTransform;
+import org.apache.paimon.predicate.Equal;
 import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.FieldTransform;
+import org.apache.paimon.predicate.GreaterOrEqual;
+import org.apache.paimon.predicate.GreaterThan;
+import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.predicate.Transform;
 import org.apache.paimon.predicate.UpperTransform;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.rest.auth.AuthProvider;
 import org.apache.paimon.rest.auth.AuthProviderEnum;
 import org.apache.paimon.rest.auth.BearTokenAuthProvider;
@@ -41,9 +53,17 @@ import org.apache.paimon.rest.auth.DLFTokenLoader;
 import org.apache.paimon.rest.auth.DLFTokenLoaderFactory;
 import org.apache.paimon.rest.auth.RESTAuthParameter;
 import org.apache.paimon.rest.exceptions.NotAuthorizedException;
-import org.apache.paimon.rest.responses.AuthTableQueryResponse;
 import org.apache.paimon.rest.responses.ConfigResponse;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
@@ -56,12 +76,15 @@ import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import static org.apache.paimon.CoreOptions.QUERY_AUTH_ENABLED;
 import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
 import static org.apache.paimon.rest.RESTApi.HEADER_PREFIX;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -273,14 +296,13 @@ class MockRESTCatalogTest extends RESTCatalogTest {
         Transform transform =
                 new UpperTransform(
                         Collections.singletonList(new FieldRef(1, "col2", 
DataTypes.STRING())));
-        String transformJson = JsonSerdeUtil.toFlatJson(transform);
 
         // Set up mock response with filter and columnMasking
-        List<String> filter = Collections.singletonList(predicateJson);
-        Map<String, String> columnMasking = new HashMap<>();
-        columnMasking.put("col2", transformJson);
-        AuthTableQueryResponse response = new AuthTableQueryResponse(filter, 
columnMasking);
-        restCatalogServer.setTableQueryAuthResponse(identifier, response);
+        List<Predicate> rowFilters = Collections.singletonList(predicate);
+        Map<String, Transform> columnMasking = new HashMap<>();
+        columnMasking.put("col2", transform);
+        restCatalogServer.setRowFilterAuth(identifier, rowFilters);
+        restCatalogServer.setColumnMaskingAuth(identifier, columnMasking);
 
         TableQueryAuthResult result = catalog.authTableQuery(identifier, null);
         assertThat(result.rowFilter()).isEqualTo(predicate);
@@ -292,6 +314,453 @@ class MockRESTCatalogTest extends RESTCatalogTest {
         catalog.dropDatabase(identifier.getDatabaseName(), true, true);
     }
 
+    @Test
+    void testColumnMaskingApplyOnRead() throws Exception {
+        Identifier identifier = Identifier.create("test_table_db", 
"auth_table_masking_apply");
+        catalog.createDatabase(identifier.getDatabaseName(), true);
+
+        // Create table with multiple columns of different types
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "col1", DataTypes.STRING()));
+        fields.add(new DataField(1, "col2", DataTypes.STRING()));
+        fields.add(new DataField(2, "col3", DataTypes.INT()));
+        fields.add(new DataField(3, "col4", DataTypes.STRING()));
+        fields.add(new DataField(4, "col5", DataTypes.STRING()));
+
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        fields,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.singletonMap(QUERY_AUTH_ENABLED.key(), 
"true"),
+                        ""),
+                true);
+
+        Table table = catalog.getTable(identifier);
+
+        // Write test data
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        BatchTableWrite write = writeBuilder.newWrite();
+        write.write(
+                GenericRow.of(
+                        BinaryString.fromString("hello"),
+                        BinaryString.fromString("world"),
+                        100,
+                        BinaryString.fromString("test"),
+                        BinaryString.fromString("data")));
+        write.write(
+                GenericRow.of(
+                        BinaryString.fromString("foo"),
+                        BinaryString.fromString("bar"),
+                        200,
+                        BinaryString.fromString("example"),
+                        BinaryString.fromString("value")));
+        List<CommitMessage> messages = write.prepareCommit();
+        BatchTableCommit commit = writeBuilder.newCommit();
+        commit.commit(messages);
+        write.close();
+        commit.close();
+
+        // Set up column masking with various transform types
+        Map<String, Transform> columnMasking = new HashMap<>();
+
+        // Test 1: ConcatTransform - mask col1 with "****"
+        ConcatTransform concatTransform =
+                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("****")));
+        columnMasking.put("col1", concatTransform);
+
+        // Test 2: UpperTransform - convert col2 to uppercase
+        UpperTransform upperTransform =
+                new UpperTransform(
+                        Collections.singletonList(new FieldRef(1, "col2", 
DataTypes.STRING())));
+        columnMasking.put("col2", upperTransform);
+
+        // Test 3: CastTransform - cast col3 (INT) to STRING
+        CastTransform castTransform =
+                new CastTransform(new FieldRef(2, "col3", DataTypes.INT()), 
DataTypes.STRING());
+        columnMasking.put("col3", castTransform);
+
+        // Test 4: ConcatWsTransform - concatenate col4 with separator
+        ConcatWsTransform concatWsTransform =
+                new ConcatWsTransform(
+                        java.util.Arrays.asList(
+                                BinaryString.fromString("-"),
+                                BinaryString.fromString("prefix"),
+                                new FieldRef(3, "col4", DataTypes.STRING())));
+        columnMasking.put("col4", concatWsTransform);
+
+        // col5 is intentionally not masked to verify unmasked columns work 
correctly
+
+        restCatalogServer.setColumnMaskingAuth(identifier, columnMasking);
+
+        // Read and verify masked data
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        TableRead read = readBuilder.newRead();
+        RecordReader<InternalRow> reader = read.createReader(splits);
+
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+
+        assertThat(rows).hasSize(2);
+
+        // Verify first row
+        InternalRow row1 = rows.get(0);
+        assertThat(row1.getString(0).toString())
+                .isEqualTo("****"); // col1 masked with ConcatTransform
+        assertThat(row1.getString(1).toString())
+                .isEqualTo("WORLD"); // col2 masked with UpperTransform
+        assertThat(row1.getString(2).toString())
+                .isEqualTo("100"); // col3 masked with CastTransform 
(INT->STRING)
+        assertThat(row1.getString(3).toString())
+                .isEqualTo("prefix-test"); // col4 masked with 
ConcatWsTransform
+        assertThat(row1.getString(4).toString())
+                .isEqualTo("data"); // col5 NOT masked - original value
+
+        // Verify second row
+        InternalRow row2 = rows.get(1);
+        assertThat(row2.getString(0).toString())
+                .isEqualTo("****"); // col1 masked with ConcatTransform
+        assertThat(row2.getString(1).toString())
+                .isEqualTo("BAR"); // col2 masked with UpperTransform
+        assertThat(row2.getString(2).toString())
+                .isEqualTo("200"); // col3 masked with CastTransform 
(INT->STRING)
+        assertThat(row2.getString(3).toString())
+                .isEqualTo("prefix-example"); // col4 masked with 
ConcatWsTransform
+        assertThat(row2.getString(4).toString())
+                .isEqualTo("value"); // col5 NOT masked - original value
+    }
+
+    @Test
+    void testRowFilter() throws Exception {
+        Identifier identifier = Identifier.create("test_table_db", 
"auth_table_filter");
+        catalog.createDatabase(identifier.getDatabaseName(), true);
+
+        // Create table with multiple data types
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "id", DataTypes.INT()));
+        fields.add(new DataField(1, "name", DataTypes.STRING()));
+        fields.add(new DataField(2, "age", DataTypes.BIGINT()));
+        fields.add(new DataField(3, "salary", DataTypes.DOUBLE()));
+        fields.add(new DataField(4, "is_active", DataTypes.BOOLEAN()));
+        fields.add(new DataField(5, "score", DataTypes.FLOAT()));
+
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        fields,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.singletonMap(QUERY_AUTH_ENABLED.key(), 
"true"),
+                        ""),
+                true);
+
+        Table table = catalog.getTable(identifier);
+
+        // Write test data with various types
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        BatchTableWrite write = writeBuilder.newWrite();
+        write.write(GenericRow.of(1, BinaryString.fromString("Alice"), 25L, 
50000.0, true, 85.5f));
+        write.write(GenericRow.of(2, BinaryString.fromString("Bob"), 30L, 
60000.0, false, 90.0f));
+        write.write(
+                GenericRow.of(3, BinaryString.fromString("Charlie"), 35L, 
70000.0, true, 95.5f));
+        write.write(GenericRow.of(4, BinaryString.fromString("David"), 28L, 
55000.0, true, 88.0f));
+        List<CommitMessage> messages = write.prepareCommit();
+        BatchTableCommit commit = writeBuilder.newCommit();
+        commit.commit(messages);
+        write.close();
+        commit.close();
+
+        // Test 1: Filter by INT type (id > 2)
+        LeafPredicate intFilterPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(0, "id", 
DataTypes.INT())),
+                        GreaterThan.INSTANCE,
+                        Collections.singletonList(2));
+        restCatalogServer.setRowFilterAuth(
+                identifier, Collections.singletonList(intFilterPredicate));
+
+        List<String> result1 = batchRead(table);
+        assertThat(result1).hasSize(2);
+        assertThat(result1)
+                .contains(
+                        "+I[3, Charlie, 35, 70000.0, true, 95.5]",
+                        "+I[4, David, 28, 55000.0, true, 88.0]");
+
+        // Test 2: Filter by BIGINT type (age >= 30)
+        LeafPredicate bigintFilterPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.BIGINT())),
+                        GreaterOrEqual.INSTANCE,
+                        Collections.singletonList(30L));
+        restCatalogServer.setRowFilterAuth(
+                identifier, Collections.singletonList(bigintFilterPredicate));
+
+        List<String> result2 = batchRead(table);
+        assertThat(result2).hasSize(2);
+        assertThat(result2)
+                .contains(
+                        "+I[2, Bob, 30, 60000.0, false, 90.0]",
+                        "+I[3, Charlie, 35, 70000.0, true, 95.5]");
+
+        // Test 3: Filter by DOUBLE type (salary > 55000.0)
+        LeafPredicate doubleFilterPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(3, "salary", 
DataTypes.DOUBLE())),
+                        GreaterThan.INSTANCE,
+                        Collections.singletonList(55000.0));
+        restCatalogServer.setRowFilterAuth(
+                identifier, Collections.singletonList(doubleFilterPredicate));
+
+        List<String> result3 = batchRead(table);
+        assertThat(result3).hasSize(2);
+        assertThat(result3)
+                .contains(
+                        "+I[2, Bob, 30, 60000.0, false, 90.0]",
+                        "+I[3, Charlie, 35, 70000.0, true, 95.5]");
+
+        // Test 4: Filter by BOOLEAN type (is_active = true)
+        LeafPredicate booleanFilterPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(4, "is_active", 
DataTypes.BOOLEAN())),
+                        Equal.INSTANCE,
+                        Collections.singletonList(true));
+        restCatalogServer.setRowFilterAuth(
+                identifier, Collections.singletonList(booleanFilterPredicate));
+
+        List<String> result4 = batchRead(table);
+        assertThat(result4).hasSize(3);
+        assertThat(result4)
+                .contains(
+                        "+I[1, Alice, 25, 50000.0, true, 85.5]",
+                        "+I[3, Charlie, 35, 70000.0, true, 95.5]",
+                        "+I[4, David, 28, 55000.0, true, 88.0]");
+
+        // Test 5: Filter by FLOAT type (score >= 90.0)
+        LeafPredicate floatFilterPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(5, "score", 
DataTypes.FLOAT())),
+                        GreaterOrEqual.INSTANCE,
+                        Collections.singletonList(90.0f));
+        restCatalogServer.setRowFilterAuth(
+                identifier, Collections.singletonList(floatFilterPredicate));
+
+        List<String> result5 = batchRead(table);
+        assertThat(result5).hasSize(2);
+        assertThat(result5)
+                .contains(
+                        "+I[2, Bob, 30, 60000.0, false, 90.0]",
+                        "+I[3, Charlie, 35, 70000.0, true, 95.5]");
+
+        // Test 6: Filter by STRING type (name = "Alice")
+        LeafPredicate stringFilterPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(1, "name", 
DataTypes.STRING())),
+                        Equal.INSTANCE,
+                        
Collections.singletonList(BinaryString.fromString("Alice")));
+        restCatalogServer.setRowFilterAuth(
+                identifier, Collections.singletonList(stringFilterPredicate));
+
+        List<String> result6 = batchRead(table);
+        assertThat(result6).hasSize(1);
+        assertThat(result6).contains("+I[1, Alice, 25, 50000.0, true, 85.5]");
+
+        // Test 7: Filter with two predicates (age >= 30 AND is_active = true)
+        LeafPredicate ageGe30Predicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.BIGINT())),
+                        GreaterOrEqual.INSTANCE,
+                        Collections.singletonList(30L));
+        LeafPredicate isActiveTruePredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(4, "is_active", 
DataTypes.BOOLEAN())),
+                        Equal.INSTANCE,
+                        Collections.singletonList(true));
+        restCatalogServer.setRowFilterAuth(
+                identifier, Arrays.asList(ageGe30Predicate, 
isActiveTruePredicate));
+
+        List<String> result7 = batchRead(table);
+        assertThat(result7).hasSize(1);
+        assertThat(result7).contains("+I[3, Charlie, 35, 70000.0, true, 
95.5]");
+
+        // Test 8: Filter with two predicates (salary > 55000.0 AND score >= 
90.0)
+        LeafPredicate salaryGt55000Predicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(3, "salary", 
DataTypes.DOUBLE())),
+                        GreaterThan.INSTANCE,
+                        Collections.singletonList(55000.0));
+        LeafPredicate scoreGe90Predicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(5, "score", 
DataTypes.FLOAT())),
+                        GreaterOrEqual.INSTANCE,
+                        Collections.singletonList(90.0f));
+        restCatalogServer.setRowFilterAuth(
+                identifier, Arrays.asList(salaryGt55000Predicate, 
scoreGe90Predicate));
+
+        List<String> result8 = batchRead(table);
+        assertThat(result8).hasSize(2);
+        assertThat(result8)
+                .contains(
+                        "+I[2, Bob, 30, 60000.0, false, 90.0]",
+                        "+I[3, Charlie, 35, 70000.0, true, 95.5]");
+    }
+
+    @Test
+    void testColumnMaskingAndRowFilter() throws Exception {
+        Identifier identifier = Identifier.create("test_table_db", 
"combined_auth_table");
+        catalog.createDatabase(identifier.getDatabaseName(), true);
+
+        // Create table with test data
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "id", DataTypes.INT()));
+        fields.add(new DataField(1, "name", DataTypes.STRING()));
+        fields.add(new DataField(2, "salary", DataTypes.STRING()));
+        fields.add(new DataField(3, "age", DataTypes.INT()));
+        fields.add(new DataField(4, "department", DataTypes.STRING()));
+
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        fields,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.singletonMap(QUERY_AUTH_ENABLED.key(), 
"true"),
+                        ""),
+                true);
+
+        Table table = catalog.getTable(identifier);
+
+        // Write test data
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        BatchTableWrite write = writeBuilder.newWrite();
+        write.write(
+                GenericRow.of(
+                        1,
+                        BinaryString.fromString("Alice"),
+                        BinaryString.fromString("50000.0"),
+                        25,
+                        BinaryString.fromString("IT")));
+        write.write(
+                GenericRow.of(
+                        2,
+                        BinaryString.fromString("Bob"),
+                        BinaryString.fromString("60000.0"),
+                        30,
+                        BinaryString.fromString("HR")));
+        write.write(
+                GenericRow.of(
+                        3,
+                        BinaryString.fromString("Charlie"),
+                        BinaryString.fromString("70000.0"),
+                        35,
+                        BinaryString.fromString("IT")));
+        write.write(
+                GenericRow.of(
+                        4,
+                        BinaryString.fromString("David"),
+                        BinaryString.fromString("55000.0"),
+                        28,
+                        BinaryString.fromString("Finance")));
+        List<CommitMessage> messages = write.prepareCommit();
+        BatchTableCommit commit = writeBuilder.newCommit();
+        commit.commit(messages);
+        write.close();
+        commit.close();
+
+        // Test column masking only
+        Transform salaryMaskTransform =
+                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("***")));
+        Map<String, Transform> columnMasking = new HashMap<>();
+        columnMasking.put("salary", salaryMaskTransform);
+        restCatalogServer.setColumnMaskingAuth(identifier, columnMasking);
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        TableRead read = readBuilder.newRead();
+        RecordReader<InternalRow> reader = read.createReader(splits);
+
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+        assertThat(rows).hasSize(4);
+        assertThat(rows.get(0).getString(2).toString()).isEqualTo("***");
+
+        // Test row filter only (clear column masking first)
+        restCatalogServer.setColumnMaskingAuth(identifier, new HashMap<>());
+        Predicate ageGe30Predicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(3, "age", 
DataTypes.INT())),
+                        GreaterOrEqual.INSTANCE,
+                        Collections.singletonList(30));
+        restCatalogServer.setRowFilterAuth(identifier, 
Collections.singletonList(ageGe30Predicate));
+
+        readBuilder = table.newReadBuilder();
+        splits = readBuilder.newScan().plan().splits();
+        read = readBuilder.newRead();
+        reader = read.createReader(splits);
+
+        rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+        assertThat(rows).hasSize(2);
+
+        // Test both column masking and row filter together
+        columnMasking.put("salary", salaryMaskTransform);
+        Transform nameMaskTransform =
+                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("***")));
+        columnMasking.put("name", nameMaskTransform);
+        restCatalogServer.setColumnMaskingAuth(identifier, columnMasking);
+        Predicate deptPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(4, "department", 
DataTypes.STRING())),
+                        Equal.INSTANCE,
+                        
Collections.singletonList(BinaryString.fromString("IT")));
+        restCatalogServer.setRowFilterAuth(identifier, 
Collections.singletonList(deptPredicate));
+
+        readBuilder = table.newReadBuilder();
+        splits = readBuilder.newScan().plan().splits();
+        read = readBuilder.newRead();
+        reader = read.createReader(splits);
+
+        rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+        assertThat(rows).hasSize(2);
+        assertThat(rows.get(0).getString(1).toString()).isEqualTo("***"); // 
name masked
+        assertThat(rows.get(0).getString(2).toString()).isEqualTo("***"); // 
salary masked
+        assertThat(rows.get(0).getString(4).toString()).isEqualTo("IT"); // 
department not masked
+
+        // Test complex scenario: row filter + column masking combined
+        Predicate combinedPredicate = PredicateBuilder.and(ageGe30Predicate, 
deptPredicate);
+        restCatalogServer.setRowFilterAuth(
+                identifier, Collections.singletonList(combinedPredicate));
+
+        readBuilder = table.newReadBuilder();
+        splits = readBuilder.newScan().plan().splits();
+        read = readBuilder.newRead();
+        reader = read.createReader(splits);
+
+        rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+        assertThat(rows).hasSize(1);
+        assertThat(rows.get(0).getInt(0)).isEqualTo(3); // id
+        assertThat(rows.get(0).getString(1).toString()).isEqualTo("***"); // 
name masked
+        assertThat(rows.get(0).getString(2).toString()).isEqualTo("***"); // 
salary masked
+        assertThat(rows.get(0).getInt(3)).isEqualTo(35); // age not masked
+
+        // Clear both column masking and row filter
+        restCatalogServer.setColumnMaskingAuth(identifier, new HashMap<>());
+        restCatalogServer.setRowFilterAuth(identifier, null);
+
+        readBuilder = table.newReadBuilder();
+        splits = readBuilder.newScan().plan().splits();
+        read = readBuilder.newRead();
+        reader = read.createReader(splits);
+
+        rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+        assertThat(rows).hasSize(4);
+        assertThat(rows.get(0).getString(1).toString()).isIn("Alice", "Bob", 
"Charlie", "David");
+    }
+
     private void checkHeader(String headerName, String headerValue) {
         // Verify that the header were included in the requests
         List<Map<String, String>> receivedHeaders = 
restCatalogServer.getReceivedHeaders();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 8716d4ea7a..6420bc956d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -41,6 +41,8 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.partition.PartitionStatistics;
 import org.apache.paimon.partition.PartitionUtils;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.Transform;
 import org.apache.paimon.rest.auth.AuthProvider;
 import org.apache.paimon.rest.auth.RESTAuthParameter;
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
@@ -97,6 +99,7 @@ import org.apache.paimon.table.TableSnapshot;
 import org.apache.paimon.table.object.ObjectTable;
 import org.apache.paimon.tag.Tag;
 import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.LazyField;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
@@ -185,8 +188,8 @@ public class RESTCatalogServer {
     private final List<String> noPermissionTables = new ArrayList<>();
     private final Map<String, Function> functionStore = new HashMap<>();
     private final Map<String, List<String>> columnAuthHandler = new 
HashMap<>();
-    private final Map<String, AuthTableQueryResponse> 
tableQueryAuthResponseHandler =
-            new HashMap<>();
+    private final Map<String, List<Predicate>> rowFilterAuthHandler = new 
HashMap<>();
+    private final Map<String, Map<String, Transform>> columnMaskingAuthHandler 
= new HashMap<>();
     public final ConfigResponse configResponse;
     public final String warehouse;
 
@@ -268,8 +271,12 @@ public class RESTCatalogServer {
         columnAuthHandler.put(identifier.getFullName(), select);
     }
 
-    public void setTableQueryAuthResponse(Identifier identifier, 
AuthTableQueryResponse response) {
-        tableQueryAuthResponseHandler.put(identifier.getFullName(), response);
+    public void setRowFilterAuth(Identifier identifier, List<Predicate> 
rowFilters) {
+        rowFilterAuthHandler.put(identifier.getFullName(), rowFilters);
+    }
+
+    public void setColumnMaskingAuth(Identifier identifier, Map<String, 
Transform> columnMasking) {
+        columnMaskingAuthHandler.put(identifier.getFullName(), columnMasking);
     }
 
     public RESTToken getDataToken(Identifier identifier) {
@@ -835,8 +842,30 @@ public class RESTCatalogServer {
                         }
                     });
         }
+        List<Predicate> rowFilters = 
rowFilterAuthHandler.get(identifier.getFullName());
+        Map<String, Transform> columnMasking =
+                columnMaskingAuthHandler.get(identifier.getFullName());
+
+        // Convert Predicate list to JSON string list
+        List<String> filterJsonList = null;
+        if (rowFilters != null) {
+            filterJsonList =
+                    
rowFilters.stream().map(JsonSerdeUtil::toFlatJson).collect(Collectors.toList());
+        }
+
+        // Convert Transform map to JSON string map
+        Map<String, String> columnMaskingJsonMap = null;
+        if (columnMasking != null) {
+            columnMaskingJsonMap =
+                    columnMasking.entrySet().stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            Map.Entry::getKey,
+                                            entry -> 
JsonSerdeUtil.toFlatJson(entry.getValue())));
+        }
+
         AuthTableQueryResponse response =
-                tableQueryAuthResponseHandler.get(identifier.getFullName());
+                new AuthTableQueryResponse(filterJsonList, 
columnMaskingJsonMap);
         if (response == null) {
             response = new AuthTableQueryResponse(Collections.emptyList(), 
ImmutableMap.of());
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 288c925ceb..bc0a74bf72 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -3030,11 +3030,27 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
         TableRead read = readBuilder.newRead();
         RecordReader<InternalRow> reader = read.createReader(splits);
         List<String> result = new ArrayList<>();
+
+        // Create field getters for each column
+        InternalRow.FieldGetter[] fieldGetters =
+                new InternalRow.FieldGetter[table.rowType().getFieldCount()];
+        for (int i = 0; i < table.rowType().getFieldCount(); i++) {
+            fieldGetters[i] = 
InternalRow.createFieldGetter(table.rowType().getTypeAt(i), i);
+        }
+
         reader.forEachRemaining(
                 row -> {
-                    String rowStr =
-                            String.format("%s[%d]", 
row.getRowKind().shortString(), row.getInt(0));
-                    result.add(rowStr);
+                    StringBuilder sb = new StringBuilder();
+                    sb.append(row.getRowKind().shortString()).append("[");
+                    for (int i = 0; i < row.getFieldCount(); i++) {
+                        if (i > 0) {
+                            sb.append(", ");
+                        }
+                        Object value = fieldGetters[i].getFieldOrNull(row);
+                        sb.append(value);
+                    }
+                    sb.append("]");
+                    result.add(sb.toString());
                 });
         return result;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index 034816d6a0..48fefa8e24 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -19,8 +19,23 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.partition.Partition;
+import org.apache.paimon.predicate.ConcatTransform;
+import org.apache.paimon.predicate.ConcatWsTransform;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.FieldTransform;
+import org.apache.paimon.predicate.GreaterOrEqual;
+import org.apache.paimon.predicate.GreaterThan;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.LessThan;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.Transform;
+import org.apache.paimon.predicate.UpperTransform;
 import org.apache.paimon.rest.RESTToken;
+import org.apache.paimon.types.DataTypes;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
@@ -35,7 +50,11 @@ import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -82,7 +101,6 @@ class RESTCatalogITCase extends RESTCatalogITCaseBase {
                 .containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2", 
22.0D));
     }
 
-    @Test
     public void testExpiredDataToken() {
         Identifier identifier = Identifier.create(DATABASE_NAME, TABLE_NAME);
         RESTToken expiredDataToken =
@@ -199,4 +217,362 @@ class RESTCatalogITCase extends RESTCatalogITCaseBase {
             
assertThat(partitions.get(0).totalBuckets()).isEqualTo(expectedTotalBuckets);
         }
     }
+
+    public void testColumnMasking() {
+        String maskingTable = "column_masking_table";
+        batchSql(
+                String.format(
+                        "CREATE TABLE %s.%s (id INT, secret STRING, email 
STRING, phone STRING, salary STRING) WITH ('query-auth.enabled' = 'true')",
+                        DATABASE_NAME, maskingTable));
+        batchSql(
+                String.format(
+                        "INSERT INTO %s.%s VALUES (1, 's1', 
'[email protected]', '12345678901', '50000.0'), (2, 's2', '[email protected]', 
'12345678902', '60000.0')",
+                        DATABASE_NAME, maskingTable));
+
+        // Test single column masking
+        Transform maskTransform =
+                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("****")));
+        Map<String, Transform> columnMasking = new HashMap<>();
+        columnMasking.put("secret", maskTransform);
+        restCatalogServer.setColumnMaskingAuth(
+                Identifier.create(DATABASE_NAME, maskingTable), columnMasking);
+
+        assertThat(batchSql(String.format("SELECT secret FROM %s.%s", 
DATABASE_NAME, maskingTable)))
+                .containsExactlyInAnyOrder(Row.of("****"), Row.of("****"));
+        assertThat(batchSql(String.format("SELECT id FROM %s.%s", 
DATABASE_NAME, maskingTable)))
+                .containsExactlyInAnyOrder(Row.of(1), Row.of(2));
+
+        // Test multiple columns masking
+        Transform emailMaskTransform =
+                new ConcatTransform(
+                        
Collections.singletonList(BinaryString.fromString("***@***.com")));
+        Transform phoneMaskTransform =
+                new ConcatTransform(
+                        
Collections.singletonList(BinaryString.fromString("***********")));
+        Transform salaryMaskTransform =
+                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("0.0")));
+
+        columnMasking.put("email", emailMaskTransform);
+        columnMasking.put("phone", phoneMaskTransform);
+        columnMasking.put("salary", salaryMaskTransform);
+        restCatalogServer.setColumnMaskingAuth(
+                Identifier.create(DATABASE_NAME, maskingTable), columnMasking);
+
+        assertThat(batchSql(String.format("SELECT email FROM %s.%s", 
DATABASE_NAME, maskingTable)))
+                .containsExactlyInAnyOrder(Row.of("***@***.com"), 
Row.of("***@***.com"));
+        assertThat(batchSql(String.format("SELECT phone FROM %s.%s", 
DATABASE_NAME, maskingTable)))
+                .containsExactlyInAnyOrder(Row.of("***********"), 
Row.of("***********"));
+        assertThat(batchSql(String.format("SELECT salary FROM %s.%s", 
DATABASE_NAME, maskingTable)))
+                .containsExactlyInAnyOrder(Row.of("0.0"), Row.of("0.0"));
+
+        // Test SELECT * with column masking
+        List<Row> allRows =
+                batchSql(
+                        String.format(
+                                "SELECT * FROM %s.%s ORDER BY id", 
DATABASE_NAME, maskingTable));
+        assertThat(allRows.size()).isEqualTo(2);
+        assertThat(allRows.get(0).getField(1)).isEqualTo("****");
+        assertThat(allRows.get(0).getField(2)).isEqualTo("***@***.com");
+        assertThat(allRows.get(0).getField(3)).isEqualTo("***********");
+        assertThat(allRows.get(0).getField(4)).isEqualTo("0.0");
+
+        // Test WHERE clause with masked column
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT id FROM %s.%s WHERE id = 1",
+                                        DATABASE_NAME, maskingTable)))
+                .containsExactlyInAnyOrder(Row.of(1));
+
+        // Test aggregation with masked columns
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT COUNT(*) FROM %s.%s", 
DATABASE_NAME, maskingTable)))
+                .containsExactlyInAnyOrder(Row.of(2L));
+
+        // Test JOIN with masked columns
+        String joinTable = "join_table";
+        batchSql(
+                String.format(
+                        "CREATE TABLE %s.%s (id INT, name STRING)", 
DATABASE_NAME, joinTable));
+        batchSql(
+                String.format(
+                        "INSERT INTO %s.%s VALUES (1, 'Alice'), (2, 'Bob')",
+                        DATABASE_NAME, joinTable));
+
+        List<Row> joinResult =
+                batchSql(
+                        String.format(
+                                "SELECT t1.id, t1.secret, t2.name FROM %s.%s 
t1 JOIN %s.%s t2 ON t1.id = t2.id ORDER BY t1.id",
+                                DATABASE_NAME, maskingTable, DATABASE_NAME, 
joinTable));
+        assertThat(joinResult.size()).isEqualTo(2);
+        assertThat(joinResult.get(0).getField(1)).isEqualTo("****");
+        assertThat(joinResult.get(0).getField(2)).isEqualTo("Alice");
+
+        // Test UpperTransform
+        Transform upperTransform =
+                new UpperTransform(
+                        Collections.singletonList(new FieldRef(1, "secret", 
DataTypes.STRING())));
+        columnMasking.clear();
+        columnMasking.put("secret", upperTransform);
+        restCatalogServer.setColumnMaskingAuth(
+                Identifier.create(DATABASE_NAME, maskingTable), columnMasking);
+
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT secret FROM %s.%s ORDER BY id",
+                                        DATABASE_NAME, maskingTable)))
+                .containsExactlyInAnyOrder(Row.of("S1"), Row.of("S2"));
+
+        // Test ConcatWsTransform
+        Transform concatWsTransform =
+                new ConcatWsTransform(
+                        Arrays.asList(
+                                BinaryString.fromString("-"),
+                                new FieldRef(1, "secret", DataTypes.STRING()),
+                                BinaryString.fromString("masked")));
+        columnMasking.clear();
+        columnMasking.put("secret", concatWsTransform);
+        restCatalogServer.setColumnMaskingAuth(
+                Identifier.create(DATABASE_NAME, maskingTable), columnMasking);
+
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT secret FROM %s.%s ORDER BY id",
+                                        DATABASE_NAME, maskingTable)))
+                .containsExactlyInAnyOrder(Row.of("s1-masked"), 
Row.of("s2-masked"));
+
+        // Clear masking and verify original data
+        restCatalogServer.setColumnMaskingAuth(
+                Identifier.create(DATABASE_NAME, maskingTable), new 
HashMap<>());
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT secret FROM %s.%s ORDER BY id",
+                                        DATABASE_NAME, maskingTable)))
+                .containsExactlyInAnyOrder(Row.of("s1"), Row.of("s2"));
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT email FROM %s.%s ORDER BY id",
+                                        DATABASE_NAME, maskingTable)))
+                .containsExactlyInAnyOrder(
+                        Row.of("[email protected]"), 
Row.of("[email protected]"));
+    }
+
+    @Test
+    public void testRowFilter() {
+        String filterTable = "row_filter_table";
+        batchSql(
+                String.format(
+                        "CREATE TABLE %s.%s (id INT, name STRING, age INT, 
department STRING) WITH ('query-auth.enabled' = 'true')",
+                        DATABASE_NAME, filterTable));
+        batchSql(
+                String.format(
+                        "INSERT INTO %s.%s VALUES (1, 'Alice', 25, 'IT'), (2, 
'Bob', 30, 'HR'), (3, 'Charlie', 35, 'IT'), (4, 'David', 28, 'Finance')",
+                        DATABASE_NAME, filterTable));
+
+        // Test single condition row filter (age > 28)
+        Predicate agePredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.INT())),
+                        GreaterThan.INSTANCE,
+                        Collections.singletonList(28));
+        restCatalogServer.setRowFilterAuth(
+                Identifier.create(DATABASE_NAME, filterTable),
+                Collections.singletonList(agePredicate));
+
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT * FROM %s.%s ORDER BY id",
+                                        DATABASE_NAME, filterTable)))
+                .containsExactlyInAnyOrder(
+                        Row.of(2, "Bob", 30, "HR"), Row.of(3, "Charlie", 35, 
"IT"));
+
+        // Test string condition row filter (department = 'IT')
+        Predicate deptPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(3, "department", 
DataTypes.STRING())),
+                        Equal.INSTANCE,
+                        
Collections.singletonList(BinaryString.fromString("IT")));
+        restCatalogServer.setRowFilterAuth(
+                Identifier.create(DATABASE_NAME, filterTable),
+                Collections.singletonList(deptPredicate));
+
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT * FROM %s.%s ORDER BY id",
+                                        DATABASE_NAME, filterTable)))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, "Alice", 25, "IT"), Row.of(3, "Charlie", 35, 
"IT"));
+
+        // Test combined conditions (age >= 30 AND department = 'IT')
+        Predicate ageGePredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.INT())),
+                        GreaterOrEqual.INSTANCE,
+                        Collections.singletonList(30));
+        Predicate combinedPredicate = PredicateBuilder.and(ageGePredicate, 
deptPredicate);
+        restCatalogServer.setRowFilterAuth(
+                Identifier.create(DATABASE_NAME, filterTable),
+                Collections.singletonList(combinedPredicate));
+
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT * FROM %s.%s ORDER BY id",
+                                        DATABASE_NAME, filterTable)))
+                .containsExactlyInAnyOrder(Row.of(3, "Charlie", 35, "IT"));
+
+        // Test OR condition (age < 27 OR department = 'Finance')
+        Predicate ageLtPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.INT())),
+                        LessThan.INSTANCE,
+                        Collections.singletonList(27));
+        Predicate financePredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(3, "department", 
DataTypes.STRING())),
+                        Equal.INSTANCE,
+                        
Collections.singletonList(BinaryString.fromString("Finance")));
+        Predicate orPredicate = PredicateBuilder.or(ageLtPredicate, 
financePredicate);
+        restCatalogServer.setRowFilterAuth(
+                Identifier.create(DATABASE_NAME, filterTable),
+                Collections.singletonList(orPredicate));
+
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT * FROM %s.%s ORDER BY id",
+                                        DATABASE_NAME, filterTable)))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, "Alice", 25, "IT"), Row.of(4, "David", 28, 
"Finance"));
+
+        // Test WHERE clause combined with row filter
+        Predicate ageGt25Predicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.INT())),
+                        GreaterThan.INSTANCE,
+                        Collections.singletonList(25));
+        restCatalogServer.setRowFilterAuth(
+                Identifier.create(DATABASE_NAME, filterTable),
+                Collections.singletonList(ageGt25Predicate));
+
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT * FROM %s.%s WHERE department 
= 'IT' ORDER BY id",
+                                        DATABASE_NAME, filterTable)))
+                .containsExactlyInAnyOrder(Row.of(3, "Charlie", 35, "IT"));
+
+        // Test JOIN with row filter
+        String joinTable = "join_table";
+        batchSql(
+                String.format(
+                        "CREATE TABLE %s.%s (id INT, salary DOUBLE)", 
DATABASE_NAME, joinTable));
+        batchSql(
+                String.format(
+                        "INSERT INTO %s.%s VALUES (1, 50000.0), (2, 60000.0), 
(3, 70000.0), (4, 55000.0)",
+                        DATABASE_NAME, joinTable));
+
+        Predicate ageGe30Predicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.INT())),
+                        GreaterOrEqual.INSTANCE,
+                        Collections.singletonList(30));
+        restCatalogServer.setRowFilterAuth(
+                Identifier.create(DATABASE_NAME, filterTable),
+                Collections.singletonList(ageGe30Predicate));
+
+        List<Row> joinResult =
+                batchSql(
+                        String.format(
+                                "SELECT t1.id, t1.name, t1.age, t2.salary FROM 
%s.%s t1 JOIN %s.%s t2 ON t1.id = t2.id ORDER BY t1.id",
+                                DATABASE_NAME, filterTable, DATABASE_NAME, 
joinTable));
+        assertThat(joinResult.size()).isEqualTo(2);
+        assertThat(joinResult.get(0)).isEqualTo(Row.of(2, "Bob", 30, 60000.0));
+        assertThat(joinResult.get(1)).isEqualTo(Row.of(3, "Charlie", 35, 
70000.0));
+
+        // Clear row filter and verify original data
+        restCatalogServer.setRowFilterAuth(Identifier.create(DATABASE_NAME, 
filterTable), null);
+
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT COUNT(*) FROM %s.%s", 
DATABASE_NAME, filterTable)))
+                .containsExactlyInAnyOrder(Row.of(4L));
+    }
+
+    @Test
+    public void testColumnMaskingAndRowFilter() {
+        String combinedTable = "combined_auth_table";
+        batchSql(
+                String.format(
+                        "CREATE TABLE %s.%s (id INT, name STRING, salary 
STRING, age INT, department STRING) WITH ('query-auth.enabled' = 'true')",
+                        DATABASE_NAME, combinedTable));
+        batchSql(
+                String.format(
+                        "INSERT INTO %s.%s VALUES (1, 'Alice', '50000.0', 25, 
'IT'), (2, 'Bob', '60000.0', 30, 'HR'), (3, 'Charlie', '70000.0', 35, 'IT'), 
(4, 'David', '55000.0', 28, 'Finance')",
+                        DATABASE_NAME, combinedTable));
+        Transform salaryMaskTransform =
+                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("***")));
+        Map<String, Transform> columnMasking = new HashMap<>();
+        columnMasking.put("salary", salaryMaskTransform);
+        Transform nameMaskTransform =
+                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("***")));
+        columnMasking.put("name", nameMaskTransform);
+        Predicate deptPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(4, "department", 
DataTypes.STRING())),
+                        Equal.INSTANCE,
+                        
Collections.singletonList(BinaryString.fromString("IT")));
+        restCatalogServer.setColumnMaskingAuth(
+                Identifier.create(DATABASE_NAME, combinedTable), 
columnMasking);
+        restCatalogServer.setRowFilterAuth(
+                Identifier.create(DATABASE_NAME, combinedTable),
+                Collections.singletonList(deptPredicate));
+
+        // Test both column masking and row filter together
+        List<Row> combinedResult =
+                batchSql(
+                        String.format(
+                                "SELECT * FROM %s.%s ORDER BY id", 
DATABASE_NAME, combinedTable));
+        assertThat(combinedResult.size()).isEqualTo(2);
+        assertThat(combinedResult.get(0).getField(0)).isEqualTo(1); // id
+        assertThat(combinedResult.get(0).getField(1)).isEqualTo("***"); // 
name masked
+        assertThat(combinedResult.get(0).getField(2)).isEqualTo("***"); // 
salary masked
+        assertThat(combinedResult.get(0).getField(3)).isEqualTo(25); // age 
not masked
+        assertThat(combinedResult.get(0).getField(4)).isEqualTo("IT"); // 
department not masked
+
+        // Test WHERE clause with both features
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT id, name FROM %s.%s WHERE age 
> 30 ORDER BY id",
+                                        DATABASE_NAME, combinedTable)))
+                .containsExactlyInAnyOrder(Row.of(3, "***"));
+
+        // Clear both column masking and row filter
+        restCatalogServer.setColumnMaskingAuth(
+                Identifier.create(DATABASE_NAME, combinedTable), new 
HashMap<>());
+        restCatalogServer.setRowFilterAuth(Identifier.create(DATABASE_NAME, 
combinedTable), null);
+
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT COUNT(*) FROM %s.%s",
+                                        DATABASE_NAME, combinedTable)))
+                .containsExactlyInAnyOrder(Row.of(4L));
+        assertThat(
+                        batchSql(
+                                String.format(
+                                        "SELECT name FROM %s.%s WHERE id = 1",
+                                        DATABASE_NAME, combinedTable)))
+                .containsExactlyInAnyOrder(Row.of("Alice"));
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
index 679234bc74..8e635d404c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
@@ -25,7 +25,7 @@ import org.apache.paimon.spark.PaimonInputPartition
 import org.apache.paimon.spark.util.SplitUtils
 import org.apache.paimon.table.FallbackReadFileStoreTable.FallbackSplit
 import org.apache.paimon.table.format.FormatDataSplit
-import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
+import org.apache.paimon.table.source.{DataSplit, DeletionFile, 
QueryAuthSplit, Split}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.PaimonSparkSession
@@ -160,19 +160,32 @@ case class BinPackingSplits(coreOptions: CoreOptions, 
readRowSizeRatio: Double =
       split: DataSplit,
       dataFiles: Seq[DataFileMeta],
       deletionFiles: Seq[DeletionFile]): DataSplit = {
+    val (actualSplit, authResult) = split match {
+      case queryAuthSplit: QueryAuthSplit =>
+        (queryAuthSplit.dataSplit(), queryAuthSplit.authResult())
+      case _ =>
+        (split, null)
+    }
+
     val builder = DataSplit
       .builder()
-      .withSnapshot(split.snapshotId())
-      .withPartition(split.partition())
-      .withBucket(split.bucket())
-      .withTotalBuckets(split.totalBuckets())
+      .withSnapshot(actualSplit.snapshotId())
+      .withPartition(actualSplit.partition())
+      .withBucket(actualSplit.bucket())
+      .withTotalBuckets(actualSplit.totalBuckets())
       .withDataFiles(dataFiles.toList.asJava)
-      .rawConvertible(split.rawConvertible)
-      .withBucketPath(split.bucketPath)
+      .rawConvertible(actualSplit.rawConvertible)
+      .withBucketPath(actualSplit.bucketPath)
     if (deletionVectors) {
       builder.withDataDeletionFiles(deletionFiles.toList.asJava)
     }
-    builder.build()
+    val newDataSplit = builder.build()
+
+    if (authResult != null) {
+      new QueryAuthSplit(newDataSplit, authResult)
+    } else {
+      newDataSplit
+    }
   }
 
   private def withSamePartitionAndBucket(split1: DataSplit, split2: 
DataSplit): Boolean = {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
index ee8978c687..bbae01af95 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
@@ -20,11 +20,25 @@ package org.apache.paimon.spark;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.function.Function;
 import org.apache.paimon.function.FunctionChange;
 import org.apache.paimon.function.FunctionDefinition;
 import org.apache.paimon.function.FunctionImpl;
 import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.predicate.ConcatTransform;
+import org.apache.paimon.predicate.ConcatWsTransform;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.FieldTransform;
+import org.apache.paimon.predicate.GreaterOrEqual;
+import org.apache.paimon.predicate.GreaterThan;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.LessThan;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.Transform;
+import org.apache.paimon.predicate.UpperTransform;
 import org.apache.paimon.rest.RESTCatalogInternalOptions;
 import org.apache.paimon.rest.RESTCatalogServer;
 import org.apache.paimon.rest.auth.AuthProvider;
@@ -38,6 +52,7 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.connector.catalog.CatalogManager;
 import org.junit.jupiter.api.AfterEach;
@@ -48,7 +63,11 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -237,6 +256,279 @@ public class SparkCatalogWithRestTest {
         cleanFunction(functionName);
     }
 
+    @Test
+    public void testColumnMasking() {
+        spark.sql(
+                "CREATE TABLE t_column_masking (id INT, secret STRING, email 
STRING, phone STRING) TBLPROPERTIES"
+                        + " ('query-auth.enabled'='true')");
+        spark.sql(
+                "INSERT INTO t_column_masking VALUES (1, 's1', 
'[email protected]', '12345678901'), (2, 's2', '[email protected]', 
'12345678902')");
+
+        // Test single column masking
+        Transform maskTransform =
+                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("****")));
+
+        Map<String, Transform> columnMasking = new HashMap<>();
+        columnMasking.put("secret", maskTransform);
+        restCatalogServer.setColumnMaskingAuth(
+                Identifier.create("db2", "t_column_masking"), columnMasking);
+
+        assertThat(spark.sql("SELECT secret FROM 
t_column_masking").collectAsList().toString())
+                .isEqualTo("[[****], [****]]");
+        assertThat(spark.sql("SELECT id FROM 
t_column_masking").collectAsList().toString())
+                .isEqualTo("[[1], [2]]");
+
+        // Test multiple columns masking
+        Transform emailMaskTransform =
+                new ConcatTransform(
+                        
Collections.singletonList(BinaryString.fromString("***@***.com")));
+        Transform phoneMaskTransform =
+                new ConcatTransform(
+                        
Collections.singletonList(BinaryString.fromString("***********")));
+
+        columnMasking.put("email", emailMaskTransform);
+        columnMasking.put("phone", phoneMaskTransform);
+        restCatalogServer.setColumnMaskingAuth(
+                Identifier.create("db2", "t_column_masking"), columnMasking);
+
+        assertThat(spark.sql("SELECT email FROM 
t_column_masking").collectAsList().toString())
+                .isEqualTo("[[***@***.com], [***@***.com]]");
+        assertThat(spark.sql("SELECT phone FROM 
t_column_masking").collectAsList().toString())
+                .isEqualTo("[[***********], [***********]]");
+
+        // Test SELECT * with column masking
+        List<Row> allRows = spark.sql("SELECT * FROM 
t_column_masking").collectAsList();
+        assertThat(allRows.size()).isEqualTo(2);
+        assertThat(allRows.get(0).getString(1)).isEqualTo("****");
+        assertThat(allRows.get(0).getString(2)).isEqualTo("***@***.com");
+        assertThat(allRows.get(0).getString(3)).isEqualTo("***********");
+
+        // Test WHERE clause with masked column
+        assertThat(
+                        spark.sql("SELECT id FROM t_column_masking WHERE id = 
1")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[1]]");
+
+        // Test aggregation with masked columns
+        assertThat(spark.sql("SELECT COUNT(*) FROM 
t_column_masking").collectAsList().toString())
+                .isEqualTo("[[2]]");
+
+        // Test UpperTransform
+        Transform upperTransform =
+                new UpperTransform(
+                        Collections.singletonList(new FieldRef(1, "secret", 
DataTypes.STRING())));
+        columnMasking.clear();
+        columnMasking.put("secret", upperTransform);
+        restCatalogServer.setColumnMaskingAuth(
+                Identifier.create("db2", "t_column_masking"), columnMasking);
+
+        assertThat(
+                        spark.sql("SELECT secret FROM t_column_masking ORDER 
BY id")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[S1], [S2]]");
+
+        // Test ConcatWsTransform
+        Transform concatWsTransform =
+                new ConcatWsTransform(
+                        Arrays.asList(
+                                BinaryString.fromString("-"),
+                                new FieldRef(1, "secret", DataTypes.STRING()),
+                                BinaryString.fromString("masked")));
+        columnMasking.clear();
+        columnMasking.put("secret", concatWsTransform);
+        restCatalogServer.setColumnMaskingAuth(
+                Identifier.create("db2", "t_column_masking"), columnMasking);
+
+        assertThat(
+                        spark.sql("SELECT secret FROM t_column_masking ORDER 
BY id")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[s1-masked], [s2-masked]]");
+
+        // Clear masking and verify original data
+        restCatalogServer.setColumnMaskingAuth(
+                Identifier.create("db2", "t_column_masking"), new HashMap<>());
+        assertThat(
+                        spark.sql("SELECT secret FROM t_column_masking ORDER 
BY id")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[s1], [s2]]");
+        assertThat(
+                        spark.sql("SELECT email FROM t_column_masking ORDER BY 
id")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[[email protected]], [[email protected]]]");
+    }
+
+    @Test
+    public void testRowFilter() {
+        spark.sql(
+                "CREATE TABLE t_row_filter (id INT, name STRING, age INT, 
department STRING) TBLPROPERTIES"
+                        + " ('query-auth.enabled'='true')");
+        spark.sql(
+                "INSERT INTO t_row_filter VALUES (1, 'Alice', 25, 'IT'), (2, 
'Bob', 30, 'HR'), (3, 'Charlie', 35, 'IT'), (4, 'David', 28, 'Finance')");
+
+        // Test single condition row filter (age > 28)
+        Predicate agePredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.INT())),
+                        GreaterThan.INSTANCE,
+                        Collections.singletonList(28));
+        restCatalogServer.setRowFilterAuth(
+                Identifier.create("db2", "t_row_filter"), 
Collections.singletonList(agePredicate));
+
+        assertThat(spark.sql("SELECT * FROM t_row_filter ORDER BY 
id").collectAsList().toString())
+                .isEqualTo("[[2,Bob,30,HR], [3,Charlie,35,IT]]");
+
+        // Test string condition row filter (department = 'IT')
+        Predicate deptPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(3, "department", 
DataTypes.STRING())),
+                        Equal.INSTANCE,
+                        
Collections.singletonList(BinaryString.fromString("IT")));
+        restCatalogServer.setRowFilterAuth(
+                Identifier.create("db2", "t_row_filter"), 
Collections.singletonList(deptPredicate));
+
+        assertThat(spark.sql("SELECT * FROM t_row_filter ORDER BY 
id").collectAsList().toString())
+                .isEqualTo("[[1,Alice,25,IT], [3,Charlie,35,IT]]");
+
+        // Test combined conditions (age >= 30 AND department = 'IT')
+        Predicate ageGePredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.INT())),
+                        GreaterOrEqual.INSTANCE,
+                        Collections.singletonList(30));
+        Predicate combinedPredicate = PredicateBuilder.and(ageGePredicate, 
deptPredicate);
+        restCatalogServer.setRowFilterAuth(
+                Identifier.create("db2", "t_row_filter"),
+                Collections.singletonList(combinedPredicate));
+
+        assertThat(spark.sql("SELECT * FROM t_row_filter ORDER BY 
id").collectAsList().toString())
+                .isEqualTo("[[3,Charlie,35,IT]]");
+
+        // Test OR condition (age < 27 OR department = 'Finance')
+        Predicate ageLtPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.INT())),
+                        LessThan.INSTANCE,
+                        Collections.singletonList(27));
+        Predicate financePredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(3, "department", 
DataTypes.STRING())),
+                        Equal.INSTANCE,
+                        
Collections.singletonList(BinaryString.fromString("Finance")));
+        Predicate orPredicate = PredicateBuilder.or(ageLtPredicate, 
financePredicate);
+        restCatalogServer.setRowFilterAuth(
+                Identifier.create("db2", "t_row_filter"), 
Collections.singletonList(orPredicate));
+
+        assertThat(spark.sql("SELECT * FROM t_row_filter ORDER BY 
id").collectAsList().toString())
+                .isEqualTo("[[1,Alice,25,IT], [4,David,28,Finance]]");
+
+        // Test WHERE clause combined with row filter
+        Predicate ageGt25Predicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.INT())),
+                        GreaterThan.INSTANCE,
+                        Collections.singletonList(25));
+        restCatalogServer.setRowFilterAuth(
+                Identifier.create("db2", "t_row_filter"),
+                Collections.singletonList(ageGt25Predicate));
+
+        assertThat(
+                        spark.sql("SELECT * FROM t_row_filter WHERE department 
= 'IT' ORDER BY id")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[3,Charlie,35,IT]]");
+
+        // Test JOIN with row filter
+        spark.sql("CREATE TABLE t_join2 (id INT, salary DOUBLE)");
+        spark.sql(
+                "INSERT INTO t_join2 VALUES (1, 50000.0), (2, 60000.0), (3, 
70000.0), (4, 55000.0)");
+
+        Predicate ageGe30Predicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.INT())),
+                        GreaterOrEqual.INSTANCE,
+                        Collections.singletonList(30));
+        restCatalogServer.setRowFilterAuth(
+                Identifier.create("db2", "t_row_filter"),
+                Collections.singletonList(ageGe30Predicate));
+
+        List<Row> joinResult =
+                spark.sql(
+                                "SELECT t1.id, t1.name, t1.age, t2.salary FROM 
t_row_filter t1 JOIN t_join2 t2 ON t1.id = t2.id ORDER BY t1.id")
+                        .collectAsList();
+        assertThat(joinResult.size()).isEqualTo(2);
+        
assertThat(joinResult.get(0).toString()).isEqualTo("[2,Bob,30,60000.0]");
+        
assertThat(joinResult.get(1).toString()).isEqualTo("[3,Charlie,35,70000.0]");
+
+        // Clear row filter and verify original data
+        restCatalogServer.setRowFilterAuth(Identifier.create("db2", 
"t_row_filter"), null);
+
+        assertThat(spark.sql("SELECT COUNT(*) FROM 
t_row_filter").collectAsList().toString())
+                .isEqualTo("[[4]]");
+    }
+
+    @Test
+    public void testColumnMaskingAndRowFilter() {
+        spark.sql(
+                "CREATE TABLE t_combined (id INT, name STRING, salary STRING, 
age INT, department STRING) TBLPROPERTIES"
+                        + " ('query-auth.enabled'='true')");
+        spark.sql(
+                "INSERT INTO t_combined VALUES (1, 'Alice', '50000.0', 25, 
'IT'), (2, 'Bob', '60000.0', 30, 'HR'), (3, 'Charlie', '70000.0', 35, 'IT'), 
(4, 'David', '55000.0', 28, 'Finance')");
+
+        Transform salaryMaskTransform =
+                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("***")));
+        Map<String, Transform> columnMasking = new HashMap<>();
+        Predicate ageGe30Predicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(3, "age", 
DataTypes.INT())),
+                        GreaterOrEqual.INSTANCE,
+                        Collections.singletonList(30));
+
+        // Test both column masking and row filter together
+        columnMasking.put("salary", salaryMaskTransform);
+        Transform nameMaskTransform =
+                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("***")));
+        columnMasking.put("name", nameMaskTransform);
+        restCatalogServer.setColumnMaskingAuth(
+                Identifier.create("db2", "t_combined"), columnMasking);
+        Predicate deptPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(4, "department", 
DataTypes.STRING())),
+                        Equal.INSTANCE,
+                        
Collections.singletonList(BinaryString.fromString("IT")));
+        restCatalogServer.setRowFilterAuth(
+                Identifier.create("db2", "t_combined"), 
Collections.singletonList(deptPredicate));
+
+        List<Row> combinedResult =
+                spark.sql("SELECT * FROM t_combined ORDER BY 
id").collectAsList();
+        assertThat(combinedResult.size()).isEqualTo(2);
+        assertThat(combinedResult.get(0).getString(1)).isEqualTo("***"); // 
name masked
+        assertThat(combinedResult.get(0).getString(2)).isEqualTo("***"); // 
salary masked
+        assertThat(combinedResult.get(0).getInt(3)).isEqualTo(25); // age not 
masked
+        assertThat(combinedResult.get(0).getString(4)).isEqualTo("IT"); // 
department not masked
+
+        // Test WHERE clause with both features
+        assertThat(
+                        spark.sql("SELECT id, name FROM t_combined WHERE age > 
30 ORDER BY id")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[3,***]]");
+
+        // Clear both column masking and row filter
+        restCatalogServer.setColumnMaskingAuth(
+                Identifier.create("db2", "t_combined"), new HashMap<>());
+        restCatalogServer.setRowFilterAuth(Identifier.create("db2", 
"t_combined"), null);
+
+        assertThat(spark.sql("SELECT COUNT(*) FROM 
t_combined").collectAsList().toString())
+                .isEqualTo("[[4]]");
+        assertThat(spark.sql("SELECT name FROM t_combined WHERE id = 
1").collectAsList().toString())
+                .isEqualTo("[[Alice]]");
+    }
+
     private Catalog getPaimonCatalog() {
         CatalogManager catalogManager = spark.sessionState().catalogManager();
         WithPaimonCatalog withPaimonCatalog = (WithPaimonCatalog) 
catalogManager.currentCatalog();

Reply via email to