This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-presto.git
The following commit(s) were added to refs/heads/main by this push:
new 80d45e8 (feat) Support partition pushdown with complex predicate (#45)
80d45e8 is described below
commit 80d45e81098f3f4bdc5b692f23c12a23f1cf60fc
Author: WenjunMin <[email protected]>
AuthorDate: Mon Dec 16 10:24:55 2024 +0800
(feat) Support partition pushdown with complex predicate (#45)
---
.../paimon/presto/PrestoComputePushdown.java | 13 +-
.../paimon/presto/TestPrestoComputePushdown.java | 68 ++++-
.../org/apache/paimon/presto/PaimonConfig.java | 12 +
.../paimon/presto/PrestoComputePushdown.java | 331 ++++++++++++++++++++-
.../paimon/presto/PrestoPlanOptimizerProvider.java | 17 +-
.../paimon/presto/PrestoSessionProperties.java | 10 +
.../apache/paimon/presto/PrestoSplitManager.java | 29 ++
.../apache/paimon/presto/PrestoTableHandle.java | 50 +++-
.../org/apache/paimon/presto/PrestoTypeUtils.java | 104 +++++++
.../paimon/presto/PrestoTableHandleTest.java | 7 +-
.../paimon/presto/TestPrestoComputePushdown.java | 74 ++++-
.../org/apache/paimon/presto/TestPrestoITCase.java | 131 +++++++-
12 files changed, 823 insertions(+), 23 deletions(-)
diff --git
a/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
b/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
index 91955b5..f01c381 100644
---
a/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
+++
b/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
@@ -26,6 +26,7 @@ import com.facebook.presto.spi.ConnectorPlanOptimizer;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.VariableAllocator;
+import com.facebook.presto.spi.function.FunctionMetadataManager;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.PlanNode;
@@ -54,14 +55,21 @@ public class PrestoComputePushdown implements
ConnectorPlanOptimizer {
private final StandardFunctionResolution functionResolution;
private final RowExpressionService rowExpressionService;
+ private final FunctionMetadataManager functionMetadataManager;
+ private final PrestoTransactionManager transactionManager;
public PrestoComputePushdown(
StandardFunctionResolution functionResolution,
- RowExpressionService rowExpressionService) {
+ RowExpressionService rowExpressionService,
+ FunctionMetadataManager functionMetadataManager,
+ PrestoTransactionManager transactionManager) {
this.functionResolution = requireNonNull(functionResolution,
"functionResolution is null");
this.rowExpressionService =
requireNonNull(rowExpressionService, "rowExpressionService is
null");
+ this.functionMetadataManager =
+ requireNonNull(functionMetadataManager,
"functionMetadataManager is null");
+ this.transactionManager = requireNonNull(transactionManager,
"transactionManager is null");
}
@Override
@@ -180,7 +188,8 @@ public class PrestoComputePushdown implements
ConnectorPlanOptimizer {
oldPrestoTableHandle.getTableName(),
oldPrestoTableHandle.getSerializedTable(),
entireColumnDomain,
- projectedColumns);
+ projectedColumns,
+ Optional.empty());
PrestoTableLayoutHandle newLayoutHandle =
new PrestoTableLayoutHandle(
diff --git
a/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
b/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
index 9c2d56e..af5f9de 100644
---
a/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
+++
b/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
@@ -18,9 +18,22 @@
package org.apache.paimon.presto;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.InstantiationUtil;
+import com.facebook.presto.block.BlockEncodingManager;
import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.metadata.FunctionManager;
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
@@ -40,6 +53,7 @@ import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.TestingRowExpressionTranslator;
+import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.gen.RowExpressionPredicateCompiler;
import com.facebook.presto.sql.planner.PlanVariableAllocator;
import com.facebook.presto.sql.planner.TypeProvider;
@@ -52,8 +66,12 @@ import com.facebook.presto.testing.TestingConnectorSession;
import com.facebook.presto.type.TypeRegistry;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -102,6 +120,33 @@ public class TestPrestoComputePushdown {
}
};
+ public byte[] table;
+
+ @BeforeTest
+ public void setUp() throws Exception {
+ String warehouse =
+
Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString();
+ Path tablePath3 = new Path(warehouse, "default.db/t3");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "pt", new VarCharType()),
+ new DataField(1, "a", new IntType()),
+ new DataField(2, "b", new BigIntType()),
+ new DataField(3, "c", new BigIntType()),
+ new DataField(4, "d", new IntType())));
+ new SchemaManager(LocalFileIO.create(), tablePath3)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.singletonList("pt"),
+ Collections.emptyList(),
+ new HashMap<>(),
+ ""));
+ FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tablePath3);
+ this.table = InstantiationUtil.serializeObject(table);
+ }
+
private TableScanNode createTableScan() {
PlanVariableAllocator variableAllocator = new PlanVariableAllocator();
VariableReferenceExpression variableA =
variableAllocator.newVariable("a", BIGINT);
@@ -114,7 +159,7 @@ public class TestPrestoComputePushdown {
"id", new BigIntType(), new
TypeRegistry()))
.build();
- PrestoTableHandle tableHandle = new PrestoTableHandle("test", "test",
"table".getBytes());
+ PrestoTableHandle tableHandle = new PrestoTableHandle("test", "test",
this.table);
return new TableScanNode(
new PlanNodeId(UUID.randomUUID().toString()),
@@ -127,8 +172,9 @@ public class TestPrestoComputePushdown {
new PrestoTableHandle(
"test",
"test",
- "table".getBytes(),
+ this.table,
TupleDomain.all(),
+ Optional.empty(),
Optional.empty()),
TupleDomain.all()))),
ImmutableList.copyOf(assignments.keySet()),
@@ -171,7 +217,14 @@ public class TestPrestoComputePushdown {
PrestoSessionProperties prestoSessionProperties = new
PrestoSessionProperties(config);
PrestoComputePushdown prestoComputePushdown =
- new PrestoComputePushdown(FUNCTION_RESOLUTION,
ROW_EXPRESSION_SERVICE);
+ new PrestoComputePushdown(
+ FUNCTION_RESOLUTION,
+ ROW_EXPRESSION_SERVICE,
+ new FunctionManager(
+ new TypeRegistry(),
+ new BlockEncodingManager(new TypeRegistry()),
+ new FeaturesConfig()),
+ new PrestoTransactionManager());
PlanNode mockInputPlan = createFilterNode();
ConnectorSession session =
@@ -219,7 +272,14 @@ public class TestPrestoComputePushdown {
PrestoSessionProperties prestoSessionProperties = new
PrestoSessionProperties(config);
PrestoComputePushdown prestoComputePushdown =
- new PrestoComputePushdown(FUNCTION_RESOLUTION,
ROW_EXPRESSION_SERVICE);
+ new PrestoComputePushdown(
+ FUNCTION_RESOLUTION,
+ ROW_EXPRESSION_SERVICE,
+ new FunctionManager(
+ new TypeRegistry(),
+ new BlockEncodingManager(new TypeRegistry()),
+ new FeaturesConfig()),
+ new PrestoTransactionManager());
PlanNode mockInputPlan = createFilterNode();
ConnectorSession session =
diff --git
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonConfig.java
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonConfig.java
index d8d98b3..e749aee 100644
---
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonConfig.java
+++
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonConfig.java
@@ -28,6 +28,7 @@ public class PaimonConfig {
private String metastore;
private String uri;
private boolean paimonPushdownEnabled = true;
+ private boolean paimonPartitionPruningEnabled = true;
public String getWarehouse() {
return warehouse;
@@ -69,4 +70,15 @@ public class PaimonConfig {
this.paimonPushdownEnabled = paimonPushdownEnabled;
return this;
}
+
+ public boolean isPaimonPartitionPruningEnabled() {
+ return paimonPartitionPruningEnabled;
+ }
+
+ @Config("paimon.partition-prune-enabled")
+ @ConfigDescription("Enable paimon query partition prune")
+ public PaimonConfig setPaimonPartitionPruningEnabled(boolean
paimonPartitionPruningEnabled) {
+ this.paimonPartitionPruningEnabled = paimonPartitionPruningEnabled;
+ return this;
+ }
}
diff --git
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
index acd7419..7c413d4 100644
---
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
+++
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
@@ -18,14 +18,31 @@
package org.apache.paimon.presto;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.utils.Preconditions;
+
import com.facebook.presto.common.Subfield;
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.BlockBuilder;
+import com.facebook.presto.common.predicate.NullableValue;
+import com.facebook.presto.common.predicate.Primitives;
import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.common.type.TypeUtils;
+import com.facebook.presto.expressions.DefaultRowExpressionTraversalVisitor;
+import com.facebook.presto.expressions.LogicalRowExpressions;
import com.facebook.presto.hive.SubfieldExtractor;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPlanOptimizer;
import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.VariableAllocator;
+import com.facebook.presto.spi.function.FunctionMetadataManager;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.PlanNode;
@@ -33,19 +50,40 @@ import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.PlanVisitor;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.relation.CallExpression;
+import com.facebook.presto.spi.relation.ConstantExpression;
import com.facebook.presto.spi.relation.DomainTranslator;
+import com.facebook.presto.spi.relation.InputReferenceExpression;
+import com.facebook.presto.spi.relation.LambdaDefinitionExpression;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
+import com.facebook.presto.spi.relation.RowExpressionVisitor;
+import com.facebook.presto.spi.relation.SpecialFormExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
+import static
com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
+import static com.facebook.presto.spi.StandardErrorCode.DIVISION_BY_ZERO;
+import static com.facebook.presto.spi.StandardErrorCode.INVALID_CAST_ARGUMENT;
+import static
com.facebook.presto.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
+import static
com.facebook.presto.spi.StandardErrorCode.NUMERIC_VALUE_OUT_OF_RANGE;
+import static
com.facebook.presto.spi.relation.ExpressionOptimizer.Level.OPTIMIZED;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static com.google.common.collect.Sets.intersection;
import static java.util.Objects.requireNonNull;
import static
org.apache.paimon.presto.PrestoSessionProperties.isPaimonPushdownEnabled;
@@ -54,14 +92,20 @@ public class PrestoComputePushdown implements
ConnectorPlanOptimizer {
private final StandardFunctionResolution functionResolution;
private final RowExpressionService rowExpressionService;
+ private final FunctionMetadataManager functionMetadataManager;
+ private final PrestoTransactionManager transactionManager;
public PrestoComputePushdown(
StandardFunctionResolution functionResolution,
- RowExpressionService rowExpressionService) {
-
+ RowExpressionService rowExpressionService,
+ FunctionMetadataManager functionMetadataManager,
+ PrestoTransactionManager transactionManager) {
this.functionResolution = requireNonNull(functionResolution,
"functionResolution is null");
this.rowExpressionService =
requireNonNull(rowExpressionService, "rowExpressionService is
null");
+ this.functionMetadataManager =
+ requireNonNull(functionMetadataManager,
"functionMetadataManager is null");
+ this.transactionManager = requireNonNull(transactionManager,
"transactionManager is null");
}
@Override
@@ -171,6 +215,42 @@ public class PrestoComputePushdown implements
ConnectorPlanOptimizer {
Optional<List<ColumnHandle>> projectedColumns =
extractColumns(filterPredicate, assignments);
+ Table table = ((PrestoTableHandle)
tableScan.getTable().getConnectorHandle()).table();
+
+ ImmutableSet.Builder<VariableReferenceExpression> builder =
ImmutableSet.builder();
+ decomposedFilter
+ .getRemainingExpression()
+ .accept(new
VariableCollector(tableScan.getOutputVariables()), builder);
+ Set<ColumnHandle> remainingFilterProjects =
+ builder.build().stream()
+ .map(
+ v ->
+ Preconditions.checkNotNull(
+ assignments.get(v),
+ "The variable is"
+ + " not found in
the assignments"))
+ .collect(Collectors.toSet());
+
+ // Prune the partition
+ Set<String> partitionColumns = new
HashSet<>(table.partitionKeys());
+ Optional<List<Map<String, String>>> remainingPartitions =
Optional.empty();
+ // we have predicate on the partition field, then we have to list
the partition and do
+ // the prune.
+ if (PrestoSessionProperties.isPartitionPruneEnabled(session)
+ && !remainingFilterProjects.isEmpty()
+ && remainingFilterProjects.stream()
+ .map(PrestoColumnHandle.class::cast)
+ .anyMatch(c ->
partitionColumns.contains(c.getColumnName()))) {
+ Map<String, ColumnHandle> columns =
+ transactionManager
+ .get(tableScan.getTable().getTransaction())
+ .getColumnHandles(
+ session,
tableScan.getTable().getConnectorHandle());
+ remainingPartitions =
+ getRemainingPartition(
+ table, decomposedFilter, session,
entireColumnDomain, columns);
+ }
+
// Build paimon new presto table handle use pushdown.
PrestoTableHandle oldPrestoTableHandle =
(PrestoTableHandle)
tableScan.getTable().getConnectorHandle();
@@ -180,7 +260,8 @@ public class PrestoComputePushdown implements
ConnectorPlanOptimizer {
oldPrestoTableHandle.getTableName(),
oldPrestoTableHandle.getSerializedTable(),
entireColumnDomain,
- projectedColumns);
+ projectedColumns,
+ remainingPartitions);
PrestoTableLayoutHandle newLayoutHandle =
new PrestoTableLayoutHandle(
@@ -203,4 +284,248 @@ public class PrestoComputePushdown implements
ConnectorPlanOptimizer {
filter.getSourceLocation(), filter.getId(), newTableScan,
filterPredicate);
}
}
+
+ private Optional<List<Map<String, String>>> getRemainingPartition(
+ Table table,
+ DomainTranslator.ExtractionResult<Subfield> decomposedFilter,
+ ConnectorSession session,
+ TupleDomain<PrestoColumnHandle> entireColumnDomain,
+ Map<String, ColumnHandle> columns) {
+ // Extract deterministic conjuncts that apply to partition columns and
specify these as
+ // Constraint#predicate
+ Predicate<Map<ColumnHandle, NullableValue>> predicate = v -> true;
+
+ RowExpression remainingExpression =
decomposedFilter.getRemainingExpression();
+ if (!TRUE_CONSTANT.equals(remainingExpression)) {
+ LogicalRowExpressions logicalRowExpressions =
+ new LogicalRowExpressions(
+ rowExpressionService.getDeterminismEvaluator(),
+ functionResolution,
+ functionMetadataManager);
+ RowExpression deterministicPredicate =
+
logicalRowExpressions.filterDeterministicConjuncts(remainingExpression);
+ if (!TRUE_CONSTANT.equals(deterministicPredicate)) {
+ ConstraintEvaluator evaluator =
+ new ConstraintEvaluator(
+ rowExpressionService, session, columns,
deterministicPredicate);
+ predicate = evaluator::isCandidate;
+ }
+ }
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+
+ // list partition with filter.
+ new PrestoFilterConverter(table.rowType())
+ .convert(entireColumnDomain)
+ .ifPresent(readBuilder::withFilter);
+
+ List<BinaryRow> partitions = readBuilder.newScan().listPartitions();
+ String partitionDefaultName = new
CoreOptions(table.options()).partitionDefaultName();
+
+ InternalRow.FieldGetter[] getters =
+ table.rowType().project(table.partitionKeys()).fieldGetters();
+ List<String> partitionKeys = table.partitionKeys();
+ List<Map<String, String>> remainingPartitions = new ArrayList<>();
+ for (BinaryRow partition : partitions) {
+ Map<ColumnHandle, NullableValue> partitionPrestoValue = new
HashMap<>();
+ LinkedHashMap<String, String> partitionMap = new LinkedHashMap<>();
+ for (int i = 0; i < getters.length; i++) {
+ Object value = getters[i].getFieldOrNull(partition);
+ PrestoColumnHandle handle = (PrestoColumnHandle)
columns.get(partitionKeys.get(i));
+ Preconditions.checkNotNull(
+ handle, "The %s column handle is not found.",
partitionKeys.get(i));
+ Type type = handle.getPrestoType();
+
+ partitionPrestoValue.put(
+ handle,
+ NullableValue.of(
+ type,
+ value == null
+ ? null
+ : Utils.blockToNativeValue(
+ type,
+
PrestoTypeUtils.singleValueToBlock(type, value))));
+ partitionMap.put(
+ partitionKeys.get(i),
+ value == null ? partitionDefaultName :
value.toString());
+ }
+ if (predicate.test(partitionPrestoValue)) {
+ remainingPartitions.add(partitionMap);
+ }
+ }
+ return Optional.of(remainingPartitions);
+ }
+
+ private static class VariableCollector
+ implements RowExpressionVisitor<
+ Void, ImmutableSet.Builder<VariableReferenceExpression>> {
+
+ List<VariableReferenceExpression> inputVariables;
+
+ public VariableCollector(List<VariableReferenceExpression>
inputVariables) {
+ this.inputVariables = inputVariables;
+ }
+
+ @Override
+ public Void visitCall(
+ CallExpression call,
ImmutableSet.Builder<VariableReferenceExpression> context) {
+ for (RowExpression argument : call.getArguments()) {
+ argument.accept(this, context);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitInputReference(
+ InputReferenceExpression reference,
+ ImmutableSet.Builder<VariableReferenceExpression> context) {
+ int pos = reference.getField();
+ context.add(inputVariables.get(pos));
+ return null;
+ }
+
+ @Override
+ public Void visitConstant(
+ ConstantExpression literal,
+ ImmutableSet.Builder<VariableReferenceExpression> context) {
+ return null;
+ }
+
+ @Override
+ public Void visitLambda(
+ LambdaDefinitionExpression lambda,
+ ImmutableSet.Builder<VariableReferenceExpression> context) {
+ return null;
+ }
+
+ @Override
+ public Void visitVariableReference(
+ VariableReferenceExpression reference,
+ ImmutableSet.Builder<VariableReferenceExpression> context) {
+ context.add(reference);
+ return null;
+ }
+
+ @Override
+ public Void visitSpecialForm(
+ SpecialFormExpression specialForm,
+ ImmutableSet.Builder<VariableReferenceExpression> context) {
+ specialForm.getArguments().forEach(argument ->
argument.accept(this, context));
+ return null;
+ }
+ }
+
+ // copied from presto
+ private static class VariableReferenceBuilderVisitor
+ extends DefaultRowExpressionTraversalVisitor<
+ ImmutableSet.Builder<VariableReferenceExpression>> {
+ @Override
+ public Void visitVariableReference(
+ VariableReferenceExpression variable,
+ ImmutableSet.Builder<VariableReferenceExpression> builder) {
+ builder.add(variable);
+ return null;
+ }
+ }
+
+ private static Set<VariableReferenceExpression> extractAll(RowExpression
expression) {
+ ImmutableSet.Builder<VariableReferenceExpression> builder =
ImmutableSet.builder();
+ expression.accept(new VariableReferenceBuilderVisitor(), builder);
+ return builder.build();
+ }
+
+ private static class ConstraintEvaluator {
+ private final Map<String, ColumnHandle> assignments;
+ private final RowExpressionService evaluator;
+ private final ConnectorSession session;
+ private final RowExpression expression;
+ private final Set<ColumnHandle> arguments;
+
+ public ConstraintEvaluator(
+ RowExpressionService evaluator,
+ ConnectorSession session,
+ Map<String, ColumnHandle> assignments,
+ RowExpression expression) {
+ this.assignments = assignments;
+ this.evaluator = evaluator;
+ this.session = session;
+ this.expression = expression;
+
+ arguments =
+ ImmutableSet.copyOf(extractAll(expression)).stream()
+ .map(VariableReferenceExpression::getName)
+ .map(assignments::get)
+ .collect(toImmutableSet());
+ }
+
+ private boolean isCandidate(Map<ColumnHandle, NullableValue> bindings)
{
+ if (intersection(bindings.keySet(), arguments).isEmpty()) {
+ return true;
+ }
+
+ Function<VariableReferenceExpression, Object> variableResolver =
+ variable -> {
+ ColumnHandle column =
assignments.get(variable.getName());
+ checkArgument(column != null, "Missing column
assignment for %s", variable);
+
+ if (!bindings.containsKey(column)) {
+ return variable;
+ }
+
+ return bindings.get(column).getValue();
+ };
+
+ // Skip pruning if evaluation fails in a recoverable way. Failing
here can cause
+ // spurious query failures for partitions that would otherwise be
filtered out.
+ Object optimized = null;
+ try {
+ optimized =
+ evaluator
+ .getExpressionOptimizer()
+ .optimize(expression, OPTIMIZED, session,
variableResolver);
+ } catch (PrestoException e) {
+ propagateIfUnhandled(e);
+ return true;
+ }
+
+ // If any conjuncts evaluate to FALSE or null, then the whole
predicate will never be
+ // true and so the partition should be pruned
+ return !Boolean.FALSE.equals(optimized)
+ && optimized != null
+ && (!(optimized instanceof ConstantExpression)
+ || !((ConstantExpression) optimized).isNull());
+ }
+
+ private static void propagateIfUnhandled(PrestoException e) throws
PrestoException {
+ int errorCode = e.getErrorCode().getCode();
+ if (errorCode == DIVISION_BY_ZERO.toErrorCode().getCode()
+ || errorCode ==
INVALID_CAST_ARGUMENT.toErrorCode().getCode()
+ || errorCode ==
INVALID_FUNCTION_ARGUMENT.toErrorCode().getCode()
+ || errorCode ==
NUMERIC_VALUE_OUT_OF_RANGE.toErrorCode().getCode()) {
+ return;
+ }
+
+ throw e;
+ }
+ }
+
+ private static final class Utils {
+ private Utils() {}
+
+ public static Block nativeValueToBlock(Type type, Object object) {
+ if (object != null &&
!Primitives.wrap(type.getJavaType()).isInstance(object)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Object '%s' does not match type %s", object,
type.getJavaType()));
+ } else {
+ BlockBuilder blockBuilder = type.createBlockBuilder(null, 1);
+ TypeUtils.writeNativeValue(type, blockBuilder, object);
+ return blockBuilder.build();
+ }
+ }
+
+ public static Object blockToNativeValue(Type type, Block block) {
+ return TypeUtils.readNativeValue(type, block, 0);
+ }
+ }
}
diff --git
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPlanOptimizerProvider.java
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPlanOptimizerProvider.java
index 7092753..6df1ba8 100644
---
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPlanOptimizerProvider.java
+++
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPlanOptimizerProvider.java
@@ -20,6 +20,7 @@ package org.apache.paimon.presto;
import com.facebook.presto.spi.ConnectorPlanOptimizer;
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
+import com.facebook.presto.spi.function.FunctionMetadataManager;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.google.common.collect.ImmutableSet;
@@ -35,19 +36,31 @@ public class PrestoPlanOptimizerProvider implements
ConnectorPlanOptimizerProvid
private final StandardFunctionResolution functionResolution;
private final RowExpressionService rowExpressionService;
+ private final FunctionMetadataManager functionMetadataManager;
+ private final PrestoTransactionManager transactionManager;
@Inject
public PrestoPlanOptimizerProvider(
StandardFunctionResolution functionResolution,
- RowExpressionService rowExpressionService) {
+ RowExpressionService rowExpressionService,
+ FunctionMetadataManager functionMetadataManager,
+ PrestoTransactionManager transactionManager) {
this.functionResolution = requireNonNull(functionResolution,
"functionResolution is null");
this.rowExpressionService =
requireNonNull(rowExpressionService, "rowExpressionService is
null");
+ this.functionMetadataManager =
+ requireNonNull(functionMetadataManager,
"functionMetadataManager is null");
+ this.transactionManager = requireNonNull(transactionManager,
"transactionManager is null");
}
@Override
public Set<ConnectorPlanOptimizer> getLogicalPlanOptimizers() {
- return ImmutableSet.of(new PrestoComputePushdown(functionResolution,
rowExpressionService));
+ return ImmutableSet.of(
+ new PrestoComputePushdown(
+ functionResolution,
+ rowExpressionService,
+ functionMetadataManager,
+ transactionManager));
}
@Override
diff --git
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java
index 6633c3d..3888cb9 100644
---
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java
+++
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java
@@ -32,6 +32,7 @@ import static
com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
public class PrestoSessionProperties {
public static final String QUERY_PUSHDOWN_ENABLED =
"query_pushdown_enabled";
+ public static final String PARTITION_PRUNE_ENABLED =
"partition_prune_enabled";
private final List<PropertyMetadata<?>> sessionProperties;
@@ -43,6 +44,11 @@ public class PrestoSessionProperties {
QUERY_PUSHDOWN_ENABLED,
"Enable paimon query pushdown",
config.isPaimonPushdownEnabled(),
+ false),
+ booleanProperty(
+ PARTITION_PRUNE_ENABLED,
+ "Enable paimon query partition prune",
+ config.isPaimonPartitionPruningEnabled(),
false));
}
@@ -53,4 +59,8 @@ public class PrestoSessionProperties {
public static boolean isPaimonPushdownEnabled(ConnectorSession session) {
return session.getProperty(QUERY_PUSHDOWN_ENABLED, Boolean.class);
}
+
+ public static boolean isPartitionPruneEnabled(ConnectorSession session) {
+ return session.getProperty(PARTITION_PRUNE_ENABLED, Boolean.class);
+ }
}
diff --git
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplitManager.java
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplitManager.java
index 5ea71c0..21a6c67 100644
---
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplitManager.java
+++
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplitManager.java
@@ -18,9 +18,14 @@
package org.apache.paimon.presto;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
@@ -28,7 +33,10 @@ import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
/** Presto {@link ConnectorSplitManager}. */
@@ -47,6 +55,27 @@ public class PrestoSplitManager implements
ConnectorSplitManager {
new PrestoFilterConverter(table.rowType())
.convert(tableHandle.getFilter())
.ifPresent(readBuilder::withFilter);
+ Optional<List<Map<String, String>>> partitions =
tableHandle.getPartitions();
+ org.apache.paimon.types.RowType partitionType =
+ table.rowType().project(table.partitionKeys());
+ List<Predicate> predicates = new ArrayList<>();
+
+ String partitionDefaultName = new
CoreOptions(table.options()).partitionDefaultName();
+ if (partitions.isPresent()) {
+ for (Map<String, String> row : partitions.get()) {
+ Map<String, Object> partition =
+ InternalRowPartitionComputer.convertSpecToInternal(
+ row, partitionType, partitionDefaultName);
+ predicates.add(
+
PartitionPredicate.createPartitionPredicate(table.rowType(), partition));
+ }
+ if (!predicates.isEmpty()) {
+ readBuilder.withFilter(PredicateBuilder.or(predicates));
+ } else {
+ // empty partition
+ return new PrestoSplitSource(new ArrayList<>());
+ }
+ }
List<Split> splits = readBuilder.newScan().plan().splits();
return new PrestoSplitSource(
splits.stream().map(PrestoSplit::fromSplit).collect(Collectors.toList()));
diff --git
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTableHandle.java
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTableHandle.java
index a706e09..af4ea4a 100644
---
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTableHandle.java
+++
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTableHandle.java
@@ -34,6 +34,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -45,12 +46,20 @@ public class PrestoTableHandle implements
ConnectorTableHandle {
private final String tableName;
private final byte[] serializedTable;
private final TupleDomain<PrestoColumnHandle> filter;
+
private final Optional<List<ColumnHandle>> projectedColumns;
+ private final Optional<List<Map<String, String>>> partitions;
private Table lazyTable;
public PrestoTableHandle(String schemaName, String tableName, byte[]
serializedTable) {
- this(schemaName, tableName, serializedTable, TupleDomain.all(),
Optional.empty());
+ this(
+ schemaName,
+ tableName,
+ serializedTable,
+ TupleDomain.all(),
+ Optional.empty(),
+ Optional.empty());
}
@JsonCreator
@@ -59,12 +68,14 @@ public class PrestoTableHandle implements
ConnectorTableHandle {
@JsonProperty("tableName") String tableName,
@JsonProperty("serializedTable") byte[] serializedTable,
@JsonProperty("filter") TupleDomain<PrestoColumnHandle> filter,
- @JsonProperty("projection") Optional<List<ColumnHandle>>
projectedColumns) {
+ @JsonProperty("projection") Optional<List<ColumnHandle>>
projectedColumns,
+ @JsonProperty("partitions") Optional<List<Map<String, String>>>
partitions) {
this.schemaName = schemaName;
this.tableName = tableName;
this.serializedTable = serializedTable;
this.filter = filter;
this.projectedColumns = projectedColumns;
+ this.partitions = partitions;
}
@JsonProperty
@@ -87,6 +98,11 @@ public class PrestoTableHandle implements
ConnectorTableHandle {
return filter;
}
+ @JsonProperty
+ public Optional<List<Map<String, String>>> getPartitions() {
+ return partitions;
+ }
+
@JsonProperty
public Optional<List<ColumnHandle>> getProjectedColumns() {
return projectedColumns;
@@ -94,12 +110,12 @@ public class PrestoTableHandle implements
ConnectorTableHandle {
public PrestoTableHandle copy(TupleDomain<PrestoColumnHandle> filter) {
return new PrestoTableHandle(
- schemaName, tableName, serializedTable, filter,
projectedColumns);
+ schemaName, tableName, serializedTable, filter,
projectedColumns, partitions);
}
public PrestoTableHandle copy(Optional<List<ColumnHandle>>
projectedColumns) {
return new PrestoTableHandle(
- schemaName, tableName, serializedTable, filter,
projectedColumns);
+ schemaName, tableName, serializedTable, filter,
projectedColumns, partitions);
}
public Table table() {
@@ -155,12 +171,34 @@ public class PrestoTableHandle implements
ConnectorTableHandle {
&& Objects.equals(schemaName, that.schemaName)
&& Objects.equals(tableName, that.tableName)
&& Objects.equals(filter, that.filter)
- && Objects.equals(projectedColumns, that.projectedColumns);
+ && Objects.equals(projectedColumns, that.projectedColumns)
+ && Objects.equals(partitions, that.partitions);
}
@Override
public int hashCode() {
return Objects.hash(
- schemaName, tableName, filter, projectedColumns,
Arrays.hashCode(serializedTable));
+ schemaName,
+ tableName,
+ filter,
+ projectedColumns,
+ Arrays.hashCode(serializedTable),
+ partitions);
+ }
+
+ @Override
+ public String toString() {
+ return "PrestoTableHandle{"
+ + "schemaName='"
+ + schemaName
+ + '\''
+ + ", tableName='"
+ + tableName
+ + '\''
+ + ", filter="
+ + filter
+ + ", partitions="
+ + partitions
+ + '}';
}
}
diff --git
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTypeUtils.java
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTypeUtils.java
index 662998f..335d32a 100644
---
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTypeUtils.java
+++
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTypeUtils.java
@@ -18,6 +18,10 @@
package org.apache.paimon.presto;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.shade.guava30.com.google.common.base.Verify;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
@@ -42,7 +46,10 @@ import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.BigintType;
+import com.facebook.presto.common.type.Decimals;
import com.facebook.presto.common.type.IntegerType;
import com.facebook.presto.common.type.RealType;
import com.facebook.presto.common.type.SmallintType;
@@ -55,12 +62,29 @@ import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.common.type.TypeSignatureParameter;
import com.facebook.presto.common.type.VarbinaryType;
import com.facebook.presto.common.type.VarcharType;
+import com.facebook.presto.spi.PrestoException;
+import io.airlift.slice.Slice;
+import java.math.BigDecimal;
+import java.time.ZoneId;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import static com.facebook.presto.common.type.BigintType.BIGINT;
+import static com.facebook.presto.common.type.DateType.DATE;
+import static com.facebook.presto.common.type.Decimals.encodeShortScaledValue;
+import static com.facebook.presto.common.type.Decimals.isLongDecimal;
+import static com.facebook.presto.common.type.Decimals.isShortDecimal;
+import static com.facebook.presto.common.type.IntegerType.INTEGER;
+import static com.facebook.presto.common.type.RealType.REAL;
+import static com.facebook.presto.common.type.SmallintType.SMALLINT;
+import static com.facebook.presto.common.type.TimeType.TIME;
+import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
+import static com.facebook.presto.common.type.TinyintType.TINYINT;
+import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
+import static io.airlift.slice.Slices.wrappedBuffer;
import static java.lang.String.format;
/** Presto type from Paimon Type. */
@@ -207,4 +231,84 @@ public class PrestoTypeUtils {
format("Cannot convert from Presto type '%s' to Paimon
type", prestoType));
}
}
+
+ /** Covert a presto block from a value. */
+ public static Block singleValueToBlock(Type prestoType, Object value) {
+ if (value == null) {
+ return null;
+ }
+ BlockBuilder output = prestoType.createBlockBuilder(null, 1);
+ Class<?> javaType = prestoType.getJavaType();
+ if (javaType == boolean.class) {
+ prestoType.writeBoolean(output, (Boolean) value);
+ } else if (javaType == long.class) {
+ if (prestoType.equals(BIGINT)
+ || prestoType.equals(INTEGER)
+ || prestoType.equals(TINYINT)
+ || prestoType.equals(SMALLINT)
+ || prestoType.equals(DATE)) {
+ prestoType.writeLong(output, ((Number) value).longValue());
+ } else if (prestoType.equals(REAL)) {
+ prestoType.writeLong(output, Float.floatToIntBits((Float)
value));
+ } else if (prestoType instanceof
com.facebook.presto.common.type.DecimalType) {
+ Verify.verify(isShortDecimal(prestoType), "The type should be
short decimal");
+ com.facebook.presto.common.type.DecimalType decimalType =
+ (com.facebook.presto.common.type.DecimalType)
prestoType;
+ BigDecimal decimal = ((Decimal) value).toBigDecimal();
+ prestoType.writeLong(
+ output, encodeShortScaledValue(decimal,
decimalType.getScale()));
+ } else if (prestoType.equals(TIMESTAMP)) {
+ prestoType.writeLong(
+ output,
+ ((Timestamp) value)
+ .toLocalDateTime()
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli());
+ } else if (prestoType.equals(TIME)) {
+ prestoType.writeLong(output, (int) value * 1_000);
+ } else {
+ throw new PrestoException(
+ GENERIC_INTERNAL_ERROR,
+ format("Unhandled type for %s: %s",
javaType.getSimpleName(), prestoType));
+ }
+ } else if (javaType == double.class) {
+ prestoType.writeDouble(output, ((Number) value).doubleValue());
+ } else if (prestoType instanceof
com.facebook.presto.common.type.DecimalType) {
+ writeObject(output, prestoType, value);
+ } else if (javaType == Slice.class) {
+ writeSlice(output, prestoType, value);
+ } else {
+ throw new PrestoException(
+ GENERIC_INTERNAL_ERROR,
+ format("Unhandled type for %s: %s",
javaType.getSimpleName(), prestoType));
+ }
+ return output.build();
+ }
+
+ private static void writeSlice(BlockBuilder output, Type type, Object
value) {
+ if (type instanceof VarcharType
+ || type instanceof com.facebook.presto.common.type.CharType) {
+ type.writeSlice(output, wrappedBuffer(((BinaryString)
value).toBytes()));
+ } else if (type instanceof VarbinaryType) {
+ type.writeSlice(output, wrappedBuffer((byte[]) value));
+ } else {
+ throw new PrestoException(
+ GENERIC_INTERNAL_ERROR, "Unhandled type for Slice: " +
type.getTypeSignature());
+ }
+ }
+
+ private static void writeObject(BlockBuilder output, Type type, Object
value) {
+ if (type instanceof com.facebook.presto.common.type.DecimalType) {
+ Verify.verify(isLongDecimal(type), "The type should be long
decimal");
+ com.facebook.presto.common.type.DecimalType decimalType =
+ (com.facebook.presto.common.type.DecimalType) type;
+ BigDecimal decimal = ((Decimal) value).toBigDecimal();
+ type.writeSlice(output, Decimals.encodeScaledValue(decimal,
decimalType.getScale()));
+ } else {
+ throw new PrestoException(
+ GENERIC_INTERNAL_ERROR,
+ "Unhandled type for Object: " + type.getTypeSignature());
+ }
+ }
}
diff --git
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/PrestoTableHandleTest.java
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/PrestoTableHandleTest.java
index fd226a4..befbdeb 100644
---
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/PrestoTableHandleTest.java
+++
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/PrestoTableHandleTest.java
@@ -36,7 +36,12 @@ public class PrestoTableHandleTest {
byte[] serializedTable = TestPrestoUtils.getSerializedTable();
PrestoTableHandle expected =
new PrestoTableHandle(
- "test", "user", serializedTable, TupleDomain.all(),
Optional.empty());
+ "test",
+ "user",
+ serializedTable,
+ TupleDomain.all(),
+ Optional.empty(),
+ Optional.empty());
testRoundTrip(expected);
}
diff --git
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
index 4ae1ce3..e591296 100644
---
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
+++
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
@@ -18,10 +18,23 @@
package org.apache.paimon.presto;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.InstantiationUtil;
+import com.facebook.presto.common.block.BlockEncodingManager;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.metadata.FunctionAndTypeManager;
+import com.facebook.presto.metadata.HandleResolver;
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
@@ -41,6 +54,7 @@ import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.TestingRowExpressionTranslator;
+import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.gen.RowExpressionPredicateCompiler;
import com.facebook.presto.sql.planner.PlanVariableAllocator;
import com.facebook.presto.sql.planner.TypeProvider;
@@ -52,8 +66,13 @@ import
com.facebook.presto.sql.relational.RowExpressionOptimizer;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -63,6 +82,7 @@ import java.util.UUID;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static
com.facebook.presto.metadata.FunctionAndTypeManager.createTestFunctionAndTypeManager;
import static
com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder.expression;
+import static
com.facebook.presto.transaction.InMemoryTransactionManager.createTestTransactionManager;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link TestPrestoComputePushdown}. */
@@ -103,6 +123,8 @@ public class TestPrestoComputePushdown {
}
};
+ public byte[] table;
+
private TableScanNode createTableScan() {
PlanVariableAllocator variableAllocator = new PlanVariableAllocator();
VariableReferenceExpression variableA =
variableAllocator.newVariable("a", BIGINT);
@@ -115,7 +137,7 @@ public class TestPrestoComputePushdown {
"id", new BigIntType(),
createTestFunctionAndTypeManager()))
.build();
- PrestoTableHandle tableHandle = new PrestoTableHandle("test", "test",
"table".getBytes());
+ PrestoTableHandle tableHandle = new PrestoTableHandle("test", "test",
this.table);
return new TableScanNode(
Optional.empty(),
@@ -129,8 +151,9 @@ public class TestPrestoComputePushdown {
new PrestoTableHandle(
"test",
"test",
- "table".getBytes(),
+ this.table,
TupleDomain.all(),
+ Optional.empty(),
Optional.empty()),
TupleDomain.all()))),
ImmutableList.copyOf(assignments.keySet()),
@@ -167,6 +190,31 @@ public class TestPrestoComputePushdown {
return sessionConfig;
}
+ @BeforeTest
+ public void setUp() throws Exception {
+ String warehouse =
+
Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString();
+ Path tablePath3 = new Path(warehouse, "default.db/t3");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "pt", new VarCharType()),
+ new DataField(1, "a", new IntType()),
+ new DataField(2, "b", new BigIntType()),
+ new DataField(3, "c", new BigIntType()),
+ new DataField(4, "d", new IntType())));
+ new SchemaManager(LocalFileIO.create(), tablePath3)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.singletonList("pt"),
+ Collections.emptyList(),
+ new HashMap<>(),
+ ""));
+ FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tablePath3);
+ this.table = InstantiationUtil.serializeObject(table);
+ }
+
@Test
public void testOptimizeFilter() {
// Mock data.
@@ -177,7 +225,16 @@ public class TestPrestoComputePushdown {
Map<String, Object> prestoSessionConfig =
createPrestoSessionConfig(config);
PrestoComputePushdown prestoComputePushdown =
- new PrestoComputePushdown(FUNCTION_RESOLUTION,
ROW_EXPRESSION_SERVICE);
+ new PrestoComputePushdown(
+ FUNCTION_RESOLUTION,
+ ROW_EXPRESSION_SERVICE,
+ new FunctionAndTypeManager(
+ createTestTransactionManager(),
+ new BlockEncodingManager(),
+ new FeaturesConfig(),
+ new HandleResolver(),
+ ImmutableSet.of()),
+ new PrestoTransactionManager());
PlanNode mockInputPlan = createFilterNode();
ConnectorSession session =
@@ -227,7 +284,16 @@ public class TestPrestoComputePushdown {
Map<String, Object> prestoSessionConfig =
createPrestoSessionConfig(config);
PrestoComputePushdown prestoComputePushdown =
- new PrestoComputePushdown(FUNCTION_RESOLUTION,
ROW_EXPRESSION_SERVICE);
+ new PrestoComputePushdown(
+ FUNCTION_RESOLUTION,
+ ROW_EXPRESSION_SERVICE,
+ new FunctionAndTypeManager(
+ createTestTransactionManager(),
+ new BlockEncodingManager(),
+ new FeaturesConfig(),
+ new HandleResolver(),
+ ImmutableSet.of()),
+ new PrestoTransactionManager());
PlanNode mockInputPlan = createFilterNode();
ConnectorSession session =
diff --git
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
index 04d63bf..03678f2 100644
---
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
+++
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.presto;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
@@ -26,6 +27,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
@@ -42,6 +44,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VarCharType;
+import com.facebook.presto.Session;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
@@ -63,6 +66,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
+import java.util.stream.Collectors;
import static com.facebook.airlift.testing.Closeables.closeAllSuppress;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
@@ -217,6 +221,58 @@ public class TestPrestoITCase {
commit.commit(0, writer.prepareCommit(true, 0));
}
+ // partitioned table
+ {
+ Path tablePath = new Path(warehouse, "default.db/t5");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "i1",
VarCharType.STRING_TYPE),
+ new DataField(1, "i2", new IntType()),
+ new DataField(2, "i3", new IntType())));
+ new SchemaManager(LocalFileIO.create(), tablePath)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ ImmutableList.of("i1", "i2"),
+ Collections.emptyList(),
+ ImmutableMap.of("bucket", "1"),
+ ""));
+ FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tablePath);
+ InnerTableWrite writer = table.newWrite("user");
+ InnerTableCommit commit = table.newCommit("user");
+ writer.write(GenericRow.of(BinaryString.fromString("20241103"), 1,
1));
+ writer.write(GenericRow.of(BinaryString.fromString("20241103"), 2,
2));
+ writer.write(GenericRow.of(BinaryString.fromString("20241104"), 3,
2));
+ commit.commit(0, writer.prepareCommit(true, 0));
+ }
+
+ // partitioned table
+ {
+ Path tablePath = new Path(warehouse, "default.db/t6");
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "i1", new IntType()),
+ new DataField(1, "i2",
VarCharType.STRING_TYPE),
+ new DataField(2, "i3", new IntType())));
+ new SchemaManager(LocalFileIO.create(), tablePath)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ ImmutableList.of("i2"),
+ ImmutableList.of("i2", "i1"),
+ ImmutableMap.of("bucket", "1"),
+ ""));
+ FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tablePath);
+ InnerTableWrite writer = table.newWrite("user");
+ InnerTableCommit commit = table.newCommit("user");
+ writer.write(GenericRow.of(1, BinaryString.fromString("20241103"),
1));
+ writer.write(GenericRow.of(2, BinaryString.fromString("20241103"),
2));
+ writer.write(GenericRow.of(3, BinaryString.fromString("20241104"),
2));
+ commit.commit(0, writer.prepareCommit(true, 0));
+ }
+
DistributedQueryRunner queryRunner = null;
try {
queryRunner =
@@ -503,8 +559,81 @@ public class TestPrestoITCase {
.isEqualTo("[[10000000000]]");
}
+ @Test
+ public void testPartitionPushDown1() throws Exception {
+ assertThat(sql("EXPLAIN SELECT * FROM paimon.default.t5 where
upper(i1) = '20241103'"))
+ .contains(
+ "LAYOUT:
PrestoTableLayoutHandle{tableHandle=PrestoTableHandle{schemaName='default',
tableName='t5', filter=TupleDomain{ALL}, partitions=Optional[[{i1=20241103,
i2=2}, {i1=20241103, i2=1}]]}, constraintSummary=TupleDomain{ALL}");
+ assertThat(sql("SELECT * FROM paimon.default.t5 where upper(i1) =
'20241103'"))
+ .isEqualTo("[[20241103, 1, 1], [20241103, 2, 2]]");
+ }
+
+ @Test
+ public void testPartitionPushDown2() throws Exception {
+ assertThat(
+ sql(
+ ("EXPLAIN SELECT * FROM paimon.default.t5
where upper(i1) = '20241103' and i2 =1")))
+ .contains(
+ "LAYOUT:
PrestoTableLayoutHandle{tableHandle=PrestoTableHandle{schemaName='default',
tableName='t5', filter=TupleDomain{...}, partitions=Optional[[{i1=20241103,
i2=1}]]}, constraintSummary=TupleDomain{...}");
+ assertThat(sql("SELECT * FROM paimon.default.t5 where upper(i1) =
'20241103' and i2 =1"))
+ .isEqualTo("[[20241103, 1, 1]]");
+ }
+
+ @Test
+ public void testPartitionPushDown3() throws Exception {
+ assertThat(
+ sql(
+ ("EXPLAIN SELECT * FROM paimon.default.t5
where upper(i1) = '20241105' and i2 =1")))
+ .contains(
+ "LAYOUT:
PrestoTableLayoutHandle{tableHandle=PrestoTableHandle{schemaName='default', "
+ + "tableName='t5', filter=TupleDomain{...},
partitions=Optional[[]]}, constraintSummary=TupleDomain{...}}");
+ assertThat(sql("SELECT * FROM paimon.default.t5 where upper(i1) =
'20241105' and i2 =1"))
+ .isEqualTo("[]");
+ }
+
+ @Test
+ public void testPartitionPushDown4() throws Exception {
+ assertThat(
+ sql(
+ "EXPLAIN SELECT * FROM paimon.default.t5 where
upper(i1) = "
+ + "'20241103'and i2 =1",
+ "partition_prune_enabled",
+ "false"))
+ .contains(
+ "LAYOUT:
PrestoTableLayoutHandle{tableHandle=PrestoTableHandle{schemaName='default', "
+ + "tableName='t5', filter=TupleDomain{...},
partitions=Optional.empty}, constraintSummary=TupleDomain{...}");
+ assertThat(
+ sql(
+ "SELECT * FROM paimon.default.t5 where
upper(i1) = "
+ + "'20241103'and i2 =1",
+ "partition_prune_enabled",
+ "false"))
+ .isEqualTo("[[20241103, 1, 1]]");
+ }
+
+ @Test
+ public void testPartitionPushDown5() throws Exception {
+ assertThat(sql("SELECT * FROM paimon.default.t6 where upper(i2) =
'20241103'"))
+ .isEqualTo("[[1, 20241103, 1], [2, 20241103, 2]]");
+ }
+
+ private String sql(String sql, String key, String value) throws Exception {
+ Session session =
+ testSessionBuilder().setCatalogSessionProperty("paimon", key,
value).build();
+ MaterializedResult result = queryRunner.execute(session, sql);
+ return result.getMaterializedRows().stream()
+ .map(Object::toString)
+ .sorted()
+ .collect(Collectors.toList())
+ .toString();
+ }
+
private String sql(String sql) throws Exception {
MaterializedResult result = queryRunner.execute(sql);
- return result.getMaterializedRows().toString();
+ return result.getMaterializedRows().stream()
+ .map(Object::toString)
+ .sorted()
+ .collect(Collectors.toList())
+ .toString();
}
}