This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4625ce945e [core] Refactor row filter and column masking to
AbstractDataTableRead
4625ce945e is described below
commit 4625ce945e3b469d4d1804eb53e84d2b4ab4605b
Author: JingsongLi <[email protected]>
AuthorDate: Wed Jan 14 21:39:07 2026 +0800
[core] Refactor row filter and column masking to AbstractDataTableRead
---
.../rest/responses/AuthTableQueryResponse.java | 8 +-
.../org/apache/paimon/predicate/LeafFunction.java | 1 +
.../org/apache/paimon/predicate/Predicate.java | 2 +
.../org/apache/paimon/predicate/Transform.java | 1 +
.../paimon/catalog/TableQueryAuthResult.java | 231 ++++++++++++++-
.../java/org/apache/paimon/rest/RESTCatalog.java | 51 +---
.../apache/paimon/table/CatalogEnvironment.java | 3 +-
.../paimon/table/source/AbstractDataTableRead.java | 11 +
.../paimon/table/source/AbstractDataTableScan.java | 40 +--
.../paimon/table/source/AppendTableRead.java | 2 +-
.../paimon/table/source/AuthAwareTableRead.java | 311 ---------------------
.../paimon/table/source/DataTableBatchScan.java | 8 +-
.../paimon/table/source/DataTableStreamScan.java | 8 +-
.../apache/paimon/table/source/PushDownUtils.java | 11 +-
.../apache/paimon/table/source/QueryAuthSplit.java | 199 +------------
.../paimon/table/source/ReadBuilderImpl.java | 7 -
.../apache/paimon/table/source/TableQueryAuth.java | 1 +
.../source/TableQueryAuthResultSerializer.java | 75 -----
.../apache/paimon/rest/MockRESTCatalogTest.java | 5 +-
.../paimon/flink/lookup/LookupCompactDiffRead.java | 3 +-
.../org/apache/paimon/flink/RESTCatalogITCase.java | 15 +-
.../spark/aggregate/AggregatePushDownUtils.scala | 16 +-
.../paimon/spark/scan/BinPackingSplits.scala | 29 +-
.../paimon/spark/SparkCatalogWithRestTest.java | 14 +-
24 files changed, 328 insertions(+), 724 deletions(-)
diff --git
a/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java
index 3aab9ea14e..7410631aea 100644
---
a/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java
+++
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java
@@ -40,6 +40,10 @@ public class AuthTableQueryResponse implements RESTResponse {
@JsonProperty(FIELD_FILTER)
private final List<String> filter;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty(FIELD_COLUMN_MASKING)
+ private final Map<String, String> columnMasking;
+
@JsonCreator
public AuthTableQueryResponse(
@JsonProperty(FIELD_FILTER) List<String> filter,
@@ -53,10 +57,6 @@ public class AuthTableQueryResponse implements RESTResponse {
return filter;
}
- @JsonInclude(JsonInclude.Include.NON_NULL)
- @JsonProperty(FIELD_COLUMN_MASKING)
- private final Map<String, String> columnMasking;
-
@JsonGetter(FIELD_COLUMN_MASKING)
public Map<String, String> columnMasking() {
return columnMasking;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java
index 974537f526..d3d0ba9eb9 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java
@@ -30,6 +30,7 @@ import java.util.Optional;
/** Function to test a field with literals. */
public abstract class LeafFunction implements Serializable {
+
@JsonCreator
public static LeafFunction fromJson(String name) throws IOException {
switch (name) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java
index c8ae3dd9d2..6ff977fd27 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java
@@ -44,7 +44,9 @@ import java.util.Optional;
@JsonSubTypes.Type(value = CompoundPredicate.class, name =
CompoundPredicate.NAME)
})
public interface Predicate extends Serializable {
+
String FIELD_KIND = "kind";
+
/**
* Test based on the specific input row.
*
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
index b90ab3c9a7..79f15699b5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
@@ -40,6 +40,7 @@ import java.util.List;
@JsonSubTypes.Type(value = UpperTransform.class, name =
UpperTransform.NAME)
})
public interface Transform extends Serializable {
+
String FIELD_NAME = "name";
String name();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/TableQueryAuthResult.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/TableQueryAuthResult.java
index c4c33f241a..87910b5cc6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/catalog/TableQueryAuthResult.java
+++
b/paimon-core/src/main/java/org/apache/paimon/catalog/TableQueryAuthResult.java
@@ -18,40 +18,243 @@
package org.apache.paimon.catalog;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.And;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.predicate.Transform;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.QueryAuthSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowUtils;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.paimon.shade.org.apache.commons.lang3.StringUtils;
import javax.annotation.Nullable;
-import java.util.Collections;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
/** Auth result for table query, including row level filter and optional
column masking rules. */
-public class TableQueryAuthResult {
+public class TableQueryAuthResult implements Serializable {
+
+ private static final long serialVersionUID = 1L;
- @Nullable private final Predicate rowFilter;
- private final Map<String, Transform> columnMasking;
+ private final @Nullable List<String> filter;
+ private final @Nullable Map<String, String> columnMasking;
public TableQueryAuthResult(
- @Nullable Predicate rowFilter, Map<String, Transform>
columnMasking) {
- this.rowFilter = rowFilter;
- this.columnMasking = columnMasking == null ? Collections.emptyMap() :
columnMasking;
+ @Nullable List<String> filter, @Nullable Map<String, String>
columnMasking) {
+ this.filter = filter;
+ this.columnMasking = columnMasking;
}
- public static TableQueryAuthResult empty() {
- return new TableQueryAuthResult(null, Collections.emptyMap());
+ @Nullable
+ public List<String> filter() {
+ return filter;
}
@Nullable
- public Predicate rowFilter() {
+ public Map<String, String> columnMasking() {
+ return columnMasking;
+ }
+
+ public TableScan.Plan convertPlan(TableScan.Plan plan) {
+ if (filter == null && (columnMasking == null ||
columnMasking.isEmpty())) {
+ return plan;
+ }
+ List<Split> authSplits =
+ plan.splits().stream()
+ .map(split -> new QueryAuthSplit(split, this))
+ .collect(Collectors.toList());
+ return new DataFilePlan<>(authSplits);
+ }
+
+ @Nullable
+ public Predicate extractPredicate() {
+ Predicate rowFilter = null;
+ if (filter != null && !filter.isEmpty()) {
+ List<Predicate> predicates = new ArrayList<>();
+ for (String json : filter) {
+ if (StringUtils.isEmpty(json)) {
+ continue;
+ }
+ Predicate predicate = JsonSerdeUtil.fromJson(json,
Predicate.class);
+ if (predicate != null) {
+ predicates.add(predicate);
+ }
+ }
+ if (predicates.size() == 1) {
+ rowFilter = predicates.get(0);
+ } else if (!predicates.isEmpty()) {
+ rowFilter = new CompoundPredicate(And.INSTANCE, predicates);
+ }
+ }
return rowFilter;
}
- public Map<String, Transform> columnMasking() {
- return columnMasking;
+ public Map<String, Transform> extractColumnMasking() {
+ Map<String, Transform> result = new TreeMap<>();
+ if (columnMasking != null && !columnMasking.isEmpty()) {
+ for (Map.Entry<String, String> e : columnMasking.entrySet()) {
+ String column = e.getKey();
+ String json = e.getValue();
+ if (StringUtils.isEmpty(column) || StringUtils.isEmpty(json)) {
+ continue;
+ }
+ Transform transform = JsonSerdeUtil.fromJson(json,
Transform.class);
+ if (transform == null) {
+ continue;
+ }
+ result.put(column, transform);
+ }
+ }
+ return result;
}
- public boolean isEmpty() {
- return rowFilter == null && (columnMasking == null ||
columnMasking.isEmpty());
+ public RecordReader<InternalRow> doAuth(
+ RecordReader<InternalRow> reader, RowType outputRowType) {
+ Predicate rowFilter = extractPredicate();
+ if (rowFilter != null) {
+ Predicate remappedFilter = rowFilter.visit(new
PredicateRemapper(outputRowType));
+ if (remappedFilter != null) {
+ reader = reader.filter(remappedFilter::test);
+ }
+ }
+
+ Map<String, Transform> columnMasking = extractColumnMasking();
+ if (columnMasking != null && !columnMasking.isEmpty()) {
+ Map<Integer, Transform> remappedMasking =
+ transformRemapping(outputRowType, columnMasking);
+ if (!remappedMasking.isEmpty()) {
+ reader = reader.transform(row -> transform(outputRowType,
remappedMasking, row));
+ }
+ }
+
+ return reader;
+ }
+
+ private static InternalRow transform(
+ RowType outputRowType, Map<Integer, Transform> remappedMasking,
InternalRow row) {
+ int arity = outputRowType.getFieldCount();
+ GenericRow out = new GenericRow(row.getRowKind(), arity);
+ for (int i = 0; i < arity; i++) {
+ DataType type = outputRowType.getTypeAt(i);
+ out.setField(i, InternalRowUtils.get(row, i, type));
+ }
+ for (Map.Entry<Integer, Transform> e : remappedMasking.entrySet()) {
+ int targetIndex = e.getKey();
+ Transform transform = e.getValue();
+ Object masked = transform.transform(row);
+ out.setField(targetIndex, masked);
+ }
+ return out;
+ }
+
+ private static Map<Integer, Transform> transformRemapping(
+ RowType outputRowType, Map<String, Transform> masking) {
+ Map<Integer, Transform> out = new HashMap<>();
+ if (masking == null || masking.isEmpty()) {
+ return out;
+ }
+
+ for (Map.Entry<String, Transform> e : masking.entrySet()) {
+ String targetColumn = e.getKey();
+ Transform transform = e.getValue();
+ if (targetColumn == null || transform == null) {
+ continue;
+ }
+
+ int targetIndex = outputRowType.getFieldIndex(targetColumn);
+ if (targetIndex < 0) {
+ continue;
+ }
+
+ List<Object> newInputs = new ArrayList<>();
+ for (Object input : transform.inputs()) {
+ if (input instanceof FieldRef) {
+ FieldRef ref = (FieldRef) input;
+ int newIndex = outputRowType.getFieldIndex(ref.name());
+ if (newIndex < 0) {
+ throw new IllegalArgumentException(
+ "Column masking refers to field '"
+ + ref.name()
+ + "' which is not present in output
row type "
+ + outputRowType);
+ }
+ DataType type = outputRowType.getTypeAt(newIndex);
+ newInputs.add(new FieldRef(newIndex, ref.name(), type));
+ } else {
+ newInputs.add(input);
+ }
+ }
+ out.put(targetIndex, transform.copyWithNewInputs(newInputs));
+ }
+ return out;
+ }
+
+ private static class PredicateRemapper implements
PredicateVisitor<Predicate> {
+
+ private final RowType outputRowType;
+
+ private PredicateRemapper(RowType outputRowType) {
+ this.outputRowType = outputRowType;
+ }
+
+ @Override
+ public Predicate visit(LeafPredicate predicate) {
+ Transform transform = predicate.transform();
+ List<Object> newInputs = new ArrayList<>();
+ for (Object input : transform.inputs()) {
+ if (input instanceof FieldRef) {
+ FieldRef ref = (FieldRef) input;
+ String fieldName = ref.name();
+ int newIndex = outputRowType.getFieldIndex(fieldName);
+ if (newIndex < 0) {
+ throw new RuntimeException(
+ String.format(
+ "Unable to read data without column %s
when row filter enabled.",
+ fieldName));
+ }
+ DataType type = outputRowType.getTypeAt(newIndex);
+ newInputs.add(new FieldRef(newIndex, fieldName, type));
+ } else {
+ newInputs.add(input);
+ }
+ }
+ return predicate.copyWithNewInputs(newInputs);
+ }
+
+ @Override
+ public Predicate visit(CompoundPredicate predicate) {
+ List<Predicate> remappedChildren = new ArrayList<>();
+ for (Predicate child : predicate.children()) {
+ Predicate remapped = child.visit(this);
+ if (remapped != null) {
+ remappedChildren.add(remapped);
+ }
+ }
+ if (remappedChildren.isEmpty()) {
+ return null;
+ }
+ if (remappedChildren.size() == 1) {
+ return remappedChildren.get(0);
+ }
+ return new CompoundPredicate(predicate.function(),
remappedChildren);
+ }
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index c41810c0fd..16d2f599bd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -38,10 +38,6 @@ import org.apache.paimon.function.FunctionChange;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.partition.PartitionStatistics;
-import org.apache.paimon.predicate.And;
-import org.apache.paimon.predicate.CompoundPredicate;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.Transform;
import org.apache.paimon.rest.exceptions.AlreadyExistsException;
import org.apache.paimon.rest.exceptions.BadRequestException;
import org.apache.paimon.rest.exceptions.ForbiddenException;
@@ -64,7 +60,6 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableSnapshot;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.system.SystemTableLoader;
-import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotNotExistException;
import org.apache.paimon.view.View;
@@ -74,8 +69,6 @@ import org.apache.paimon.view.ViewSchema;
import org.apache.paimon.shade.org.apache.commons.lang3.StringUtils;
-import org.jetbrains.annotations.NotNull;
-
import javax.annotation.Nullable;
import java.io.IOException;
@@ -88,7 +81,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.TreeMap;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BRANCH;
@@ -539,7 +531,7 @@ public class RESTCatalog implements Catalog {
checkNotSystemTable(identifier, "authTable");
try {
AuthTableQueryResponse response = api.authTableQuery(identifier,
select);
- return getTableQueryAuthResult(response);
+ return new TableQueryAuthResult(response.filter(),
response.columnMasking());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (ForbiddenException e) {
@@ -1165,45 +1157,4 @@ public class RESTCatalog implements Catalog {
}
return schema;
}
-
- private static @NotNull TableQueryAuthResult getTableQueryAuthResult(
- AuthTableQueryResponse response) {
- List<String> predicateJsons = response == null ? null :
response.filter();
- Predicate rowFilter = null;
- if (predicateJsons != null && !predicateJsons.isEmpty()) {
- List<Predicate> predicates = new ArrayList<>();
- for (String json : predicateJsons) {
- if (StringUtils.isEmpty(json)) {
- continue;
- }
- Predicate predicate = JsonSerdeUtil.fromJson(json,
Predicate.class);
- if (predicate != null) {
- predicates.add(predicate);
- }
- }
- if (predicates.size() == 1) {
- rowFilter = predicates.get(0);
- } else if (!predicates.isEmpty()) {
- rowFilter = new CompoundPredicate(And.INSTANCE, predicates);
- }
- }
-
- Map<String, Transform> columnMasking = new TreeMap<>();
- Map<String, String> maskingJsons = response == null ? null :
response.columnMasking();
- if (maskingJsons != null && !maskingJsons.isEmpty()) {
- for (Map.Entry<String, String> e : maskingJsons.entrySet()) {
- String column = e.getKey();
- String json = e.getValue();
- if (StringUtils.isEmpty(column) || StringUtils.isEmpty(json)) {
- continue;
- }
- Transform transform = JsonSerdeUtil.fromJson(json,
Transform.class);
- if (transform == null) {
- continue;
- }
- columnMasking.put(column, transform);
- }
- }
- return new TableQueryAuthResult(rowFilter, columnMasking);
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index d07e009214..a0a23d8ca4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -28,7 +28,6 @@ import org.apache.paimon.catalog.CatalogSnapshotCommit;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.RenamingSnapshotCommit;
import org.apache.paimon.catalog.SnapshotCommit;
-import org.apache.paimon.catalog.TableQueryAuthResult;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.table.source.TableQueryAuth;
import org.apache.paimon.tag.SnapshotLoaderImpl;
@@ -154,7 +153,7 @@ public class CatalogEnvironment implements Serializable {
public TableQueryAuth tableQueryAuth(CoreOptions options) {
if (!options.queryAuthEnabled() || catalogLoader == null) {
- return select -> TableQueryAuthResult.empty();
+ return select -> null;
}
return select -> {
try (Catalog catalog = catalogLoader.load()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index bb4a3cce2b..b1dab4db56 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.source;
+import org.apache.paimon.catalog.TableQueryAuthResult;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.disk.IOManager;
@@ -90,7 +91,17 @@ public abstract class AbstractDataTableRead implements
InnerTableRead {
@Override
public final RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ TableQueryAuthResult queryAuthResult = null;
+ if (split instanceof QueryAuthSplit) {
+ QueryAuthSplit authSplit = (QueryAuthSplit) split;
+ split = authSplit.split();
+ queryAuthResult = authSplit.authResult();
+ }
RecordReader<InternalRow> reader = reader(split);
+ if (queryAuthResult != null) {
+ RowType type = readType == null ? schema.logicalRowType() :
readType;
+ reader = queryAuthResult.doAuth(reader, type);
+ }
if (executeFilter) {
reader = executeFilter(reader);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index c64b75557a..dbc88937a4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -65,7 +65,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -87,7 +86,6 @@ abstract class AbstractDataTableScan implements DataTableScan
{
private final TableQueryAuth queryAuth;
@Nullable private RowType readType;
- @Nullable private TableQueryAuthResult authResult;
protected AbstractDataTableScan(
TableSchema schema,
@@ -100,6 +98,18 @@ abstract class AbstractDataTableScan implements
DataTableScan {
this.queryAuth = queryAuth;
}
+ @Override
+ public final TableScan.Plan plan() {
+ TableQueryAuthResult queryAuthResult = authQuery();
+ Plan plan = planWithoutAuth();
+ if (queryAuthResult != null) {
+ plan = queryAuthResult.convertPlan(plan);
+ }
+ return plan;
+ }
+
+ protected abstract TableScan.Plan planWithoutAuth();
+
@Override
public InnerTableScan withFilter(Predicate predicate) {
snapshotReader.withFilter(predicate);
@@ -166,34 +176,12 @@ abstract class AbstractDataTableScan implements
DataTableScan {
return this;
}
+ @Nullable
protected TableQueryAuthResult authQuery() {
if (!options.queryAuthEnabled()) {
return null;
}
- if (authResult == null) {
- authResult = queryAuth.auth(readType == null ? null :
readType.getFieldNames());
- }
- return authResult;
- }
-
- protected TableScan.Plan applyAuthToSplits(Plan plan) {
- TableQueryAuthResult authResult = authQuery();
- if (authResult == null || authResult.isEmpty()) {
- return plan;
- }
-
- List<Split> splits = plan.splits();
- List<Split> authSplits = new ArrayList<>(splits.size());
- for (Split split : splits) {
- if (split instanceof DataSplit) {
- DataSplit dataSplit = (DataSplit) split;
- authSplits.add(QueryAuthSplit.wrap(dataSplit, authResult));
- } else {
- authSplits.add(split);
- }
- }
-
- return new DataFilePlan<>(authSplits);
+ return queryAuth.auth(readType == null ? null :
readType.getFieldNames());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
index e47ffcc2ba..c727fb63be 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
@@ -122,6 +122,6 @@ public final class AppendTableRead extends
AbstractDataTableRead {
}
}
- throw new RuntimeException("Should not happen.");
+ throw new RuntimeException("Unsupported split: " + split.getClass());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AuthAwareTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AuthAwareTableRead.java
deleted file mode 100644
index a7772b3c0a..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AuthAwareTableRead.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source;
-
-import org.apache.paimon.catalog.TableQueryAuthResult;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.metrics.MetricRegistry;
-import org.apache.paimon.predicate.CompoundPredicate;
-import org.apache.paimon.predicate.FieldRef;
-import org.apache.paimon.predicate.LeafPredicate;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateVisitor;
-import org.apache.paimon.predicate.Transform;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.RowType;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.paimon.utils.InternalRowUtils.get;
-
-/** A {@link TableRead} wrapper that checks splits for authorization
information. */
-public class AuthAwareTableRead implements TableRead {
-
- private final TableRead wrapped;
- private final RowType outputRowType;
-
- public AuthAwareTableRead(TableRead wrapped, RowType outputRowType) {
- this.wrapped = wrapped;
- this.outputRowType = outputRowType;
- }
-
- @Override
- public TableRead withMetricRegistry(MetricRegistry registry) {
- return new AuthAwareTableRead(wrapped.withMetricRegistry(registry),
outputRowType);
- }
-
- @Override
- public TableRead executeFilter() {
- return new AuthAwareTableRead(wrapped.executeFilter(), outputRowType);
- }
-
- @Override
- public TableRead withIOManager(IOManager ioManager) {
- return new AuthAwareTableRead(wrapped.withIOManager(ioManager),
outputRowType);
- }
-
- @Override
- public RecordReader<InternalRow> createReader(Split split) throws
IOException {
- if (split instanceof QueryAuthSplit) {
- TableQueryAuthResult authResult = ((QueryAuthSplit)
split).authResult();
- if (authResult != null) {
- RecordReader<InternalRow> reader =
- wrapped.createReader(((QueryAuthSplit)
split).dataSplit());
- // Apply row-level filter if present
- Predicate rowFilter = authResult.rowFilter();
- if (rowFilter != null) {
- Predicate remappedFilter =
remapPredicateToOutputRow(outputRowType, rowFilter);
- if (remappedFilter != null) {
- reader = new FilterRecordReader(reader,
remappedFilter);
- }
- }
-
- // Apply column masking if present
- Map<String, Transform> columnMasking =
authResult.columnMasking();
- if (columnMasking != null && !columnMasking.isEmpty()) {
- MaskingApplier applier = new MaskingApplier(outputRowType,
columnMasking);
- reader = new MaskingRecordReader(reader, applier);
- }
-
- return reader;
- }
- }
- return wrapped.createReader(split);
- }
-
- private static class FilterRecordReader implements
RecordReader<InternalRow> {
-
- private final RecordReader<InternalRow> wrapped;
- private final Predicate predicate;
-
- private FilterRecordReader(RecordReader<InternalRow> wrapped,
Predicate predicate) {
- this.wrapped = wrapped;
- this.predicate = predicate;
- }
-
- @Nullable
- @Override
- public RecordIterator<InternalRow> readBatch() throws IOException {
- RecordIterator<InternalRow> batch = wrapped.readBatch();
- if (batch == null) {
- return null;
- }
- return new FilterRecordIterator(batch, predicate);
- }
-
- @Override
- public void close() throws IOException {
- wrapped.close();
- }
- }
-
- private static class FilterRecordIterator implements
RecordReader.RecordIterator<InternalRow> {
-
- private final RecordReader.RecordIterator<InternalRow> wrapped;
- private final Predicate predicate;
-
- private FilterRecordIterator(
- RecordReader.RecordIterator<InternalRow> wrapped, Predicate
predicate) {
- this.wrapped = wrapped;
- this.predicate = predicate;
- }
-
- @Nullable
- @Override
- public InternalRow next() throws IOException {
- while (true) {
- InternalRow row = wrapped.next();
- if (row == null) {
- return null;
- }
- if (predicate.test(row)) {
- return row;
- }
- }
- }
-
- @Override
- public void releaseBatch() {
- wrapped.releaseBatch();
- }
- }
-
- private static class MaskingRecordReader implements
RecordReader<InternalRow> {
-
- private final RecordReader<InternalRow> wrapped;
- private final MaskingApplier applier;
-
- private MaskingRecordReader(RecordReader<InternalRow> wrapped,
MaskingApplier applier) {
- this.wrapped = wrapped;
- this.applier = applier;
- }
-
- @Nullable
- @Override
- public RecordIterator<InternalRow> readBatch() throws IOException {
- RecordIterator<InternalRow> batch = wrapped.readBatch();
- if (batch == null) {
- return null;
- }
- return batch.transform(applier::apply);
- }
-
- @Override
- public void close() throws IOException {
- wrapped.close();
- }
- }
-
- private static class MaskingApplier {
-
- private final RowType outputRowType;
- private final Map<Integer, Transform> remapped;
-
- private MaskingApplier(RowType outputRowType, Map<String, Transform>
masking) {
- this.outputRowType = outputRowType;
- this.remapped = remapToOutputRow(outputRowType, masking);
- }
-
- private InternalRow apply(InternalRow row) {
- if (remapped.isEmpty()) {
- return row;
- }
- int arity = outputRowType.getFieldCount();
- GenericRow out = new GenericRow(row.getRowKind(), arity);
- for (int i = 0; i < arity; i++) {
- DataType type = outputRowType.getTypeAt(i);
- out.setField(i, get(row, i, type));
- }
- for (Map.Entry<Integer, Transform> e : remapped.entrySet()) {
- int targetIndex = e.getKey();
- Transform transform = e.getValue();
- Object masked = transform.transform(row);
- out.setField(targetIndex, masked);
- }
- return out;
- }
-
- private static Map<Integer, Transform> remapToOutputRow(
- RowType outputRowType, Map<String, Transform> masking) {
- Map<Integer, Transform> out = new HashMap<>();
- if (masking == null || masking.isEmpty()) {
- return out;
- }
-
- for (Map.Entry<String, Transform> e : masking.entrySet()) {
- String targetColumn = e.getKey();
- Transform transform = e.getValue();
- if (targetColumn == null || transform == null) {
- continue;
- }
-
- int targetIndex = outputRowType.getFieldIndex(targetColumn);
- if (targetIndex < 0) {
- continue;
- }
-
- List<Object> newInputs = new ArrayList<>();
- for (Object input : transform.inputs()) {
- if (input instanceof FieldRef) {
- FieldRef ref = (FieldRef) input;
- int newIndex = outputRowType.getFieldIndex(ref.name());
- if (newIndex < 0) {
- throw new IllegalArgumentException(
- "Column masking refers to field '"
- + ref.name()
- + "' which is not present in
output row type "
- + outputRowType);
- }
- DataType type = outputRowType.getTypeAt(newIndex);
- newInputs.add(new FieldRef(newIndex, ref.name(),
type));
- } else {
- newInputs.add(input);
- }
- }
- out.put(targetIndex, transform.copyWithNewInputs(newInputs));
- }
- return out;
- }
- }
-
- private static Predicate remapPredicateToOutputRow(RowType outputRowType,
Predicate predicate) {
- return predicate.visit(new PredicateRemapper(outputRowType));
- }
-
- private static class PredicateRemapper implements
PredicateVisitor<Predicate> {
- private final RowType outputRowType;
-
- private PredicateRemapper(RowType outputRowType) {
- this.outputRowType = outputRowType;
- }
-
- @Override
- public Predicate visit(LeafPredicate predicate) {
- Transform transform = predicate.transform();
- List<Object> newInputs = new ArrayList<>();
- boolean hasUnmappedField = false;
- for (Object input : transform.inputs()) {
- if (input instanceof FieldRef) {
- FieldRef ref = (FieldRef) input;
- String fieldName = ref.name();
- int newIndex = outputRowType.getFieldIndex(fieldName);
- if (newIndex < 0) {
- hasUnmappedField = true;
- break;
- }
- DataType type = outputRowType.getTypeAt(newIndex);
- newInputs.add(new FieldRef(newIndex, fieldName, type));
- } else {
- newInputs.add(input);
- }
- }
- if (hasUnmappedField) {
- return null;
- }
- return predicate.copyWithNewInputs(newInputs);
- }
-
- @Override
- public Predicate visit(CompoundPredicate predicate) {
- List<Predicate> remappedChildren = new ArrayList<>();
- for (Predicate child : predicate.children()) {
- Predicate remapped = child.visit(this);
- if (remapped != null) {
- remappedChildren.add(remapped);
- }
- }
- if (remappedChildren.isEmpty()) {
- return null;
- }
- if (remappedChildren.size() == 1) {
- return remappedChildren.get(0);
- }
- return new CompoundPredicate(predicate.function(),
remappedChildren);
- }
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 9fe11c44fd..67c8acbf62 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -90,7 +90,7 @@ public class DataTableBatchScan extends AbstractDataTableScan
{
}
@Override
- public TableScan.Plan plan() {
+ protected TableScan.Plan planWithoutAuth() {
if (startingScanner == null) {
startingScanner = createStartingScanner(false);
}
@@ -99,13 +99,13 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
hasNext = false;
Optional<StartingScanner.Result> pushed = applyPushDownLimit();
if (pushed.isPresent()) {
- return
applyAuthToSplits(DataFilePlan.fromResult(pushed.get()));
+ return DataFilePlan.fromResult(pushed.get());
}
pushed = applyPushDownTopN();
if (pushed.isPresent()) {
- return
applyAuthToSplits(DataFilePlan.fromResult(pushed.get()));
+ return DataFilePlan.fromResult(pushed.get());
}
- return
applyAuthToSplits(DataFilePlan.fromResult(startingScanner.scan(snapshotReader)));
+ return
DataFilePlan.fromResult(startingScanner.scan(snapshotReader));
} else {
throw new EndOfScanException();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 678cf0f4df..8b5031de4c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -117,7 +117,7 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
}
@Override
- public Plan plan() {
+ protected TableScan.Plan planWithoutAuth() {
if (!initialized) {
initScanner();
}
@@ -180,7 +180,7 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
"Starting snapshot is {}, next snapshot will be {}.",
scannedResult.plan().snapshotId(),
nextSnapshotId);
- return applyAuthToSplits(scannedResult.plan());
+ return scannedResult.plan();
} else if (result instanceof StartingScanner.NextSnapshot) {
nextSnapshotId = ((StartingScanner.NextSnapshot)
result).nextSnapshotId();
isFullPhaseEnd =
@@ -221,7 +221,7 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
if (overwritePlan.splits().isEmpty()) {
continue;
}
- return applyAuthToSplits(overwritePlan);
+ return overwritePlan;
}
}
@@ -233,7 +233,7 @@ public class DataTableStreamScan extends
AbstractDataTableScan implements Stream
if (plan.splits().isEmpty()) {
continue;
}
- return applyAuthToSplits(plan);
+ return plan;
} else {
nextSnapshotId++;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/PushDownUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/PushDownUtils.java
index 833d6422fb..65793014f1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/PushDownUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/PushDownUtils.java
@@ -55,16 +55,21 @@ public class PushDownUtils {
|| type instanceof DateType;
}
- public static boolean minmaxAvailable(DataSplit split, Set<String>
columns) {
+ public static boolean minmaxAvailable(Split split, Set<String> columns) {
+ if (!(split instanceof DataSplit)) {
+ return false;
+ }
+
+ DataSplit dataSplit = (DataSplit) split;
if (isNullOrEmpty(columns)) {
return false;
}
- if (!split.rawConvertible()) {
+ if (!dataSplit.rawConvertible()) {
return false;
}
- return split.dataFiles().stream()
+ return dataSplit.dataFiles().stream()
.map(DataFileMeta::valueStatsCols)
.allMatch(
valueStatsCols ->
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
index 9daa9fe9ea..75da5bda7f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
@@ -19,42 +19,24 @@
package org.apache.paimon.table.source;
import org.apache.paimon.catalog.TableQueryAuthResult;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataInputView;
-import org.apache.paimon.io.DataInputViewStreamWrapper;
-import org.apache.paimon.io.DataOutputView;
-import org.apache.paimon.stats.SimpleStatsEvolutions;
import javax.annotation.Nullable;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.OptionalLong;
-
-/**
- * A wrapper class for {@link DataSplit} that adds query authorization
information. This class
- * delegates all Split interface methods to the wrapped DataSplit, while
providing additional auth
- * result functionality.
- */
-public class QueryAuthSplit extends DataSplit {
+/** A wrapper class for {@link Split} that adds query authorization
information. */
+public class QueryAuthSplit implements Split {
private static final long serialVersionUID = 1L;
- private DataSplit dataSplit;
- @Nullable private TableQueryAuthResult authResult;
+ private final Split split;
+ private final TableQueryAuthResult authResult;
- public QueryAuthSplit(DataSplit dataSplit, @Nullable TableQueryAuthResult
authResult) {
- this.dataSplit = dataSplit;
+ public QueryAuthSplit(Split split, TableQueryAuthResult authResult) {
+ this.split = split;
this.authResult = authResult;
}
- public DataSplit dataSplit() {
- return dataSplit;
+ public Split split() {
+ return split;
}
@Nullable
@@ -62,171 +44,8 @@ public class QueryAuthSplit extends DataSplit {
return authResult;
}
- // Delegate all DataSplit methods to the wrapped instance
-
- public long snapshotId() {
- return dataSplit.snapshotId();
- }
-
- public BinaryRow partition() {
- return dataSplit.partition();
- }
-
- public int bucket() {
- return dataSplit.bucket();
- }
-
- public String bucketPath() {
- return dataSplit.bucketPath();
- }
-
- @Nullable
- public Integer totalBuckets() {
- return dataSplit.totalBuckets();
- }
-
- public List<DataFileMeta> beforeFiles() {
- return dataSplit.beforeFiles();
- }
-
- public Optional<List<DeletionFile>> beforeDeletionFiles() {
- return dataSplit.beforeDeletionFiles();
- }
-
- public List<DataFileMeta> dataFiles() {
- return dataSplit.dataFiles();
- }
-
@Override
- public Optional<List<DeletionFile>> deletionFiles() {
- return dataSplit.deletionFiles();
- }
-
- public boolean isStreaming() {
- return dataSplit.isStreaming();
- }
-
- public boolean rawConvertible() {
- return dataSplit.rawConvertible();
- }
-
- public OptionalLong latestFileCreationEpochMillis() {
- return dataSplit.latestFileCreationEpochMillis();
- }
-
- public OptionalLong earliestFileCreationEpochMillis() {
- return dataSplit.earliestFileCreationEpochMillis();
- }
-
public long rowCount() {
- return dataSplit.rowCount();
- }
-
- public boolean mergedRowCountAvailable() {
- return dataSplit.mergedRowCountAvailable();
- }
-
- public long mergedRowCount() {
- return dataSplit.mergedRowCount();
- }
-
- public Object minValue(
- int fieldIndex,
- org.apache.paimon.types.DataField dataField,
- SimpleStatsEvolutions evolutions) {
- return dataSplit.minValue(fieldIndex, dataField, evolutions);
- }
-
- public Object maxValue(
- int fieldIndex,
- org.apache.paimon.types.DataField dataField,
- SimpleStatsEvolutions evolutions) {
- return dataSplit.maxValue(fieldIndex, dataField, evolutions);
- }
-
- public Long nullCount(int fieldIndex, SimpleStatsEvolutions evolutions) {
- return dataSplit.nullCount(fieldIndex, evolutions);
- }
-
- public long partialMergedRowCount() {
- return dataSplit.partialMergedRowCount();
- }
-
- @Override
- public Optional<List<RawFile>> convertToRawFiles() {
- return dataSplit.convertToRawFiles();
- }
-
- @Override
- @Nullable
- public Optional<List<IndexFile>> indexFiles() {
- return dataSplit.indexFiles();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- QueryAuthSplit that = (QueryAuthSplit) o;
- return Objects.equals(dataSplit, that.dataSplit)
- && Objects.equals(authResult, that.authResult);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(dataSplit, authResult);
- }
-
- @Override
- public String toString() {
- return "QueryAuthSplit{" + "dataSplit=" + dataSplit + ", authResult="
+ authResult + '}';
- }
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- serialize(new org.apache.paimon.io.DataOutputViewStreamWrapper(out));
- }
-
- private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
- QueryAuthSplit other = deserialize(new DataInputViewStreamWrapper(in));
- this.dataSplit = other.dataSplit;
- this.authResult = other.authResult;
- }
-
- public void serialize(DataOutputView out) throws IOException {
- // Serialize the wrapped DataSplit
- dataSplit.serialize(out);
-
- // Serialize authResult
- if (authResult != null) {
- out.writeBoolean(true);
- TableQueryAuthResultSerializer.serialize(authResult, out);
- } else {
- out.writeBoolean(false);
- }
- }
-
- public static QueryAuthSplit deserialize(DataInputView in) throws
IOException {
- // Deserialize the wrapped DataSplit
- DataSplit dataSplit = DataSplit.deserialize(in);
-
- // Deserialize authResult
- TableQueryAuthResult authResult = null;
- if (in.readBoolean()) {
- authResult = TableQueryAuthResultSerializer.deserialize(in);
- }
-
- return new QueryAuthSplit(dataSplit, authResult);
- }
-
- public static QueryAuthSplit wrap(
- DataSplit dataSplit, @Nullable TableQueryAuthResult authResult) {
- if (authResult == null || authResult.isEmpty()) {
- return new QueryAuthSplit(dataSplit, null);
- }
- return new QueryAuthSplit(dataSplit, authResult);
+ return split.rowCount();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 8da04edae0..c81dfd8e01 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -26,7 +26,6 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.predicate.VectorSearch;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
@@ -257,12 +256,6 @@ public class ReadBuilderImpl implements ReadBuilder {
if (variantAccessInfo != null) {
read.withVariantAccess(variantAccessInfo);
}
- if (table instanceof FileStoreTable) {
- CoreOptions options = new CoreOptions(table.options());
- if (options.queryAuthEnabled()) {
- return new AuthAwareTableRead(read, readType());
- }
- }
return read;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java
index 3d45ec2f33..b4e98576ad 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java
@@ -27,5 +27,6 @@ import java.util.List;
/** Table query auth. */
public interface TableQueryAuth {
+ @Nullable
TableQueryAuthResult auth(@Nullable List<String> select);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuthResultSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuthResultSerializer.java
deleted file mode 100644
index c557ee08e0..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuthResultSerializer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source;
-
-import org.apache.paimon.catalog.TableQueryAuthResult;
-import org.apache.paimon.io.DataInputView;
-import org.apache.paimon.io.DataOutputView;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.Transform;
-import org.apache.paimon.rest.RESTApi;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/** Serializer for {@link TableQueryAuthResult}. */
-public class TableQueryAuthResultSerializer {
- public static void serialize(TableQueryAuthResult authResult,
DataOutputView out)
- throws IOException {
- // Serialize row filter
- if (authResult.rowFilter() != null) {
- out.writeBoolean(true);
- String predicateJson = RESTApi.toJson(authResult.rowFilter());
- out.writeUTF(predicateJson);
- } else {
- out.writeBoolean(false);
- }
-
- // Serialize column masking
- Map<String, Transform> columnMasking = authResult.columnMasking();
- out.writeInt(columnMasking.size());
- for (Map.Entry<String, Transform> entry : columnMasking.entrySet()) {
- out.writeUTF(entry.getKey());
- String transformJson = RESTApi.toJson(entry.getValue());
- out.writeUTF(transformJson);
- }
- }
-
- public static TableQueryAuthResult deserialize(DataInputView in) throws
IOException {
- // Deserialize row filter
- Predicate rowFilter = null;
- if (in.readBoolean()) {
- String predicateJson = in.readUTF();
- rowFilter = RESTApi.fromJson(predicateJson, Predicate.class);
- }
-
- // Deserialize column masking
- int maskingSize = in.readInt();
- Map<String, Transform> columnMasking = new HashMap<>(maskingSize);
- for (int i = 0; i < maskingSize; i++) {
- String columnName = in.readUTF();
- String transformJson = in.readUTF();
- Transform transform = RESTApi.fromJson(transformJson,
Transform.class);
- columnMasking.put(columnName, transform);
- }
-
- return new TableQueryAuthResult(rowFilter, columnMasking);
- }
-}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
index f628a68ea0..a5a84b2512 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
@@ -296,6 +296,7 @@ class MockRESTCatalogTest extends RESTCatalogTest {
Transform transform =
new UpperTransform(
Collections.singletonList(new FieldRef(1, "col2",
DataTypes.STRING())));
+ String transformJson = JsonSerdeUtil.toFlatJson(transform);
// Set up mock response with filter and columnMasking
List<Predicate> rowFilters = Collections.singletonList(predicate);
@@ -305,10 +306,10 @@ class MockRESTCatalogTest extends RESTCatalogTest {
restCatalogServer.setColumnMaskingAuth(identifier, columnMasking);
TableQueryAuthResult result = catalog.authTableQuery(identifier, null);
- assertThat(result.rowFilter()).isEqualTo(predicate);
+ assertThat(result.filter()).containsOnly(predicateJson);
assertThat(result.columnMasking()).isNotEmpty();
assertThat(result.columnMasking()).containsKey("col2");
- assertThat(result.columnMasking().get("col2")).isEqualTo(transform);
+
assertThat(result.columnMasking().get("col2")).isEqualTo(transformJson);
catalog.dropTable(identifier, true);
catalog.dropDatabase(identifier.getDatabaseName(), true, true);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
index f955418b6d..33d7344f6a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -39,6 +39,7 @@ import static
org.apache.paimon.table.source.KeyValueTableRead.unwrap;
/** An {@link InnerTableRead} that reads the data changed before and after
compaction. */
public class LookupCompactDiffRead extends AbstractDataTableRead {
+
private final SplitRead<InternalRow> fullPhaseMergeRead;
private final SplitRead<InternalRow> incrementalDiffRead;
@@ -67,7 +68,7 @@ public class LookupCompactDiffRead extends
AbstractDataTableRead {
if (dataSplit.beforeFiles().isEmpty()) {
return fullPhaseMergeRead.createReader(dataSplit); // full reading
phase
} else {
- return incrementalDiffRead.createReader((DataSplit) split);
+ return incrementalDiffRead.createReader(split);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index 48fefa8e24..5fa01a6bb7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -58,6 +58,7 @@ import java.util.Map;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertThrows;
/** ITCase for REST catalog. */
@@ -549,13 +550,23 @@ class RESTCatalogITCase extends RESTCatalogITCaseBase {
assertThat(combinedResult.get(0).getField(3)).isEqualTo(25); // age
not masked
assertThat(combinedResult.get(0).getField(4)).isEqualTo("IT"); //
department not masked
+ // Test must read with row filter columns
+ assertThatThrownBy(
+ () ->
+ batchSql(
+ String.format(
+ "SELECT id, name FROM %s.%s
WHERE age > 30 ORDER BY id",
+ DATABASE_NAME, combinedTable)))
+ .rootCause()
+ .hasMessageContaining("Unable to read data without column
department");
+
// Test WHERE clause with both features
assertThat(
batchSql(
String.format(
- "SELECT id, name FROM %s.%s WHERE age
> 30 ORDER BY id",
+ "SELECT id, name, department FROM
%s.%s WHERE age > 30 ORDER BY id",
DATABASE_NAME, combinedTable)))
- .containsExactlyInAnyOrder(Row.of(3, "***"));
+ .containsExactlyInAnyOrder(Row.of(3, "***", "IT"));
// Clear both column masking and row filter
restCatalogServer.setColumnMaskingAuth(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
index e07db9515e..df6af4d483 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark.aggregate
import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.source.{DataSplit, ReadBuilder}
+import org.apache.paimon.table.source.{DataSplit, ReadBuilder, Split}
import org.apache.paimon.table.source.PushDownUtils.minmaxAvailable
import org.apache.paimon.types._
@@ -29,6 +29,7 @@ import
org.apache.spark.sql.execution.datasources.v2.V2ColumnUtils.extractV2Colu
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.language.postfixOps
object AggregatePushDownUtils {
@@ -64,18 +65,23 @@ object AggregatePushDownUtils {
case None => return None
}
- if (!splits.forall(_.mergedRowCountAvailable())) {
+ if (!splits.forall(_.isInstanceOf[DataSplit])) {
+ return None
+ }
+ val dataSplits = splits.map(_.asInstanceOf[DataSplit])
+
+ if (!dataSplits.forall(_.mergedRowCountAvailable())) {
return None
}
val aggregator = new LocalAggregator(table)
aggregator.initialize(aggregation)
- splits.foreach(aggregator.update)
+ dataSplits.foreach(aggregator.update)
Option(aggregator)
}
- private def generateSplits(readBuilder: ReadBuilder): mutable.Seq[DataSplit]
= {
-
readBuilder.newScan().plan().splits().asScala.map(_.asInstanceOf[DataSplit])
+ private def generateSplits(readBuilder: ReadBuilder): mutable.Seq[Split] = {
+ readBuilder.newScan().plan().splits().asScala
}
private def extractMinMaxColumns(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
index 8e635d404c..679234bc74 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
@@ -25,7 +25,7 @@ import org.apache.paimon.spark.PaimonInputPartition
import org.apache.paimon.spark.util.SplitUtils
import org.apache.paimon.table.FallbackReadFileStoreTable.FallbackSplit
import org.apache.paimon.table.format.FormatDataSplit
-import org.apache.paimon.table.source.{DataSplit, DeletionFile,
QueryAuthSplit, Split}
+import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.PaimonSparkSession
@@ -160,32 +160,19 @@ case class BinPackingSplits(coreOptions: CoreOptions,
readRowSizeRatio: Double =
split: DataSplit,
dataFiles: Seq[DataFileMeta],
deletionFiles: Seq[DeletionFile]): DataSplit = {
- val (actualSplit, authResult) = split match {
- case queryAuthSplit: QueryAuthSplit =>
- (queryAuthSplit.dataSplit(), queryAuthSplit.authResult())
- case _ =>
- (split, null)
- }
-
val builder = DataSplit
.builder()
- .withSnapshot(actualSplit.snapshotId())
- .withPartition(actualSplit.partition())
- .withBucket(actualSplit.bucket())
- .withTotalBuckets(actualSplit.totalBuckets())
+ .withSnapshot(split.snapshotId())
+ .withPartition(split.partition())
+ .withBucket(split.bucket())
+ .withTotalBuckets(split.totalBuckets())
.withDataFiles(dataFiles.toList.asJava)
- .rawConvertible(actualSplit.rawConvertible)
- .withBucketPath(actualSplit.bucketPath)
+ .rawConvertible(split.rawConvertible)
+ .withBucketPath(split.bucketPath)
if (deletionVectors) {
builder.withDataDeletionFiles(deletionFiles.toList.asJava)
}
- val newDataSplit = builder.build()
-
- if (authResult != null) {
- new QueryAuthSplit(newDataSplit, authResult)
- } else {
- newDataSplit
- }
+ builder.build()
}
private def withSamePartitionAndBucket(split1: DataSplit, split2:
DataSplit): Boolean = {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
index bbae01af95..cdf6384da3 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
@@ -71,6 +71,7 @@ import java.util.Map;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for spark read from Rest catalog. */
public class SparkCatalogWithRestTest {
@@ -511,12 +512,21 @@ public class SparkCatalogWithRestTest {
assertThat(combinedResult.get(0).getInt(3)).isEqualTo(25); // age not
masked
assertThat(combinedResult.get(0).getString(4)).isEqualTo("IT"); //
department not masked
+ // Test must read with row filter columns
+ assertThatThrownBy(
+ () ->
+ spark.sql(
+ "SELECT id, name FROM
t_combined WHERE age > 30 ORDER BY id")
+ .collectAsList())
+ .hasMessageContaining("Unable to read data without column
department");
+
// Test WHERE clause with both features
assertThat(
- spark.sql("SELECT id, name FROM t_combined WHERE age >
30 ORDER BY id")
+ spark.sql(
+ "SELECT id, name, department FROM
t_combined WHERE age > 30 ORDER BY id")
.collectAsList()
.toString())
- .isEqualTo("[[3,***]]");
+ .isEqualTo("[[3,***,IT]]");
// Clear both column masking and row filter
restCatalogServer.setColumnMaskingAuth(