This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dde7851da [test] Move auth test cases to RESTCatalogTest (#7069)
6dde7851da is described below

commit 6dde7851dac3290e24cf012ff7742c387a49b36c
Author: Jiajia Li <[email protected]>
AuthorDate: Sat Jan 17 13:55:10 2026 +0800

    [test] Move auth test cases to RESTCatalogTest (#7069)
---
 .../apache/paimon/predicate/FieldTransform.java    |   1 -
 .../apache/paimon/rest/MockRESTCatalogTest.java    | 481 +--------------------
 .../org/apache/paimon/rest/RESTCatalogTest.java    | 456 +++++++++++++++++++
 .../org/apache/paimon/flink/RESTCatalogITCase.java |   1 +
 4 files changed, 467 insertions(+), 472 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/FieldTransform.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/FieldTransform.java
index 744e789b50..5136dedec5 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/FieldTransform.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/FieldTransform.java
@@ -42,7 +42,6 @@ public class FieldTransform implements Transform {
     private final FieldRef fieldRef;
 
     public static final String FIELD_FIELD_REF = "fieldRef";
-    public static final String FIELD_TYPE = "type";
 
     @JsonCreator
     public FieldTransform(@JsonProperty(FIELD_FIELD_REF) FieldRef fieldRef) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
index a5a84b2512..85e7a27e61 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
@@ -26,25 +26,13 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.TableQueryAuthResult;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.CastTransform;
-import org.apache.paimon.predicate.ConcatTransform;
-import org.apache.paimon.predicate.ConcatWsTransform;
-import org.apache.paimon.predicate.Equal;
 import org.apache.paimon.predicate.FieldRef;
-import org.apache.paimon.predicate.FieldTransform;
-import org.apache.paimon.predicate.GreaterOrEqual;
-import org.apache.paimon.predicate.GreaterThan;
-import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.predicate.Transform;
 import org.apache.paimon.predicate.UpperTransform;
-import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.rest.auth.AuthProvider;
 import org.apache.paimon.rest.auth.AuthProviderEnum;
 import org.apache.paimon.rest.auth.BearTokenAuthProvider;
@@ -55,15 +43,6 @@ import org.apache.paimon.rest.auth.RESTAuthParameter;
 import org.apache.paimon.rest.exceptions.NotAuthorizedException;
 import org.apache.paimon.rest.responses.ConfigResponse;
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchTableCommit;
-import org.apache.paimon.table.sink.BatchTableWrite;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
@@ -76,15 +55,12 @@ import org.junit.jupiter.api.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import static org.apache.paimon.CoreOptions.QUERY_AUTH_ENABLED;
 import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
 import static org.apache.paimon.rest.RESTApi.HEADER_PREFIX;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -315,453 +291,6 @@ class MockRESTCatalogTest extends RESTCatalogTest {
         catalog.dropDatabase(identifier.getDatabaseName(), true, true);
     }
 
-    @Test
-    void testColumnMaskingApplyOnRead() throws Exception {
-        Identifier identifier = Identifier.create("test_table_db", 
"auth_table_masking_apply");
-        catalog.createDatabase(identifier.getDatabaseName(), true);
-
-        // Create table with multiple columns of different types
-        List<DataField> fields = new ArrayList<>();
-        fields.add(new DataField(0, "col1", DataTypes.STRING()));
-        fields.add(new DataField(1, "col2", DataTypes.STRING()));
-        fields.add(new DataField(2, "col3", DataTypes.INT()));
-        fields.add(new DataField(3, "col4", DataTypes.STRING()));
-        fields.add(new DataField(4, "col5", DataTypes.STRING()));
-
-        catalog.createTable(
-                identifier,
-                new Schema(
-                        fields,
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        Collections.singletonMap(QUERY_AUTH_ENABLED.key(), 
"true"),
-                        ""),
-                true);
-
-        Table table = catalog.getTable(identifier);
-
-        // Write test data
-        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
-        BatchTableWrite write = writeBuilder.newWrite();
-        write.write(
-                GenericRow.of(
-                        BinaryString.fromString("hello"),
-                        BinaryString.fromString("world"),
-                        100,
-                        BinaryString.fromString("test"),
-                        BinaryString.fromString("data")));
-        write.write(
-                GenericRow.of(
-                        BinaryString.fromString("foo"),
-                        BinaryString.fromString("bar"),
-                        200,
-                        BinaryString.fromString("example"),
-                        BinaryString.fromString("value")));
-        List<CommitMessage> messages = write.prepareCommit();
-        BatchTableCommit commit = writeBuilder.newCommit();
-        commit.commit(messages);
-        write.close();
-        commit.close();
-
-        // Set up column masking with various transform types
-        Map<String, Transform> columnMasking = new HashMap<>();
-
-        // Test 1: ConcatTransform - mask col1 with "****"
-        ConcatTransform concatTransform =
-                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("****")));
-        columnMasking.put("col1", concatTransform);
-
-        // Test 2: UpperTransform - convert col2 to uppercase
-        UpperTransform upperTransform =
-                new UpperTransform(
-                        Collections.singletonList(new FieldRef(1, "col2", 
DataTypes.STRING())));
-        columnMasking.put("col2", upperTransform);
-
-        // Test 3: CastTransform - cast col3 (INT) to STRING
-        CastTransform castTransform =
-                new CastTransform(new FieldRef(2, "col3", DataTypes.INT()), 
DataTypes.STRING());
-        columnMasking.put("col3", castTransform);
-
-        // Test 4: ConcatWsTransform - concatenate col4 with separator
-        ConcatWsTransform concatWsTransform =
-                new ConcatWsTransform(
-                        java.util.Arrays.asList(
-                                BinaryString.fromString("-"),
-                                BinaryString.fromString("prefix"),
-                                new FieldRef(3, "col4", DataTypes.STRING())));
-        columnMasking.put("col4", concatWsTransform);
-
-        // col5 is intentionally not masked to verify unmasked columns work 
correctly
-
-        restCatalogServer.setColumnMaskingAuth(identifier, columnMasking);
-
-        // Read and verify masked data
-        ReadBuilder readBuilder = table.newReadBuilder();
-        List<Split> splits = readBuilder.newScan().plan().splits();
-        TableRead read = readBuilder.newRead();
-        RecordReader<InternalRow> reader = read.createReader(splits);
-
-        List<InternalRow> rows = new ArrayList<>();
-        reader.forEachRemaining(rows::add);
-
-        assertThat(rows).hasSize(2);
-
-        // Verify first row
-        InternalRow row1 = rows.get(0);
-        assertThat(row1.getString(0).toString())
-                .isEqualTo("****"); // col1 masked with ConcatTransform
-        assertThat(row1.getString(1).toString())
-                .isEqualTo("WORLD"); // col2 masked with UpperTransform
-        assertThat(row1.getString(2).toString())
-                .isEqualTo("100"); // col3 masked with CastTransform 
(INT->STRING)
-        assertThat(row1.getString(3).toString())
-                .isEqualTo("prefix-test"); // col4 masked with 
ConcatWsTransform
-        assertThat(row1.getString(4).toString())
-                .isEqualTo("data"); // col5 NOT masked - original value
-
-        // Verify second row
-        InternalRow row2 = rows.get(1);
-        assertThat(row2.getString(0).toString())
-                .isEqualTo("****"); // col1 masked with ConcatTransform
-        assertThat(row2.getString(1).toString())
-                .isEqualTo("BAR"); // col2 masked with UpperTransform
-        assertThat(row2.getString(2).toString())
-                .isEqualTo("200"); // col3 masked with CastTransform 
(INT->STRING)
-        assertThat(row2.getString(3).toString())
-                .isEqualTo("prefix-example"); // col4 masked with 
ConcatWsTransform
-        assertThat(row2.getString(4).toString())
-                .isEqualTo("value"); // col5 NOT masked - original value
-    }
-
-    @Test
-    void testRowFilter() throws Exception {
-        Identifier identifier = Identifier.create("test_table_db", 
"auth_table_filter");
-        catalog.createDatabase(identifier.getDatabaseName(), true);
-
-        // Create table with multiple data types
-        List<DataField> fields = new ArrayList<>();
-        fields.add(new DataField(0, "id", DataTypes.INT()));
-        fields.add(new DataField(1, "name", DataTypes.STRING()));
-        fields.add(new DataField(2, "age", DataTypes.BIGINT()));
-        fields.add(new DataField(3, "salary", DataTypes.DOUBLE()));
-        fields.add(new DataField(4, "is_active", DataTypes.BOOLEAN()));
-        fields.add(new DataField(5, "score", DataTypes.FLOAT()));
-
-        catalog.createTable(
-                identifier,
-                new Schema(
-                        fields,
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        Collections.singletonMap(QUERY_AUTH_ENABLED.key(), 
"true"),
-                        ""),
-                true);
-
-        Table table = catalog.getTable(identifier);
-
-        // Write test data with various types
-        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
-        BatchTableWrite write = writeBuilder.newWrite();
-        write.write(GenericRow.of(1, BinaryString.fromString("Alice"), 25L, 
50000.0, true, 85.5f));
-        write.write(GenericRow.of(2, BinaryString.fromString("Bob"), 30L, 
60000.0, false, 90.0f));
-        write.write(
-                GenericRow.of(3, BinaryString.fromString("Charlie"), 35L, 
70000.0, true, 95.5f));
-        write.write(GenericRow.of(4, BinaryString.fromString("David"), 28L, 
55000.0, true, 88.0f));
-        List<CommitMessage> messages = write.prepareCommit();
-        BatchTableCommit commit = writeBuilder.newCommit();
-        commit.commit(messages);
-        write.close();
-        commit.close();
-
-        // Test 1: Filter by INT type (id > 2)
-        LeafPredicate intFilterPredicate =
-                LeafPredicate.of(
-                        new FieldTransform(new FieldRef(0, "id", 
DataTypes.INT())),
-                        GreaterThan.INSTANCE,
-                        Collections.singletonList(2));
-        restCatalogServer.setRowFilterAuth(
-                identifier, Collections.singletonList(intFilterPredicate));
-
-        List<String> result1 = batchRead(table);
-        assertThat(result1).hasSize(2);
-        assertThat(result1)
-                .contains(
-                        "+I[3, Charlie, 35, 70000.0, true, 95.5]",
-                        "+I[4, David, 28, 55000.0, true, 88.0]");
-
-        // Test 2: Filter by BIGINT type (age >= 30)
-        LeafPredicate bigintFilterPredicate =
-                LeafPredicate.of(
-                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.BIGINT())),
-                        GreaterOrEqual.INSTANCE,
-                        Collections.singletonList(30L));
-        restCatalogServer.setRowFilterAuth(
-                identifier, Collections.singletonList(bigintFilterPredicate));
-
-        List<String> result2 = batchRead(table);
-        assertThat(result2).hasSize(2);
-        assertThat(result2)
-                .contains(
-                        "+I[2, Bob, 30, 60000.0, false, 90.0]",
-                        "+I[3, Charlie, 35, 70000.0, true, 95.5]");
-
-        // Test 3: Filter by DOUBLE type (salary > 55000.0)
-        LeafPredicate doubleFilterPredicate =
-                LeafPredicate.of(
-                        new FieldTransform(new FieldRef(3, "salary", 
DataTypes.DOUBLE())),
-                        GreaterThan.INSTANCE,
-                        Collections.singletonList(55000.0));
-        restCatalogServer.setRowFilterAuth(
-                identifier, Collections.singletonList(doubleFilterPredicate));
-
-        List<String> result3 = batchRead(table);
-        assertThat(result3).hasSize(2);
-        assertThat(result3)
-                .contains(
-                        "+I[2, Bob, 30, 60000.0, false, 90.0]",
-                        "+I[3, Charlie, 35, 70000.0, true, 95.5]");
-
-        // Test 4: Filter by BOOLEAN type (is_active = true)
-        LeafPredicate booleanFilterPredicate =
-                LeafPredicate.of(
-                        new FieldTransform(new FieldRef(4, "is_active", 
DataTypes.BOOLEAN())),
-                        Equal.INSTANCE,
-                        Collections.singletonList(true));
-        restCatalogServer.setRowFilterAuth(
-                identifier, Collections.singletonList(booleanFilterPredicate));
-
-        List<String> result4 = batchRead(table);
-        assertThat(result4).hasSize(3);
-        assertThat(result4)
-                .contains(
-                        "+I[1, Alice, 25, 50000.0, true, 85.5]",
-                        "+I[3, Charlie, 35, 70000.0, true, 95.5]",
-                        "+I[4, David, 28, 55000.0, true, 88.0]");
-
-        // Test 5: Filter by FLOAT type (score >= 90.0)
-        LeafPredicate floatFilterPredicate =
-                LeafPredicate.of(
-                        new FieldTransform(new FieldRef(5, "score", 
DataTypes.FLOAT())),
-                        GreaterOrEqual.INSTANCE,
-                        Collections.singletonList(90.0f));
-        restCatalogServer.setRowFilterAuth(
-                identifier, Collections.singletonList(floatFilterPredicate));
-
-        List<String> result5 = batchRead(table);
-        assertThat(result5).hasSize(2);
-        assertThat(result5)
-                .contains(
-                        "+I[2, Bob, 30, 60000.0, false, 90.0]",
-                        "+I[3, Charlie, 35, 70000.0, true, 95.5]");
-
-        // Test 6: Filter by STRING type (name = "Alice")
-        LeafPredicate stringFilterPredicate =
-                LeafPredicate.of(
-                        new FieldTransform(new FieldRef(1, "name", 
DataTypes.STRING())),
-                        Equal.INSTANCE,
-                        
Collections.singletonList(BinaryString.fromString("Alice")));
-        restCatalogServer.setRowFilterAuth(
-                identifier, Collections.singletonList(stringFilterPredicate));
-
-        List<String> result6 = batchRead(table);
-        assertThat(result6).hasSize(1);
-        assertThat(result6).contains("+I[1, Alice, 25, 50000.0, true, 85.5]");
-
-        // Test 7: Filter with two predicates (age >= 30 AND is_active = true)
-        LeafPredicate ageGe30Predicate =
-                LeafPredicate.of(
-                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.BIGINT())),
-                        GreaterOrEqual.INSTANCE,
-                        Collections.singletonList(30L));
-        LeafPredicate isActiveTruePredicate =
-                LeafPredicate.of(
-                        new FieldTransform(new FieldRef(4, "is_active", 
DataTypes.BOOLEAN())),
-                        Equal.INSTANCE,
-                        Collections.singletonList(true));
-        restCatalogServer.setRowFilterAuth(
-                identifier, Arrays.asList(ageGe30Predicate, 
isActiveTruePredicate));
-
-        List<String> result7 = batchRead(table);
-        assertThat(result7).hasSize(1);
-        assertThat(result7).contains("+I[3, Charlie, 35, 70000.0, true, 
95.5]");
-
-        // Test 8: Filter with two predicates (salary > 55000.0 AND score >= 
90.0)
-        LeafPredicate salaryGt55000Predicate =
-                LeafPredicate.of(
-                        new FieldTransform(new FieldRef(3, "salary", 
DataTypes.DOUBLE())),
-                        GreaterThan.INSTANCE,
-                        Collections.singletonList(55000.0));
-        LeafPredicate scoreGe90Predicate =
-                LeafPredicate.of(
-                        new FieldTransform(new FieldRef(5, "score", 
DataTypes.FLOAT())),
-                        GreaterOrEqual.INSTANCE,
-                        Collections.singletonList(90.0f));
-        restCatalogServer.setRowFilterAuth(
-                identifier, Arrays.asList(salaryGt55000Predicate, 
scoreGe90Predicate));
-
-        List<String> result8 = batchRead(table);
-        assertThat(result8).hasSize(2);
-        assertThat(result8)
-                .contains(
-                        "+I[2, Bob, 30, 60000.0, false, 90.0]",
-                        "+I[3, Charlie, 35, 70000.0, true, 95.5]");
-    }
-
-    @Test
-    void testColumnMaskingAndRowFilter() throws Exception {
-        Identifier identifier = Identifier.create("test_table_db", 
"combined_auth_table");
-        catalog.createDatabase(identifier.getDatabaseName(), true);
-
-        // Create table with test data
-        List<DataField> fields = new ArrayList<>();
-        fields.add(new DataField(0, "id", DataTypes.INT()));
-        fields.add(new DataField(1, "name", DataTypes.STRING()));
-        fields.add(new DataField(2, "salary", DataTypes.STRING()));
-        fields.add(new DataField(3, "age", DataTypes.INT()));
-        fields.add(new DataField(4, "department", DataTypes.STRING()));
-
-        catalog.createTable(
-                identifier,
-                new Schema(
-                        fields,
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        Collections.singletonMap(QUERY_AUTH_ENABLED.key(), 
"true"),
-                        ""),
-                true);
-
-        Table table = catalog.getTable(identifier);
-
-        // Write test data
-        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
-        BatchTableWrite write = writeBuilder.newWrite();
-        write.write(
-                GenericRow.of(
-                        1,
-                        BinaryString.fromString("Alice"),
-                        BinaryString.fromString("50000.0"),
-                        25,
-                        BinaryString.fromString("IT")));
-        write.write(
-                GenericRow.of(
-                        2,
-                        BinaryString.fromString("Bob"),
-                        BinaryString.fromString("60000.0"),
-                        30,
-                        BinaryString.fromString("HR")));
-        write.write(
-                GenericRow.of(
-                        3,
-                        BinaryString.fromString("Charlie"),
-                        BinaryString.fromString("70000.0"),
-                        35,
-                        BinaryString.fromString("IT")));
-        write.write(
-                GenericRow.of(
-                        4,
-                        BinaryString.fromString("David"),
-                        BinaryString.fromString("55000.0"),
-                        28,
-                        BinaryString.fromString("Finance")));
-        List<CommitMessage> messages = write.prepareCommit();
-        BatchTableCommit commit = writeBuilder.newCommit();
-        commit.commit(messages);
-        write.close();
-        commit.close();
-
-        // Test column masking only
-        Transform salaryMaskTransform =
-                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("***")));
-        Map<String, Transform> columnMasking = new HashMap<>();
-        columnMasking.put("salary", salaryMaskTransform);
-        restCatalogServer.setColumnMaskingAuth(identifier, columnMasking);
-
-        ReadBuilder readBuilder = table.newReadBuilder();
-        List<Split> splits = readBuilder.newScan().plan().splits();
-        TableRead read = readBuilder.newRead();
-        RecordReader<InternalRow> reader = read.createReader(splits);
-
-        List<InternalRow> rows = new ArrayList<>();
-        reader.forEachRemaining(rows::add);
-        assertThat(rows).hasSize(4);
-        assertThat(rows.get(0).getString(2).toString()).isEqualTo("***");
-
-        // Test row filter only (clear column masking first)
-        restCatalogServer.setColumnMaskingAuth(identifier, new HashMap<>());
-        Predicate ageGe30Predicate =
-                LeafPredicate.of(
-                        new FieldTransform(new FieldRef(3, "age", 
DataTypes.INT())),
-                        GreaterOrEqual.INSTANCE,
-                        Collections.singletonList(30));
-        restCatalogServer.setRowFilterAuth(identifier, 
Collections.singletonList(ageGe30Predicate));
-
-        readBuilder = table.newReadBuilder();
-        splits = readBuilder.newScan().plan().splits();
-        read = readBuilder.newRead();
-        reader = read.createReader(splits);
-
-        rows = new ArrayList<>();
-        reader.forEachRemaining(rows::add);
-        assertThat(rows).hasSize(2);
-
-        // Test both column masking and row filter together
-        columnMasking.put("salary", salaryMaskTransform);
-        Transform nameMaskTransform =
-                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("***")));
-        columnMasking.put("name", nameMaskTransform);
-        restCatalogServer.setColumnMaskingAuth(identifier, columnMasking);
-        Predicate deptPredicate =
-                LeafPredicate.of(
-                        new FieldTransform(new FieldRef(4, "department", 
DataTypes.STRING())),
-                        Equal.INSTANCE,
-                        
Collections.singletonList(BinaryString.fromString("IT")));
-        restCatalogServer.setRowFilterAuth(identifier, 
Collections.singletonList(deptPredicate));
-
-        readBuilder = table.newReadBuilder();
-        splits = readBuilder.newScan().plan().splits();
-        read = readBuilder.newRead();
-        reader = read.createReader(splits);
-
-        rows = new ArrayList<>();
-        reader.forEachRemaining(rows::add);
-        assertThat(rows).hasSize(2);
-        assertThat(rows.get(0).getString(1).toString()).isEqualTo("***"); // 
name masked
-        assertThat(rows.get(0).getString(2).toString()).isEqualTo("***"); // 
salary masked
-        assertThat(rows.get(0).getString(4).toString()).isEqualTo("IT"); // 
department not masked
-
-        // Test complex scenario: row filter + column masking combined
-        Predicate combinedPredicate = PredicateBuilder.and(ageGe30Predicate, 
deptPredicate);
-        restCatalogServer.setRowFilterAuth(
-                identifier, Collections.singletonList(combinedPredicate));
-
-        readBuilder = table.newReadBuilder();
-        splits = readBuilder.newScan().plan().splits();
-        read = readBuilder.newRead();
-        reader = read.createReader(splits);
-
-        rows = new ArrayList<>();
-        reader.forEachRemaining(rows::add);
-        assertThat(rows).hasSize(1);
-        assertThat(rows.get(0).getInt(0)).isEqualTo(3); // id
-        assertThat(rows.get(0).getString(1).toString()).isEqualTo("***"); // 
name masked
-        assertThat(rows.get(0).getString(2).toString()).isEqualTo("***"); // 
salary masked
-        assertThat(rows.get(0).getInt(3)).isEqualTo(35); // age not masked
-
-        // Clear both column masking and row filter
-        restCatalogServer.setColumnMaskingAuth(identifier, new HashMap<>());
-        restCatalogServer.setRowFilterAuth(identifier, null);
-
-        readBuilder = table.newReadBuilder();
-        splits = readBuilder.newScan().plan().splits();
-        read = readBuilder.newRead();
-        reader = read.createReader(splits);
-
-        rows = new ArrayList<>();
-        reader.forEachRemaining(rows::add);
-        assertThat(rows).hasSize(4);
-        assertThat(rows.get(0).getString(1).toString()).isIn("Alice", "Bob", 
"Charlie", "David");
-    }
-
     private void checkHeader(String headerName, String headerValue) {
         // Verify that the header were included in the requests
         List<Map<String, String>> receivedHeaders = 
restCatalogServer.getReceivedHeaders();
@@ -855,6 +384,16 @@ class MockRESTCatalogTest extends RESTCatalogTest {
                 lastFileCreationTime);
     }
 
