This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-presto.git


The following commit(s) were added to refs/heads/main by this push:
     new 80d45e8  (feat) Support partition pushdown with complex predicate (#45)
80d45e8 is described below

commit 80d45e81098f3f4bdc5b692f23c12a23f1cf60fc
Author: WenjunMin <[email protected]>
AuthorDate: Mon Dec 16 10:24:55 2024 +0800

    (feat) Support partition pushdown with complex predicate (#45)
---
 .../paimon/presto/PrestoComputePushdown.java       |  13 +-
 .../paimon/presto/TestPrestoComputePushdown.java   |  68 ++++-
 .../org/apache/paimon/presto/PaimonConfig.java     |  12 +
 .../paimon/presto/PrestoComputePushdown.java       | 331 ++++++++++++++++++++-
 .../paimon/presto/PrestoPlanOptimizerProvider.java |  17 +-
 .../paimon/presto/PrestoSessionProperties.java     |  10 +
 .../apache/paimon/presto/PrestoSplitManager.java   |  29 ++
 .../apache/paimon/presto/PrestoTableHandle.java    |  50 +++-
 .../org/apache/paimon/presto/PrestoTypeUtils.java  | 104 +++++++
 .../paimon/presto/PrestoTableHandleTest.java       |   7 +-
 .../paimon/presto/TestPrestoComputePushdown.java   |  74 ++++-
 .../org/apache/paimon/presto/TestPrestoITCase.java | 131 +++++++-
 12 files changed, 823 insertions(+), 23 deletions(-)

diff --git 
a/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
 
b/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
index 91955b5..f01c381 100644
--- 
a/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
+++ 
b/paimon-presto-0.236/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
@@ -26,6 +26,7 @@ import com.facebook.presto.spi.ConnectorPlanOptimizer;
 import com.facebook.presto.spi.ConnectorSession;
 import com.facebook.presto.spi.TableHandle;
 import com.facebook.presto.spi.VariableAllocator;
+import com.facebook.presto.spi.function.FunctionMetadataManager;
 import com.facebook.presto.spi.function.StandardFunctionResolution;
 import com.facebook.presto.spi.plan.FilterNode;
 import com.facebook.presto.spi.plan.PlanNode;
@@ -54,14 +55,21 @@ public class PrestoComputePushdown implements 
ConnectorPlanOptimizer {
 
     private final StandardFunctionResolution functionResolution;
     private final RowExpressionService rowExpressionService;
+    private final FunctionMetadataManager functionMetadataManager;
+    private final PrestoTransactionManager transactionManager;
 
     public PrestoComputePushdown(
             StandardFunctionResolution functionResolution,
-            RowExpressionService rowExpressionService) {
+            RowExpressionService rowExpressionService,
+            FunctionMetadataManager functionMetadataManager,
+            PrestoTransactionManager transactionManager) {
 
         this.functionResolution = requireNonNull(functionResolution, 
"functionResolution is null");
         this.rowExpressionService =
                 requireNonNull(rowExpressionService, "rowExpressionService is 
null");
+        this.functionMetadataManager =
+                requireNonNull(functionMetadataManager, 
"functionMetadataManager is null");
+        this.transactionManager = requireNonNull(transactionManager, 
"transactionManager is null");
     }
 
     @Override
@@ -180,7 +188,8 @@ public class PrestoComputePushdown implements 
ConnectorPlanOptimizer {
                             oldPrestoTableHandle.getTableName(),
                             oldPrestoTableHandle.getSerializedTable(),
                             entireColumnDomain,
-                            projectedColumns);
+                            projectedColumns,
+                            Optional.empty());
 
             PrestoTableLayoutHandle newLayoutHandle =
                     new PrestoTableLayoutHandle(
diff --git 
a/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
 
b/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
index 9c2d56e..af5f9de 100644
--- 
a/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
+++ 
b/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
@@ -18,9 +18,22 @@
 
 package org.apache.paimon.presto;
 
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.InstantiationUtil;
 
+import com.facebook.presto.block.BlockEncodingManager;
 import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.metadata.FunctionManager;
 import com.facebook.presto.metadata.MetadataManager;
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorId;
@@ -40,6 +53,7 @@ import com.facebook.presto.spi.relation.RowExpression;
 import com.facebook.presto.spi.relation.RowExpressionService;
 import com.facebook.presto.spi.relation.VariableReferenceExpression;
 import com.facebook.presto.sql.TestingRowExpressionTranslator;
+import com.facebook.presto.sql.analyzer.FeaturesConfig;
 import com.facebook.presto.sql.gen.RowExpressionPredicateCompiler;
 import com.facebook.presto.sql.planner.PlanVariableAllocator;
 import com.facebook.presto.sql.planner.TypeProvider;
@@ -52,8 +66,12 @@ import com.facebook.presto.testing.TestingConnectorSession;
 import com.facebook.presto.type.TypeRegistry;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -102,6 +120,33 @@ public class TestPrestoComputePushdown {
                 }
             };
 
+    public byte[] table;
+
+    @BeforeTest
+    public void setUp() throws Exception {
+        String warehouse =
+                
Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString();
+        Path tablePath3 = new Path(warehouse, "default.db/t3");
+        RowType rowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField(0, "pt", new VarCharType()),
+                                new DataField(1, "a", new IntType()),
+                                new DataField(2, "b", new BigIntType()),
+                                new DataField(3, "c", new BigIntType()),
+                                new DataField(4, "d", new IntType())));
+        new SchemaManager(LocalFileIO.create(), tablePath3)
+                .createTable(
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.singletonList("pt"),
+                                Collections.emptyList(),
+                                new HashMap<>(),
+                                ""));
+        FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath3);
+        this.table = InstantiationUtil.serializeObject(table);
+    }
+
     private TableScanNode createTableScan() {
         PlanVariableAllocator variableAllocator = new PlanVariableAllocator();
         VariableReferenceExpression variableA = 
variableAllocator.newVariable("a", BIGINT);
@@ -114,7 +159,7 @@ public class TestPrestoComputePushdown {
                                         "id", new BigIntType(), new 
TypeRegistry()))
                         .build();
 
-        PrestoTableHandle tableHandle = new PrestoTableHandle("test", "test", 
"table".getBytes());
+        PrestoTableHandle tableHandle = new PrestoTableHandle("test", "test", 
this.table);
 
         return new TableScanNode(
                 new PlanNodeId(UUID.randomUUID().toString()),
@@ -127,8 +172,9 @@ public class TestPrestoComputePushdown {
                                         new PrestoTableHandle(
                                                 "test",
                                                 "test",
-                                                "table".getBytes(),
+                                                this.table,
                                                 TupleDomain.all(),
+                                                Optional.empty(),
                                                 Optional.empty()),
                                         TupleDomain.all()))),
                 ImmutableList.copyOf(assignments.keySet()),
