This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6dde7851da [test] Move auth test cases to RESTCatalogTest (#7069)
6dde7851da is described below
commit 6dde7851dac3290e24cf012ff7742c387a49b36c
Author: Jiajia Li <[email protected]>
AuthorDate: Sat Jan 17 13:55:10 2026 +0800
[test] Move auth test cases to RESTCatalogTest (#7069)
---
.../apache/paimon/predicate/FieldTransform.java | 1 -
.../apache/paimon/rest/MockRESTCatalogTest.java | 481 +--------------------
.../org/apache/paimon/rest/RESTCatalogTest.java | 456 +++++++++++++++++++
.../org/apache/paimon/flink/RESTCatalogITCase.java | 1 +
4 files changed, 467 insertions(+), 472 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/FieldTransform.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/FieldTransform.java
index 744e789b50..5136dedec5 100644
---
a/paimon-common/src/main/java/org/apache/paimon/predicate/FieldTransform.java
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/FieldTransform.java
@@ -42,7 +42,6 @@ public class FieldTransform implements Transform {
private final FieldRef fieldRef;
public static final String FIELD_FIELD_REF = "fieldRef";
- public static final String FIELD_TYPE = "type";
@JsonCreator
public FieldTransform(@JsonProperty(FIELD_FIELD_REF) FieldRef fieldRef) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
index a5a84b2512..85e7a27e61 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
@@ -26,25 +26,13 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.TableQueryAuthResult;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.CastTransform;
-import org.apache.paimon.predicate.ConcatTransform;
-import org.apache.paimon.predicate.ConcatWsTransform;
-import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.FieldRef;
-import org.apache.paimon.predicate.FieldTransform;
-import org.apache.paimon.predicate.GreaterOrEqual;
-import org.apache.paimon.predicate.GreaterThan;
-import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.Transform;
import org.apache.paimon.predicate.UpperTransform;
-import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.rest.auth.AuthProvider;
import org.apache.paimon.rest.auth.AuthProviderEnum;
import org.apache.paimon.rest.auth.BearTokenAuthProvider;
@@ -55,15 +43,6 @@ import org.apache.paimon.rest.auth.RESTAuthParameter;
import org.apache.paimon.rest.exceptions.NotAuthorizedException;
import org.apache.paimon.rest.responses.ConfigResponse;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchTableCommit;
-import org.apache.paimon.table.sink.BatchTableWrite;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
@@ -76,15 +55,12 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import static org.apache.paimon.CoreOptions.QUERY_AUTH_ENABLED;
import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
import static org.apache.paimon.rest.RESTApi.HEADER_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
@@ -315,453 +291,6 @@ class MockRESTCatalogTest extends RESTCatalogTest {
catalog.dropDatabase(identifier.getDatabaseName(), true, true);
}
- @Test
- void testColumnMaskingApplyOnRead() throws Exception {
- Identifier identifier = Identifier.create("test_table_db",
"auth_table_masking_apply");
- catalog.createDatabase(identifier.getDatabaseName(), true);
-
- // Create table with multiple columns of different types
- List<DataField> fields = new ArrayList<>();
- fields.add(new DataField(0, "col1", DataTypes.STRING()));
- fields.add(new DataField(1, "col2", DataTypes.STRING()));
- fields.add(new DataField(2, "col3", DataTypes.INT()));
- fields.add(new DataField(3, "col4", DataTypes.STRING()));
- fields.add(new DataField(4, "col5", DataTypes.STRING()));
-
- catalog.createTable(
- identifier,
- new Schema(
- fields,
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.singletonMap(QUERY_AUTH_ENABLED.key(),
"true"),
- ""),
- true);
-
- Table table = catalog.getTable(identifier);
-
- // Write test data
- BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
- BatchTableWrite write = writeBuilder.newWrite();
- write.write(
- GenericRow.of(
- BinaryString.fromString("hello"),
- BinaryString.fromString("world"),
- 100,
- BinaryString.fromString("test"),
- BinaryString.fromString("data")));
- write.write(
- GenericRow.of(
- BinaryString.fromString("foo"),
- BinaryString.fromString("bar"),
- 200,
- BinaryString.fromString("example"),
- BinaryString.fromString("value")));
- List<CommitMessage> messages = write.prepareCommit();
- BatchTableCommit commit = writeBuilder.newCommit();
- commit.commit(messages);
- write.close();
- commit.close();
-
- // Set up column masking with various transform types
- Map<String, Transform> columnMasking = new HashMap<>();
-
- // Test 1: ConcatTransform - mask col1 with "****"
- ConcatTransform concatTransform =
- new
ConcatTransform(Collections.singletonList(BinaryString.fromString("****")));
- columnMasking.put("col1", concatTransform);
-
- // Test 2: UpperTransform - convert col2 to uppercase
- UpperTransform upperTransform =
- new UpperTransform(
- Collections.singletonList(new FieldRef(1, "col2",
DataTypes.STRING())));
- columnMasking.put("col2", upperTransform);
-
- // Test 3: CastTransform - cast col3 (INT) to STRING
- CastTransform castTransform =
- new CastTransform(new FieldRef(2, "col3", DataTypes.INT()),
DataTypes.STRING());
- columnMasking.put("col3", castTransform);
-
- // Test 4: ConcatWsTransform - concatenate col4 with separator
- ConcatWsTransform concatWsTransform =
- new ConcatWsTransform(
- java.util.Arrays.asList(
- BinaryString.fromString("-"),
- BinaryString.fromString("prefix"),
- new FieldRef(3, "col4", DataTypes.STRING())));
- columnMasking.put("col4", concatWsTransform);
-
- // col5 is intentionally not masked to verify unmasked columns work
correctly
-
- restCatalogServer.setColumnMaskingAuth(identifier, columnMasking);
-
- // Read and verify masked data
- ReadBuilder readBuilder = table.newReadBuilder();
- List<Split> splits = readBuilder.newScan().plan().splits();
- TableRead read = readBuilder.newRead();
- RecordReader<InternalRow> reader = read.createReader(splits);
-
- List<InternalRow> rows = new ArrayList<>();
- reader.forEachRemaining(rows::add);
-
- assertThat(rows).hasSize(2);
-
- // Verify first row
- InternalRow row1 = rows.get(0);
- assertThat(row1.getString(0).toString())
- .isEqualTo("****"); // col1 masked with ConcatTransform
- assertThat(row1.getString(1).toString())
- .isEqualTo("WORLD"); // col2 masked with UpperTransform
- assertThat(row1.getString(2).toString())
- .isEqualTo("100"); // col3 masked with CastTransform
(INT->STRING)
- assertThat(row1.getString(3).toString())
- .isEqualTo("prefix-test"); // col4 masked with
ConcatWsTransform
- assertThat(row1.getString(4).toString())
- .isEqualTo("data"); // col5 NOT masked - original value
-
- // Verify second row
- InternalRow row2 = rows.get(1);
- assertThat(row2.getString(0).toString())
- .isEqualTo("****"); // col1 masked with ConcatTransform
- assertThat(row2.getString(1).toString())
- .isEqualTo("BAR"); // col2 masked with UpperTransform
- assertThat(row2.getString(2).toString())
- .isEqualTo("200"); // col3 masked with CastTransform
(INT->STRING)
- assertThat(row2.getString(3).toString())
- .isEqualTo("prefix-example"); // col4 masked with
ConcatWsTransform
- assertThat(row2.getString(4).toString())
- .isEqualTo("value"); // col5 NOT masked - original value
- }
-
- @Test
- void testRowFilter() throws Exception {
- Identifier identifier = Identifier.create("test_table_db",
"auth_table_filter");
- catalog.createDatabase(identifier.getDatabaseName(), true);
-
- // Create table with multiple data types
- List<DataField> fields = new ArrayList<>();
- fields.add(new DataField(0, "id", DataTypes.INT()));
- fields.add(new DataField(1, "name", DataTypes.STRING()));
- fields.add(new DataField(2, "age", DataTypes.BIGINT()));
- fields.add(new DataField(3, "salary", DataTypes.DOUBLE()));
- fields.add(new DataField(4, "is_active", DataTypes.BOOLEAN()));
- fields.add(new DataField(5, "score", DataTypes.FLOAT()));
-
- catalog.createTable(
- identifier,
- new Schema(
- fields,
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.singletonMap(QUERY_AUTH_ENABLED.key(),
"true"),
- ""),
- true);
-
- Table table = catalog.getTable(identifier);
-
- // Write test data with various types
- BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
- BatchTableWrite write = writeBuilder.newWrite();
- write.write(GenericRow.of(1, BinaryString.fromString("Alice"), 25L,
50000.0, true, 85.5f));
- write.write(GenericRow.of(2, BinaryString.fromString("Bob"), 30L,
60000.0, false, 90.0f));
- write.write(
- GenericRow.of(3, BinaryString.fromString("Charlie"), 35L,
70000.0, true, 95.5f));
- write.write(GenericRow.of(4, BinaryString.fromString("David"), 28L,
55000.0, true, 88.0f));
- List<CommitMessage> messages = write.prepareCommit();
- BatchTableCommit commit = writeBuilder.newCommit();
- commit.commit(messages);
- write.close();
- commit.close();
-
- // Test 1: Filter by INT type (id > 2)
- LeafPredicate intFilterPredicate =
- LeafPredicate.of(
- new FieldTransform(new FieldRef(0, "id",
DataTypes.INT())),
- GreaterThan.INSTANCE,
- Collections.singletonList(2));
- restCatalogServer.setRowFilterAuth(
- identifier, Collections.singletonList(intFilterPredicate));
-
- List<String> result1 = batchRead(table);
- assertThat(result1).hasSize(2);
- assertThat(result1)
- .contains(
- "+I[3, Charlie, 35, 70000.0, true, 95.5]",
- "+I[4, David, 28, 55000.0, true, 88.0]");
-
- // Test 2: Filter by BIGINT type (age >= 30)
- LeafPredicate bigintFilterPredicate =
- LeafPredicate.of(
- new FieldTransform(new FieldRef(2, "age",
DataTypes.BIGINT())),
- GreaterOrEqual.INSTANCE,
- Collections.singletonList(30L));
- restCatalogServer.setRowFilterAuth(
- identifier, Collections.singletonList(bigintFilterPredicate));
-
- List<String> result2 = batchRead(table);
- assertThat(result2).hasSize(2);
- assertThat(result2)
- .contains(
- "+I[2, Bob, 30, 60000.0, false, 90.0]",
- "+I[3, Charlie, 35, 70000.0, true, 95.5]");
-
- // Test 3: Filter by DOUBLE type (salary > 55000.0)
- LeafPredicate doubleFilterPredicate =
- LeafPredicate.of(
- new FieldTransform(new FieldRef(3, "salary",
DataTypes.DOUBLE())),
- GreaterThan.INSTANCE,
- Collections.singletonList(55000.0));
- restCatalogServer.setRowFilterAuth(
- identifier, Collections.singletonList(doubleFilterPredicate));
-
- List<String> result3 = batchRead(table);
- assertThat(result3).hasSize(2);
- assertThat(result3)
- .contains(
- "+I[2, Bob, 30, 60000.0, false, 90.0]",
- "+I[3, Charlie, 35, 70000.0, true, 95.5]");
-
- // Test 4: Filter by BOOLEAN type (is_active = true)
- LeafPredicate booleanFilterPredicate =
- LeafPredicate.of(
- new FieldTransform(new FieldRef(4, "is_active",
DataTypes.BOOLEAN())),
- Equal.INSTANCE,
- Collections.singletonList(true));
- restCatalogServer.setRowFilterAuth(
- identifier, Collections.singletonList(booleanFilterPredicate));
-
- List<String> result4 = batchRead(table);
- assertThat(result4).hasSize(3);
- assertThat(result4)
- .contains(
- "+I[1, Alice, 25, 50000.0, true, 85.5]",
- "+I[3, Charlie, 35, 70000.0, true, 95.5]",
- "+I[4, David, 28, 55000.0, true, 88.0]");
-
- // Test 5: Filter by FLOAT type (score >= 90.0)
- LeafPredicate floatFilterPredicate =
- LeafPredicate.of(
- new FieldTransform(new FieldRef(5, "score",
DataTypes.FLOAT())),
- GreaterOrEqual.INSTANCE,
- Collections.singletonList(90.0f));
- restCatalogServer.setRowFilterAuth(
- identifier, Collections.singletonList(floatFilterPredicate));
-
- List<String> result5 = batchRead(table);
- assertThat(result5).hasSize(2);
- assertThat(result5)
- .contains(
- "+I[2, Bob, 30, 60000.0, false, 90.0]",
- "+I[3, Charlie, 35, 70000.0, true, 95.5]");
-
- // Test 6: Filter by STRING type (name = "Alice")
- LeafPredicate stringFilterPredicate =
- LeafPredicate.of(
- new FieldTransform(new FieldRef(1, "name",
DataTypes.STRING())),
- Equal.INSTANCE,
-
Collections.singletonList(BinaryString.fromString("Alice")));
- restCatalogServer.setRowFilterAuth(
- identifier, Collections.singletonList(stringFilterPredicate));
-
- List<String> result6 = batchRead(table);
- assertThat(result6).hasSize(1);
- assertThat(result6).contains("+I[1, Alice, 25, 50000.0, true, 85.5]");
-
- // Test 7: Filter with two predicates (age >= 30 AND is_active = true)
- LeafPredicate ageGe30Predicate =
- LeafPredicate.of(
- new FieldTransform(new FieldRef(2, "age",
DataTypes.BIGINT())),
- GreaterOrEqual.INSTANCE,
- Collections.singletonList(30L));
- LeafPredicate isActiveTruePredicate =
- LeafPredicate.of(
- new FieldTransform(new FieldRef(4, "is_active",
DataTypes.BOOLEAN())),
- Equal.INSTANCE,
- Collections.singletonList(true));
- restCatalogServer.setRowFilterAuth(
- identifier, Arrays.asList(ageGe30Predicate,
isActiveTruePredicate));
-
- List<String> result7 = batchRead(table);
- assertThat(result7).hasSize(1);
- assertThat(result7).contains("+I[3, Charlie, 35, 70000.0, true,
95.5]");
-
- // Test 8: Filter with two predicates (salary > 55000.0 AND score >=
90.0)
- LeafPredicate salaryGt55000Predicate =
- LeafPredicate.of(
- new FieldTransform(new FieldRef(3, "salary",
DataTypes.DOUBLE())),
- GreaterThan.INSTANCE,
- Collections.singletonList(55000.0));
- LeafPredicate scoreGe90Predicate =
- LeafPredicate.of(
- new FieldTransform(new FieldRef(5, "score",
DataTypes.FLOAT())),
- GreaterOrEqual.INSTANCE,
- Collections.singletonList(90.0f));
- restCatalogServer.setRowFilterAuth(
- identifier, Arrays.asList(salaryGt55000Predicate,
scoreGe90Predicate));
-
- List<String> result8 = batchRead(table);
- assertThat(result8).hasSize(2);
- assertThat(result8)
- .contains(
- "+I[2, Bob, 30, 60000.0, false, 90.0]",
- "+I[3, Charlie, 35, 70000.0, true, 95.5]");
- }
-
- @Test
- void testColumnMaskingAndRowFilter() throws Exception {
- Identifier identifier = Identifier.create("test_table_db",
"combined_auth_table");
- catalog.createDatabase(identifier.getDatabaseName(), true);
-
- // Create table with test data
- List<DataField> fields = new ArrayList<>();
- fields.add(new DataField(0, "id", DataTypes.INT()));
- fields.add(new DataField(1, "name", DataTypes.STRING()));
- fields.add(new DataField(2, "salary", DataTypes.STRING()));
- fields.add(new DataField(3, "age", DataTypes.INT()));
- fields.add(new DataField(4, "department", DataTypes.STRING()));
-
- catalog.createTable(
- identifier,
- new Schema(
- fields,
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.singletonMap(QUERY_AUTH_ENABLED.key(),
"true"),
- ""),
- true);
-
- Table table = catalog.getTable(identifier);
-
- // Write test data
- BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
- BatchTableWrite write = writeBuilder.newWrite();
- write.write(
- GenericRow.of(
- 1,
- BinaryString.fromString("Alice"),
- BinaryString.fromString("50000.0"),
- 25,
- BinaryString.fromString("IT")));
- write.write(
- GenericRow.of(
- 2,
- BinaryString.fromString("Bob"),
- BinaryString.fromString("60000.0"),
- 30,
- BinaryString.fromString("HR")));
- write.write(
- GenericRow.of(
- 3,
- BinaryString.fromString("Charlie"),
- BinaryString.fromString("70000.0"),
- 35,
- BinaryString.fromString("IT")));
- write.write(
- GenericRow.of(
- 4,
- BinaryString.fromString("David"),
- BinaryString.fromString("55000.0"),
- 28,
- BinaryString.fromString("Finance")));
- List<CommitMessage> messages = write.prepareCommit();
- BatchTableCommit commit = writeBuilder.newCommit();
- commit.commit(messages);
- write.close();
- commit.close();
-
- // Test column masking only
- Transform salaryMaskTransform =
- new
ConcatTransform(Collections.singletonList(BinaryString.fromString("***")));
- Map<String, Transform> columnMasking = new HashMap<>();
- columnMasking.put("salary", salaryMaskTransform);
- restCatalogServer.setColumnMaskingAuth(identifier, columnMasking);
-
- ReadBuilder readBuilder = table.newReadBuilder();
- List<Split> splits = readBuilder.newScan().plan().splits();
- TableRead read = readBuilder.newRead();
- RecordReader<InternalRow> reader = read.createReader(splits);
-
- List<InternalRow> rows = new ArrayList<>();
- reader.forEachRemaining(rows::add);
- assertThat(rows).hasSize(4);
- assertThat(rows.get(0).getString(2).toString()).isEqualTo("***");
-
- // Test row filter only (clear column masking first)
- restCatalogServer.setColumnMaskingAuth(identifier, new HashMap<>());
- Predicate ageGe30Predicate =
- LeafPredicate.of(
- new FieldTransform(new FieldRef(3, "age",
DataTypes.INT())),
- GreaterOrEqual.INSTANCE,
- Collections.singletonList(30));
- restCatalogServer.setRowFilterAuth(identifier,
Collections.singletonList(ageGe30Predicate));
-
- readBuilder = table.newReadBuilder();
- splits = readBuilder.newScan().plan().splits();
- read = readBuilder.newRead();
- reader = read.createReader(splits);
-
- rows = new ArrayList<>();
- reader.forEachRemaining(rows::add);
- assertThat(rows).hasSize(2);
-
- // Test both column masking and row filter together
- columnMasking.put("salary", salaryMaskTransform);
- Transform nameMaskTransform =
- new
ConcatTransform(Collections.singletonList(BinaryString.fromString("***")));
- columnMasking.put("name", nameMaskTransform);
- restCatalogServer.setColumnMaskingAuth(identifier, columnMasking);
- Predicate deptPredicate =
- LeafPredicate.of(
- new FieldTransform(new FieldRef(4, "department",
DataTypes.STRING())),
- Equal.INSTANCE,
-
Collections.singletonList(BinaryString.fromString("IT")));
- restCatalogServer.setRowFilterAuth(identifier,
Collections.singletonList(deptPredicate));
-
- readBuilder = table.newReadBuilder();
- splits = readBuilder.newScan().plan().splits();
- read = readBuilder.newRead();
- reader = read.createReader(splits);
-
- rows = new ArrayList<>();
- reader.forEachRemaining(rows::add);
- assertThat(rows).hasSize(2);
- assertThat(rows.get(0).getString(1).toString()).isEqualTo("***"); //
name masked
- assertThat(rows.get(0).getString(2).toString()).isEqualTo("***"); //
salary masked
- assertThat(rows.get(0).getString(4).toString()).isEqualTo("IT"); //
department not masked
-
- // Test complex scenario: row filter + column masking combined
- Predicate combinedPredicate = PredicateBuilder.and(ageGe30Predicate,
deptPredicate);
- restCatalogServer.setRowFilterAuth(
- identifier, Collections.singletonList(combinedPredicate));
-
- readBuilder = table.newReadBuilder();
- splits = readBuilder.newScan().plan().splits();
- read = readBuilder.newRead();
- reader = read.createReader(splits);
-
- rows = new ArrayList<>();
- reader.forEachRemaining(rows::add);
- assertThat(rows).hasSize(1);
- assertThat(rows.get(0).getInt(0)).isEqualTo(3); // id
- assertThat(rows.get(0).getString(1).toString()).isEqualTo("***"); //
name masked
- assertThat(rows.get(0).getString(2).toString()).isEqualTo("***"); //
salary masked
- assertThat(rows.get(0).getInt(3)).isEqualTo(35); // age not masked
-
- // Clear both column masking and row filter
- restCatalogServer.setColumnMaskingAuth(identifier, new HashMap<>());
- restCatalogServer.setRowFilterAuth(identifier, null);
-
- readBuilder = table.newReadBuilder();
- splits = readBuilder.newScan().plan().splits();
- read = readBuilder.newRead();
- reader = read.createReader(splits);
-
- rows = new ArrayList<>();
- reader.forEachRemaining(rows::add);
- assertThat(rows).hasSize(4);
- assertThat(rows.get(0).getString(1).toString()).isIn("Alice", "Bob",
"Charlie", "David");
- }
-
private void checkHeader(String headerName, String headerValue) {
// Verify that the header were included in the requests
List<Map<String, String>> receivedHeaders =
restCatalogServer.getReceivedHeaders();
@@ -855,6 +384,16 @@ class MockRESTCatalogTest extends RESTCatalogTest {
lastFileCreationTime);
}
+ @Override
+ protected void setColumnMasking(Identifier identifier, Map<String,
Transform> columnMasking) {
+ restCatalogServer.setColumnMaskingAuth(identifier, columnMasking);
+ }
+
+ @Override
+ protected void setRowFilter(Identifier identifier, List<Predicate>
rowFilters) {
+ restCatalogServer.setRowFilterAuth(identifier, rowFilters);
+ }
+
private RESTCatalog initCatalog(boolean enableDataToken) throws
IOException {
return initCatalogUtil(enableDataToken, Collections.emptyMap(), null,
null);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index bc0a74bf72..300a94287e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -40,6 +40,19 @@ import org.apache.paimon.function.FunctionDefinition;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.partition.PartitionStatistics;
+import org.apache.paimon.predicate.CastTransform;
+import org.apache.paimon.predicate.ConcatTransform;
+import org.apache.paimon.predicate.ConcatWsTransform;
+import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.FieldTransform;
+import org.apache.paimon.predicate.GreaterOrEqual;
+import org.apache.paimon.predicate.GreaterThan;
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.Transform;
+import org.apache.paimon.predicate.UpperTransform;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.rest.auth.DLFToken;
import org.apache.paimon.rest.exceptions.BadRequestException;
@@ -2969,6 +2982,444 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
DEFAULT_TABLE_SCHEMA.options().remove(CoreOptions.PATH.key());
}
+ @Test
+ void testColumnMaskingApplyOnRead() throws Exception {
+ Identifier identifier = Identifier.create("test_table_db",
"auth_table_masking_apply");
+ catalog.createDatabase(identifier.getDatabaseName(), true);
+
+ // Create table with multiple columns of different types
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(0, "col1", DataTypes.STRING()));
+ fields.add(new DataField(1, "col2", DataTypes.STRING()));
+ fields.add(new DataField(2, "col3", DataTypes.INT()));
+ fields.add(new DataField(3, "col4", DataTypes.STRING()));
+ fields.add(new DataField(4, "col5", DataTypes.STRING()));
+
+ catalog.createTable(
+ identifier,
+ new Schema(
+ fields,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonMap(QUERY_AUTH_ENABLED.key(),
"true"),
+ ""),
+ true);
+
+ Table table = catalog.getTable(identifier);
+
+ // Write test data
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ BatchTableWrite write = writeBuilder.newWrite();
+ write.write(
+ GenericRow.of(
+ BinaryString.fromString("hello"),
+ BinaryString.fromString("world"),
+ 100,
+ BinaryString.fromString("test"),
+ BinaryString.fromString("data")));
+ write.write(
+ GenericRow.of(
+ BinaryString.fromString("foo"),
+ BinaryString.fromString("bar"),
+ 200,
+ BinaryString.fromString("example"),
+ BinaryString.fromString("value")));
+ List<CommitMessage> messages = write.prepareCommit();
+ BatchTableCommit commit = writeBuilder.newCommit();
+ commit.commit(messages);
+ write.close();
+ commit.close();
+
+ // Set up column masking with various transform types
+ Map<String, Transform> columnMasking = new HashMap<>();
+
+ // Test 1: ConcatTransform - mask col1 with "****"
+ ConcatTransform concatTransform =
+ new
ConcatTransform(Collections.singletonList(BinaryString.fromString("****")));
+ columnMasking.put("col1", concatTransform);
+
+ // Test 2: UpperTransform - convert col2 to uppercase
+ UpperTransform upperTransform =
+ new UpperTransform(
+ Collections.singletonList(new FieldRef(1, "col2",
DataTypes.STRING())));
+ columnMasking.put("col2", upperTransform);
+
+ // Test 3: CastTransform - cast col3 (INT) to STRING
+ CastTransform castTransform =
+ new CastTransform(new FieldRef(2, "col3", DataTypes.INT()),
DataTypes.STRING());
+ columnMasking.put("col3", castTransform);
+
+ // Test 4: ConcatWsTransform - concatenate col4 with separator
+ ConcatWsTransform concatWsTransform =
+ new ConcatWsTransform(
+ java.util.Arrays.asList(
+ BinaryString.fromString("-"),
+ BinaryString.fromString("prefix"),
+ new FieldRef(3, "col4", DataTypes.STRING())));
+ columnMasking.put("col4", concatWsTransform);
+
+ // col5 is intentionally not masked to verify unmasked columns work
correctly
+
+ setColumnMasking(identifier, columnMasking);
+
+ // Read and verify masked data
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ TableRead read = readBuilder.newRead();
+ RecordReader<InternalRow> reader = read.createReader(splits);
+
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(rows::add);
+
+ assertThat(rows).hasSize(2);
+
+ // Verify first row
+ InternalRow row1 = rows.get(0);
+ assertThat(row1.getString(0).toString())
+ .isEqualTo("****"); // col1 masked with ConcatTransform
+ assertThat(row1.getString(1).toString())
+ .isEqualTo("WORLD"); // col2 masked with UpperTransform
+ assertThat(row1.getString(2).toString())
+ .isEqualTo("100"); // col3 masked with CastTransform
(INT->STRING)
+ assertThat(row1.getString(3).toString())
+ .isEqualTo("prefix-test"); // col4 masked with
ConcatWsTransform
+ assertThat(row1.getString(4).toString())
+ .isEqualTo("data"); // col5 NOT masked - original value
+
+ // Verify second row
+ InternalRow row2 = rows.get(1);
+ assertThat(row2.getString(0).toString())
+ .isEqualTo("****"); // col1 masked with ConcatTransform
+ assertThat(row2.getString(1).toString())
+ .isEqualTo("BAR"); // col2 masked with UpperTransform
+ assertThat(row2.getString(2).toString())
+ .isEqualTo("200"); // col3 masked with CastTransform
(INT->STRING)
+ assertThat(row2.getString(3).toString())
+ .isEqualTo("prefix-example"); // col4 masked with
ConcatWsTransform
+ assertThat(row2.getString(4).toString())
+ .isEqualTo("value"); // col5 NOT masked - original value
+ }
+
+ @Test
+ void testRowFilter() throws Exception {
+ Identifier identifier = Identifier.create("test_table_db",
"auth_table_filter");
+ catalog.createDatabase(identifier.getDatabaseName(), true);
+
+ // Create table with multiple data types
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(0, "id", DataTypes.INT()));
+ fields.add(new DataField(1, "name", DataTypes.STRING()));
+ fields.add(new DataField(2, "age", DataTypes.BIGINT()));
+ fields.add(new DataField(3, "salary", DataTypes.DOUBLE()));
+ fields.add(new DataField(4, "is_active", DataTypes.BOOLEAN()));
+ fields.add(new DataField(5, "score", DataTypes.FLOAT()));
+
+ catalog.createTable(
+ identifier,
+ new Schema(
+ fields,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonMap(QUERY_AUTH_ENABLED.key(),
"true"),
+ ""),
+ true);
+
+ Table table = catalog.getTable(identifier);
+
+ // Write test data with various types
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ BatchTableWrite write = writeBuilder.newWrite();
+ write.write(GenericRow.of(1, BinaryString.fromString("Alice"), 25L,
50000.0, true, 85.5f));
+ write.write(GenericRow.of(2, BinaryString.fromString("Bob"), 30L,
60000.0, false, 90.0f));
+ write.write(
+ GenericRow.of(3, BinaryString.fromString("Charlie"), 35L,
70000.0, true, 95.5f));
+ write.write(GenericRow.of(4, BinaryString.fromString("David"), 28L,
55000.0, true, 88.0f));
+ List<CommitMessage> messages = write.prepareCommit();
+ BatchTableCommit commit = writeBuilder.newCommit();
+ commit.commit(messages);
+ write.close();
+ commit.close();
+
+ // Test 1: Filter by INT type (id > 2)
+ LeafPredicate intFilterPredicate =
+ LeafPredicate.of(
+ new FieldTransform(new FieldRef(0, "id",
DataTypes.INT())),
+ GreaterThan.INSTANCE,
+ Collections.singletonList(2));
+ setRowFilter(identifier,
Collections.singletonList(intFilterPredicate));
+
+ List<String> result1 = batchRead(table);
+ assertThat(result1).hasSize(2);
+ assertThat(result1)
+ .contains(
+ "+I[3, Charlie, 35, 70000.0, true, 95.5]",
+ "+I[4, David, 28, 55000.0, true, 88.0]");
+
+ // Test 2: Filter by BIGINT type (age >= 30)
+ LeafPredicate bigintFilterPredicate =
+ LeafPredicate.of(
+ new FieldTransform(new FieldRef(2, "age",
DataTypes.BIGINT())),
+ GreaterOrEqual.INSTANCE,
+ Collections.singletonList(30L));
+ setRowFilter(identifier,
Collections.singletonList(bigintFilterPredicate));
+
+ List<String> result2 = batchRead(table);
+ assertThat(result2).hasSize(2);
+ assertThat(result2)
+ .contains(
+ "+I[2, Bob, 30, 60000.0, false, 90.0]",
+ "+I[3, Charlie, 35, 70000.0, true, 95.5]");
+
+ // Test 3: Filter by DOUBLE type (salary > 55000.0)
+ LeafPredicate doubleFilterPredicate =
+ LeafPredicate.of(
+ new FieldTransform(new FieldRef(3, "salary",
DataTypes.DOUBLE())),
+ GreaterThan.INSTANCE,
+ Collections.singletonList(55000.0));
+ setRowFilter(identifier,
Collections.singletonList(doubleFilterPredicate));
+
+ List<String> result3 = batchRead(table);
+ assertThat(result3).hasSize(2);
+ assertThat(result3)
+ .contains(
+ "+I[2, Bob, 30, 60000.0, false, 90.0]",
+ "+I[3, Charlie, 35, 70000.0, true, 95.5]");
+
+ // Test 4: Filter by BOOLEAN type (is_active = true)
+ LeafPredicate booleanFilterPredicate =
+ LeafPredicate.of(
+ new FieldTransform(new FieldRef(4, "is_active",
DataTypes.BOOLEAN())),
+ Equal.INSTANCE,
+ Collections.singletonList(true));
+ setRowFilter(identifier,
Collections.singletonList(booleanFilterPredicate));
+
+ List<String> result4 = batchRead(table);
+ assertThat(result4).hasSize(3);
+ assertThat(result4)
+ .contains(
+ "+I[1, Alice, 25, 50000.0, true, 85.5]",
+ "+I[3, Charlie, 35, 70000.0, true, 95.5]",
+ "+I[4, David, 28, 55000.0, true, 88.0]");
+
+ // Test 5: Filter by FLOAT type (score >= 90.0)
+ LeafPredicate floatFilterPredicate =
+ LeafPredicate.of(
+ new FieldTransform(new FieldRef(5, "score",
DataTypes.FLOAT())),
+ GreaterOrEqual.INSTANCE,
+ Collections.singletonList(90.0f));
+ setRowFilter(identifier,
Collections.singletonList(floatFilterPredicate));
+
+ List<String> result5 = batchRead(table);
+ assertThat(result5).hasSize(2);
+ assertThat(result5)
+ .contains(
+ "+I[2, Bob, 30, 60000.0, false, 90.0]",
+ "+I[3, Charlie, 35, 70000.0, true, 95.5]");
+
+ // Test 6: Filter by STRING type (name = "Alice")
+ LeafPredicate stringFilterPredicate =
+ LeafPredicate.of(
+ new FieldTransform(new FieldRef(1, "name",
DataTypes.STRING())),
+ Equal.INSTANCE,
+
Collections.singletonList(BinaryString.fromString("Alice")));
+ setRowFilter(identifier,
Collections.singletonList(stringFilterPredicate));
+
+ List<String> result6 = batchRead(table);
+ assertThat(result6).hasSize(1);
+ assertThat(result6).contains("+I[1, Alice, 25, 50000.0, true, 85.5]");
+
+ // Test 7: Filter with two predicates (age >= 30 AND is_active = true)
+ LeafPredicate ageGe30Predicate =
+ LeafPredicate.of(
+ new FieldTransform(new FieldRef(2, "age",
DataTypes.BIGINT())),
+ GreaterOrEqual.INSTANCE,
+ Collections.singletonList(30L));
+ LeafPredicate isActiveTruePredicate =
+ LeafPredicate.of(
+ new FieldTransform(new FieldRef(4, "is_active",
DataTypes.BOOLEAN())),
+ Equal.INSTANCE,
+ Collections.singletonList(true));
+ setRowFilter(identifier, Arrays.asList(ageGe30Predicate,
isActiveTruePredicate));
+
+ List<String> result7 = batchRead(table);
+ assertThat(result7).hasSize(1);
+ assertThat(result7).contains("+I[3, Charlie, 35, 70000.0, true,
95.5]");
+
+ // Test 8: Filter with two predicates (salary > 55000.0 AND score >=
90.0)
+ LeafPredicate salaryGt55000Predicate =
+ LeafPredicate.of(
+ new FieldTransform(new FieldRef(3, "salary",
DataTypes.DOUBLE())),
+ GreaterThan.INSTANCE,
+ Collections.singletonList(55000.0));
+ LeafPredicate scoreGe90Predicate =
+ LeafPredicate.of(
+ new FieldTransform(new FieldRef(5, "score",
DataTypes.FLOAT())),
+ GreaterOrEqual.INSTANCE,
+ Collections.singletonList(90.0f));
+ setRowFilter(identifier, Arrays.asList(salaryGt55000Predicate,
scoreGe90Predicate));
+
+ List<String> result8 = batchRead(table);
+ assertThat(result8).hasSize(2);
+ assertThat(result8)
+ .contains(
+ "+I[2, Bob, 30, 60000.0, false, 90.0]",
+ "+I[3, Charlie, 35, 70000.0, true, 95.5]");
+ }
+
+ @Test
+ void testColumnMaskingAndRowFilter() throws Exception {
+ Identifier identifier = Identifier.create("test_table_db",
"combined_auth_table");
+ catalog.createDatabase(identifier.getDatabaseName(), true);
+
+ // Create table with test data
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(0, "id", DataTypes.INT()));
+ fields.add(new DataField(1, "name", DataTypes.STRING()));
+ fields.add(new DataField(2, "salary", DataTypes.STRING()));
+ fields.add(new DataField(3, "age", DataTypes.INT()));
+ fields.add(new DataField(4, "department", DataTypes.STRING()));
+
+ catalog.createTable(
+ identifier,
+ new Schema(
+ fields,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonMap(QUERY_AUTH_ENABLED.key(),
"true"),
+ ""),
+ true);
+
+ Table table = catalog.getTable(identifier);
+
+ // Write test data
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ BatchTableWrite write = writeBuilder.newWrite();
+ write.write(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("Alice"),
+ BinaryString.fromString("50000.0"),
+ 25,
+ BinaryString.fromString("IT")));
+ write.write(
+ GenericRow.of(
+ 2,
+ BinaryString.fromString("Bob"),
+ BinaryString.fromString("60000.0"),
+ 30,
+ BinaryString.fromString("HR")));
+ write.write(
+ GenericRow.of(
+ 3,
+ BinaryString.fromString("Charlie"),
+ BinaryString.fromString("70000.0"),
+ 35,
+ BinaryString.fromString("IT")));
+ write.write(
+ GenericRow.of(
+ 4,
+ BinaryString.fromString("David"),
+ BinaryString.fromString("55000.0"),
+ 28,
+ BinaryString.fromString("Finance")));
+ List<CommitMessage> messages = write.prepareCommit();
+ BatchTableCommit commit = writeBuilder.newCommit();
+ commit.commit(messages);
+ write.close();
+ commit.close();
+
+ // Test column masking only
+ Transform salaryMaskTransform =
+ new
ConcatTransform(Collections.singletonList(BinaryString.fromString("***")));
+ Map<String, Transform> columnMasking = new HashMap<>();
+ columnMasking.put("salary", salaryMaskTransform);
+ setColumnMasking(identifier, columnMasking);
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ TableRead read = readBuilder.newRead();
+ RecordReader<InternalRow> reader = read.createReader(splits);
+
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(rows::add);
+ assertThat(rows).hasSize(4);
+ assertThat(rows.get(0).getString(2).toString()).isEqualTo("***");
+
+ // Test row filter only (clear column masking first)
+ setColumnMasking(identifier, new HashMap<>());
+ Predicate ageGe30Predicate =
+ LeafPredicate.of(
+ new FieldTransform(new FieldRef(3, "age",
DataTypes.INT())),
+ GreaterOrEqual.INSTANCE,
+ Collections.singletonList(30));
+ setRowFilter(identifier, Collections.singletonList(ageGe30Predicate));
+
+ readBuilder = table.newReadBuilder();
+ splits = readBuilder.newScan().plan().splits();
+ read = readBuilder.newRead();
+ reader = read.createReader(splits);
+
+ rows = new ArrayList<>();
+ reader.forEachRemaining(rows::add);
+ assertThat(rows).hasSize(2);
+
+ // Test both column masking and row filter together
+ columnMasking.put("salary", salaryMaskTransform);
+ Transform nameMaskTransform =
+ new
ConcatTransform(Collections.singletonList(BinaryString.fromString("***")));
+ columnMasking.put("name", nameMaskTransform);
+ setColumnMasking(identifier, columnMasking);
+ Predicate deptPredicate =
+ LeafPredicate.of(
+ new FieldTransform(new FieldRef(4, "department",
DataTypes.STRING())),
+ Equal.INSTANCE,
+
Collections.singletonList(BinaryString.fromString("IT")));
+ setRowFilter(identifier, Collections.singletonList(deptPredicate));
+
+ readBuilder = table.newReadBuilder();
+ splits = readBuilder.newScan().plan().splits();
+ read = readBuilder.newRead();
+ reader = read.createReader(splits);
+
+ rows = new ArrayList<>();
+ reader.forEachRemaining(rows::add);
+ assertThat(rows).hasSize(2);
+ assertThat(rows.get(0).getString(1).toString()).isEqualTo("***"); //
name masked
+ assertThat(rows.get(0).getString(2).toString()).isEqualTo("***"); //
salary masked
+ assertThat(rows.get(0).getString(4).toString()).isEqualTo("IT"); //
department not masked
+
+ // Test complex scenario: row filter + column masking combined
+ Predicate combinedPredicate = PredicateBuilder.and(ageGe30Predicate,
deptPredicate);
+ setRowFilter(identifier, Collections.singletonList(combinedPredicate));
+
+ readBuilder = table.newReadBuilder();
+ splits = readBuilder.newScan().plan().splits();
+ read = readBuilder.newRead();
+ reader = read.createReader(splits);
+
+ rows = new ArrayList<>();
+ reader.forEachRemaining(rows::add);
+ assertThat(rows).hasSize(1);
+ assertThat(rows.get(0).getInt(0)).isEqualTo(3); // id
+ assertThat(rows.get(0).getString(1).toString()).isEqualTo("***"); //
name masked
+ assertThat(rows.get(0).getString(2).toString()).isEqualTo("***"); //
salary masked
+ assertThat(rows.get(0).getInt(3)).isEqualTo(35); // age not masked
+
+ // Clear both column masking and row filter
+ setColumnMasking(identifier, new HashMap<>());
+ setRowFilter(identifier, null);
+
+ readBuilder = table.newReadBuilder();
+ splits = readBuilder.newScan().plan().splits();
+ read = readBuilder.newRead();
+ reader = read.createReader(splits);
+
+ rows = new ArrayList<>();
+ reader.forEachRemaining(rows::add);
+ assertThat(rows).hasSize(4);
+ assertThat(rows.get(0).getString(1).toString()).isIn("Alice", "Bob",
"Charlie", "David");
+ }
+
protected void createTable(
Identifier identifier, Map<String, String> options, List<String>
partitionKeys)
throws Exception {
@@ -3010,6 +3461,11 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
long fileCount,
long lastFileCreationTime);
+ protected abstract void setColumnMasking(
+ Identifier identifier, Map<String, Transform> columnMasking);
+
+ protected abstract void setRowFilter(Identifier identifier,
List<Predicate> rowFilter);
+
protected void batchWrite(Table table, List<Integer> data) throws
Exception {
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
BatchTableWrite write = writeBuilder.newWrite();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index 5fa01a6bb7..6a0bed06a9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -219,6 +219,7 @@ class RESTCatalogITCase extends RESTCatalogITCaseBase {
}
}
+ @Test
public void testColumnMasking() {
String maskingTable = "column_masking_table";
batchSql(