+    @Override
+    protected void setColumnMasking(Identifier identifier, Map<String, 
Transform> columnMasking) {
+        restCatalogServer.setColumnMaskingAuth(identifier, columnMasking);
+    }
+
+    @Override
+    protected void setRowFilter(Identifier identifier, List<Predicate> 
rowFilters) {
+        restCatalogServer.setRowFilterAuth(identifier, rowFilters);
+    }
+
     private RESTCatalog initCatalog(boolean enableDataToken) throws 
IOException {
         return initCatalogUtil(enableDataToken, Collections.emptyMap(), null, 
null);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index bc0a74bf72..300a94287e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -40,6 +40,19 @@ import org.apache.paimon.function.FunctionDefinition;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.partition.PartitionStatistics;
+import org.apache.paimon.predicate.CastTransform;
+import org.apache.paimon.predicate.ConcatTransform;
+import org.apache.paimon.predicate.ConcatWsTransform;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.FieldTransform;
+import org.apache.paimon.predicate.GreaterOrEqual;
+import org.apache.paimon.predicate.GreaterThan;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.Transform;
+import org.apache.paimon.predicate.UpperTransform;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.rest.auth.DLFToken;
 import org.apache.paimon.rest.exceptions.BadRequestException;
@@ -2969,6 +2982,444 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
         DEFAULT_TABLE_SCHEMA.options().remove(CoreOptions.PATH.key());
     }
 