@@ -171,7 +217,14 @@ public class TestPrestoComputePushdown {
         PrestoSessionProperties prestoSessionProperties = new 
PrestoSessionProperties(config);
 
         PrestoComputePushdown prestoComputePushdown =
-                new PrestoComputePushdown(FUNCTION_RESOLUTION, 
ROW_EXPRESSION_SERVICE);
+                new PrestoComputePushdown(
+                        FUNCTION_RESOLUTION,
+                        ROW_EXPRESSION_SERVICE,
+                        new FunctionManager(
+                                new TypeRegistry(),
+                                new BlockEncodingManager(new TypeRegistry()),
+                                new FeaturesConfig()),
+                        new PrestoTransactionManager());
 
         PlanNode mockInputPlan = createFilterNode();
         ConnectorSession session =
@@ -219,7 +272,14 @@ public class TestPrestoComputePushdown {
         PrestoSessionProperties prestoSessionProperties = new 
PrestoSessionProperties(config);
 
         PrestoComputePushdown prestoComputePushdown =
-                new PrestoComputePushdown(FUNCTION_RESOLUTION, 
ROW_EXPRESSION_SERVICE);
+                new PrestoComputePushdown(
+                        FUNCTION_RESOLUTION,
+                        ROW_EXPRESSION_SERVICE,
+                        new FunctionManager(
+                                new TypeRegistry(),
+                                new BlockEncodingManager(new TypeRegistry()),
+                                new FeaturesConfig()),
+                        new PrestoTransactionManager());
 
         PlanNode mockInputPlan = createFilterNode();
         ConnectorSession session =
diff --git 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonConfig.java 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonConfig.java
index d8d98b3..e749aee 100644
--- 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonConfig.java
+++ 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PaimonConfig.java
@@ -28,6 +28,7 @@ public class PaimonConfig {
     private String metastore;
     private String uri;
     private boolean paimonPushdownEnabled = true;
+    private boolean paimonPartitionPruningEnabled = true;
 
     public String getWarehouse() {
         return warehouse;
@@ -69,4 +70,15 @@ public class PaimonConfig {
         this.paimonPushdownEnabled = paimonPushdownEnabled;
         return this;
     }
+
+    public boolean isPaimonPartitionPruningEnabled() {
+        return paimonPartitionPruningEnabled;
+    }
+
+    @Config("paimon.partition-prune-enabled")
+    @ConfigDescription("Enable paimon query partition prune")
+    public PaimonConfig setPaimonPartitionPruningEnabled(boolean 
paimonPartitionPruningEnabled) {
+        this.paimonPartitionPruningEnabled = paimonPartitionPruningEnabled;
+        return this;
+    }
 }
diff --git 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
index acd7419..7c413d4 100644
--- 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
+++ 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java
@@ -18,14 +18,31 @@
 
 package org.apache.paimon.presto;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.utils.Preconditions;
+
 import com.facebook.presto.common.Subfield;
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.BlockBuilder;
+import com.facebook.presto.common.predicate.NullableValue;
+import com.facebook.presto.common.predicate.Primitives;
 import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.common.type.TypeUtils;
+import com.facebook.presto.expressions.DefaultRowExpressionTraversalVisitor;
+import com.facebook.presto.expressions.LogicalRowExpressions;
 import com.facebook.presto.hive.SubfieldExtractor;
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorPlanOptimizer;
 import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.PrestoException;
 import com.facebook.presto.spi.TableHandle;
 import com.facebook.presto.spi.VariableAllocator;
+import com.facebook.presto.spi.function.FunctionMetadataManager;
 import com.facebook.presto.spi.function.StandardFunctionResolution;
 import com.facebook.presto.spi.plan.FilterNode;
 import com.facebook.presto.spi.plan.PlanNode;
@@ -33,19 +50,40 @@ import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
 import com.facebook.presto.spi.plan.PlanVisitor;
 import com.facebook.presto.spi.plan.TableScanNode;
 import com.facebook.presto.spi.relation.CallExpression;
+import com.facebook.presto.spi.relation.ConstantExpression;
 import com.facebook.presto.spi.relation.DomainTranslator;
+import com.facebook.presto.spi.relation.InputReferenceExpression;
+import com.facebook.presto.spi.relation.LambdaDefinitionExpression;
 import com.facebook.presto.spi.relation.RowExpression;
 import com.facebook.presto.spi.relation.RowExpressionService;
+import com.facebook.presto.spi.relation.RowExpressionVisitor;
+import com.facebook.presto.spi.relation.SpecialFormExpression;
 import com.facebook.presto.spi.relation.VariableReferenceExpression;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import static 
com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
+import static com.facebook.presto.spi.StandardErrorCode.DIVISION_BY_ZERO;
+import static com.facebook.presto.spi.StandardErrorCode.INVALID_CAST_ARGUMENT;
+import static 
com.facebook.presto.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
+import static 
com.facebook.presto.spi.StandardErrorCode.NUMERIC_VALUE_OUT_OF_RANGE;
+import static 
com.facebook.presto.spi.relation.ExpressionOptimizer.Level.OPTIMIZED;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static com.google.common.collect.Sets.intersection;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.paimon.presto.PrestoSessionProperties.isPaimonPushdownEnabled;
 
@@ -54,14 +92,20 @@ public class PrestoComputePushdown implements 
ConnectorPlanOptimizer {
 
     private final StandardFunctionResolution functionResolution;
     private final RowExpressionService rowExpressionService;
+    private final FunctionMetadataManager functionMetadataManager;
+    private final PrestoTransactionManager transactionManager;
 
     public PrestoComputePushdown(
             StandardFunctionResolution functionResolution,
-            RowExpressionService rowExpressionService) {
-
+            RowExpressionService rowExpressionService,
+            FunctionMetadataManager functionMetadataManager,
+            PrestoTransactionManager transactionManager) {
         this.functionResolution = requireNonNull(functionResolution, 
"functionResolution is null");
         this.rowExpressionService =
                 requireNonNull(rowExpressionService, "rowExpressionService is 
null");
+        this.functionMetadataManager =
+                requireNonNull(functionMetadataManager, 
"functionMetadataManager is null");
+        this.transactionManager = requireNonNull(transactionManager, 
"transactionManager is null");
     }
 
     @Override
@@ -171,6 +215,42 @@ public class PrestoComputePushdown implements 
ConnectorPlanOptimizer {
             Optional<List<ColumnHandle>> projectedColumns =
                     extractColumns(filterPredicate, assignments);
 
+            Table table = ((PrestoTableHandle) 
tableScan.getTable().getConnectorHandle()).table();
+
+            ImmutableSet.Builder<VariableReferenceExpression> builder = 
ImmutableSet.builder();
+            decomposedFilter
+                    .getRemainingExpression()
+                    .accept(new 
VariableCollector(tableScan.getOutputVariables()), builder);
+            Set<ColumnHandle> remainingFilterProjects =
+                    builder.build().stream()
+                            .map(
+                                    v ->
+                                            Preconditions.checkNotNull(
+                                                    assignments.get(v),
+                                                    "The variable is"
+                                                            + " not found in 
the assignments"))
+                            .collect(Collectors.toSet());
+
+            // Prune the partition
+            Set<String> partitionColumns = new 
HashSet<>(table.partitionKeys());
+            Optional<List<Map<String, String>>> remainingPartitions = 
Optional.empty();
+            // we have predicate on the partition field, then we have to list 
the partition and do
+            // the prune.
+            if (PrestoSessionProperties.isPartitionPruneEnabled(session)
+                    && !remainingFilterProjects.isEmpty()
+                    && remainingFilterProjects.stream()
+                            .map(PrestoColumnHandle.class::cast)
+                            .anyMatch(c -> 
partitionColumns.contains(c.getColumnName()))) {
+                Map<String, ColumnHandle> columns =
+                        transactionManager
+                                .get(tableScan.getTable().getTransaction())
+                                .getColumnHandles(
+                                        session, 
tableScan.getTable().getConnectorHandle());
+                remainingPartitions =
+                        getRemainingPartition(
+                                table, decomposedFilter, session, 
entireColumnDomain, columns);
+            }
+
             // Build paimon new presto table handle use pushdown.
             PrestoTableHandle oldPrestoTableHandle =
                     (PrestoTableHandle) 
tableScan.getTable().getConnectorHandle();
@@ -180,7 +260,8 @@ public class PrestoComputePushdown implements 
ConnectorPlanOptimizer {
                             oldPrestoTableHandle.getTableName(),
                             oldPrestoTableHandle.getSerializedTable(),
                             entireColumnDomain,
-                            projectedColumns);
+                            projectedColumns,
+                            remainingPartitions);
 
             PrestoTableLayoutHandle newLayoutHandle =
                     new PrestoTableLayoutHandle(
@@ -203,4 +284,248 @@ public class PrestoComputePushdown implements 
ConnectorPlanOptimizer {
                     filter.getSourceLocation(), filter.getId(), newTableScan, 
filterPredicate);
         }
     }
+
+    private Optional<List<Map<String, String>>> getRemainingPartition(
+            Table table,
+            DomainTranslator.ExtractionResult<Subfield> decomposedFilter,
+            ConnectorSession session,
+            TupleDomain<PrestoColumnHandle> entireColumnDomain,
+            Map<String, ColumnHandle> columns) {
+        // Extract deterministic conjuncts that apply to partition columns and 
specify these as
+        // Constraint#predicate
+        Predicate<Map<ColumnHandle, NullableValue>> predicate = v -> true;
+
+        RowExpression remainingExpression = 
decomposedFilter.getRemainingExpression();
+        if (!TRUE_CONSTANT.equals(remainingExpression)) {
+            LogicalRowExpressions logicalRowExpressions =
+                    new LogicalRowExpressions(
+                            rowExpressionService.getDeterminismEvaluator(),
+                            functionResolution,
+                            functionMetadataManager);
+            RowExpression deterministicPredicate =
+                    
logicalRowExpressions.filterDeterministicConjuncts(remainingExpression);
+            if (!TRUE_CONSTANT.equals(deterministicPredicate)) {
+                ConstraintEvaluator evaluator =
+                        new ConstraintEvaluator(
+                                rowExpressionService, session, columns, 
deterministicPredicate);
+                predicate = evaluator::isCandidate;
+            }
+        }
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+
+        // list partition with filter.
+        new PrestoFilterConverter(table.rowType())
+                .convert(entireColumnDomain)
+                .ifPresent(readBuilder::withFilter);
+
+        List<BinaryRow> partitions = readBuilder.newScan().listPartitions();
+        String partitionDefaultName = new 
CoreOptions(table.options()).partitionDefaultName();
+
+        InternalRow.FieldGetter[] getters =
+                table.rowType().project(table.partitionKeys()).fieldGetters();
+        List<String> partitionKeys = table.partitionKeys();
+        List<Map<String, String>> remainingPartitions = new ArrayList<>();
+        for (BinaryRow partition : partitions) {
+            Map<ColumnHandle, NullableValue> partitionPrestoValue = new 
HashMap<>();
+            LinkedHashMap<String, String> partitionMap = new LinkedHashMap<>();
+            for (int i = 0; i < getters.length; i++) {
+                Object value = getters[i].getFieldOrNull(partition);
+                PrestoColumnHandle handle = (PrestoColumnHandle) 
columns.get(partitionKeys.get(i));
+                Preconditions.checkNotNull(
+                        handle, "The %s column handle is not found.", 
partitionKeys.get(i));
+                Type type = handle.getPrestoType();
+
+                partitionPrestoValue.put(
+                        handle,
+                        NullableValue.of(
+                                type,
+                                value == null
+                                        ? null
+                                        : Utils.blockToNativeValue(
+                                                type,
+                                                
PrestoTypeUtils.singleValueToBlock(type, value))));
+                partitionMap.put(
+                        partitionKeys.get(i),
+                        value == null ? partitionDefaultName : 
value.toString());
+            }
+            if (predicate.test(partitionPrestoValue)) {
+                remainingPartitions.add(partitionMap);
+            }
+        }
+        return Optional.of(remainingPartitions);
+    }
+
+    private static class VariableCollector
+            implements RowExpressionVisitor<
+                    Void, ImmutableSet.Builder<VariableReferenceExpression>> {
+
+        List<VariableReferenceExpression> inputVariables;
+
+        public VariableCollector(List<VariableReferenceExpression> 
inputVariables) {
+            this.inputVariables = inputVariables;
+        }
+
+        @Override
+        public Void visitCall(
+                CallExpression call, 
ImmutableSet.Builder<VariableReferenceExpression> context) {
+            for (RowExpression argument : call.getArguments()) {
+                argument.accept(this, context);
+            }
+            return null;
+        }
+
+        @Override
+        public Void visitInputReference(
+                InputReferenceExpression reference,
+                ImmutableSet.Builder<VariableReferenceExpression> context) {
+            int pos = reference.getField();
+            context.add(inputVariables.get(pos));
+            return null;
+        }
+
+        @Override
+        public Void visitConstant(
+                ConstantExpression literal,
+                ImmutableSet.Builder<VariableReferenceExpression> context) {
+            return null;
+        }
+
+        @Override
+        public Void visitLambda(
+                LambdaDefinitionExpression lambda,
+                ImmutableSet.Builder<VariableReferenceExpression> context) {
+            return null;
+        }
+
+        @Override
+        public Void visitVariableReference(
+                VariableReferenceExpression reference,
+                ImmutableSet.Builder<VariableReferenceExpression> context) {
+            context.add(reference);
+            return null;
+        }
+
+        @Override
+        public Void visitSpecialForm(
+                SpecialFormExpression specialForm,
+                ImmutableSet.Builder<VariableReferenceExpression> context) {
+            specialForm.getArguments().forEach(argument -> 
argument.accept(this, context));
+            return null;
+        }
+    }
+
+    // copied from presto
+    private static class VariableReferenceBuilderVisitor
+            extends DefaultRowExpressionTraversalVisitor<
+                    ImmutableSet.Builder<VariableReferenceExpression>> {
+        @Override
+        public Void visitVariableReference(
+                VariableReferenceExpression variable,
+                ImmutableSet.Builder<VariableReferenceExpression> builder) {
+            builder.add(variable);
+            return null;
+        }
+    }
+
+    private static Set<VariableReferenceExpression> extractAll(RowExpression 
expression) {
+        ImmutableSet.Builder<VariableReferenceExpression> builder = 
ImmutableSet.builder();
+        expression.accept(new VariableReferenceBuilderVisitor(), builder);
+        return builder.build();
+    }
+
+    private static class ConstraintEvaluator {
+        private final Map<String, ColumnHandle> assignments;
+        private final RowExpressionService evaluator;
+        private final ConnectorSession session;
+        private final RowExpression expression;
+        private final Set<ColumnHandle> arguments;
+
+        public ConstraintEvaluator(
+                RowExpressionService evaluator,
+                ConnectorSession session,
+                Map<String, ColumnHandle> assignments,
+                RowExpression expression) {
+            this.assignments = assignments;
+            this.evaluator = evaluator;
+            this.session = session;
+            this.expression = expression;
+
+            arguments =
+                    ImmutableSet.copyOf(extractAll(expression)).stream()
+                            .map(VariableReferenceExpression::getName)
+                            .map(assignments::get)
+                            .collect(toImmutableSet());
+        }
+
+        private boolean isCandidate(Map<ColumnHandle, NullableValue> bindings) 
{
+            if (intersection(bindings.keySet(), arguments).isEmpty()) {
+                return true;
+            }
+
+            Function<VariableReferenceExpression, Object> variableResolver =
+                    variable -> {
+                        ColumnHandle column = 
assignments.get(variable.getName());
+                        checkArgument(column != null, "Missing column 
assignment for %s", variable);
+
+                        if (!bindings.containsKey(column)) {
+                            return variable;
+                        }
+
+                        return bindings.get(column).getValue();
+                    };
+
+            // Skip pruning if evaluation fails in a recoverable way. Failing 
here can cause
+            // spurious query failures for partitions that would otherwise be 
filtered out.
+            Object optimized = null;
+            try {
+                optimized =
+                        evaluator
+                                .getExpressionOptimizer()
+                                .optimize(expression, OPTIMIZED, session, 
variableResolver);
+            } catch (PrestoException e) {
+                propagateIfUnhandled(e);
+                return true;
+            }
+
+            // If any conjuncts evaluate to FALSE or null, then the whole 
predicate will never be
+            // true and so the partition should be pruned
+            return !Boolean.FALSE.equals(optimized)
+                    && optimized != null
+                    && (!(optimized instanceof ConstantExpression)
+                            || !((ConstantExpression) optimized).isNull());
+        }
+
+        private static void propagateIfUnhandled(PrestoException e) throws 
PrestoException {
+            int errorCode = e.getErrorCode().getCode();
+            if (errorCode == DIVISION_BY_ZERO.toErrorCode().getCode()
+                    || errorCode == 
INVALID_CAST_ARGUMENT.toErrorCode().getCode()
+                    || errorCode == 
INVALID_FUNCTION_ARGUMENT.toErrorCode().getCode()
+                    || errorCode == 
NUMERIC_VALUE_OUT_OF_RANGE.toErrorCode().getCode()) {
+                return;
+            }
+
+            throw e;
+        }
+    }
+
+    private static final class Utils {
+        private Utils() {}
+
+        public static Block nativeValueToBlock(Type type, Object object) {
+            if (object != null && 
!Primitives.wrap(type.getJavaType()).isInstance(object)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Object '%s' does not match type %s", object, 
type.getJavaType()));
+            } else {
+                BlockBuilder blockBuilder = type.createBlockBuilder(null, 1);
+                TypeUtils.writeNativeValue(type, blockBuilder, object);
+                return blockBuilder.build();
+            }
+        }
+
+        public static Object blockToNativeValue(Type type, Block block) {
+            return TypeUtils.readNativeValue(type, block, 0);
+        }
+    }
 }
diff --git 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPlanOptimizerProvider.java
 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPlanOptimizerProvider.java
index 7092753..6df1ba8 100644
--- 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPlanOptimizerProvider.java
+++ 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPlanOptimizerProvider.java
@@ -20,6 +20,7 @@ package org.apache.paimon.presto;
 
 import com.facebook.presto.spi.ConnectorPlanOptimizer;
 import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
+import com.facebook.presto.spi.function.FunctionMetadataManager;
 import com.facebook.presto.spi.function.StandardFunctionResolution;
 import com.facebook.presto.spi.relation.RowExpressionService;
 import com.google.common.collect.ImmutableSet;
@@ -35,19 +36,31 @@ public class PrestoPlanOptimizerProvider implements 
ConnectorPlanOptimizerProvid
 
     private final StandardFunctionResolution functionResolution;
     private final RowExpressionService rowExpressionService;
+    private final FunctionMetadataManager functionMetadataManager;
+    private final PrestoTransactionManager transactionManager;
 
     @Inject
     public PrestoPlanOptimizerProvider(
             StandardFunctionResolution functionResolution,
-            RowExpressionService rowExpressionService) {
+            RowExpressionService rowExpressionService,
+            FunctionMetadataManager functionMetadataManager,
+            PrestoTransactionManager transactionManager) {
         this.functionResolution = requireNonNull(functionResolution, 
"functionResolution is null");
         this.rowExpressionService =
                 requireNonNull(rowExpressionService, "rowExpressionService is 
null");
+        this.functionMetadataManager =
+                requireNonNull(functionMetadataManager, 
"functionMetadataManager is null");
+        this.transactionManager = requireNonNull(transactionManager, 
"transactionManager is null");
     }
 
     @Override
     public Set<ConnectorPlanOptimizer> getLogicalPlanOptimizers() {
-        return ImmutableSet.of(new PrestoComputePushdown(functionResolution, 
rowExpressionService));
+        return ImmutableSet.of(
+                new PrestoComputePushdown(
+                        functionResolution,
+                        rowExpressionService,
+                        functionMetadataManager,
+                        transactionManager));
     }
 
     @Override
diff --git 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java
 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java
index 6633c3d..3888cb9 100644
--- 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java
+++ 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java
@@ -32,6 +32,7 @@ import static 
com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
 public class PrestoSessionProperties {
 
     public static final String QUERY_PUSHDOWN_ENABLED = 
"query_pushdown_enabled";
+    public static final String PARTITION_PRUNE_ENABLED = 
"partition_prune_enabled";
 
     private final List<PropertyMetadata<?>> sessionProperties;
 
@@ -43,6 +44,11 @@ public class PrestoSessionProperties {
                                 QUERY_PUSHDOWN_ENABLED,
                                 "Enable paimon query pushdown",
                                 config.isPaimonPushdownEnabled(),
+                                false),
+                        booleanProperty(
+                                PARTITION_PRUNE_ENABLED,
+                                "Enable paimon query partition prune",
+                                config.isPaimonPartitionPruningEnabled(),
                                 false));
     }
 
@@ -53,4 +59,8 @@ public class PrestoSessionProperties {
     public static boolean isPaimonPushdownEnabled(ConnectorSession session) {
         return session.getProperty(QUERY_PUSHDOWN_ENABLED, Boolean.class);
     }
+
+    public static boolean isPartitionPruneEnabled(ConnectorSession session) {
+        return session.getProperty(PARTITION_PRUNE_ENABLED, Boolean.class);
+    }
 }
diff --git 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplitManager.java
 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplitManager.java
index 5ea71c0..21a6c67 100644
--- 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplitManager.java
+++ 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSplitManager.java
@@ -18,9 +18,14 @@
 
 package org.apache.paimon.presto;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 
 import com.facebook.presto.spi.ConnectorSession;
 import com.facebook.presto.spi.ConnectorSplitSource;
@@ -28,7 +33,10 @@ import com.facebook.presto.spi.ConnectorTableLayoutHandle;
 import com.facebook.presto.spi.connector.ConnectorSplitManager;
 import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 /** Presto {@link ConnectorSplitManager}. */
@@ -47,6 +55,27 @@ public class PrestoSplitManager implements 
ConnectorSplitManager {
         new PrestoFilterConverter(table.rowType())
                 .convert(tableHandle.getFilter())
                 .ifPresent(readBuilder::withFilter);
+        Optional<List<Map<String, String>>> partitions = 
tableHandle.getPartitions();
+        org.apache.paimon.types.RowType partitionType =
+                table.rowType().project(table.partitionKeys());
+        List<Predicate> predicates = new ArrayList<>();
+
+        String partitionDefaultName = new 
CoreOptions(table.options()).partitionDefaultName();
+        if (partitions.isPresent()) {
+            for (Map<String, String> row : partitions.get()) {
+                Map<String, Object> partition =
+                        InternalRowPartitionComputer.convertSpecToInternal(
+                                row, partitionType, partitionDefaultName);
+                predicates.add(
+                        
PartitionPredicate.createPartitionPredicate(table.rowType(), partition));
+            }
+            if (!predicates.isEmpty()) {
+                readBuilder.withFilter(PredicateBuilder.or(predicates));
+            } else {
+                // empty partition
+                return new PrestoSplitSource(new ArrayList<>());
+            }
+        }
         List<Split> splits = readBuilder.newScan().plan().splits();
         return new PrestoSplitSource(
                 
splits.stream().map(PrestoSplit::fromSplit).collect(Collectors.toList()));
diff --git 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTableHandle.java
 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTableHandle.java
index a706e09..af4ea4a 100644
--- 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTableHandle.java
+++ 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTableHandle.java
@@ -34,6 +34,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -45,12 +46,20 @@ public class PrestoTableHandle implements 
ConnectorTableHandle {
     private final String tableName;
     private final byte[] serializedTable;
     private final TupleDomain<PrestoColumnHandle> filter;
+
     private final Optional<List<ColumnHandle>> projectedColumns;
+    private final Optional<List<Map<String, String>>> partitions;
 
     private Table lazyTable;
 
     public PrestoTableHandle(String schemaName, String tableName, byte[] 
serializedTable) {
-        this(schemaName, tableName, serializedTable, TupleDomain.all(), 
Optional.empty());
+        this(
+                schemaName,
+                tableName,
+                serializedTable,
+                TupleDomain.all(),
+                Optional.empty(),
+                Optional.empty());
     }
 
     @JsonCreator
@@ -59,12 +68,14 @@ public class PrestoTableHandle implements 
ConnectorTableHandle {
             @JsonProperty("tableName") String tableName,
             @JsonProperty("serializedTable") byte[] serializedTable,
             @JsonProperty("filter") TupleDomain<PrestoColumnHandle> filter,
-            @JsonProperty("projection") Optional<List<ColumnHandle>> 
projectedColumns) {
+            @JsonProperty("projection") Optional<List<ColumnHandle>> 
projectedColumns,
+            @JsonProperty("partitions") Optional<List<Map<String, String>>> 
partitions) {
         this.schemaName = schemaName;
         this.tableName = tableName;
         this.serializedTable = serializedTable;
         this.filter = filter;
         this.projectedColumns = projectedColumns;
+        this.partitions = partitions;
     }
 
     @JsonProperty
@@ -87,6 +98,11 @@ public class PrestoTableHandle implements 
ConnectorTableHandle {
         return filter;
     }
 
+    @JsonProperty
+    public Optional<List<Map<String, String>>> getPartitions() {
+        return partitions;
+    }
+
     @JsonProperty
     public Optional<List<ColumnHandle>> getProjectedColumns() {
         return projectedColumns;
@@ -94,12 +110,12 @@ public class PrestoTableHandle implements 
ConnectorTableHandle {
 
     public PrestoTableHandle copy(TupleDomain<PrestoColumnHandle> filter) {
         return new PrestoTableHandle(
-                schemaName, tableName, serializedTable, filter, 
projectedColumns);
+                schemaName, tableName, serializedTable, filter, 
projectedColumns, partitions);
     }
 
     public PrestoTableHandle copy(Optional<List<ColumnHandle>> 
projectedColumns) {
         return new PrestoTableHandle(
-                schemaName, tableName, serializedTable, filter, 
projectedColumns);
+                schemaName, tableName, serializedTable, filter, 
projectedColumns, partitions);
     }
 
     public Table table() {
@@ -155,12 +171,34 @@ public class PrestoTableHandle implements 
ConnectorTableHandle {
                 && Objects.equals(schemaName, that.schemaName)
                 && Objects.equals(tableName, that.tableName)
                 && Objects.equals(filter, that.filter)
-                && Objects.equals(projectedColumns, that.projectedColumns);
+                && Objects.equals(projectedColumns, that.projectedColumns)
+                && Objects.equals(partitions, that.partitions);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(
-                schemaName, tableName, filter, projectedColumns, 
Arrays.hashCode(serializedTable));
+                schemaName,
+                tableName,
+                filter,
+                projectedColumns,
+                Arrays.hashCode(serializedTable),
+                partitions);
+    }
+
+    @Override
+    public String toString() {
+        return "PrestoTableHandle{"
+                + "schemaName='"
+                + schemaName
+                + '\''
+                + ", tableName='"
+                + tableName
+                + '\''
+                + ", filter="
+                + filter
+                + ", partitions="
+                + partitions
+                + '}';
     }
 }
diff --git 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTypeUtils.java
 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTypeUtils.java
index 662998f..335d32a 100644
--- 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTypeUtils.java
+++ 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoTypeUtils.java
@@ -18,6 +18,10 @@
 
 package org.apache.paimon.presto;
 
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.shade.guava30.com.google.common.base.Verify;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
@@ -42,7 +46,10 @@ import org.apache.paimon.types.TinyIntType;
 import org.apache.paimon.types.VarBinaryType;
 import org.apache.paimon.types.VarCharType;
 
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.BlockBuilder;
 import com.facebook.presto.common.type.BigintType;
+import com.facebook.presto.common.type.Decimals;
 import com.facebook.presto.common.type.IntegerType;
 import com.facebook.presto.common.type.RealType;
 import com.facebook.presto.common.type.SmallintType;
@@ -55,12 +62,29 @@ import com.facebook.presto.common.type.TypeSignature;
 import com.facebook.presto.common.type.TypeSignatureParameter;
 import com.facebook.presto.common.type.VarbinaryType;
 import com.facebook.presto.common.type.VarcharType;
+import com.facebook.presto.spi.PrestoException;
+import io.airlift.slice.Slice;
 
+import java.math.BigDecimal;
+import java.time.ZoneId;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+import static com.facebook.presto.common.type.BigintType.BIGINT;
+import static com.facebook.presto.common.type.DateType.DATE;
+import static com.facebook.presto.common.type.Decimals.encodeShortScaledValue;
+import static com.facebook.presto.common.type.Decimals.isLongDecimal;
+import static com.facebook.presto.common.type.Decimals.isShortDecimal;
+import static com.facebook.presto.common.type.IntegerType.INTEGER;
+import static com.facebook.presto.common.type.RealType.REAL;
+import static com.facebook.presto.common.type.SmallintType.SMALLINT;
+import static com.facebook.presto.common.type.TimeType.TIME;
+import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
+import static com.facebook.presto.common.type.TinyintType.TINYINT;
+import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
+import static io.airlift.slice.Slices.wrappedBuffer;
 import static java.lang.String.format;
 
 /** Presto type from Paimon Type. */
@@ -207,4 +231,84 @@ public class PrestoTypeUtils {
                     format("Cannot convert from Presto type '%s' to Paimon 
type", prestoType));
         }
     }
+
+    /** Covert a presto block from a value. */
+    public static Block singleValueToBlock(Type prestoType, Object value) {
+        if (value == null) {
+            return null;
+        }
+        BlockBuilder output = prestoType.createBlockBuilder(null, 1);
+        Class<?> javaType = prestoType.getJavaType();
+        if (javaType == boolean.class) {
+            prestoType.writeBoolean(output, (Boolean) value);
+        } else if (javaType == long.class) {
+            if (prestoType.equals(BIGINT)
+                    || prestoType.equals(INTEGER)
+                    || prestoType.equals(TINYINT)
+                    || prestoType.equals(SMALLINT)
+                    || prestoType.equals(DATE)) {
+                prestoType.writeLong(output, ((Number) value).longValue());
+            } else if (prestoType.equals(REAL)) {
+                prestoType.writeLong(output, Float.floatToIntBits((Float) 
value));
+            } else if (prestoType instanceof 
com.facebook.presto.common.type.DecimalType) {
+                Verify.verify(isShortDecimal(prestoType), "The type should be 
short decimal");
+                com.facebook.presto.common.type.DecimalType decimalType =
+                        (com.facebook.presto.common.type.DecimalType) 
prestoType;
+                BigDecimal decimal = ((Decimal) value).toBigDecimal();
+                prestoType.writeLong(
+                        output, encodeShortScaledValue(decimal, 
decimalType.getScale()));
+            } else if (prestoType.equals(TIMESTAMP)) {
+                prestoType.writeLong(
+                        output,
+                        ((Timestamp) value)
+                                .toLocalDateTime()
+                                .atZone(ZoneId.systemDefault())
+                                .toInstant()
+                                .toEpochMilli());
+            } else if (prestoType.equals(TIME)) {
+                prestoType.writeLong(output, (int) value * 1_000);
+            } else {
+                throw new PrestoException(
+                        GENERIC_INTERNAL_ERROR,
+                        format("Unhandled type for %s: %s", 
javaType.getSimpleName(), prestoType));
+            }
+        } else if (javaType == double.class) {
+            prestoType.writeDouble(output, ((Number) value).doubleValue());
+        } else if (prestoType instanceof 
com.facebook.presto.common.type.DecimalType) {
+            writeObject(output, prestoType, value);
+        } else if (javaType == Slice.class) {
+            writeSlice(output, prestoType, value);
+        } else {
+            throw new PrestoException(
+                    GENERIC_INTERNAL_ERROR,
+                    format("Unhandled type for %s: %s", 
javaType.getSimpleName(), prestoType));
+        }
+        return output.build();
+    }
+
+    private static void writeSlice(BlockBuilder output, Type type, Object 
value) {
+        if (type instanceof VarcharType
+                || type instanceof com.facebook.presto.common.type.CharType) {
+            type.writeSlice(output, wrappedBuffer(((BinaryString) 
value).toBytes()));
+        } else if (type instanceof VarbinaryType) {
+            type.writeSlice(output, wrappedBuffer((byte[]) value));
+        } else {
+            throw new PrestoException(
+                    GENERIC_INTERNAL_ERROR, "Unhandled type for Slice: " + 
type.getTypeSignature());
+        }
+    }
+
+    private static void writeObject(BlockBuilder output, Type type, Object 
value) {
+        if (type instanceof com.facebook.presto.common.type.DecimalType) {
+            Verify.verify(isLongDecimal(type), "The type should be long 
decimal");
+            com.facebook.presto.common.type.DecimalType decimalType =
+                    (com.facebook.presto.common.type.DecimalType) type;
+            BigDecimal decimal = ((Decimal) value).toBigDecimal();
+            type.writeSlice(output, Decimals.encodeScaledValue(decimal, 
decimalType.getScale()));
+        } else {
+            throw new PrestoException(
+                    GENERIC_INTERNAL_ERROR,
+                    "Unhandled type for Object: " + type.getTypeSignature());
+        }
+    }
 }
diff --git 
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/PrestoTableHandleTest.java
 
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/PrestoTableHandleTest.java
index fd226a4..befbdeb 100644
--- 
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/PrestoTableHandleTest.java
+++ 
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/PrestoTableHandleTest.java
@@ -36,7 +36,12 @@ public class PrestoTableHandleTest {
         byte[] serializedTable = TestPrestoUtils.getSerializedTable();
         PrestoTableHandle expected =
                 new PrestoTableHandle(
-                        "test", "user", serializedTable, TupleDomain.all(), 
Optional.empty());
+                        "test",
+                        "user",
+                        serializedTable,
+                        TupleDomain.all(),
+                        Optional.empty(),
+                        Optional.empty());
         testRoundTrip(expected);
     }
 
diff --git 
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
 
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
index 4ae1ce3..e591296 100644
--- 
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
+++ 
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoComputePushdown.java
@@ -18,10 +18,23 @@
 
 package org.apache.paimon.presto;
 
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.InstantiationUtil;
 
+import com.facebook.presto.common.block.BlockEncodingManager;
 import com.facebook.presto.common.predicate.TupleDomain;
 import com.facebook.presto.metadata.FunctionAndTypeManager;
+import com.facebook.presto.metadata.HandleResolver;
 import com.facebook.presto.metadata.MetadataManager;
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorId;
@@ -41,6 +54,7 @@ import com.facebook.presto.spi.relation.RowExpression;
 import com.facebook.presto.spi.relation.RowExpressionService;
 import com.facebook.presto.spi.relation.VariableReferenceExpression;
 import com.facebook.presto.sql.TestingRowExpressionTranslator;
+import com.facebook.presto.sql.analyzer.FeaturesConfig;
 import com.facebook.presto.sql.gen.RowExpressionPredicateCompiler;
 import com.facebook.presto.sql.planner.PlanVariableAllocator;
 import com.facebook.presto.sql.planner.TypeProvider;
@@ -52,8 +66,13 @@ import 
com.facebook.presto.sql.relational.RowExpressionOptimizer;
 import com.facebook.presto.testing.TestingConnectorSession;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -63,6 +82,7 @@ import java.util.UUID;
 import static com.facebook.presto.common.type.BigintType.BIGINT;
 import static 
com.facebook.presto.metadata.FunctionAndTypeManager.createTestFunctionAndTypeManager;
 import static 
com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder.expression;
+import static 
com.facebook.presto.transaction.InMemoryTransactionManager.createTestTransactionManager;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link TestPrestoComputePushdown}. */
@@ -103,6 +123,8 @@ public class TestPrestoComputePushdown {
                 }
             };
 
+    public byte[] table;
+
     private TableScanNode createTableScan() {
         PlanVariableAllocator variableAllocator = new PlanVariableAllocator();
         VariableReferenceExpression variableA = 
variableAllocator.newVariable("a", BIGINT);
@@ -115,7 +137,7 @@ public class TestPrestoComputePushdown {
                                         "id", new BigIntType(), 
createTestFunctionAndTypeManager()))
                         .build();
 
-        PrestoTableHandle tableHandle = new PrestoTableHandle("test", "test", 
"table".getBytes());
+        PrestoTableHandle tableHandle = new PrestoTableHandle("test", "test", 
this.table);
 
         return new TableScanNode(
                 Optional.empty(),
@@ -129,8 +151,9 @@ public class TestPrestoComputePushdown {
                                         new PrestoTableHandle(
                                                 "test",
                                                 "test",
-                                                "table".getBytes(),
+                                                this.table,
                                                 TupleDomain.all(),
+                                                Optional.empty(),
                                                 Optional.empty()),
                                         TupleDomain.all()))),
                 ImmutableList.copyOf(assignments.keySet()),
@@ -167,6 +190,31 @@ public class TestPrestoComputePushdown {
         return sessionConfig;
     }
 
+    @BeforeTest
+    public void setUp() throws Exception {
+        String warehouse =
+                
Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString();
+        Path tablePath3 = new Path(warehouse, "default.db/t3");
+        RowType rowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField(0, "pt", new VarCharType()),
+                                new DataField(1, "a", new IntType()),
+                                new DataField(2, "b", new BigIntType()),
+                                new DataField(3, "c", new BigIntType()),
+                                new DataField(4, "d", new IntType())));
+        new SchemaManager(LocalFileIO.create(), tablePath3)
+                .createTable(
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.singletonList("pt"),
+                                Collections.emptyList(),
+                                new HashMap<>(),
+                                ""));
+        FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath3);
+        this.table = InstantiationUtil.serializeObject(table);
+    }
+
     @Test
     public void testOptimizeFilter() {
         // Mock data.
@@ -177,7 +225,16 @@ public class TestPrestoComputePushdown {
         Map<String, Object> prestoSessionConfig = 
createPrestoSessionConfig(config);
 
         PrestoComputePushdown prestoComputePushdown =
-                new PrestoComputePushdown(FUNCTION_RESOLUTION, 
ROW_EXPRESSION_SERVICE);
+                new PrestoComputePushdown(
+                        FUNCTION_RESOLUTION,
+                        ROW_EXPRESSION_SERVICE,
+                        new FunctionAndTypeManager(
+                                createTestTransactionManager(),
+                                new BlockEncodingManager(),
+                                new FeaturesConfig(),
+                                new HandleResolver(),
+                                ImmutableSet.of()),
+                        new PrestoTransactionManager());
 
         PlanNode mockInputPlan = createFilterNode();
         ConnectorSession session =
@@ -227,7 +284,16 @@ public class TestPrestoComputePushdown {
         Map<String, Object> prestoSessionConfig = 
createPrestoSessionConfig(config);
 
         PrestoComputePushdown prestoComputePushdown =
-                new PrestoComputePushdown(FUNCTION_RESOLUTION, 
ROW_EXPRESSION_SERVICE);
+                new PrestoComputePushdown(
+                        FUNCTION_RESOLUTION,
+                        ROW_EXPRESSION_SERVICE,
+                        new FunctionAndTypeManager(
+                                createTestTransactionManager(),
+                                new BlockEncodingManager(),
+                                new FeaturesConfig(),
+                                new HandleResolver(),
+                                ImmutableSet.of()),
+                        new PrestoTransactionManager());
 
         PlanNode mockInputPlan = createFilterNode();
         ConnectorSession session =
diff --git 
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
 
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
index 04d63bf..03678f2 100644
--- 
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
+++ 
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.presto;
 
+import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericMap;
 import org.apache.paimon.data.GenericRow;
@@ -26,6 +27,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
@@ -42,6 +44,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.types.VarCharType;
 
+import com.facebook.presto.Session;
 import com.facebook.presto.common.type.TimeZoneKey;
 import com.facebook.presto.testing.MaterializedResult;
 import com.facebook.presto.testing.QueryRunner;
@@ -63,6 +66,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.TimeZone;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static com.facebook.airlift.testing.Closeables.closeAllSuppress;
 import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
@@ -217,6 +221,58 @@ public class TestPrestoITCase {
             commit.commit(0, writer.prepareCommit(true, 0));
         }
 
+        // partitioned table
+        {
+            Path tablePath = new Path(warehouse, "default.db/t5");
+            RowType rowType =
+                    new RowType(
+                            Arrays.asList(
+                                    new DataField(0, "i1", 
VarCharType.STRING_TYPE),
+                                    new DataField(1, "i2", new IntType()),
+                                    new DataField(2, "i3", new IntType())));
+            new SchemaManager(LocalFileIO.create(), tablePath)
+                    .createTable(
+                            new Schema(
+                                    rowType.getFields(),
+                                    ImmutableList.of("i1", "i2"),
+                                    Collections.emptyList(),
+                                    ImmutableMap.of("bucket", "1"),
+                                    ""));
+            FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath);
+            InnerTableWrite writer = table.newWrite("user");
+            InnerTableCommit commit = table.newCommit("user");
+            writer.write(GenericRow.of(BinaryString.fromString("20241103"), 1, 
1));
+            writer.write(GenericRow.of(BinaryString.fromString("20241103"), 2, 
2));
+            writer.write(GenericRow.of(BinaryString.fromString("20241104"), 3, 
2));
+            commit.commit(0, writer.prepareCommit(true, 0));
+        }
+
+        // partitioned table
+        {
+            Path tablePath = new Path(warehouse, "default.db/t6");
+            RowType rowType =
+                    new RowType(
+                            Arrays.asList(
+                                    new DataField(0, "i1", new IntType()),
+                                    new DataField(1, "i2", 
VarCharType.STRING_TYPE),
+                                    new DataField(2, "i3", new IntType())));
+            new SchemaManager(LocalFileIO.create(), tablePath)
+                    .createTable(
+                            new Schema(
+                                    rowType.getFields(),
+                                    ImmutableList.of("i2"),
+                                    ImmutableList.of("i2", "i1"),
+                                    ImmutableMap.of("bucket", "1"),
+                                    ""));
+            FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath);
+            InnerTableWrite writer = table.newWrite("user");
+            InnerTableCommit commit = table.newCommit("user");
+            writer.write(GenericRow.of(1, BinaryString.fromString("20241103"), 
1));
+            writer.write(GenericRow.of(2, BinaryString.fromString("20241103"), 
2));
+            writer.write(GenericRow.of(3, BinaryString.fromString("20241104"), 
2));
+            commit.commit(0, writer.prepareCommit(true, 0));
+        }
+
         DistributedQueryRunner queryRunner = null;
         try {
             queryRunner =
@@ -503,8 +559,81 @@ public class TestPrestoITCase {
                 .isEqualTo("[[10000000000]]");
     }
 
+    @Test
+    public void testPartitionPushDown1() throws Exception {
+        assertThat(sql("EXPLAIN SELECT * FROM paimon.default.t5 where 
upper(i1) = '20241103'"))
+                .contains(
+                        "LAYOUT: 
PrestoTableLayoutHandle{tableHandle=PrestoTableHandle{schemaName='default', 
tableName='t5', filter=TupleDomain{ALL}, partitions=Optional[[{i1=20241103, 
i2=2}, {i1=20241103, i2=1}]]}, constraintSummary=TupleDomain{ALL}");
+        assertThat(sql("SELECT * FROM paimon.default.t5 where upper(i1) = 
'20241103'"))
+                .isEqualTo("[[20241103, 1, 1], [20241103, 2, 2]]");
+    }
+
+    @Test
+    public void testPartitionPushDown2() throws Exception {
+        assertThat(
+                        sql(
+                                ("EXPLAIN SELECT * FROM paimon.default.t5 
where upper(i1) = '20241103' and i2 =1")))
+                .contains(
+                        "LAYOUT: 
PrestoTableLayoutHandle{tableHandle=PrestoTableHandle{schemaName='default', 
tableName='t5', filter=TupleDomain{...}, partitions=Optional[[{i1=20241103, 
i2=1}]]}, constraintSummary=TupleDomain{...}");
+        assertThat(sql("SELECT * FROM paimon.default.t5 where upper(i1) = 
'20241103' and i2 =1"))
+                .isEqualTo("[[20241103, 1, 1]]");
+    }
+
+    @Test
+    public void testPartitionPushDown3() throws Exception {
+        assertThat(
+                        sql(
+                                ("EXPLAIN SELECT * FROM paimon.default.t5 
where upper(i1) = '20241105' and i2 =1")))
+                .contains(
+                        "LAYOUT: 
PrestoTableLayoutHandle{tableHandle=PrestoTableHandle{schemaName='default', "
+                                + "tableName='t5', filter=TupleDomain{...}, 
partitions=Optional[[]]}, constraintSummary=TupleDomain{...}}");
+        assertThat(sql("SELECT * FROM paimon.default.t5 where upper(i1) = 
'20241105' and i2 =1"))
+                .isEqualTo("[]");
+    }
+
+    @Test
+    public void testPartitionPushDown4() throws Exception {
+        assertThat(
+                        sql(
+                                "EXPLAIN SELECT * FROM paimon.default.t5 where 
upper(i1) = "
+                                        + "'20241103'and i2 =1",
+                                "partition_prune_enabled",
+                                "false"))
+                .contains(
+                        "LAYOUT: 
PrestoTableLayoutHandle{tableHandle=PrestoTableHandle{schemaName='default', "
+                                + "tableName='t5', filter=TupleDomain{...}, 
partitions=Optional.empty}, constraintSummary=TupleDomain{...}");
+        assertThat(
+                        sql(
+                                "SELECT * FROM paimon.default.t5 where 
upper(i1) = "
+                                        + "'20241103'and i2 =1",
+                                "partition_prune_enabled",
+                                "false"))
+                .isEqualTo("[[20241103, 1, 1]]");
+    }
+
+    @Test
+    public void testPartitionPushDown5() throws Exception {
+        assertThat(sql("SELECT * FROM paimon.default.t6 where upper(i2) = 
'20241103'"))
+                .isEqualTo("[[1, 20241103, 1], [2, 20241103, 2]]");
+    }
+
+    private String sql(String sql, String key, String value) throws Exception {
+        Session session =
+                testSessionBuilder().setCatalogSessionProperty("paimon", key, 
value).build();
+        MaterializedResult result = queryRunner.execute(session, sql);
+        return result.getMaterializedRows().stream()
+                .map(Object::toString)
+                .sorted()
+                .collect(Collectors.toList())
+                .toString();
+    }
+
     private String sql(String sql) throws Exception {
         MaterializedResult result = queryRunner.execute(sql);
-        return result.getMaterializedRows().toString();
+        return result.getMaterializedRows().stream()
+                .map(Object::toString)
+                .sorted()
+                .collect(Collectors.toList())
+                .toString();
     }
 }

Reply via email to