This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2538f69321 [core] DataTableRead should add auth columns to row filters
2538f69321 is described below
commit 2538f69321423bdeb061e1b6e7ccf0ab873fbea4
Author: JingsongLi <[email protected]>
AuthorDate: Thu Jan 29 16:18:05 2026 +0800
[core] DataTableRead should add auth columns to row filters
---
.../apache/paimon/predicate/PredicateVisitor.java | 37 +++++++++++++++
.../table/format/predicate/PredicateUtils.java | 33 +------------
.../paimon/table/source/AbstractDataTableRead.java | 49 +++++++++++++++++---
.../paimon/flink/source/FlinkSourceBuilder.java | 54 +---------------------
.../paimon/spark/PaimonBaseScanBuilder.scala | 26 +----------
5 files changed, 83 insertions(+), 116 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
index c741c05d46..f14cb32886 100644
---
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java
@@ -18,10 +18,47 @@
package org.apache.paimon.predicate;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
/** A visitor to visit {@link Predicate}. */
public interface PredicateVisitor<T> {
T visit(LeafPredicate predicate);
T visit(CompoundPredicate predicate);
+
+ static Set<String> collectFieldNames(@Nullable Predicate predicate) {
+ if (predicate == null) {
+ return Collections.emptySet();
+ }
+ return predicate.visit(new FieldNameCollector());
+ }
+
+ /** A visitor that collects all field names referenced by a predicate. */
+ class FieldNameCollector implements PredicateVisitor<Set<String>> {
+
+ @Override
+ public Set<String> visit(LeafPredicate predicate) {
+ Set<String> fieldNames = new HashSet<>();
+ for (Object input : predicate.transform().inputs()) {
+ if (input instanceof FieldRef) {
+ fieldNames.add(((FieldRef) input).name());
+ }
+ }
+ return fieldNames;
+ }
+
+ @Override
+ public Set<String> visit(CompoundPredicate predicate) {
+ Set<String> fieldNames = new HashSet<>();
+ for (Predicate child : predicate.children()) {
+ fieldNames.addAll(child.visit(this));
+ }
+ return fieldNames;
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java
index aac792f196..86ef0a13ad 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java
@@ -18,12 +18,8 @@
package org.apache.paimon.table.format.predicate;
-import org.apache.paimon.predicate.CompoundPredicate;
-import org.apache.paimon.predicate.FieldRef;
-import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.types.RowType;
import java.util.Arrays;
@@ -35,6 +31,7 @@ import java.util.Optional;
import java.util.Set;
import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
+import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames;
/** Utility methods for working with {@link Predicate}s. */
public class PredicateUtils {
@@ -69,32 +66,4 @@ public class PredicateUtils {
return result;
}
-
- public static Set<String> collectFieldNames(Predicate predicate) {
- return predicate.visit(new FieldNameCollector());
- }
-
- /** A visitor that collects all field names referenced by a predicate. */
- private static class FieldNameCollector implements
PredicateVisitor<Set<String>> {
-
- @Override
- public Set<String> visit(LeafPredicate predicate) {
- Set<String> fieldNames = new HashSet<>();
- for (Object input : predicate.transform().inputs()) {
- if (input instanceof FieldRef) {
- fieldNames.add(((FieldRef) input).name());
- }
- }
- return fieldNames;
- }
-
- @Override
- public Set<String> visit(CompoundPredicate predicate) {
- Set<String> fieldNames = new HashSet<>();
- for (Predicate child : predicate.children()) {
- fieldNames.addAll(child.visit(this));
- }
- return fieldNames;
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index e1dae66b30..691cf0f765 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -26,9 +26,16 @@ import
org.apache.paimon.predicate.PredicateProjectionConverter;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ListUtils;
+import org.apache.paimon.utils.ProjectedRow;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames;
/** A {@link InnerTableRead} for data table. */
public abstract class AbstractDataTableRead implements InnerTableRead {
@@ -82,16 +89,17 @@ public abstract class AbstractDataTableRead implements
InnerTableRead {
@Override
public final RecordReader<InternalRow> createReader(Split split) throws
IOException {
- TableQueryAuthResult queryAuthResult = null;
+ TableQueryAuthResult authResult = null;
if (split instanceof QueryAuthSplit) {
QueryAuthSplit authSplit = (QueryAuthSplit) split;
split = authSplit.split();
- queryAuthResult = authSplit.authResult();
+ authResult = authSplit.authResult();
}
- RecordReader<InternalRow> reader = reader(split);
- if (queryAuthResult != null) {
- RowType type = readType == null ? schema.logicalRowType() :
readType;
- reader = queryAuthResult.doAuth(reader, type);
+ RecordReader<InternalRow> reader;
+ if (authResult == null) {
+ reader = reader(split);
+ } else {
+ reader = authedReader(split, authResult);
}
if (executeFilter) {
reader = executeFilter(reader);
@@ -100,6 +108,35 @@ public abstract class AbstractDataTableRead implements
InnerTableRead {
return reader;
}
+ private RecordReader<InternalRow> authedReader(Split split,
TableQueryAuthResult authResult)
+ throws IOException {
+ RecordReader<InternalRow> reader;
+ RowType tableType = schema.logicalRowType();
+ RowType readType = this.readType == null ? tableType : this.readType;
+ Predicate authPredicate = authResult.extractPredicate();
+ ProjectedRow backRow = null;
+ if (authPredicate != null) {
+ Set<String> authFields = collectFieldNames(authPredicate);
+ List<String> readFields = readType.getFieldNames();
+ List<String> authAddNames = new ArrayList<>();
+ for (String field : tableType.getFieldNames()) {
+ if (authFields.contains(field) && !readFields.contains(field))
{
+ authAddNames.add(field);
+ }
+ }
+ if (!authAddNames.isEmpty()) {
+ readType = tableType.project(ListUtils.union(readFields,
authAddNames));
+ withReadType(readType);
+ backRow =
ProjectedRow.from(readType.projectIndexes(readFields));
+ }
+ }
+ reader = authResult.doAuth(reader(split), readType);
+ if (backRow != null) {
+ reader = reader.transform(backRow::replaceRow);
+ }
+ return reader;
+ }
+
private RecordReader<InternalRow> executeFilter(RecordReader<InternalRow>
reader) {
if (predicate == null) {
return reader;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 6497f28184..3e96dec1ea 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.catalog.TableQueryAuthResult;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.NestedProjectedRowData;
import org.apache.paimon.flink.Projection;
@@ -34,10 +33,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.format.predicate.PredicateUtils;
import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.table.source.TableQueryAuth;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -61,11 +57,8 @@ import org.apache.flink.types.Row;
import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
import static
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SOURCE_OPERATOR_UID_SUFFIX;
@@ -196,7 +189,7 @@ public class FlinkSourceBuilder {
private ReadBuilder createReadBuilder(@Nullable
org.apache.paimon.types.RowType readType) {
ReadBuilder readBuilder = table.newReadBuilder();
if (readType != null) {
- readBuilder.withReadType(readTypeWithAuth(readType));
+ readBuilder.withReadType(readType);
}
if (predicate != null) {
readBuilder.withFilter(predicate);
@@ -397,49 +390,4 @@ public class FlinkSourceBuilder {
"The align mode of paimon source currently only supports
EXACTLY_ONCE checkpoint mode. Please set "
+ "execution.checkpointing.mode to exactly-once");
}
-
- private org.apache.paimon.types.RowType readTypeWithAuth(
- org.apache.paimon.types.RowType readType) {
- if (!(table instanceof FileStoreTable)) {
- return readType;
- }
-
- FileStoreTable fileStoreTable = (FileStoreTable) table;
- TableQueryAuth auth =
- fileStoreTable.catalogEnvironment().tableQueryAuth(new
CoreOptions(conf.toMap()));
- if (auth == null) {
- return readType;
- }
-
- List<String> requiredFieldNames = readType.getFieldNames();
- TableQueryAuthResult result = auth.auth(requiredFieldNames);
- if (result == null) {
- return readType;
- }
-
- Predicate authPredicate = result.extractPredicate();
- if (authPredicate == null) {
- return readType;
- }
-
- Set<String> authFieldNames =
PredicateUtils.collectFieldNames(authPredicate);
- Set<String> requiredFieldNameSet = new HashSet<>(requiredFieldNames);
-
- List<DataField> additionalFields = new ArrayList<>();
- for (DataField field : table.rowType().getFields()) {
- if (authFieldNames.contains(field.name())
- && !requiredFieldNameSet.contains(field.name())) {
- additionalFields.add(field);
- }
- }
-
- if (additionalFields.isEmpty()) {
- return readType;
- }
-
- // Create new read type with additional fields
- List<DataField> newFields = new ArrayList<>(readType.getFields());
- newFields.addAll(additionalFields);
- return new org.apache.paimon.types.RowType(newFields);
- }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 4bca5bad11..47723171e4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -22,8 +22,7 @@ import org.apache.paimon.CoreOptions
import org.apache.paimon.partition.PartitionPredicate
import
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate,
TopN, VectorSearch}
-import org.apache.paimon.table.{FileStoreTable, SpecialFields, Table}
-import org.apache.paimon.table.format.predicate.PredicateUtils
+import org.apache.paimon.table.{SpecialFields, Table}
import org.apache.paimon.types.RowType
import org.apache.spark.sql.connector.expressions.filter.{Predicate =>
SparkPredicate}
@@ -59,29 +58,6 @@ abstract class PaimonBaseScanBuilder
override def pruneColumns(requiredSchema: StructType): Unit = {
this.requiredSchema = requiredSchema
- pruneColumnsWithAuth(requiredSchema)
- }
-
- private def pruneColumnsWithAuth(requiredSchema: StructType): Unit = {
- val auth = table match {
- case fileStoreTable: FileStoreTable =>
- fileStoreTable.catalogEnvironment().tableQueryAuth(coreOptions)
- case _ =>
- return
- }
-
- val result = auth.auth(requiredSchema.fieldNames.toList.asJava)
- if (result != null) {
- val predicate = result.extractPredicate()
- if (predicate != null) {
- val names = PredicateUtils.collectFieldNames(predicate)
- val fullType = SparkTypeUtils.fromPaimonRowType(table.rowType())
- val requiredFieldNames = requiredSchema.fieldNames.toSet
- val addFields = fullType.fields.filter(
- field => names.contains(field.name) &&
!requiredFieldNames.contains(field.name))
- this.requiredSchema = StructType(requiredSchema.fields ++ addFields)
- }
- }
}
override def pushPredicates(predicates: Array[SparkPredicate]):
Array[SparkPredicate] = {