+    @Test
+    void testColumnMaskingApplyOnRead() throws Exception {
+        Identifier identifier = Identifier.create("test_table_db", 
"auth_table_masking_apply");
+        catalog.createDatabase(identifier.getDatabaseName(), true);
+
+        // Create table with multiple columns of different types
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "col1", DataTypes.STRING()));
+        fields.add(new DataField(1, "col2", DataTypes.STRING()));
+        fields.add(new DataField(2, "col3", DataTypes.INT()));
+        fields.add(new DataField(3, "col4", DataTypes.STRING()));
+        fields.add(new DataField(4, "col5", DataTypes.STRING()));
+
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        fields,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.singletonMap(QUERY_AUTH_ENABLED.key(), 
"true"),
+                        ""),
+                true);
+
+        Table table = catalog.getTable(identifier);
+
+        // Write test data
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        BatchTableWrite write = writeBuilder.newWrite();
+        write.write(
+                GenericRow.of(
+                        BinaryString.fromString("hello"),
+                        BinaryString.fromString("world"),
+                        100,
+                        BinaryString.fromString("test"),
+                        BinaryString.fromString("data")));
+        write.write(
+                GenericRow.of(
+                        BinaryString.fromString("foo"),
+                        BinaryString.fromString("bar"),
+                        200,
+                        BinaryString.fromString("example"),
+                        BinaryString.fromString("value")));
+        List<CommitMessage> messages = write.prepareCommit();
+        BatchTableCommit commit = writeBuilder.newCommit();
+        commit.commit(messages);
+        write.close();
+        commit.close();
+
+        // Set up column masking with various transform types
+        Map<String, Transform> columnMasking = new HashMap<>();
+
+        // Test 1: ConcatTransform - mask col1 with "****"
+        ConcatTransform concatTransform =
+                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("****")));
+        columnMasking.put("col1", concatTransform);
+
+        // Test 2: UpperTransform - convert col2 to uppercase
+        UpperTransform upperTransform =
+                new UpperTransform(
+                        Collections.singletonList(new FieldRef(1, "col2", 
DataTypes.STRING())));
+        columnMasking.put("col2", upperTransform);
+
+        // Test 3: CastTransform - cast col3 (INT) to STRING
+        CastTransform castTransform =
+                new CastTransform(new FieldRef(2, "col3", DataTypes.INT()), 
DataTypes.STRING());
+        columnMasking.put("col3", castTransform);
+
+        // Test 4: ConcatWsTransform - concatenate col4 with separator
+        ConcatWsTransform concatWsTransform =
+                new ConcatWsTransform(
+                        java.util.Arrays.asList(
+                                BinaryString.fromString("-"),
+                                BinaryString.fromString("prefix"),
+                                new FieldRef(3, "col4", DataTypes.STRING())));
+        columnMasking.put("col4", concatWsTransform);
+
+        // col5 is intentionally not masked to verify unmasked columns work 
correctly
+
+        setColumnMasking(identifier, columnMasking);
+
+        // Read and verify masked data
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        TableRead read = readBuilder.newRead();
+        RecordReader<InternalRow> reader = read.createReader(splits);
+
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+
+        assertThat(rows).hasSize(2);
+
+        // Verify first row
+        InternalRow row1 = rows.get(0);
+        assertThat(row1.getString(0).toString())
+                .isEqualTo("****"); // col1 masked with ConcatTransform
+        assertThat(row1.getString(1).toString())
+                .isEqualTo("WORLD"); // col2 masked with UpperTransform
+        assertThat(row1.getString(2).toString())
+                .isEqualTo("100"); // col3 masked with CastTransform 
(INT->STRING)
+        assertThat(row1.getString(3).toString())
+                .isEqualTo("prefix-test"); // col4 masked with 
ConcatWsTransform
+        assertThat(row1.getString(4).toString())
+                .isEqualTo("data"); // col5 NOT masked - original value
+
+        // Verify second row
+        InternalRow row2 = rows.get(1);
+        assertThat(row2.getString(0).toString())
+                .isEqualTo("****"); // col1 masked with ConcatTransform
+        assertThat(row2.getString(1).toString())
+                .isEqualTo("BAR"); // col2 masked with UpperTransform
+        assertThat(row2.getString(2).toString())
+                .isEqualTo("200"); // col3 masked with CastTransform 
(INT->STRING)
+        assertThat(row2.getString(3).toString())
+                .isEqualTo("prefix-example"); // col4 masked with 
ConcatWsTransform
+        assertThat(row2.getString(4).toString())
+                .isEqualTo("value"); // col5 NOT masked - original value
+    }
+
+    @Test
+    void testRowFilter() throws Exception {
+        Identifier identifier = Identifier.create("test_table_db", 
"auth_table_filter");
+        catalog.createDatabase(identifier.getDatabaseName(), true);
+
+        // Create table with multiple data types
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "id", DataTypes.INT()));
+        fields.add(new DataField(1, "name", DataTypes.STRING()));
+        fields.add(new DataField(2, "age", DataTypes.BIGINT()));
+        fields.add(new DataField(3, "salary", DataTypes.DOUBLE()));
+        fields.add(new DataField(4, "is_active", DataTypes.BOOLEAN()));
+        fields.add(new DataField(5, "score", DataTypes.FLOAT()));
+
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        fields,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.singletonMap(QUERY_AUTH_ENABLED.key(), 
"true"),
+                        ""),
+                true);
+
+        Table table = catalog.getTable(identifier);
+
+        // Write test data with various types
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        BatchTableWrite write = writeBuilder.newWrite();
+        write.write(GenericRow.of(1, BinaryString.fromString("Alice"), 25L, 
50000.0, true, 85.5f));
+        write.write(GenericRow.of(2, BinaryString.fromString("Bob"), 30L, 
60000.0, false, 90.0f));
+        write.write(
+                GenericRow.of(3, BinaryString.fromString("Charlie"), 35L, 
70000.0, true, 95.5f));
+        write.write(GenericRow.of(4, BinaryString.fromString("David"), 28L, 
55000.0, true, 88.0f));
+        List<CommitMessage> messages = write.prepareCommit();
+        BatchTableCommit commit = writeBuilder.newCommit();
+        commit.commit(messages);
+        write.close();
+        commit.close();
+
+        // Test 1: Filter by INT type (id > 2)
+        LeafPredicate intFilterPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(0, "id", 
DataTypes.INT())),
+                        GreaterThan.INSTANCE,
+                        Collections.singletonList(2));
+        setRowFilter(identifier, 
Collections.singletonList(intFilterPredicate));
+
+        List<String> result1 = batchRead(table);
+        assertThat(result1).hasSize(2);
+        assertThat(result1)
+                .contains(
+                        "+I[3, Charlie, 35, 70000.0, true, 95.5]",
+                        "+I[4, David, 28, 55000.0, true, 88.0]");
+
+        // Test 2: Filter by BIGINT type (age >= 30)
+        LeafPredicate bigintFilterPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.BIGINT())),
+                        GreaterOrEqual.INSTANCE,
+                        Collections.singletonList(30L));
+        setRowFilter(identifier, 
Collections.singletonList(bigintFilterPredicate));
+
+        List<String> result2 = batchRead(table);
+        assertThat(result2).hasSize(2);
+        assertThat(result2)
+                .contains(
+                        "+I[2, Bob, 30, 60000.0, false, 90.0]",
+                        "+I[3, Charlie, 35, 70000.0, true, 95.5]");
+
+        // Test 3: Filter by DOUBLE type (salary > 55000.0)
+        LeafPredicate doubleFilterPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(3, "salary", 
DataTypes.DOUBLE())),
+                        GreaterThan.INSTANCE,
+                        Collections.singletonList(55000.0));
+        setRowFilter(identifier, 
Collections.singletonList(doubleFilterPredicate));
+
+        List<String> result3 = batchRead(table);
+        assertThat(result3).hasSize(2);
+        assertThat(result3)
+                .contains(
+                        "+I[2, Bob, 30, 60000.0, false, 90.0]",
+                        "+I[3, Charlie, 35, 70000.0, true, 95.5]");
+
+        // Test 4: Filter by BOOLEAN type (is_active = true)
+        LeafPredicate booleanFilterPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(4, "is_active", 
DataTypes.BOOLEAN())),
+                        Equal.INSTANCE,
+                        Collections.singletonList(true));
+        setRowFilter(identifier, 
Collections.singletonList(booleanFilterPredicate));
+
+        List<String> result4 = batchRead(table);
+        assertThat(result4).hasSize(3);
+        assertThat(result4)
+                .contains(
+                        "+I[1, Alice, 25, 50000.0, true, 85.5]",
+                        "+I[3, Charlie, 35, 70000.0, true, 95.5]",
+                        "+I[4, David, 28, 55000.0, true, 88.0]");
+
+        // Test 5: Filter by FLOAT type (score >= 90.0)
+        LeafPredicate floatFilterPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(5, "score", 
DataTypes.FLOAT())),
+                        GreaterOrEqual.INSTANCE,
+                        Collections.singletonList(90.0f));
+        setRowFilter(identifier, 
Collections.singletonList(floatFilterPredicate));
+
+        List<String> result5 = batchRead(table);
+        assertThat(result5).hasSize(2);
+        assertThat(result5)
+                .contains(
+                        "+I[2, Bob, 30, 60000.0, false, 90.0]",
+                        "+I[3, Charlie, 35, 70000.0, true, 95.5]");
+
+        // Test 6: Filter by STRING type (name = "Alice")
+        LeafPredicate stringFilterPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(1, "name", 
DataTypes.STRING())),
+                        Equal.INSTANCE,
+                        
Collections.singletonList(BinaryString.fromString("Alice")));
+        setRowFilter(identifier, 
Collections.singletonList(stringFilterPredicate));
+
+        List<String> result6 = batchRead(table);
+        assertThat(result6).hasSize(1);
+        assertThat(result6).contains("+I[1, Alice, 25, 50000.0, true, 85.5]");
+
+        // Test 7: Filter with two predicates (age >= 30 AND is_active = true)
+        LeafPredicate ageGe30Predicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(2, "age", 
DataTypes.BIGINT())),
+                        GreaterOrEqual.INSTANCE,
+                        Collections.singletonList(30L));
+        LeafPredicate isActiveTruePredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(4, "is_active", 
DataTypes.BOOLEAN())),
+                        Equal.INSTANCE,
+                        Collections.singletonList(true));
+        setRowFilter(identifier, Arrays.asList(ageGe30Predicate, 
isActiveTruePredicate));
+
+        List<String> result7 = batchRead(table);
+        assertThat(result7).hasSize(1);
+        assertThat(result7).contains("+I[3, Charlie, 35, 70000.0, true, 
95.5]");
+
+        // Test 8: Filter with two predicates (salary > 55000.0 AND score >= 
90.0)
+        LeafPredicate salaryGt55000Predicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(3, "salary", 
DataTypes.DOUBLE())),
+                        GreaterThan.INSTANCE,
+                        Collections.singletonList(55000.0));
+        LeafPredicate scoreGe90Predicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(5, "score", 
DataTypes.FLOAT())),
+                        GreaterOrEqual.INSTANCE,
+                        Collections.singletonList(90.0f));
+        setRowFilter(identifier, Arrays.asList(salaryGt55000Predicate, 
scoreGe90Predicate));
+
+        List<String> result8 = batchRead(table);
+        assertThat(result8).hasSize(2);
+        assertThat(result8)
+                .contains(
+                        "+I[2, Bob, 30, 60000.0, false, 90.0]",
+                        "+I[3, Charlie, 35, 70000.0, true, 95.5]");
+    }
+
+    @Test
+    void testColumnMaskingAndRowFilter() throws Exception {
+        Identifier identifier = Identifier.create("test_table_db", 
"combined_auth_table");
+        catalog.createDatabase(identifier.getDatabaseName(), true);
+
+        // Create table with test data
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "id", DataTypes.INT()));
+        fields.add(new DataField(1, "name", DataTypes.STRING()));
+        fields.add(new DataField(2, "salary", DataTypes.STRING()));
+        fields.add(new DataField(3, "age", DataTypes.INT()));
+        fields.add(new DataField(4, "department", DataTypes.STRING()));
+
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        fields,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.singletonMap(QUERY_AUTH_ENABLED.key(), 
"true"),
+                        ""),
+                true);
+
+        Table table = catalog.getTable(identifier);
+
+        // Write test data
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        BatchTableWrite write = writeBuilder.newWrite();
+        write.write(
+                GenericRow.of(
+                        1,
+                        BinaryString.fromString("Alice"),
+                        BinaryString.fromString("50000.0"),
+                        25,
+                        BinaryString.fromString("IT")));
+        write.write(
+                GenericRow.of(
+                        2,
+                        BinaryString.fromString("Bob"),
+                        BinaryString.fromString("60000.0"),
+                        30,
+                        BinaryString.fromString("HR")));
+        write.write(
+                GenericRow.of(
+                        3,
+                        BinaryString.fromString("Charlie"),
+                        BinaryString.fromString("70000.0"),
+                        35,
+                        BinaryString.fromString("IT")));
+        write.write(
+                GenericRow.of(
+                        4,
+                        BinaryString.fromString("David"),
+                        BinaryString.fromString("55000.0"),
+                        28,
+                        BinaryString.fromString("Finance")));
+        List<CommitMessage> messages = write.prepareCommit();
+        BatchTableCommit commit = writeBuilder.newCommit();
+        commit.commit(messages);
+        write.close();
+        commit.close();
+
+        // Test column masking only
+        Transform salaryMaskTransform =
+                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("***")));
+        Map<String, Transform> columnMasking = new HashMap<>();
+        columnMasking.put("salary", salaryMaskTransform);
+        setColumnMasking(identifier, columnMasking);
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        TableRead read = readBuilder.newRead();
+        RecordReader<InternalRow> reader = read.createReader(splits);
+
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+        assertThat(rows).hasSize(4);
+        assertThat(rows.get(0).getString(2).toString()).isEqualTo("***");
+
+        // Test row filter only (clear column masking first)
+        setColumnMasking(identifier, new HashMap<>());
+        Predicate ageGe30Predicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(3, "age", 
DataTypes.INT())),
+                        GreaterOrEqual.INSTANCE,
+                        Collections.singletonList(30));
+        setRowFilter(identifier, Collections.singletonList(ageGe30Predicate));
+
+        readBuilder = table.newReadBuilder();
+        splits = readBuilder.newScan().plan().splits();
+        read = readBuilder.newRead();
+        reader = read.createReader(splits);
+
+        rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+        assertThat(rows).hasSize(2);
+
+        // Test both column masking and row filter together
+        columnMasking.put("salary", salaryMaskTransform);
+        Transform nameMaskTransform =
+                new 
ConcatTransform(Collections.singletonList(BinaryString.fromString("***")));
+        columnMasking.put("name", nameMaskTransform);
+        setColumnMasking(identifier, columnMasking);
+        Predicate deptPredicate =
+                LeafPredicate.of(
+                        new FieldTransform(new FieldRef(4, "department", 
DataTypes.STRING())),
+                        Equal.INSTANCE,
+                        
Collections.singletonList(BinaryString.fromString("IT")));
+        setRowFilter(identifier, Collections.singletonList(deptPredicate));
+
+        readBuilder = table.newReadBuilder();
+        splits = readBuilder.newScan().plan().splits();
+        read = readBuilder.newRead();
+        reader = read.createReader(splits);
+
+        rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+        assertThat(rows).hasSize(2);
+        assertThat(rows.get(0).getString(1).toString()).isEqualTo("***"); // 
name masked
+        assertThat(rows.get(0).getString(2).toString()).isEqualTo("***"); // 
salary masked
+        assertThat(rows.get(0).getString(4).toString()).isEqualTo("IT"); // 
department not masked
+
+        // Test complex scenario: row filter + column masking combined
+        Predicate combinedPredicate = PredicateBuilder.and(ageGe30Predicate, 
deptPredicate);
+        setRowFilter(identifier, Collections.singletonList(combinedPredicate));
+
+        readBuilder = table.newReadBuilder();
+        splits = readBuilder.newScan().plan().splits();
+        read = readBuilder.newRead();
+        reader = read.createReader(splits);
+
+        rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+        assertThat(rows).hasSize(1);
+        assertThat(rows.get(0).getInt(0)).isEqualTo(3); // id
+        assertThat(rows.get(0).getString(1).toString()).isEqualTo("***"); // 
name masked
+        assertThat(rows.get(0).getString(2).toString()).isEqualTo("***"); // 
salary masked
+        assertThat(rows.get(0).getInt(3)).isEqualTo(35); // age not masked
+
+        // Clear both column masking and row filter
+        setColumnMasking(identifier, new HashMap<>());
+        setRowFilter(identifier, null);
+
+        readBuilder = table.newReadBuilder();
+        splits = readBuilder.newScan().plan().splits();
+        read = readBuilder.newRead();
+        reader = read.createReader(splits);
+
+        rows = new ArrayList<>();
+        reader.forEachRemaining(rows::add);
+        assertThat(rows).hasSize(4);
+        assertThat(rows.get(0).getString(1).toString()).isIn("Alice", "Bob", 
"Charlie", "David");
+    }
+
     protected void createTable(
             Identifier identifier, Map<String, String> options, List<String> 
partitionKeys)
             throws Exception {
@@ -3010,6 +3461,11 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
             long fileCount,
             long lastFileCreationTime);
 
+    protected abstract void setColumnMasking(
+            Identifier identifier, Map<String, Transform> columnMasking);
+
+    protected abstract void setRowFilter(Identifier identifier, 
List<Predicate> rowFilter);
+
     protected void batchWrite(Table table, List<Integer> data) throws 
Exception {
         BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
         BatchTableWrite write = writeBuilder.newWrite();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index 5fa01a6bb7..6a0bed06a9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -219,6 +219,7 @@ class RESTCatalogITCase extends RESTCatalogITCaseBase {
         }
     }
 
+    @Test
     public void testColumnMasking() {
         String maskingTable = "column_masking_table";
         batchSql(

Reply via email to