This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 306f2bb03e6 [HUDI-6077] Add more partition push down filters (#8452)
306f2bb03e6 is described below

commit 306f2bb03e6a42e2bca5f4e0db0379a87ad58a66
Author: Rex(Hui) An <bonean...@gmail.com>
AuthorDate: Mon Jul 24 18:43:10 2023 +0800

    [HUDI-6077] Add more partition push down filters (#8452)
---
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |   7 +-
 .../functional/TestHoodieBackedMetadata.java       |   2 +-
 .../io/storage/row/TestHoodieRowCreateHandle.java  |  11 +-
 .../org/apache/hudi/BaseHoodieTableFileIndex.java  |  22 ++
 .../java/org/apache/hudi/expression/ArrayData.java |  21 +-
 .../apache/hudi/expression/BinaryExpression.java   |  37 +-
 .../org/apache/hudi/expression/BindVisitor.java    | 187 ++++++++++
 .../org/apache/hudi/expression/BoundReference.java |  32 +-
 .../org/apache/hudi/expression/Comparators.java    |  52 +++
 .../org/apache/hudi}/expression/Expression.java    |  32 +-
 .../apache/hudi}/expression/ExpressionVisitor.java |  16 +-
 .../apache/hudi}/expression/LeafExpression.java    |  11 +-
 .../java/org/apache/hudi/expression/Literal.java   | 121 ++++++
 .../org/apache/hudi/expression/NameReference.java  |  20 +-
 .../apache/hudi/expression/PartialBindVisitor.java | 153 ++++++++
 .../java/org/apache/hudi/expression/Predicate.java |  29 +-
 .../org/apache/hudi/expression/Predicates.java     | 413 +++++++++++++++++++++
 .../org/apache/hudi/expression/StructLike.java     |  13 +-
 .../java/org/apache/hudi/internal/schema/Type.java |  80 +++-
 .../org/apache/hudi/internal/schema/Types.java     |  37 +-
 .../hudi/internal/schema/utils/Conversions.java    |  49 +++
 .../hudi/metadata/AbstractHoodieTableMetadata.java |  96 +++++
 .../apache/hudi/metadata/BaseTableMetadata.java    |  24 +-
 .../metadata/FileSystemBackedTableMetadata.java    | 111 +++++-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  23 ++
 .../apache/hudi/metadata/HoodieTableMetadata.java  |  10 +
 .../hudi/expression/TestPartialBindVisitor.java    |  83 +++++
 .../schema/utils/TestAvroSchemaEvolutionUtils.java |   2 +-
 .../TestFileSystemBackedTableMetadata.java         |  12 +-
 .../scala/org/apache/hudi/SparkFilterHelper.scala  | 122 ++++++
 .../apache/hudi/SparkHoodieTableFileIndex.scala    |  92 +++--
 .../org/apache/hudi/TestHoodieFileIndex.scala      |  72 ++++
 .../org/apache/hudi/TestSparkFilterHelper.scala    | 185 +++++++++
 .../sql/hudi/TestHoodieInternalRowUtils.scala      |   2 +-
 .../sql/hudi/TestLazyPartitionPathFetching.scala   |  30 ++
 .../TestPartitionPushDownWhenListingPaths.scala    | 109 ++++++
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |  15 +-
 .../spark/sql/adapter/BaseSpark3Adapter.scala      |   7 +-
 .../apache/hudi/hive/util/FilterGenVisitor.java    |  84 +++--
 .../hudi/hive/util/PartitionFilterGenerator.java   |  64 ++--
 .../hudi/utilities/HoodieDataTableValidator.java   |   2 +-
 .../apache/hudi/utilities/HoodieRepairTool.java    |   2 +-
 42 files changed, 2260 insertions(+), 232 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index cc72cf23a6b..782a49ac189 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, 
TableIdentifier}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.parser.HoodieExtendedParserInterface
-import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.{DataType, Metadata, StructType}
 import org.apache.spark.storage.StorageLevel
 
@@ -203,4 +203,9 @@ trait SparkAdapter extends Serializable {
    * Converts instance of [[StorageLevel]] to a corresponding string
    */
   def convertStorageLevelToString(level: StorageLevel): String
+
+  /**
+   * Tries to translate a Catalyst Expression into data source Filter
+   */
+  def translateFilter(predicate: Expression, supportNestedPredicatePushdown: 
Boolean = false): Option[Filter]
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 56fabb362b6..5288947bb19 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -3204,7 +3204,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
 
     // Partitions should match
-    FileSystemBackedTableMetadata fsBackedTableMetadata = new 
FileSystemBackedTableMetadata(engineContext,
+    FileSystemBackedTableMetadata fsBackedTableMetadata = new 
FileSystemBackedTableMetadata(engineContext, metaClient.getTableConfig(),
         new SerializableConfiguration(hadoopConf), config.getBasePath(), 
config.shouldAssumeDatePartitioning());
     List<String> fsPartitions = fsBackedTableMetadata.getAllPartitionPaths();
     List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
index 6dc90af78ae..42e84ebe7fd 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
@@ -190,16 +189,8 @@ public class TestHoodieRowCreateHandle extends 
HoodieClientTestHarness {
       HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
       new HoodieRowCreateHandle(table, cfg, " def", 
UUID.randomUUID().toString(), "001", RANDOM.nextInt(100000), RANDOM.nextLong(), 
RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
       fail("Should have thrown exception");
-    } catch (HoodieInsertException ioe) {
-      // expected without metadata table
-      if (enableMetadataTable) {
-        fail("Should have thrown TableNotFoundException");
-      }
     } catch (TableNotFoundException e) {
-      // expected with metadata table
-      if (!enableMetadataTable) {
-        fail("Should have thrown HoodieInsertException");
-      }
+      // expected throw failure
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java 
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index c5adafa38e2..3a24ef4dd2f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -36,7 +36,9 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.expression.Expression;
 import org.apache.hudi.hadoop.CachingPath;
+import org.apache.hudi.internal.schema.Types;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 
@@ -270,6 +272,26 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
         ));
   }
 
+  protected List<PartitionPath> listPartitionPaths(List<String> 
relativePartitionPaths,
+                                                   Types.RecordType 
partitionFields,
+                                                   Expression 
partitionColumnPredicates) {
+    List<String> matchedPartitionPaths;
+    try {
+      matchedPartitionPaths = 
tableMetadata.getPartitionPathWithPathPrefixUsingFilterExpression(relativePartitionPaths,
+          partitionFields, partitionColumnPredicates);
+    } catch (IOException e) {
+      throw new HoodieIOException("Error fetching partition paths", e);
+    }
+
+    // Convert partition's path into partition descriptor
+    return matchedPartitionPaths.stream()
+        .map(partitionPath -> {
+          Object[] partitionColumnValues = 
parsePartitionColumnValues(partitionColumns, partitionPath);
+          return new PartitionPath(partitionPath, partitionColumnValues);
+        })
+        .collect(Collectors.toList());
+  }
+
   protected List<PartitionPath> listPartitionPaths(List<String> 
relativePartitionPaths) {
     List<String> matchedPartitionPaths;
     try {
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java
 b/hudi-common/src/main/java/org/apache/hudi/expression/ArrayData.java
similarity index 69%
copy from 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java
copy to hudi-common/src/main/java/org/apache/hudi/expression/ArrayData.java
index 3be19330a19..3f7830e996c 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/ArrayData.java
@@ -16,22 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
 
-public class AttributeReferenceExpression extends LeafExpression {
+import java.util.List;
 
-  private final String name;
+public class ArrayData implements StructLike {
 
-  public AttributeReferenceExpression(String name) {
-    this.name = name;
+  private final List<Object> data;
+
+  public ArrayData(List<Object> data) {
+    this.data = data;
   }
 
-  public String getName() {
-    return name;
+  @Override
+  public int numFields() {
+    return data.size();
   }
 
   @Override
-  public <T> T accept(ExpressionVisitor<T> exprVisitor) {
-    return exprVisitor.visitAttribute(this);
+  public <T> T get(int pos, Class<T> classTag) {
+    return classTag.cast(data.get(pos));
   }
 }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/BinaryOperator.java
 b/hudi-common/src/main/java/org/apache/hudi/expression/BinaryExpression.java
similarity index 56%
rename from 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/BinaryOperator.java
rename to 
hudi-common/src/main/java/org/apache/hudi/expression/BinaryExpression.java
index d2bc83e9452..a0afdc18928 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/BinaryOperator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/BinaryExpression.java
@@ -16,46 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
 
 import java.util.Arrays;
+import java.util.List;
 
 /**
  * The expression that accept two child expressions.
  */
-public class BinaryOperator extends Expression {
+public abstract class BinaryExpression implements Expression {
 
   private final Operator operator;
   private final Expression left;
   private final Expression right;
 
-  private BinaryOperator(Expression left, Operator operator, Expression right) 
{
-    super(Arrays.asList(left, right));
+  BinaryExpression(Expression left, Operator operator, Expression right) {
     this.left = left;
     this.operator = operator;
     this.right = right;
   }
 
-  public static BinaryOperator and(Expression left, Expression right) {
-    return new BinaryOperator(left, Operator.AND, right);
-  }
-
-  public static BinaryOperator or(Expression left, Expression right) {
-    return new BinaryOperator(left, Operator.OR, right);
-  }
-
-  public static BinaryOperator eq(Expression left, Expression right) {
-    return new BinaryOperator(left, Operator.EQ, right);
-  }
-
-  public static BinaryOperator gteq(Expression left, Expression right) {
-    return new BinaryOperator(left, Operator.GT_EQ, right);
-  }
-
-  public static BinaryOperator lteq(Expression left, Expression right) {
-    return new BinaryOperator(left, Operator.LT_EQ, right);
-  }
-
   public Operator getOperator() {
     return operator;
   }
@@ -69,7 +49,12 @@ public class BinaryOperator extends Expression {
   }
 
   @Override
-  public <T> T accept(ExpressionVisitor<T> exprVisitor) {
-    return exprVisitor.visitBinaryOperator(this);
+  public List<Expression> getChildren() {
+    return Arrays.asList(left, right);
+  }
+
+  @Override
+  public String toString() {
+    return left.toString() + " " + operator.symbol + " " + right.toString();
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/expression/BindVisitor.java 
b/hudi-common/src/main/java/org/apache/hudi/expression/BindVisitor.java
new file mode 100644
index 00000000000..2b7e589af21
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/BindVisitor.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.expression;
+
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class BindVisitor implements ExpressionVisitor<Expression>  {
+
+  protected final Types.RecordType recordType;
+  protected final boolean caseSensitive;
+
+  public BindVisitor(Types.RecordType recordType, boolean caseSensitive) {
+    this.recordType = recordType;
+    this.caseSensitive = caseSensitive;
+  }
+
+  @Override
+  public Expression alwaysTrue() {
+    return Predicates.TrueExpression.get();
+  }
+
+  @Override
+  public Expression alwaysFalse() {
+    return Predicates.FalseExpression.get();
+  }
+
+  @Override
+  public Expression visitAnd(Predicates.And and) {
+    if (and.getLeft() instanceof Predicates.FalseExpression
+        || and.getRight() instanceof Predicates.FalseExpression) {
+      return alwaysFalse();
+    }
+
+    Expression left = and.getLeft().accept(this);
+    Expression right = and.getRight().accept(this);
+    if (left instanceof Predicates.FalseExpression
+        || right instanceof Predicates.FalseExpression) {
+      return alwaysFalse();
+    }
+
+    if (left instanceof Predicates.TrueExpression
+        && right instanceof Predicates.TrueExpression) {
+      return alwaysTrue();
+    }
+
+    if (left instanceof Predicates.TrueExpression) {
+      return right;
+    }
+
+    if (right instanceof Predicates.TrueExpression) {
+      return left;
+    }
+
+    return Predicates.and(left, right);
+  }
+
+  @Override
+  public Expression visitOr(Predicates.Or or) {
+    if (or.getLeft() instanceof Predicates.TrueExpression
+        || or.getRight() instanceof Predicates.TrueExpression) {
+      return alwaysTrue();
+    }
+
+    Expression left = or.getLeft().accept(this);
+    Expression right = or.getRight().accept(this);
+    if (left instanceof Predicates.TrueExpression
+        || right instanceof Predicates.TrueExpression) {
+      return alwaysTrue();
+    }
+
+    if (left instanceof Predicates.FalseExpression
+        && right instanceof Predicates.FalseExpression) {
+      return alwaysFalse();
+    }
+
+    if (left instanceof Predicates.FalseExpression) {
+      return right;
+    }
+
+    if (right instanceof Predicates.FalseExpression) {
+      return left;
+    }
+
+    return Predicates.or(left, right);
+  }
+
+  @Override
+  public Expression visitLiteral(Literal literal) {
+    return literal;
+  }
+
+  @Override
+  public Expression visitNameReference(NameReference attribute) {
+    Types.Field field = caseSensitive
+        ? recordType.fieldByName(attribute.getName())
+        : recordType.fieldByNameCaseInsensitive(attribute.getName());
+
+    if (field == null) {
+      throw new IllegalArgumentException("The attribute " + attribute
+          + " cannot be bound from schema " + recordType);
+    }
+
+    return new BoundReference(field.fieldId(), field.type());
+  }
+
+  @Override
+  public Expression visitBoundReference(BoundReference boundReference) {
+    return boundReference;
+  }
+
+  @Override
+  public Expression visitPredicate(Predicate predicate) {
+    if (predicate instanceof Predicates.Not) {
+      Expression expr = ((Predicates.Not) predicate).child.accept(this);
+      if (expr instanceof Predicates.TrueExpression) {
+        return alwaysFalse();
+      }
+      if (expr instanceof Predicates.FalseExpression) {
+        return alwaysTrue();
+      }
+
+      return Predicates.not(expr);
+    }
+
+    if (predicate instanceof Predicates.BinaryComparison) {
+      Predicates.BinaryComparison binaryExp = (Predicates.BinaryComparison) 
predicate;
+      Expression left = binaryExp.getLeft().accept(this);
+      Expression right = binaryExp.getRight().accept(this);
+      return new Predicates.BinaryComparison(left, binaryExp.getOperator(), 
right);
+    }
+
+    if (predicate instanceof Predicates.In) {
+      Predicates.In in = ((Predicates.In) predicate);
+      Expression valueExpression = in.value.accept(this);
+      List<Expression> validValues = in.validValues.stream()
+          .map(validValue -> validValue.accept(this))
+          .collect(Collectors.toList());
+
+      return Predicates.in(valueExpression, validValues);
+    }
+
+    if (predicate instanceof Predicates.IsNull) {
+      Predicates.IsNull isNull = (Predicates.IsNull) predicate;
+      return Predicates.isNull(isNull.child.accept(this));
+    }
+
+    if (predicate instanceof Predicates.IsNotNull) {
+      Predicates.IsNotNull isNotNull = (Predicates.IsNotNull) predicate;
+      return Predicates.isNotNull(isNotNull.child.accept(this));
+    }
+
+    if (predicate instanceof Predicates.StringStartsWith) {
+      Predicates.StringStartsWith contains = (Predicates.StringStartsWith) 
predicate;
+      Expression left = contains.getLeft().accept(this);
+      Expression right = contains.getRight().accept(this);
+      return Predicates.startsWith(left, right);
+    }
+
+    if (predicate instanceof Predicates.StringContains) {
+      Predicates.StringContains contains = (Predicates.StringContains) 
predicate;
+      Expression left = contains.getLeft().accept(this);
+      Expression right = contains.getRight().accept(this);
+      return Predicates.contains(left, right);
+    }
+
+    throw new IllegalArgumentException("The expression " + this + "cannot be 
visited as predicate");
+  }
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java
 b/hudi-common/src/main/java/org/apache/hudi/expression/BoundReference.java
similarity index 60%
copy from 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java
copy to hudi-common/src/main/java/org/apache/hudi/expression/BoundReference.java
index f84756a6a85..f3a7e0fac51 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/BoundReference.java
@@ -16,28 +16,38 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
 
-public class Literal extends LeafExpression {
+import org.apache.hudi.internal.schema.Type;
 
-  private final String value;
-  private final String type;
+public class BoundReference extends LeafExpression {
 
-  public Literal(String value, String type) {
-    this.value = value;
+  private final int ordinal;
+  private final Type type;
+
+  public BoundReference(int ordinal, Type type) {
+    this.ordinal = ordinal;
     this.type = type;
   }
 
-  public String getValue() {
-    return value;
+  @Override
+  public Type getDataType() {
+    return type;
   }
 
-  public String getType() {
-    return type;
+  @Override
+  public Object eval(StructLike data) {
+    return data.get(ordinal, this.type.typeId().getClassTag());
   }
 
   @Override
   public <T> T accept(ExpressionVisitor<T> exprVisitor) {
-    return exprVisitor.visitLiteral(this);
+    return exprVisitor.visitBoundReference(this);
+  }
+
+  @Override
+  public String toString() {
+    return "boundReference[ordinal: " + ordinal + ", type: "
+        + type.typeId().getName() + "]";
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/expression/Comparators.java 
b/hudi-common/src/main/java/org/apache/hudi/expression/Comparators.java
new file mode 100644
index 00000000000..14e208197f1
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/Comparators.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.expression;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Comparators {
+
+  private static final Map<Type, Comparator<?>> COMPARATORS = 
Collections.unmodifiableMap(
+      new HashMap<Type, Comparator<?>>() {
+        {
+          put(Types.BooleanType.get(), Comparator.naturalOrder());
+          put(Types.IntType.get(), Comparator.naturalOrder());
+          put(Types.LongType.get(), Comparator.naturalOrder());
+          put(Types.FloatType.get(), Comparator.naturalOrder());
+          put(Types.DoubleType.get(), Comparator.naturalOrder());
+          put(Types.DateType.get(), Comparator.naturalOrder());
+          put(Types.TimeType.get(), Comparator.naturalOrder());
+          put(Types.TimestampType.get(), Comparator.naturalOrder());
+          put(Types.StringType.get(), Comparator.naturalOrder());
+          put(Types.UUIDType.get(), Comparator.naturalOrder());
+        }
+      });
+
+  public static <T> Comparator<T> forType(Type.PrimitiveType type) {
+    return (Comparator<T>) Option.ofNullable(COMPARATORS.get(type))
+        .orElseThrow(() -> new UnsupportedOperationException("The desired type 
" + type + " doesn't support comparator yet"));
+  }
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Expression.java
 b/hudi-common/src/main/java/org/apache/hudi/expression/Expression.java
similarity index 64%
rename from 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Expression.java
rename to hudi-common/src/main/java/org/apache/hudi/expression/Expression.java
index 8a64237ae1b..5052a35b852 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Expression.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/Expression.java
@@ -16,20 +16,31 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
 
+import org.apache.hudi.internal.schema.Type;
+
+import java.io.Serializable;
 import java.util.List;
 
-public abstract class Expression {
+public interface Expression extends Serializable {
 
-  public enum Operator {
+  enum Operator {
+    TRUE("TRUE", "TRUE"),
+    FALSE("FALSE", "FALSE"),
     AND("AND", "&&"),
     OR("OR", "||"),
     GT(">", ">"),
     LT("<", "<"),
     EQ("=", "="),
     GT_EQ(">=", ">="),
-    LT_EQ("<=", "<=");
+    LT_EQ("<=", "<="),
+    STARTS_WITH(null, null),
+    CONTAINS(null, null),
+    IS_NULL(null, null),
+    IS_NOT_NULL(null, null),
+    IN("IN", "IN"),
+    NOT("NOT", "NOT");
 
     public final String sqlOperator;
     public final String symbol;
@@ -40,14 +51,19 @@ public abstract class Expression {
     }
   }
 
-  private final List<Expression> children;
+  List<Expression> getChildren();
+
+  Type getDataType();
 
-  public Expression(List<Expression> children) {
-    this.children = children;
+  default Object eval(StructLike data) {
+    throw new UnsupportedOperationException("Cannot evaluate expression " + 
this);
   }
 
   /**
    * Traverses the expression with the provided {@link ExpressionVisitor}
    */
-  public abstract <T> T accept(ExpressionVisitor<T> exprVisitor);
+  <T> T accept(ExpressionVisitor<T> exprVisitor);
+
+  @Override
+  String toString();
 }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/ExpressionVisitor.java
 b/hudi-common/src/main/java/org/apache/hudi/expression/ExpressionVisitor.java
similarity index 76%
rename from 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/ExpressionVisitor.java
rename to 
hudi-common/src/main/java/org/apache/hudi/expression/ExpressionVisitor.java
index 9f6ea7b1597..10a0362c786 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/ExpressionVisitor.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/expression/ExpressionVisitor.java
@@ -16,16 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
 
 /**
  * Visitor used to travers the expression.
  */
 public interface ExpressionVisitor<T> {
 
-  T visitBinaryOperator(BinaryOperator binaryOperator);
+  T alwaysTrue();
+
+  T alwaysFalse();
 
   T visitLiteral(Literal literal);
 
-  T visitAttribute(AttributeReferenceExpression attribute);
+  T visitNameReference(NameReference attribute);
+
+  T visitBoundReference(BoundReference boundReference);
+
+  T visitAnd(Predicates.And and);
+
+  T visitOr(Predicates.Or or);
+
+  T visitPredicate(Predicate predicate);
 }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java
 b/hudi-common/src/main/java/org/apache/hudi/expression/LeafExpression.java
similarity index 81%
copy from 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java
copy to hudi-common/src/main/java/org/apache/hudi/expression/LeafExpression.java
index 318a79116ca..c85f2095a4f 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/LeafExpression.java
@@ -16,14 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
+
+import java.util.List;
 
 /**
  * Expression that without any child expressions.
  */
-public abstract class LeafExpression extends Expression {
+public abstract class LeafExpression implements Expression {
 
-  public LeafExpression() {
-    super(null);
+  @Override
+  public List<Expression> getChildren() {
+    return null;
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/expression/Literal.java 
b/hudi-common/src/main/java/org/apache/hudi/expression/Literal.java
new file mode 100644
index 00000000000..01fbdb1a1c8
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/Literal.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.expression;
+
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import javax.xml.bind.DatatypeConverter;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+public class Literal<T> extends LeafExpression {
+
+  public static <V> Literal from(V value) {
+    if (value instanceof Integer || value instanceof Short) {
+      return new Literal<>(value, Types.IntType.get());
+    }
+
+    if (value instanceof Byte) {
+      return new Literal<>(((Byte)value).intValue(), Types.IntType.get());
+    }
+
+    if (value instanceof Long) {
+      return new Literal<>(value, Types.LongType.get());
+    }
+
+    if (value instanceof Boolean) {
+      return new Literal<>(value, Types.BooleanType.get());
+    }
+
+    if (value instanceof Double) {
+      return new Literal<>(value, Types.DoubleType.get());
+    }
+
+    if (value instanceof Float) {
+      return new Literal<>(value, Types.FloatType.get());
+    }
+
+    if (value instanceof BigDecimal) {
+      BigDecimal decimal = (BigDecimal) value;
+      return new Literal<>(value, Types.DecimalType.get(decimal.precision(), 
decimal.scale()));
+    }
+
+    if (value instanceof CharSequence) {
+      return new Literal<>(value, Types.StringType.get());
+    }
+
+    if (value instanceof byte[]) {
+      byte[] bytes = (byte[]) value;
+      return new Literal<>(ByteBuffer.wrap(bytes), 
Types.FixedType.getFixed(bytes.length));
+    }
+
+    if (value instanceof ByteBuffer) {
+      return new Literal<>(value, Types.BinaryType.get());
+    }
+
+    if (value instanceof UUID) {
+      return new Literal<>(value, Types.UUIDType.get());
+    }
+
+    throw new IllegalArgumentException("Cannot convert value from class "
+        + value.getClass().getName() + " to Literal");
+  }
+
+  private final T value;
+  private final Type type;
+
+  public Literal(T value, Type type) {
+    this.value = value;
+    this.type = type;
+  }
+
+  public T getValue() {
+    return value;
+  }
+
+  @Override
+  public Type getDataType() {
+    return type;
+  }
+
+  @Override
+  public Object eval(StructLike data) {
+    return value;
+  }
+
+  @Override
+  public <T> T accept(ExpressionVisitor<T> exprVisitor) {
+    return exprVisitor.visitLiteral(this);
+  }
+
+  @Override
+  public String toString() {
+    if (value == null) {
+      return "null";
+    }
+
+    if (value instanceof ByteBuffer) {
+      return DatatypeConverter.printHexBinary(((ByteBuffer)value).array());
+    }
+
+    return value.toString();
+  }
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java
 b/hudi-common/src/main/java/org/apache/hudi/expression/NameReference.java
similarity index 69%
rename from 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java
rename to 
hudi-common/src/main/java/org/apache/hudi/expression/NameReference.java
index 3be19330a19..7d41a61791f 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/NameReference.java
@@ -16,13 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
 
-public class AttributeReferenceExpression extends LeafExpression {
+import org.apache.hudi.internal.schema.Type;
+
+public class NameReference extends LeafExpression {
 
   private final String name;
 
-  public AttributeReferenceExpression(String name) {
+  public NameReference(String name) {
     this.name = name;
   }
 
@@ -30,8 +32,18 @@ public class AttributeReferenceExpression extends 
LeafExpression {
     return name;
   }
 
+  @Override
+  public Type getDataType() {
+    throw new UnsupportedOperationException("NameReference is not bound yet");
+  }
+
   @Override
   public <T> T accept(ExpressionVisitor<T> exprVisitor) {
-    return exprVisitor.visitAttribute(this);
+    return exprVisitor.visitNameReference(this);
+  }
+
+  @Override
+  public String toString() {
+    return "NameReference(name=" + name + ")";
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/expression/PartialBindVisitor.java 
b/hudi-common/src/main/java/org/apache/hudi/expression/PartialBindVisitor.java
new file mode 100644
index 00000000000..cece36291df
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/expression/PartialBindVisitor.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.expression;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Will try to bind all references, and convert unresolved references to 
AlwaysTrue.
+ *
+ * e.g. `year=2023 AND day=12`, if year and day both are provided to 
`recordType`,
+ * then the expression won't change, if day is not provided, the expression 
will be
+ * transformed to `year=2023 AND True`, which will be optimized to `year=2023`.
+ */
+public class PartialBindVisitor extends BindVisitor {
+
+  public PartialBindVisitor(Types.RecordType recordType, boolean 
caseSensitive) {
+    super(recordType, caseSensitive);
+  }
+
+  /**
+   * If the attribute cannot find from the schema, directly return null, 
visitPredicate
+   * will handle it.
+   */
+  @Override
+  public Expression visitNameReference(NameReference attribute) {
+    Types.Field field = caseSensitive
+        ? recordType.fieldByName(attribute.getName())
+        : recordType.fieldByNameCaseInsensitive(attribute.getName());
+
+    if (field == null) {
+      return null;
+    }
+
+    return new BoundReference(field.fieldId(), field.type());
+  }
+
+  /**
+   * If an expression is null after accept method, which means it cannot be 
bounded from
+   * schema, we'll directly return {@link Predicates.TrueExpression}.
+   */
+  @Override
+  public Expression visitPredicate(Predicate predicate) {
+
+    if (predicate instanceof Predicates.BinaryComparison) {
+      Predicates.BinaryComparison binaryExp = (Predicates.BinaryComparison) 
predicate;
+      Expression left = binaryExp.getLeft().accept(this);
+      if (left == null) {
+        return alwaysTrue();
+      } else {
+        Expression right = binaryExp.getRight().accept(this);
+        if (right == null) {
+          return alwaysTrue();
+        }
+
+        return new Predicates.BinaryComparison(left, binaryExp.getOperator(), 
right);
+      }
+    }
+
+    if (predicate instanceof Predicates.Not) {
+      Expression expr = ((Predicates.Not) predicate).child.accept(this);
+      if (expr instanceof Predicates.TrueExpression) {
+        return alwaysFalse();
+      }
+      if (expr instanceof Predicates.FalseExpression) {
+        return alwaysTrue();
+      }
+
+      return Predicates.not(expr);
+    }
+
+    if (predicate instanceof Predicates.In) {
+      Predicates.In in = ((Predicates.In) predicate);
+      Expression valueExpression = in.value.accept(this);
+      if (valueExpression == null) {
+        return alwaysTrue();
+      }
+      List<Expression> validValues = in.validValues.stream()
+          .map(validValue -> validValue.accept(this))
+          .collect(Collectors.toList());
+      if (validValues.stream().anyMatch(Objects::isNull)) {
+        return alwaysTrue();
+      }
+      return Predicates.in(valueExpression, validValues);
+    }
+
+    if (predicate instanceof Predicates.IsNull) {
+      Predicates.IsNull isNull = (Predicates.IsNull) predicate;
+      return Option.ofNullable(isNull.child.accept(this))
+          .map(expr -> (Expression)Predicates.isNull(expr))
+          .orElse(alwaysTrue());
+    }
+
+    if (predicate instanceof Predicates.IsNotNull) {
+      Predicates.IsNotNull isNotNull = (Predicates.IsNotNull) predicate;
+      return Option.ofNullable(isNotNull.child.accept(this))
+          .map(expr -> (Expression)Predicates.isNotNull(expr))
+          .orElse(alwaysTrue());
+    }
+
+    if (predicate instanceof Predicates.StringStartsWith) {
+      Predicates.StringStartsWith startsWith = (Predicates.StringStartsWith) 
predicate;
+      Expression left = startsWith.getLeft().accept(this);
+      if (left == null) {
+        return alwaysTrue();
+      } else {
+        Expression right = startsWith.getRight().accept(this);
+        if (right == null) {
+          return alwaysTrue();
+        }
+
+        return Predicates.startsWith(left, right);
+      }
+    }
+
+    if (predicate instanceof Predicates.StringContains) {
+      Predicates.StringContains contains = (Predicates.StringContains) 
predicate;
+      Expression left = contains.getLeft().accept(this);
+      if (left == null) {
+        return alwaysTrue();
+      } else {
+        Expression right = contains.getRight().accept(this);
+        if (right == null) {
+          return alwaysTrue();
+        }
+
+        return Predicates.contains(left, right);
+      }
+    }
+
+    throw new IllegalArgumentException("The expression " + predicate + " 
cannot be visited as predicate");
+  }
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java
 b/hudi-common/src/main/java/org/apache/hudi/expression/Predicate.java
similarity index 65%
rename from 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java
rename to hudi-common/src/main/java/org/apache/hudi/expression/Predicate.java
index f84756a6a85..220267870dc 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/Predicate.java
@@ -16,28 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
 
-public class Literal extends LeafExpression {
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
 
-  private final String value;
-  private final String type;
-
-  public Literal(String value, String type) {
-    this.value = value;
-    this.type = type;
-  }
+/**
+ * An expression that returns a Boolean value.
+ */
+public interface Predicate extends Expression {
 
-  public String getValue() {
-    return value;
+  @Override
+  default Type getDataType() {
+    return Types.BooleanType.get();
   }
 
-  public String getType() {
-    return type;
-  }
+  Operator getOperator();
 
   @Override
-  public <T> T accept(ExpressionVisitor<T> exprVisitor) {
-    return exprVisitor.visitLiteral(this);
+  default <T> T accept(ExpressionVisitor<T> exprVisitor) {
+    return exprVisitor.visitPredicate(this);
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/expression/Predicates.java 
b/hudi-common/src/main/java/org/apache/hudi/expression/Predicates.java
new file mode 100644
index 00000000000..11c4f39507f
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/Predicates.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.expression;
+
+import org.apache.hudi.internal.schema.Type;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class Predicates {
+
+  public static TrueExpression alwaysTrue() {
+    return TrueExpression.get();
+  }
+
+  public static FalseExpression alwaysFalse() {
+    return FalseExpression.get();
+  }
+
+  public static And and(Expression left, Expression right) {
+    return new And(left, right);
+  }
+
+  public static Or or(Expression left, Expression right) {
+    return new Or(left, right);
+  }
+
+  public static BinaryComparison gt(Expression left, Expression right) {
+    return new BinaryComparison(left, Expression.Operator.GT, right);
+  }
+
+  public static BinaryComparison lt(Expression left, Expression right) {
+    return new BinaryComparison(left, Expression.Operator.LT, right);
+  }
+
+  public static BinaryComparison eq(Expression left, Expression right) {
+    return new BinaryComparison(left, Expression.Operator.EQ, right);
+  }
+
+  public static BinaryComparison gteq(Expression left, Expression right) {
+    return new BinaryComparison(left, Expression.Operator.GT_EQ, right);
+  }
+
+  public static BinaryComparison lteq(Expression left, Expression right) {
+    return new BinaryComparison(left, Expression.Operator.LT_EQ, right);
+  }
+
+  public static StringStartsWith startsWith(Expression left, Expression right) 
{
+    return new StringStartsWith(left, right);
+  }
+
+  public static StringContains contains(Expression left, Expression right) {
+    return new StringContains(left, right);
+  }
+
+  public static In in(Expression left, List<Expression> validExpressions) {
+    return new In(left, validExpressions);
+  }
+
+  public static IsNull isNull(Expression child) {
+    return new IsNull(child);
+  }
+
+  public static IsNotNull isNotNull(Expression child) {
+    return new IsNotNull(child);
+  }
+
+  public static Not not(Expression expr) {
+    return new Not(expr);
+  }
+
+  public static class TrueExpression extends LeafExpression implements 
Predicate {
+
+    private static final TrueExpression INSTANCE = new TrueExpression();
+
+    public static TrueExpression get() {
+      return INSTANCE;
+    }
+
+    @Override
+    public Boolean eval(StructLike data) {
+      return true;
+    }
+
+    @Override
+    public Operator getOperator() {
+      return Operator.TRUE;
+    }
+
+    @Override
+    public <T> T accept(ExpressionVisitor<T> exprVisitor) {
+      return exprVisitor.alwaysTrue();
+    }
+
+    @Override
+    public String toString() {
+      return "TRUE";
+    }
+  }
+
+  public static class FalseExpression extends LeafExpression implements 
Predicate {
+
+    private static final FalseExpression INSTANCE = new FalseExpression();
+
+    public static FalseExpression get() {
+      return INSTANCE;
+    }
+
+    @Override
+    public Boolean eval(StructLike data) {
+      return false;
+    }
+
+    @Override
+    public Operator getOperator() {
+      return Operator.FALSE;
+    }
+
+    @Override
+    public <T> T accept(ExpressionVisitor<T> exprVisitor) {
+      return exprVisitor.alwaysFalse();
+    }
+
+    @Override
+    public String toString() {
+      return "FALSE";
+    }
+  }
+
+  public static class And extends BinaryExpression implements Predicate {
+
+    public And(Expression left, Expression right) {
+      super(left, Operator.AND, right);
+    }
+
+    @Override
+    public Boolean eval(StructLike data) {
+      if (getLeft() instanceof FalseExpression || getRight() instanceof 
FalseExpression) {
+        return false;
+      }
+      Object left = getLeft().eval(data);
+      if (left != null && !(Boolean) left) {
+        return false;
+      } else {
+        Object right = getRight().eval(data);
+        if (right != null && !(Boolean) right) {
+          return false;
+        } else {
+          if (left != null && right != null) {
+            return true;
+          } else {
+            return false;
+          }
+        }
+      }
+    }
+
+    @Override
+    public <T> T accept(ExpressionVisitor<T> exprVisitor) {
+      return exprVisitor.visitAnd(this);
+    }
+
+    @Override
+    public String toString() {
+      return "(" + getLeft() + " " + getOperator().symbol + " " + getRight() + 
")";
+    }
+  }
+
+  public static class Or extends BinaryExpression implements Predicate {
+
+    public Or(Expression left, Expression right) {
+      super(left, Operator.OR, right);
+    }
+
+    @Override
+    public Boolean eval(StructLike data) {
+      if (getLeft() instanceof TrueExpression || getRight() instanceof 
TrueExpression) {
+        return true;
+      }
+
+      Object left = getLeft().eval(data);
+
+      if (left == null) {
+        return false;
+      }
+
+      if ((Boolean) left) {
+        return true;
+      } else {
+        Object right = getRight().eval(data);
+        return right != null && (Boolean) right;
+      }
+    }
+
+    @Override
+    public <T> T accept(ExpressionVisitor<T> exprVisitor) {
+      return exprVisitor.visitOr(this);
+    }
+
+    @Override
+    public String toString() {
+      return "(" + getLeft() + " " + getOperator().symbol + " " + getRight() + 
")";
+    }
+  }
+
+  public static class StringStartsWith extends BinaryExpression implements 
Predicate {
+
+    StringStartsWith(Expression left, Expression right) {
+      super(left, Operator.STARTS_WITH, right);
+    }
+
+    @Override
+    public String toString() {
+      return getLeft().toString() + ".startWith(" + getRight().toString() + 
")";
+    }
+
+    @Override
+    public Object eval(StructLike data) {
+      return 
getLeft().eval(data).toString().startsWith(getRight().eval(data).toString());
+    }
+  }
+
+  public static class StringContains extends BinaryExpression implements 
Predicate {
+
+    StringContains(Expression left, Expression right) {
+      super(left, Operator.CONTAINS, right);
+    }
+
+    @Override
+    public String toString() {
+      return getLeft().toString() + ".contains(" + getRight().toString() + ")";
+    }
+
+    @Override
+    public Object eval(StructLike data) {
+      return 
getLeft().eval(data).toString().contains(getRight().eval(data).toString());
+    }
+  }
+
+  public static class In implements Predicate {
+
+    protected final Expression value;
+    protected final List<Expression> validValues;
+
+    public In(Expression value, List<Expression> validValues) {
+      this.value = value;
+      this.validValues = validValues;
+    }
+
+    @Override
+    public List<Expression> getChildren() {
+      ArrayList<Expression> children = new ArrayList<>(validValues.size() + 1);
+      children.add(value);
+      children.addAll(validValues);
+      return children;
+    }
+
+    @Override
+    public Boolean eval(StructLike data) {
+      Set<Object> values = validValues.stream()
+          .map(validValue -> validValue.eval(data))
+          .collect(Collectors.toSet());
+      return values.contains(value.eval(data));
+    }
+
+    @Override
+    public Operator getOperator() {
+      return Operator.IN;
+    }
+
+    @Override
+    public String toString() {
+      return value.toString() + " " + getOperator().symbol + " "
+          + 
validValues.stream().map(Expression::toString).collect(Collectors.joining(",", 
"(", ")"));
+    }
+  }
+
+  public static class IsNull implements Predicate {
+
+    protected final Expression child;
+
+    public IsNull(Expression child) {
+      this.child = child;
+    }
+
+    @Override
+    public List<Expression> getChildren() {
+      return Collections.singletonList(child);
+    }
+
+    @Override
+    public Boolean eval(StructLike data) {
+      return child.eval(data) == null;
+    }
+
+    @Override
+    public Operator getOperator() {
+      return Operator.IS_NULL;
+    }
+
+    @Override
+    public String toString() {
+      return child.toString() + " IS NULL";
+    }
+  }
+
+  public static class IsNotNull implements Predicate {
+
+    protected final Expression child;
+
+    public IsNotNull(Expression child) {
+      this.child = child;
+    }
+
+    @Override
+    public List<Expression> getChildren() {
+      return Collections.singletonList(child);
+    }
+
+    @Override
+    public Boolean eval(StructLike data) {
+      return child.eval(data) != null;
+    }
+
+    @Override
+    public Operator getOperator() {
+      return Operator.IS_NOT_NULL;
+    }
+
+    @Override
+    public String toString() {
+      return child.toString() + " IS NOT NULL";
+    }
+  }
+
+  public static class Not implements Predicate {
+
+    Expression child;
+
+    public Not(Expression child) {
+      this.child = child;
+    }
+
+    @Override
+    public List<Expression> getChildren() {
+      return Collections.singletonList(child);
+    }
+
+    @Override
+    public Boolean eval(StructLike data) {
+      return ! (Boolean) child.eval(data);
+    }
+
+    @Override
+    public Operator getOperator() {
+      return Operator.NOT;
+    }
+
+    @Override
+    public String toString() {
+      return "NOT " + child;
+    }
+  }
+
+  public static class BinaryComparison extends BinaryExpression implements 
Predicate {
+
+    public BinaryComparison(Expression left, Operator operator, Expression 
right) {
+      super(left, operator, right);
+    }
+
+    @Override
+    public Boolean eval(StructLike data) {
+      if (getLeft().getDataType().isNestedType()) {
+        throw new IllegalArgumentException("The nested type doesn't support 
binary comparison");
+      }
+      Comparator<Object> comparator = Comparators.forType((Type.PrimitiveType) 
getLeft().getDataType());
+      switch (getOperator()) {
+        case EQ:
+          return comparator.compare(getLeft().eval(data), 
getRight().eval(data)) == 0;
+        case GT:
+          return comparator.compare(getLeft().eval(data), 
getRight().eval(data)) > 0;
+        case GT_EQ:
+          return comparator.compare(getLeft().eval(data), 
getRight().eval(data)) >= 0;
+        case LT:
+          return comparator.compare(getLeft().eval(data), 
getRight().eval(data)) < 0;
+        case LT_EQ:
+          return comparator.compare(getLeft().eval(data), 
getRight().eval(data)) <= 0;
+        default:
+          throw new IllegalArgumentException("The operation " + getOperator() 
+ " doesn't support binary comparison");
+      }
+    }
+  }
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java
 b/hudi-common/src/main/java/org/apache/hudi/expression/StructLike.java
similarity index 79%
rename from 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java
rename to hudi-common/src/main/java/org/apache/hudi/expression/StructLike.java
index 318a79116ca..d6346b8c657 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java
+++ b/hudi-common/src/main/java/org/apache/hudi/expression/StructLike.java
@@ -16,14 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.hive.expression;
+package org.apache.hudi.expression;
 
-/**
- * Expression that without any child expressions.
- */
-public abstract class LeafExpression extends Expression {
+public interface StructLike {
+
+  int numFields();
 
-  public LeafExpression() {
-    super(null);
-  }
+  <T> T get(int pos, Class<T> classTag);
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java
index d51ab37719a..bc8b89004d6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java
@@ -18,10 +18,22 @@
 
 package org.apache.hudi.internal.schema;
 
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+
 import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Objects;
+import java.util.UUID;
 
 /**
  * The type of a schema, reference avro schema.
@@ -29,21 +41,45 @@ import java.util.Objects;
  * to do add support for localTime if avro version is updated
  */
 public interface Type extends Serializable {
+
+  OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
   /**
    * Enums for type names.
    */
   enum TypeID {
-    RECORD, ARRAY, MAP, FIXED, STRING, BINARY,
-    INT, LONG, FLOAT, DOUBLE, DATE, BOOLEAN, TIME, TIMESTAMP, DECIMAL, UUID;
-    private String name;
-
-    TypeID() {
+    RECORD(Types.RecordType.class),
+    ARRAY(List.class),
+    MAP(Map.class),
+    FIXED(ByteBuffer.class),
+    STRING(String.class),
+    BINARY(ByteBuffer.class),
+    INT(Integer.class),
+    LONG(Long.class),
+    FLOAT(Float.class),
+    DOUBLE(Double.class),
+    DATE(Integer.class),
+    BOOLEAN(Boolean.class),
+    TIME(Long.class),
+    TIMESTAMP(Long.class),
+    DECIMAL(BigDecimal.class),
+    UUID(UUID.class);
+    private final String name;
+    private final Class<?> classTag;
+
+    TypeID(Class<?> classTag) {
       this.name = this.name().toLowerCase(Locale.ROOT);
+      this.classTag = classTag;
     }
 
     public String getName() {
       return name;
     }
+
+    public Class<?> getClassTag() {
+      return classTag;
+    }
   }
 
   static TypeID fromValue(String value) {
@@ -54,6 +90,40 @@ public interface Type extends Serializable {
     }
   }
 
+  static Object fromPartitionString(String partitionValue, Type type) {
+    if (partitionValue == null
+        || 
PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH.equals(partitionValue)
+        || 
PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH.equals(partitionValue))
 {
+      return null;
+    }
+
+    switch (type.typeId()) {
+      case INT:
+        return Integer.parseInt(partitionValue);
+      case LONG:
+        return Long.parseLong(partitionValue);
+      case BOOLEAN:
+        return Boolean.parseBoolean(partitionValue);
+      case FLOAT:
+        return Float.parseFloat(partitionValue);
+      case DECIMAL:
+        return new BigDecimal(partitionValue);
+      case DOUBLE:
+        return Double.parseDouble(partitionValue);
+      case UUID:
+        return UUID.fromString(partitionValue);
+      case DATE:
+        // TODO Support different date format
+        return Math.toIntExact(ChronoUnit.DAYS.between(
+            EPOCH_DAY, LocalDate.parse(partitionValue, 
DateTimeFormatter.ISO_LOCAL_DATE)));
+      case STRING:
+        return partitionValue;
+      default:
+        throw new UnsupportedOperationException("Cast value " + partitionValue
+            + " to type " + type + " is not supported yet");
+    }
+  }
+
   TypeID typeId();
 
   default boolean isNestedType() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java
index 3d2774bc19b..ed03a7349cb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java
@@ -23,7 +23,6 @@ import org.apache.hudi.internal.schema.Type.PrimitiveType;
 
 import java.io.Serializable;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -511,6 +510,7 @@ public class Types {
     private final Field[] fields;
 
     private transient Map<String, Field> nameToFields = null;
+    private transient Map<String, Field> lowercaseNameToFields = null;
     private transient Map<Integer, Field> idToFields = null;
 
     private RecordType(List<Field> fields, String name) {
@@ -523,30 +523,43 @@ public class Types {
       return Arrays.asList(fields);
     }
 
-    public Field field(String name) {
+    /**
+     * Case-sensitive get field by name
+     */
+    public Field fieldByName(String name) {
       if (nameToFields == null) {
-        nameToFields = new HashMap<>();
-        for (Field field : fields) {
-          nameToFields.put(field.name().toLowerCase(Locale.ROOT), field);
-        }
+        nameToFields = Arrays.stream(fields)
+            .collect(Collectors.toMap(
+                Field::name,
+                field -> field));
+      }
+      return nameToFields.get(name);
+    }
+
+    public Field fieldByNameCaseInsensitive(String name) {
+      if (lowercaseNameToFields == null) {
+        lowercaseNameToFields = Arrays.stream(fields)
+            .collect(Collectors.toMap(
+                field -> field.name.toLowerCase(Locale.ROOT),
+                field -> field));
       }
-      return nameToFields.get(name.toLowerCase(Locale.ROOT));
+      return lowercaseNameToFields.get(name.toLowerCase(Locale.ROOT));
     }
 
     @Override
     public Field field(int id) {
       if (idToFields == null) {
-        idToFields = new HashMap<>();
-        for (Field field : fields) {
-          idToFields.put(field.fieldId(), field);
-        }
+        idToFields = Arrays.stream(fields)
+            .collect(Collectors.toMap(
+                Field::fieldId,
+                field -> field));
       }
       return idToFields.get(id);
     }
 
     @Override
     public Type fieldType(String name) {
-      Field field = field(name);
+      Field field = fieldByNameCaseInsensitive(name);
       if (field != null) {
         return field.type();
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/Conversions.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/Conversions.java
new file mode 100644
index 00000000000..67adff88ac7
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/Conversions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.Arrays;
+import java.util.HashSet;
+
+public class Conversions {
+
+  private static final HashSet<Type.TypeID> SUPPORTED_PARTITION_TYPES = new 
HashSet<>(
+      Arrays.asList(Type.TypeID.INT,
+          Type.TypeID.LONG,
+          Type.TypeID.BOOLEAN,
+          Type.TypeID.FLOAT,
+          Type.TypeID.DECIMAL,
+          Type.TypeID.DOUBLE,
+          Type.TypeID.UUID,
+          Type.TypeID.DATE,
+          Type.TypeID.STRING));
+
+  public static boolean isPartitionSchemaSupportedConversion(Types.RecordType 
schema) {
+    for (Types.Field field: schema.fields()) {
+      if (!SUPPORTED_PARTITION_TYPES.contains(field.type().typeId())) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java
new file mode 100644
index 00000000000..f62786e9517
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.expression.ArrayData;
+import org.apache.hudi.hadoop.CachingPath;
+import org.apache.hudi.hadoop.SerializablePath;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public abstract class AbstractHoodieTableMetadata implements 
HoodieTableMetadata {
+
+  protected transient HoodieEngineContext engineContext;
+
+  protected final SerializableConfiguration hadoopConf;
+  protected final SerializablePath dataBasePath;
+
+  // TODO get this from HoodieConfig
+  protected final boolean caseSensitive = false;
+
+  public AbstractHoodieTableMetadata(HoodieEngineContext engineContext, 
SerializableConfiguration conf, String dataBasePath) {
+    this.engineContext = engineContext;
+    this.hadoopConf = conf;
+    this.dataBasePath = new SerializablePath(new CachingPath(dataBasePath));
+  }
+
+  protected static int getPathPartitionLevel(Types.RecordType partitionFields, 
String path) {
+    if (StringUtils.isNullOrEmpty(path) || partitionFields == null) {
+      return 0;
+    }
+
+    int level = 1;
+    for (int i = 1; i < path.length() - 1; i++) {
+      if (path.charAt(i) == Path.SEPARATOR_CHAR) {
+        level++;
+      }
+    }
+    if (path.startsWith(Path.SEPARATOR)) {
+      level--;
+    }
+    if (path.endsWith(Path.SEPARATOR)) {
+      level--;
+    }
+    return level;
+  }
+
+  protected static ArrayData extractPartitionValues(Types.RecordType 
partitionFields,
+                                                    String 
relativePartitionPath,
+                                                    boolean 
urlEncodePartitioningEnabled) {
+    if (partitionFields.fields().size() == 1) {
+      // SinglePartPartitionValue, which might contain slashes.
+      String partitionValue = relativePartitionPath.split("=")[1];
+      return new ArrayData(Collections.singletonList(Type.fromPartitionString(
+          urlEncodePartitioningEnabled ? 
PartitionPathEncodeUtils.unescapePathName(partitionValue) : partitionValue,
+          partitionFields.field(0).type())));
+    }
+
+    List<Object> partitionValues;
+    String[] partitionFragments = relativePartitionPath.split("/");
+    partitionValues = IntStream.range(0, partitionFragments.length)
+        .mapToObj(idx -> {
+          String partitionValue = partitionFragments[idx].split("=")[1];
+          return Type.fromPartitionString(
+              urlEncodePartitioningEnabled ? 
PartitionPathEncodeUtils.unescapePathName(partitionValue) : partitionValue,
+              partitionFields.field(idx).type());
+        }).collect(Collectors.toList());
+
+    return new ArrayData(partitionValues);
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 076eb4bf1dc..7e1acf3a87c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -29,8 +29,8 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
@@ -41,8 +41,6 @@ import org.apache.hudi.common.util.hash.FileIndexID;
 import org.apache.hudi.common.util.hash.PartitionIndexID;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
-import org.apache.hudi.hadoop.CachingPath;
-import org.apache.hudi.hadoop.SerializablePath;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -68,25 +66,33 @@ import java.util.stream.Collectors;
 /**
  * Abstract class for implementing common table metadata operations.
  */
-public abstract class BaseTableMetadata implements HoodieTableMetadata {
+public abstract class BaseTableMetadata extends AbstractHoodieTableMetadata {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseTableMetadata.class);
 
-  protected transient HoodieEngineContext engineContext;
-  protected final SerializablePath dataBasePath;
+  protected static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
+  // NOTE: Buffer-size is deliberately set pretty low, since MT internally is 
relying
+  //       on HFile (serving as persisted binary key-value mapping) to do 
caching
+  protected static final int BUFFER_SIZE = 10 * 1024; // 10Kb
+
   protected final HoodieTableMetaClient dataMetaClient;
   protected final Option<HoodieMetadataMetrics> metrics;
   protected final HoodieMetadataConfig metadataConfig;
 
   protected boolean isMetadataTableInitialized;
+  protected final boolean hiveStylePartitioningEnabled;
+  protected final boolean urlEncodePartitioningEnabled;
 
   protected BaseTableMetadata(HoodieEngineContext engineContext, 
HoodieMetadataConfig metadataConfig, String dataBasePath) {
-    this.engineContext = engineContext;
-    this.dataBasePath = new SerializablePath(new CachingPath(dataBasePath));
+    super(engineContext, engineContext.getHadoopConf(), dataBasePath);
+
     this.dataMetaClient = HoodieTableMetaClient.builder()
-        .setConf(engineContext.getHadoopConf().get())
+        .setConf(hadoopConf.get())
         .setBasePath(dataBasePath)
         .build();
+
+    this.hiveStylePartitioningEnabled = 
Boolean.parseBoolean(dataMetaClient.getTableConfig().getHiveStylePartitioningEnable());
+    this.urlEncodePartitioningEnabled = 
Boolean.parseBoolean(dataMetaClient.getTableConfig().getUrlEncodePartitioning());
     this.metadataConfig = metadataConfig;
     this.isMetadataTableInitialized = 
dataMetaClient.getTableConfig().isMetadataTableAvailable();
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 9ce83a7d953..10ed196714d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
@@ -33,6 +34,12 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.expression.BindVisitor;
+import org.apache.hudi.expression.Expression;
+import org.apache.hudi.expression.PartialBindVisitor;
+import org.apache.hudi.expression.Predicates;
+import org.apache.hudi.internal.schema.Types;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -52,20 +59,36 @@ import java.util.stream.Stream;
 /**
  * Implementation of {@link HoodieTableMetadata} based file-system-backed 
table metadata.
  */
-public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
+public class FileSystemBackedTableMetadata extends AbstractHoodieTableMetadata 
{
 
   private static final int DEFAULT_LISTING_PARALLELISM = 1500;
 
-  private final transient HoodieEngineContext engineContext;
-  private final SerializableConfiguration hadoopConf;
-  private final String datasetBasePath;
   private final boolean assumeDatePartitioning;
 
-  public FileSystemBackedTableMetadata(HoodieEngineContext engineContext, 
SerializableConfiguration conf, String datasetBasePath,
+  private final boolean hiveStylePartitioningEnabled;
+  private final boolean urlEncodePartitioningEnabled;
+
+  public FileSystemBackedTableMetadata(HoodieEngineContext engineContext, 
HoodieTableConfig tableConfig,
+                                       SerializableConfiguration conf, String 
datasetBasePath,
                                        boolean assumeDatePartitioning) {
-    this.engineContext = engineContext;
-    this.hadoopConf = conf;
-    this.datasetBasePath = datasetBasePath;
+    super(engineContext, conf, datasetBasePath);
+
+    this.hiveStylePartitioningEnabled = 
Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable());
+    this.urlEncodePartitioningEnabled = 
Boolean.parseBoolean(tableConfig.getUrlEncodePartitioning());
+    this.assumeDatePartitioning = assumeDatePartitioning;
+  }
+
+  public FileSystemBackedTableMetadata(HoodieEngineContext engineContext,
+                                       SerializableConfiguration conf, String 
datasetBasePath,
+                                       boolean assumeDatePartitioning) {
+    super(engineContext, conf, datasetBasePath);
+
+    FileSystem fs = FSUtils.getFs(dataBasePath.get(), conf.get());
+    Path metaPath = new Path(dataBasePath.get(), 
HoodieTableMetaClient.METAFOLDER_NAME);
+    TableNotFoundException.checkTableValidity(fs, this.dataBasePath.get(), 
metaPath);
+    HoodieTableConfig tableConfig = new HoodieTableConfig(fs, 
metaPath.toString(), null, null);
+    this.hiveStylePartitioningEnabled = 
Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable());
+    this.urlEncodePartitioningEnabled = 
Boolean.parseBoolean(tableConfig.getUrlEncodePartitioning());
     this.assumeDatePartitioning = assumeDatePartitioning;
   }
 
@@ -77,15 +100,29 @@ public class FileSystemBackedTableMetadata implements 
HoodieTableMetadata {
 
   @Override
   public List<String> getAllPartitionPaths() throws IOException {
-    Path basePath = new Path(datasetBasePath);
+    Path basePath = dataBasePath.get();
     if (assumeDatePartitioning) {
       FileSystem fs = basePath.getFileSystem(hadoopConf.get());
-      return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, 
datasetBasePath);
+      return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, 
dataBasePath.toString());
     }
 
     return getPartitionPathWithPathPrefixes(Collections.singletonList(""));
   }
 
+  @Override
+  public List<String> 
getPartitionPathWithPathPrefixUsingFilterExpression(List<String> 
relativePathPrefixes,
+                                                                          
Types.RecordType partitionFields,
+                                                                          
Expression expression) throws IOException {
+    return relativePathPrefixes.stream().flatMap(relativePathPrefix -> {
+      try {
+        return 
getPartitionPathWithPathPrefixUsingFilterExpression(relativePathPrefix,
+            partitionFields, expression).stream();
+      } catch (IOException e) {
+        throw new HoodieIOException("Error fetching partition paths with 
relative path: " + relativePathPrefix, e);
+      }
+    }).collect(Collectors.toList());
+  }
+
   @Override
   public List<String> getPartitionPathWithPathPrefixes(List<String> 
relativePathPrefixes) {
     return relativePathPrefixes.stream().flatMap(relativePathPrefix -> {
@@ -98,11 +135,35 @@ public class FileSystemBackedTableMetadata implements 
HoodieTableMetadata {
   }
 
   private List<String> getPartitionPathWithPathPrefix(String 
relativePathPrefix) throws IOException {
+    return 
getPartitionPathWithPathPrefixUsingFilterExpression(relativePathPrefix, null, 
null);
+  }
+
+  private List<String> 
getPartitionPathWithPathPrefixUsingFilterExpression(String relativePathPrefix,
+                                                                           
Types.RecordType partitionFields,
+                                                                           
Expression pushedExpr) throws IOException {
     List<Path> pathsToList = new CopyOnWriteArrayList<>();
     pathsToList.add(StringUtils.isNullOrEmpty(relativePathPrefix)
-        ? new Path(datasetBasePath) : new Path(datasetBasePath, 
relativePathPrefix));
+        ? dataBasePath.get() : new Path(dataBasePath.get(), 
relativePathPrefix));
     List<String> partitionPaths = new CopyOnWriteArrayList<>();
 
+    int currentPartitionLevel = -1;
+    boolean needPushDownExpressions;
+    Expression fullBoundExpr;
+    // Not like `HoodieBackedTableMetadata`, since we don't know the exact 
partition levels here,
+    // given it's possible that partition values contains `/`, which could 
affect
+    // the result to get right `partitionValue` when listing paths, here we 
have
+    // to make it more strict that `urlEncodePartitioningEnabled` must be 
enabled.
+    // TODO better enable urlEncodePartitioningEnabled if 
hiveStylePartitioningEnabled is enabled?
+    if (hiveStylePartitioningEnabled && urlEncodePartitioningEnabled
+        && pushedExpr != null && partitionFields != null) {
+      currentPartitionLevel = getPathPartitionLevel(partitionFields, 
relativePathPrefix);
+      needPushDownExpressions = true;
+      fullBoundExpr = pushedExpr.accept(new BindVisitor(partitionFields, 
caseSensitive));
+    } else {
+      fullBoundExpr = Predicates.alwaysTrue();
+      needPushDownExpressions = false;
+    }
+
     while (!pathsToList.isEmpty()) {
       // TODO: Get the parallelism from HoodieWriteConfig
       int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, 
pathsToList.size());
@@ -116,7 +177,7 @@ public class FileSystemBackedTableMetadata implements 
HoodieTableMetadata {
       List<Pair<Option<String>, Option<Path>>> result = 
engineContext.flatMap(pathsToList, path -> {
         FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
         if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, path)) {
-          return 
Stream.of(Pair.of(Option.of(FSUtils.getRelativePartitionPath(new 
Path(datasetBasePath), path)), Option.empty()));
+          return 
Stream.of(Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(),
 path)), Option.empty()));
         }
         return Arrays.stream(fileSystem.listStatus(path, p -> {
           try {
@@ -129,10 +190,34 @@ public class FileSystemBackedTableMetadata implements 
HoodieTableMetadata {
       }, listingParallelism);
       pathsToList.clear();
 
-      partitionPaths.addAll(result.stream().filter(entry -> 
entry.getKey().isPresent()).map(entry -> entry.getKey().get())
+      partitionPaths.addAll(result.stream().filter(entry -> 
entry.getKey().isPresent())
+          .map(entry -> entry.getKey().get())
+          .filter(relativePartitionPath -> fullBoundExpr instanceof 
Predicates.TrueExpression
+              || (Boolean) fullBoundExpr.eval(
+                      extractPartitionValues(partitionFields, 
relativePartitionPath, urlEncodePartitioningEnabled)))
           .collect(Collectors.toList()));
 
+      Expression partialBoundExpr;
+      // If partitionPaths is nonEmpty, we're already at the last path level, 
and all paths
+      // are filtered already.
+      if (needPushDownExpressions && partitionPaths.isEmpty()) {
+        // Here we assume the path level matches the number of partition 
columns, so we'll rebuild
+        // new schema based on current path level.
+        // e.g. partition columns are <region, date, hh>, if we're listing the 
second level, then
+        // currentSchema would be <region, date>
+        // `PartialBindVisitor` will bind reference if it can be found from 
`currentSchema`, otherwise
+        // will change the expression to `alwaysTrue`. Can see 
`PartialBindVisitor` for details.
+        Types.RecordType currentSchema = 
Types.RecordType.get(partitionFields.fields().subList(0, 
++currentPartitionLevel));
+        PartialBindVisitor partialBindVisitor = new 
PartialBindVisitor(currentSchema, caseSensitive);
+        partialBoundExpr = pushedExpr.accept(partialBindVisitor);
+      } else {
+        partialBoundExpr = Predicates.alwaysTrue();
+      }
+
       pathsToList.addAll(result.stream().filter(entry -> 
entry.getValue().isPresent()).map(entry -> entry.getValue().get())
+              .filter(path -> partialBoundExpr instanceof 
Predicates.TrueExpression
+                  || (Boolean) partialBoundExpr.eval(
+                      extractPartitionValues(partitionFields, 
FSUtils.getRelativePartitionPath(dataBasePath.get(), path), 
urlEncodePartitioningEnabled)))
           .collect(Collectors.toList()));
     }
     return partitionPaths;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index b0e9bcaaab2..9a27d2cfbca 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -46,6 +46,9 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.expression.BindVisitor;
+import org.apache.hudi.expression.Expression;
+import org.apache.hudi.internal.schema.Types;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieSeekingFileReader;
 import org.apache.hudi.util.Transient;
@@ -143,6 +146,26 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     return Option.ofNullable(recordsByKeys.get(key));
   }
 
+  @Override
+  public List<String> 
getPartitionPathWithPathPrefixUsingFilterExpression(List<String> 
relativePathPrefixes,
+                                                                          
Types.RecordType partitionFields,
+                                                                          
Expression expression) throws IOException {
+    Expression boundedExpr = expression.accept(new 
BindVisitor(partitionFields, caseSensitive));
+    List<String> selectedPartitionPaths = 
getPartitionPathWithPathPrefixes(relativePathPrefixes);
+
+    // Can only prune partitions if the number of partition levels matches 
partition fields
+    // Here we'll check the first selected partition to see whether the 
numbers match.
+    if (hiveStylePartitioningEnabled
+        && getPathPartitionLevel(partitionFields, 
selectedPartitionPaths.get(0)) == partitionFields.fields().size()) {
+      return selectedPartitionPaths.stream()
+          .filter(p ->
+              (boolean) 
boundedExpr.eval(extractPartitionValues(partitionFields, p, 
urlEncodePartitioningEnabled)))
+          .collect(Collectors.toList());
+    }
+
+    return selectedPartitionPaths;
+  }
+
   @Override
   public List<String> getPartitionPathWithPathPrefixes(List<String> 
relativePathPrefixes) throws IOException {
     // TODO: consider skipping this method for non-partitioned table and 
simplify the checks
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index ac17eed001b..0ba197a5c68 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -33,6 +33,8 @@ import org.apache.hudi.exception.HoodieMetadataException;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.expression.Expression;
+import org.apache.hudi.internal.schema.Types;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -146,6 +148,14 @@ public interface HoodieTableMetadata extends Serializable, 
AutoCloseable {
    */
   FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOException;
 
+  /**
+   * Retrieve the paths of partitions under the provided sub-directories,
+   * and try to filter these partitions using the provided {@link Expression}.
+   */
+  List<String> 
getPartitionPathWithPathPrefixUsingFilterExpression(List<String> 
relativePathPrefixes,
+                                                                   
Types.RecordType partitionFields,
+                                                                   Expression 
expression) throws IOException;
+
   /**
    * Fetches all partition paths that are the sub-directories of the list of 
provided (relative) paths.
    * <p>
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/expression/TestPartialBindVisitor.java
 
b/hudi-common/src/test/java/org/apache/hudi/expression/TestPartialBindVisitor.java
new file mode 100644
index 00000000000..c7e75711823
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/expression/TestPartialBindVisitor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.expression;
+
+import org.apache.hudi.internal.schema.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class TestPartialBindVisitor {
+
+  private static Types.RecordType schema;
+  @BeforeAll
+  public static void init() {
+    ArrayList<Types.Field> fields = new ArrayList<>(5);
+    fields.add(Types.Field.get(0, true, "a", Types.StringType.get()));
+    fields.add(Types.Field.get(1, true, "b", Types.DateType.get()));
+    fields.add(Types.Field.get(2, true, "c", Types.IntType.get()));
+    fields.add(Types.Field.get(3, true, "d", Types.LongType.get()));
+    fields.add(Types.Field.get(4, false, "f", Types.BooleanType.get()));
+    schema = Types.RecordType.get(fields, "schema");
+  }
+
+  @Test
+  public void testPartialBindIfAllExisting() {
+    PartialBindVisitor partialBindVisitor = new PartialBindVisitor(schema, 
false);
+
+    Predicates.BinaryComparison eq = Predicates.eq(new NameReference("a"),
+        Literal.from("Jane"));
+    Predicates.BinaryComparison gt = Predicates.gt(new NameReference("c"),
+        Literal.from(10));
+    Predicates.In in = Predicates.in(new NameReference("d"),
+        Arrays.asList(Literal.from(10L), Literal.from(13L)));
+
+    Predicates.And expr = Predicates.and(eq, Predicates.or(gt, in));
+    Expression binded = expr.accept(partialBindVisitor);
+
+    Assertions.assertTrue((Boolean) binded.eval(new 
ArrayData(Arrays.asList("Jane", "2023-04-02", 15, 5L, false))));
+    Assertions.assertTrue((Boolean) binded.eval(new 
ArrayData(Arrays.asList("Jane", "2023-04-02", 5, 10L, false))));
+    Assertions.assertFalse((Boolean) binded.eval(new 
ArrayData(Arrays.asList("Lone", "2023-04-02", 15, 5L, false))));
+    Assertions.assertFalse((Boolean) binded.eval(new 
ArrayData(Arrays.asList("Lone", "2023-04-02", 10, 5L, false))));
+  }
+
+  @Test
+  public void testPartialBindIfFieldMissing() {
+    PartialBindVisitor partialBindVisitor = new PartialBindVisitor(schema, 
false);
+
+    Predicates.BinaryComparison eq = Predicates.eq(new NameReference("a"),
+        Literal.from("Jane"));
+    Predicates.BinaryComparison lt = Predicates.lt(new NameReference("m"),
+        Literal.from(10));
+    Predicates.BinaryComparison gteq = Predicates.gteq(new NameReference("d"),
+        Literal.from(10L));
+
+    Predicates.And expr = Predicates.and(eq, Predicates.or(lt, gteq));
+    // Since Attribute m does not exist in the schema, so the OR expression is 
always true,
+    // the expression is optimized to only consider the EQ expression
+    Expression binded = expr.accept(partialBindVisitor);
+
+    Assertions.assertTrue((Boolean) binded.eval(new 
ArrayData(Arrays.asList("Jane", "2023-04-02", 15, 5L, false))));
+    Assertions.assertFalse((Boolean) binded.eval(new 
ArrayData(Arrays.asList("Lone", "2023-04-02", 15, 5L, false))));
+    Assertions.assertFalse((Boolean) binded.eval(new 
ArrayData(Arrays.asList("Lone", "2023-04-02", 10, 5L, false))));
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
index 338e892f48c..6c5fcb7049c 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
@@ -331,7 +331,7 @@ public class TestAvroSchemaEvolutionUtils {
     avroRecord.put("preferences", preferencesRecord);
     // fill mapType
     Map<String, GenericData.Record> locations = new HashMap<>();
-    Schema mapSchema = 
AvroInternalSchemaConverter.convert(((Types.MapType)record.field("locations").type()).valueType(),
 "test1.locations");
+    Schema mapSchema = 
AvroInternalSchemaConverter.convert(((Types.MapType)record.fieldByNameCaseInsensitive("locations").type()).valueType(),
 "test1.locations");
     GenericData.Record locationsValue = new GenericData.Record(mapSchema);
     locationsValue.put("lat", 1.2f);
     locationsValue.put("long", 1.4f);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java
 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java
index db7608cea96..799ff7e7d23 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java
@@ -71,7 +71,7 @@ public class TestFileSystemBackedTableMetadata extends 
HoodieCommonTestHarness {
     
hoodieTestTable.addCommit("100").withBaseFilesInPartition(DEFAULT_PARTITION, 
IntStream.range(0, 10).toArray());
     HoodieLocalEngineContext localEngineContext = new 
HoodieLocalEngineContext(metaClient.getHadoopConf());
     FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
-        new FileSystemBackedTableMetadata(localEngineContext, new 
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
+        new FileSystemBackedTableMetadata(localEngineContext, 
metaClient.getTableConfig(), new 
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
     Assertions.assertEquals(0, 
fileSystemBackedTableMetadata.getAllPartitionPaths().size());
     Assertions.assertEquals(10, 
fileSystemBackedTableMetadata.getAllFilesInPartition(new 
Path(basePath)).length);
     Assertions.assertEquals(10, 
fileSystemBackedTableMetadata.getAllFilesInPartitions(
@@ -96,7 +96,7 @@ public class TestFileSystemBackedTableMetadata extends 
HoodieCommonTestHarness {
     });
     HoodieLocalEngineContext localEngineContext = new 
HoodieLocalEngineContext(metaClient.getHadoopConf());
     FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
-        new FileSystemBackedTableMetadata(localEngineContext, new 
SerializableConfiguration(metaClient.getHadoopConf()), basePath, true);
+        new FileSystemBackedTableMetadata(localEngineContext, 
metaClient.getTableConfig(), new 
SerializableConfiguration(metaClient.getHadoopConf()), basePath, true);
     Assertions.assertEquals(3, 
fileSystemBackedTableMetadata.getAllPartitionPaths().size());
     Assertions.assertEquals(10, 
fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + 
DATE_PARTITIONS.get(0))).length);
 
@@ -127,7 +127,7 @@ public class TestFileSystemBackedTableMetadata extends 
HoodieCommonTestHarness {
     });
     HoodieLocalEngineContext localEngineContext = new 
HoodieLocalEngineContext(metaClient.getHadoopConf());
     FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
-        new FileSystemBackedTableMetadata(localEngineContext, new 
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
+        new FileSystemBackedTableMetadata(localEngineContext, 
metaClient.getTableConfig(), new 
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
     Assertions.assertEquals(3, 
fileSystemBackedTableMetadata.getAllPartitionPaths().size());
 
     List<String> fullPartitionPaths = DATE_PARTITIONS.stream().map(p -> 
basePath + "/" + p).collect(Collectors.toList());
@@ -152,7 +152,7 @@ public class TestFileSystemBackedTableMetadata extends 
HoodieCommonTestHarness {
     });
     HoodieLocalEngineContext localEngineContext = new 
HoodieLocalEngineContext(metaClient.getHadoopConf());
     FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
-        new FileSystemBackedTableMetadata(localEngineContext, new 
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
+        new FileSystemBackedTableMetadata(localEngineContext, 
metaClient.getTableConfig(), new 
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
     Assertions.assertEquals(3, 
fileSystemBackedTableMetadata.getAllPartitionPaths().size());
     Assertions.assertEquals(10, 
fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + 
ONE_LEVEL_PARTITIONS.get(0))).length);
 
@@ -178,7 +178,7 @@ public class TestFileSystemBackedTableMetadata extends 
HoodieCommonTestHarness {
     });
     HoodieLocalEngineContext localEngineContext = new 
HoodieLocalEngineContext(metaClient.getHadoopConf());
     FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
-        new FileSystemBackedTableMetadata(localEngineContext, new 
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
+        new FileSystemBackedTableMetadata(localEngineContext, 
metaClient.getTableConfig(), new 
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
     Assertions.assertEquals(3, 
fileSystemBackedTableMetadata.getAllPartitionPaths().size());
     Assertions.assertEquals(10, 
fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + 
MULTI_LEVEL_PARTITIONS.get(0))).length);
 
@@ -203,7 +203,7 @@ public class TestFileSystemBackedTableMetadata extends 
HoodieCommonTestHarness {
     });
     HoodieLocalEngineContext localEngineContext = new 
HoodieLocalEngineContext(metaClient.getHadoopConf());
     FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
-        new FileSystemBackedTableMetadata(localEngineContext, new 
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
+        new FileSystemBackedTableMetadata(localEngineContext, 
metaClient.getTableConfig(), new 
SerializableConfiguration(metaClient.getHadoopConf()), basePath, false);
     Assertions.assertEquals(3, 
fileSystemBackedTableMetadata.getAllPartitionPaths().size());
     Assertions.assertEquals(0, 
fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + 
MULTI_LEVEL_PARTITIONS.get(0))).length);
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkFilterHelper.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkFilterHelper.scala
new file mode 100644
index 00000000000..5a9bc29089e
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkFilterHelper.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.expression.{Predicates, Expression, Literal, 
NameReference}
+import org.apache.hudi.internal.schema.{Type, Types}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+import java.sql.{Date, Timestamp}
+import java.time.{Instant, LocalDate}
+import scala.jdk.CollectionConverters.seqAsJavaListConverter
+
+object SparkFilterHelper {
+
+  def convertFilters(filters: Seq[Filter]): Expression = {
+    filters.flatMap(convertFilter)
+      .reduceLeftOption(Predicates.and)
+      .getOrElse(Predicates.alwaysTrue())
+  }
+
+  def convertFilter(filter: Filter): Option[Expression] = filter match {
+    case EqualTo(attribute, value) =>
+      Some(Predicates.eq(new NameReference(attribute), toLiteral(value)))
+    case EqualNullSafe(attribute, value) =>
+      Some(Predicates.eq(new NameReference(attribute), toLiteral(value)))
+    case LessThan(attribute, value) =>
+      Some(Predicates.lt(new NameReference(attribute), toLiteral(value)))
+    case LessThanOrEqual(attribute, value) =>
+      Some(Predicates.lteq(new NameReference(attribute), toLiteral(value)))
+    case GreaterThan(attribute, value) =>
+      Some(Predicates.gt(new NameReference(attribute), toLiteral(value)))
+    case GreaterThanOrEqual(attribute, value) =>
+      Some(Predicates.gteq(new NameReference(attribute), toLiteral(value)))
+    case In(attribute, values) =>
+      Some(Predicates.in(new NameReference(attribute), 
values.map(toLiteral(_).asInstanceOf[Expression]).toList.asJava))
+    case And(left, right) =>
+      for {
+        convertedLeft <- convertFilter(left)
+        convertedRight <- convertFilter(right)
+      } yield Predicates.and(convertedLeft, convertedRight)
+    case Or(left, right) =>
+      for {
+        convertedLeft <- convertFilter(left)
+        convertedRight <- convertFilter(right)
+      } yield Predicates.or(convertedLeft, convertedRight)
+    case StringStartsWith(attribute, value) =>
+      Some(Predicates.startsWith(new NameReference(attribute), 
toLiteral(value)))
+    case StringContains(attribute, value) =>
+      Some(Predicates.contains(new NameReference(attribute), toLiteral(value)))
+    case Not(child) =>
+      convertFilter(child).map(Predicates.not)
+    case IsNull(attribute) =>
+      Some(Predicates.isNull(new NameReference(attribute)))
+    case IsNotNull(attribute) =>
+      Some(Predicates.isNotNull(new NameReference(attribute)))
+    case _ =>
+      None
+  }
+
+  def toLiteral(value: Any): Literal[_] = {
+    value match {
+      case timestamp : Timestamp =>
+        new Literal(DateTimeUtils.fromJavaTimestamp(timestamp), 
Types.TimestampType.get())
+      case date: Date =>
+        new Literal(DateTimeUtils.fromJavaDate(date), Types.DateType.get())
+      case instant: Instant =>
+        new Literal(DateTimeUtils.instantToMicros(instant), 
Types.TimestampType.get())
+      case localDate: LocalDate =>
+        new Literal(Math.toIntExact(localDate.toEpochDay), 
Types.TimestampType.get())
+      case _ =>
+        Literal.from(value)
+    }
+  }
+
+  def convertDataType(sparkType: DataType): Type = sparkType match {
+    case StructType(fields) =>
+      val convertedFields = fields.zipWithIndex.map {
+        case (field, idx) =>
+          Types.Field.get(idx, field.nullable, field.name, 
convertDataType(field.dataType), field.getComment().orNull)
+      }.toList.asJava
+      Types.RecordType.get(convertedFields)
+    case BooleanType =>
+      Types.BooleanType.get()
+    case IntegerType | ShortType | ByteType =>
+      Types.IntType.get()
+    case LongType =>
+      Types.LongType.get()
+    case FloatType =>
+      Types.FloatType.get()
+    case DoubleType =>
+      Types.DoubleType.get()
+    case StringType | CharType(_) | VarcharType(_) =>
+      Types.StringType.get()
+    case DateType =>
+      Types.DateType.get()
+    case TimestampType =>
+      Types.TimestampType.get()
+    case _type: DecimalType =>
+      Types.DecimalType.get(_type.precision, _type.scale)
+    case _ =>
+      throw new UnsupportedOperationException(s"Cannot convert spark type 
$sparkType to the relate HUDI type")
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index c76af7b39ce..35ef3e9f066 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -31,6 +31,8 @@ import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
 import org.apache.hudi.hadoop.CachingPath
 import org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe
+import org.apache.hudi.internal.schema.Types.RecordType
+import org.apache.hudi.internal.schema.utils.Conversions
 import org.apache.hudi.keygen.{StringPartitionPathFormatter, 
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
 import org.apache.hudi.util.JFunction
 import org.apache.spark.api.java.JavaSparkContext
@@ -44,9 +46,11 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
+import java.util.Collections
 import javax.annotation.concurrent.NotThreadSafe
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
+import scala.util.{Failure, Success, Try}
 
 /**
  * Implementation of the [[BaseHoodieTableFileIndex]] for Spark
@@ -225,7 +229,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
         logInfo("Partition path prefix analysis is disabled; falling back to 
fetching all partitions")
         getAllQueryPartitionPaths.asScala
       } else {
-        tryListByPartitionPathPrefix(partitionColumnNames, 
partitionPruningPredicates)
+        tryPushDownPartitionPredicates(partitionColumnNames, 
partitionPruningPredicates)
       }
 
       // NOTE: In some cases, like for ex, when non-encoded slash '/' is used 
w/in the partition column's value,
@@ -260,7 +264,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
   // NOTE: Here we try to to achieve efficiency in avoiding necessity to 
recursively list deep folder structures of
   //       partitioned tables w/ multiple partition columns, by carefully 
analyzing provided partition predicates:
   //
-  //       In cases when partition-predicates have
+  // 1. Firstly, when partition-predicates have
   //         - The form of equality predicates w/ static literals (for ex, 
like `date = '2022-01-01'`)
   //         - Fully specified proper prefix of the partition schema (ie fully 
binding first N columns
   //           of the partition schema adhering to hereby described rules)
@@ -274,11 +278,12 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
   //
   //    country_code: string (for ex, 'us')
   //    date: string (for ex, '2022-01-01')
+  //    hour: string (for ex, '08')
   //
   // Table's folder structure:
   //    us/
-  //     |- 2022-01-01/
-  //     |- 2022-01-02/
+  //     |- 2022-01-01/06
+  //     |- 2022-01-02/07
   //     ...
   //
   // In case we have incoming query specifies following predicates:
@@ -286,7 +291,15 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
   //    `... WHERE country_code = 'us' AND date = '2022-01-01'`
   //
   // We can deduce full partition-path w/o doing a single listing: 
`us/2022-01-01`
-  private def tryListByPartitionPathPrefix(partitionColumnNames: Seq[String], 
partitionColumnPredicates: Seq[Expression]) = {
+  //
+  // 2. Try to push down all partition predicates when listing the sub-folder.
+  // In case we have incoming query specifies following predicates:
+  //
+  //    `... WHERE country_code = 'us' AND date = '2022-01-01' and hour = '06'`
+  //
+  // We can deduce full partition-path w/o doing a single listing: 
`us/2022-01-01`, and then push down
+  // these filters when listing `us/2022-01-01` to get the directory 
'us/2022-01-01/06'
+  private def tryPushDownPartitionPredicates(partitionColumnNames: 
Seq[String], partitionColumnPredicates: Seq[Expression]): Seq[PartitionPath] = {
     // Static partition-path prefix is defined as a prefix of the full 
partition-path where only
     // first N partition columns (in-order) have proper (static) values bound 
in equality predicates,
     // allowing in turn to build such prefix to be used in subsequent filtering
@@ -301,24 +314,59 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
         .map(colName => (colName, (staticPartitionColumnValuesMap(colName)._1, 
staticPartitionColumnValuesMap(colName)._2.get)))
     }
 
-    if (staticPartitionColumnNameValuePairs.isEmpty) {
-      logDebug("Unable to compose relative partition path prefix from the 
predicates; falling back to fetching all partitions")
-      getAllQueryPartitionPaths.asScala
-    } else {
-      // Based on the static partition-column name-value pairs, we'll try to 
compose static partition-path
-      // prefix to try to reduce the scope of the required file-listing
-      val relativePartitionPathPrefix = 
composeRelativePartitionPath(staticPartitionColumnNameValuePairs)
-
-      if (!metaClient.getFs.exists(new Path(getBasePath, 
relativePartitionPathPrefix))) {
-        Seq()
-      } else if (staticPartitionColumnNameValuePairs.length == 
partitionColumnNames.length) {
-        // In case composed partition path is complete, we can return it 
directly avoiding extra listing operation
-        Seq(new PartitionPath(relativePartitionPathPrefix, 
staticPartitionColumnNameValuePairs.map(_._2._2.asInstanceOf[AnyRef]).toArray))
-      } else {
-        // Otherwise, compile extracted partition values (from query 
predicates) into a sub-path which is a prefix
-        // of the complete partition path, do listing for this prefix-path only
-        
listPartitionPaths(Seq(relativePartitionPathPrefix).toList.asJava).asScala
+    val hiveStylePartitioning = 
metaClient.getTableConfig.getHiveStylePartitioningEnable.toBoolean
+    val urlEncodePartitioning = 
metaClient.getTableConfig.getUrlEncodePartitioning.toBoolean
+
+    val partitionTypesOption =if (hiveStylePartitioning && 
urlEncodePartitioning) {
+      Try {
+        
SparkFilterHelper.convertDataType(partitionSchema).asInstanceOf[RecordType]
+      } match {
+        case Success(partitionRecordType)
+          if partitionRecordType.fields().size() == 
_partitionSchemaFromProperties.size
+            && 
Conversions.isPartitionSchemaSupportedConversion(partitionRecordType) =>
+          Some(partitionRecordType)
+        case _ =>
+          None
       }
+    } else {
+      // Avoid convert partition schemas if hivestylePartitioning & 
urlEncodePartitioning is not enabled.
+      None
+    }
+
+    (staticPartitionColumnNameValuePairs.isEmpty, partitionTypesOption) match {
+      case (true, Some(partitionTypes)) =>
+        // Push down partition filters without pathPrefix
+        val convertedFilters = SparkFilterHelper.convertFilters(
+          partitionColumnPredicates.flatMap {
+            expr => sparkAdapter.translateFilter(expr)
+          })
+        listPartitionPaths(Collections.singletonList(""), partitionTypes, 
convertedFilters).asScala
+      case (true, None) =>
+        logDebug("Unable to compose relative partition path prefix from the 
predicates; falling back to fetching all partitions")
+        getAllQueryPartitionPaths.asScala
+      case (false, _) =>
+        // Based on the static partition-column name-value pairs, we'll try to 
compose static partition-path
+        // prefix to try to reduce the scope of the required file-listing
+        val relativePartitionPathPrefix = 
composeRelativePartitionPath(staticPartitionColumnNameValuePairs)
+
+        if (!metaClient.getFs.exists(new Path(getBasePath, 
relativePartitionPathPrefix))) {
+          Seq()
+        } else if (staticPartitionColumnNameValuePairs.length == 
partitionColumnNames.length) {
+          // In case composed partition path is complete, we can return it 
directly avoiding extra listing operation
+          Seq(new PartitionPath(relativePartitionPathPrefix, 
staticPartitionColumnNameValuePairs.map(_._2._2.asInstanceOf[AnyRef]).toArray))
+        } else {
+          partitionTypesOption.map { partitionTypes =>
+            // Try to composite path prefix and filters to gain better 
performance
+            val convertedFilters = SparkFilterHelper.convertFilters(
+              partitionColumnPredicates.flatMap {
+                expr => sparkAdapter.translateFilter(expr)
+              })
+            listPartitionPaths(Seq(relativePartitionPathPrefix).toList.asJava, 
partitionTypes, convertedFilters).asScala
+          }.getOrElse {
+            log.warn("Met incompatible issue when converting to hudi data 
type, rollback to list by prefix directly")
+            
listPartitionPaths(Seq(relativePartitionPathPrefix).toList.asJava).asScala
+          }
+        }
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index f8f082489ce..ba5c2edb2d1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -383,6 +383,66 @@ class TestHoodieFileIndex extends 
HoodieSparkClientTestBase with ScalaAssertionS
     }
   }
 
+  /**
+   * This test mainly ensures all non-partition-prefix filter can be pushed 
successfully
+   */
+  @ParameterizedTest
+  @CsvSource(value = Array("true, false", "false, false", "true, true", 
"false, true"))
+  def 
testPartitionPruneWithMultiplePartitionColumnsWithComplexExpression(useMetadataTable:
 Boolean,
+                                                                          
complexExpressionPushDown: Boolean): Unit = {
+    val _spark = spark
+    import _spark.implicits._
+
+    val partitionNames = Seq("prefix", "dt", "hh", "country")
+    val writerOpts: Map[String, String] = commonOpts ++ Map(
+      DataSourceWriteOptions.OPERATION.key -> 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      RECORDKEY_FIELD.key -> "id",
+      PRECOMBINE_FIELD.key -> "version",
+      PARTITIONPATH_FIELD.key -> partitionNames.mkString(","),
+      HoodieMetadataConfig.ENABLE.key -> useMetadataTable.toString
+    )
+
+    val readerOpts: Map[String, String] = queryOpts ++ Map(
+      HoodieMetadataConfig.ENABLE.key -> useMetadataTable.toString,
+      DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key -> "lazy",
+      
DataSourceReadOptions.FILE_INDEX_LISTING_PARTITION_PATH_PREFIX_ANALYSIS_ENABLED.key
 -> "true"
+    )
+
+    // Add a prefix "default" to ensure `PushDownByPartitionPrefix` not work
+    val inputDF1 = (for (i <- 0 until 10) yield (i, s"a$i", 10 + i, 10000,
+      "default", s"2021-03-0${i % 2 + 1}", i % 6 + 1, if (i % 2 == 0) "CN" 
else "SG"))
+      .toDF("id", "name", "price", "version", "prefix", "dt", "hh", "country")
+
+    inputDF1.write.format("hudi")
+      .options(writerOpts)
+      .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key, 
complexExpressionPushDown.toString)
+      .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key, 
complexExpressionPushDown.toString)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    // NOTE: We're init-ing file-index in advance to additionally test 
refreshing capability
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val fileIndex = HoodieFileIndex(spark, metaClient, None, readerOpts)
+
+    val partitionFilters = EqualTo(attribute("hh"), Literal.create(5))
+
+    val partitionAndFilesAfterPrune = 
fileIndex.listFiles(Seq(partitionFilters), Seq.empty)
+    assertEquals(1, partitionAndFilesAfterPrune.size)
+
+    assertEquals(fileIndex.areAllPartitionPathsCached(), 
!complexExpressionPushDown)
+
+    val PartitionDirectory(partitionActualValues, filesAfterPrune) = 
partitionAndFilesAfterPrune.head
+    val partitionExpectValues = Seq("default", "2021-03-01", "5", "CN")
+    assertEquals(partitionExpectValues.mkString(","), 
partitionActualValues.toSeq(Seq(StringType)).mkString(","))
+    assertEquals(getFileCountInPartitionPath(makePartitionPath(partitionNames, 
partitionExpectValues, complexExpressionPushDown)),
+      filesAfterPrune.size)
+
+    val readDF = spark.read.format("hudi").options(readerOpts).load()
+
+    assertEquals(10, readDF.count())
+    assertEquals(1, readDF.filter("hh = 5").count())
+  }
+
   @ParameterizedTest
   @CsvSource(value = Array("true", "false"))
   def 
testFileListingPartitionPrefixAnalysis(enablePartitionPathPrefixAnalysis: 
Boolean): Unit = {
@@ -686,6 +746,18 @@ class TestHoodieFileIndex extends 
HoodieSparkClientTestBase with ScalaAssertionS
   private def getFileCountInPartitionPaths(partitionPaths: String*): Int = {
     partitionPaths.map(getFileCountInPartitionPath).sum
   }
+
+  private def makePartitionPath(partitionNames: Seq[String],
+                                partitionValues: Seq[String],
+                                hiveStylePartitioning: Boolean): String = {
+    if (hiveStylePartitioning) {
+      partitionNames.zip(partitionValues).map {
+        case (name, value) => s"$name=$value"
+      }.mkString(Path.SEPARATOR)
+    } else {
+      partitionValues.mkString(Path.SEPARATOR)
+    }
+  }
 }
 
 object TestHoodieFileIndex {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestSparkFilterHelper.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestSparkFilterHelper.scala
new file mode 100644
index 00000000000..7d53596ac10
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestSparkFilterHelper.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.SparkFilterHelper.convertFilter
+import org.apache.hudi.expression.{Expression, NameReference, Predicates, 
Literal => HLiteral}
+import org.apache.hudi.testutils.HoodieClientTestHarness
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.junit.jupiter.api.{Assertions, Test}
+
+import scala.jdk.CollectionConverters.seqAsJavaListConverter
+
+class TestSparkFilterHelper extends HoodieClientTestHarness with 
SparkAdapterSupport  {
+
+  @Test
+  def testConvertInExpression(): Unit = {
+    val filterExpr = sparkAdapter.translateFilter(
+      expr("col1 IN (1, 2, 3)").expr.transformUp {
+        case UnresolvedAttribute(nameParts) => 
AttributeReference(nameParts.mkString("."), IntegerType)()
+      })
+
+    val result = SparkFilterHelper.convertFilter(filterExpr.get).get
+
+    val expected = Predicates.in(
+      new NameReference("col1"),
+      Seq(1, 2, 3).map(v => HLiteral.from(v).asInstanceOf[Expression]).asJava)
+
+    Assertions.assertEquals(result.toString, expected.toString)
+  }
+
+  @Test
+  def testConvertInSetExpression(): Unit = {
+    val filterExpr = sparkAdapter.translateFilter(
+      InSet(AttributeReference("col1", StringType)(), Set("value1", "value2", 
"value3").map(UTF8String.fromString)))
+
+    val result = SparkFilterHelper.convertFilter(filterExpr.get).get
+
+    val expected = Predicates.in(
+      new NameReference("col1"),
+      Seq("value1", "value2", "value3").map(v => 
HLiteral.from(v).asInstanceOf[Expression]).asJava)
+
+    Assertions.assertEquals(result.toString, expected.toString)
+  }
+
+  @Test
+  def testConvertEqualToExpression(): Unit = {
+    val filter = 
sparkAdapter.translateFilter(EqualTo(AttributeReference("col1", LongType)(), 
Literal(1L)))
+    val result = convertFilter(filter.get).get
+
+    val expected = Predicates.eq(
+      new NameReference("col1"),
+      HLiteral.from(1L).asInstanceOf[Expression])
+
+    Assertions.assertEquals(result.toString, expected.toString)
+  }
+
+  @Test
+  def testConvertGreaterThanExpression(): Unit = {
+    val filter = 
sparkAdapter.translateFilter(GreaterThan(AttributeReference("col3", 
DoubleType)(), Literal(3.0D)))
+    val result = convertFilter(filter.get).get
+
+    val expected = Predicates.gt(
+      new NameReference("col3"),
+      HLiteral.from(3.0D).asInstanceOf[Expression])
+
+    Assertions.assertEquals(result.toString, expected.toString)
+  }
+
+  @Test
+  def testConvertGreaterThanOrEqualExpression(): Unit = {
+    val filter = 
sparkAdapter.translateFilter(GreaterThanOrEqual(AttributeReference("col4", 
FloatType)(), Literal(4.0f)))
+    val result = convertFilter(filter.get).get
+
+    val expected = Predicates.gteq(
+      new NameReference("col4"),
+      HLiteral.from(4.0f).asInstanceOf[Expression])
+
+    Assertions.assertEquals(result.toString, expected.toString)
+  }
+
+  @Test
+  def testConvertLessThanExpression(): Unit = {
+    val filter = 
sparkAdapter.translateFilter(LessThan(AttributeReference("col5", StringType)(), 
Literal("abc")))
+    val result = convertFilter(filter.get).get
+
+    val expected = Predicates.lt(
+      new NameReference("col5"),
+      HLiteral.from("abc").asInstanceOf[Expression])
+
+    Assertions.assertEquals(result.toString, expected.toString)
+  }
+
+  @Test
+  def testConvertLessThanOrEqualExpression(): Unit = {
+    val filter = 
sparkAdapter.translateFilter(LessThanOrEqual(AttributeReference("col6", 
BooleanType)(), Literal(true)))
+    val result = convertFilter(filter.get).get
+
+    val expected = Predicates.lteq(
+      new NameReference("col6"),
+      HLiteral.from(true).asInstanceOf[Expression])
+
+    Assertions.assertEquals(result.toString, expected.toString)
+  }
+
+  @Test
+  def testConvertStartsWithExpression(): Unit = {
+    val filter = 
sparkAdapter.translateFilter(StartsWith(AttributeReference("col2", 
StringType)(), Literal("prefix")))
+    val result = convertFilter(filter.get).get
+
+    val expected = Predicates.startsWith(
+      new NameReference("col2"),
+      HLiteral.from("prefix").asInstanceOf[Expression])
+
+    Assertions.assertEquals(result.toString, expected.toString)
+  }
+
+  @Test
+  def testConvertContainsExpression(): Unit = {
+    val filter = 
sparkAdapter.translateFilter(Contains(AttributeReference("col2", StringType)(), 
Literal("prefix")))
+    val result = convertFilter(filter.get).get
+
+    val expected = Predicates.contains(
+      new NameReference("col2"),
+      HLiteral.from("prefix").asInstanceOf[Expression])
+
+    Assertions.assertEquals(result.toString, expected.toString)
+  }
+
+  @Test
+  def testConvertAndExpression(): Unit = {
+    val filter = sparkAdapter.translateFilter(And(
+      EqualTo(AttributeReference("col1", IntegerType)(), Literal(1)),
+      GreaterThan(AttributeReference("col2", FloatType)(), Literal(2.0F))))
+    val result = convertFilter(filter.get).get
+
+    val expected = Predicates.and(
+      Predicates.eq(
+        new NameReference("col1"),
+        HLiteral.from(1).asInstanceOf[Expression]),
+      Predicates.gt(
+        new NameReference("col2"),
+        HLiteral.from(2.0F).asInstanceOf[Expression]))
+
+    Assertions.assertEquals(result.toString, expected.toString)
+  }
+
+  @Test
+  def testConvertOrEqualExpression(): Unit = {
+    val filter = sparkAdapter.translateFilter(Or(
+      LessThan(AttributeReference("col1", ShortType)(), Literal(10.toShort)),
+      GreaterThanOrEqual(AttributeReference("col2", DoubleType)(), 
Literal(100.0D))))
+    val result = convertFilter(filter.get).get
+
+    val expected = Predicates.or(
+      Predicates.lt(
+        new NameReference("col1"),
+        HLiteral.from(10.toShort).asInstanceOf[Expression]),
+      Predicates.gteq(
+        new NameReference("col2"),
+        HLiteral.from(100.0D).asInstanceOf[Expression]))
+
+    Assertions.assertEquals(result.toString, expected.toString)
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala
index be1dae8cc27..35afff918b9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala
@@ -220,7 +220,7 @@ class TestHoodieInternalRowUtils extends FunSuite with 
Matchers with BeforeAndAf
     avroRecord.put("preferences", preferencesRecord)
     // fill mapType
     val locations = new HashMap[String, GenericData.Record]
-    val mapSchema = 
AvroInternalSchemaConverter.convert(record.field("locations").`type`.asInstanceOf[Types.MapType].valueType,
 "test1_locations")
+    val mapSchema = 
AvroInternalSchemaConverter.convert(record.fieldByNameCaseInsensitive("locations").`type`.asInstanceOf[Types.MapType].valueType,
 "test1_locations")
     val locationsValue: GenericData.Record = new GenericData.Record(mapSchema)
     locationsValue.put("lat", 1.2f)
     locationsValue.put("long", 1.4f)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala
index 0467b6664c6..e2635c0cba8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala
@@ -55,6 +55,36 @@ class TestLazyPartitionPathFetching extends 
HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test querying with date column + partition pruning") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long,
+           |  grass_date date
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  preCombineField = 'ts'
+           | )
+           | PARTITIONED BY (grass_date)
+         """.stripMargin)
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 
date('2023-02-27'))")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 
date('2023-02-28'))")
+      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, 
date('2023-03-01'))")
+
+      checkAnswer(s"select id, name, price, ts from $tableName where 
grass_date = date'2023-03-01' order by id")(
+        Seq(3, "a3", 10.0, 1000)
+      )
+    }
+  }
+
   test("Test querying with date column + partition pruning (multi-level 
partitioning)") {
     withTempDir { tmp =>
       val tableName = generateTableName
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartitionPushDownWhenListingPaths.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartitionPushDownWhenListingPaths.scala
new file mode 100644
index 00000000000..1b5e590913f
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartitionPushDownWhenListingPaths.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.common.config.HoodieMetadataConfig
+
+class TestPartitionPushDownWhenListingPaths extends HoodieSparkSqlTestBase {
+
+  test("Test push down different partitions") {
+    Seq("true", "false").foreach { enableMetadata =>
+      withSQLConf(HoodieMetadataConfig.ENABLE.key -> enableMetadata) {
+        Seq("cow", "mor").foreach { tableType =>
+          withTempDir { tmp =>
+            val tableName = generateTableName
+            spark.sql(
+              s"""
+                 |create table $tableName (
+                 |  id int,
+                 |  name string,
+                 |  price double,
+                 |  ts long,
+                 |  date_par date,
+                 |  country string,
+                 |  hour int,
+                 |  longValue long
+                 |) using hudi
+                 | location '${tmp.getCanonicalPath}'
+                 | tblproperties (
+                 |  primaryKey ='id',
+                 |  type = '$tableType',
+                 |  preCombineField = 'ts',
+                 |  hoodie.datasource.write.hive_style_partitioning = 'true',
+                 |  hoodie.datasource.write.partitionpath.urlencode = 'true'
+                 | )
+                 | PARTITIONED BY (date_par, country, hour, 
longValue)""".stripMargin)
+            spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, date 
'2023-02-27', 'ID', 1, 102345L)")
+            spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, date 
'2023-02-28', 'US', 4, 102346L)")
+            spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, date 
'2023-03-01', 'CN', 10, 102347L)")
+
+            // Only filter one partition column
+            checkAnswer(s"select id, name, price, ts from $tableName where 
date_par = date'2023-03-01' order by id")(
+              Seq(3, "a3", 10.0, 1000)
+            )
+
+            // Filter with And operation
+            checkAnswer(s"select id, name, price, ts from $tableName where 
date_par = date'2023-02-28' and hour = 4 order by id")(
+              Seq(2, "a2", 10.0, 1000)
+            )
+
+            // Filter with Or operation
+            checkAnswer(s"select id, name, price, ts from $tableName where 
date_par = date'2023-02-28' or country = 'CN' order by id")(
+              Seq(2, "a2", 10.0, 1000),
+              Seq(3, "a3", 10.0, 1000)
+            )
+
+            // Filter with GT
+            checkAnswer(s"select id, name, price, ts from $tableName where 
date_par > date'2023-02-27' order by id")(
+              Seq(2, "a2", 10.0, 1000),
+              Seq(3, "a3", 10.0, 1000)
+            )
+
+            // Filter with LT
+            checkAnswer(s"select id, name, price, ts from $tableName where 
longValue < 102346L order by id")(
+              Seq(1, "a1", 10.0, 1000)
+            )
+
+            // Filter with EQ
+            checkAnswer(s"select id, name, price, ts from $tableName where 
longValue = 102346L order by id")(
+              Seq(2, "a2", 10.0, 1000)
+            )
+
+            // Filter with GT_EQ
+            checkAnswer(s"select id, name, price, ts from $tableName where 
date_par >= date'2023-02-27' order by id")(
+              Seq(1, "a1", 10.0, 1000),
+              Seq(2, "a2", 10.0, 1000),
+              Seq(3, "a3", 10.0, 1000)
+            )
+
+            // Filter with LT_EQ
+            checkAnswer(s"select id, name, price, ts from $tableName where 
date_par <= date'2023-02-27' order by id")(
+              Seq(1, "a1", 10.0, 1000)
+            )
+
+            // Filter with In operation
+            checkAnswer(s"select id, name, price, ts from $tableName where 
country in ('CN', 'US') order by id")(
+              Seq(2, "a2", 10.0, 1000),
+              Seq(3, "a3", 10.0, 1000)
+            )
+          }
+        }
+      }
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index 9eddf81d121..cc2e25f9891 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -38,7 +38,7 @@ import 
org.apache.spark.sql.execution.vectorized.MutableColumnarRow
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
 import org.apache.spark.sql.parser.HoodieExtendedParserInterface
-import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, 
StructType}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel._
@@ -187,4 +187,17 @@ class Spark2Adapter extends SparkAdapter {
     case OFF_HEAP => "OFF_HEAP"
     case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: 
$level")
   }
+
+  /**
+   * Spark2 doesn't support nestedPredicatePushdown,
+   * so fail it if [[supportNestedPredicatePushdown]] is true here.
+   */
+  override def translateFilter(predicate: Expression,
+                               supportNestedPredicatePushdown: Boolean = 
false): Option[Filter] = {
+    if (supportNestedPredicatePushdown) {
+      throw new UnsupportedOperationException("Nested predicate push down is 
not supported")
+    }
+
+    DataSourceStrategy.translateFilter(predicate)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index d5a35980526..ce9499ae7d2 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, 
InterpretedPredica
 import org.apache.spark.sql.catalyst.util.DateFormatter
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hudi.SparkAdapter
-import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.{HoodieSpark3CatalogUtils, SQLContext, 
SparkSession}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.storage.StorageLevel
@@ -92,4 +92,9 @@ abstract class BaseSpark3Adapter extends SparkAdapter with 
Logging {
   }
 
   override def convertStorageLevelToString(level: StorageLevel): String
+
+  override def translateFilter(predicate: Expression,
+                               supportNestedPredicatePushdown: Boolean = 
false): Option[Filter] = {
+    DataSourceStrategy.translateFilter(predicate, 
supportNestedPredicatePushdown)
+  }
 }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/FilterGenVisitor.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/FilterGenVisitor.java
index f1d4724cc1b..f42b157727c 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/FilterGenVisitor.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/FilterGenVisitor.java
@@ -18,13 +18,14 @@
 
 package org.apache.hudi.hive.util;
 
-import org.apache.hudi.hive.expression.AttributeReferenceExpression;
-import org.apache.hudi.hive.expression.BinaryOperator;
-import org.apache.hudi.hive.expression.Expression;
-import org.apache.hudi.hive.expression.ExpressionVisitor;
-import org.apache.hudi.hive.expression.Literal;
-
-import java.util.Locale;
+import org.apache.hudi.expression.NameReference;
+import org.apache.hudi.expression.BoundReference;
+import org.apache.hudi.expression.Expression;
+import org.apache.hudi.expression.ExpressionVisitor;
+import org.apache.hudi.expression.Literal;
+import org.apache.hudi.expression.Predicate;
+import org.apache.hudi.expression.Predicates;
+import org.apache.hudi.internal.schema.Types;
 
 public class FilterGenVisitor implements ExpressionVisitor<String> {
 
@@ -42,9 +43,10 @@ public class FilterGenVisitor implements 
ExpressionVisitor<String> {
     }
   }
 
-  private String visitAnd(Expression left, Expression right) {
-    String leftResult = left.accept(this);
-    String rightResult = right.accept(this);
+  @Override
+  public String visitAnd(Predicates.And and) {
+    String leftResult = and.getLeft().accept(this);
+    String rightResult = and.getRight().accept(this);
 
     if (leftResult.isEmpty()) {
       if (rightResult.isEmpty()) {
@@ -59,9 +61,10 @@ public class FilterGenVisitor implements 
ExpressionVisitor<String> {
     return "(" + makeBinaryOperatorString(leftResult, Expression.Operator.AND, 
rightResult) + ")";
   }
 
-  private String visitOr(Expression left, Expression right) {
-    String leftResult = left.accept(this);
-    String rightResult = right.accept(this);
+  @Override
+  public String visitOr(Predicates.Or or) {
+    String leftResult = or.getLeft().accept(this);
+    String rightResult = or.getRight().accept(this);
 
     if (!leftResult.isEmpty() && !rightResult.isEmpty()) {
       return "(" + makeBinaryOperatorString(leftResult, 
Expression.Operator.OR, rightResult) + ")";
@@ -81,39 +84,46 @@ public class FilterGenVisitor implements 
ExpressionVisitor<String> {
   }
 
   @Override
-  public String visitBinaryOperator(BinaryOperator expr) {
-    switch (expr.getOperator()) {
-      case AND:
-        return visitAnd(expr.getLeft(), expr.getRight());
-      case OR:
-        return visitOr(expr.getLeft(), expr.getRight());
-      case EQ:
-      case GT:
-      case LT:
-      case GT_EQ:
-      case LT_EQ:
-        return visitBinaryComparator(expr.getLeft(), expr.getOperator(), 
expr.getRight());
-      default:
-        return "";
+  public String visitPredicate(Predicate predicate) {
+    if (predicate instanceof Predicates.BinaryComparison) {
+      Predicates.BinaryComparison expr = (Predicates.BinaryComparison) 
predicate;
+      return visitBinaryComparator(expr.getLeft(), expr.getOperator(), 
expr.getRight());
     }
+
+    return "";
+  }
+
+  @Override
+  public String alwaysTrue() {
+    return "";
+  }
+
+  @Override
+  public String alwaysFalse() {
+    return "";
   }
 
   @Override
   public String visitLiteral(Literal literalExpr) {
-    switch (literalExpr.getType().toLowerCase(Locale.ROOT)) {
-      case HiveSchemaUtil.STRING_TYPE_NAME:
-        return quoteStringLiteral(literalExpr.getValue());
-      case HiveSchemaUtil.INT_TYPE_NAME:
-      case HiveSchemaUtil.BIGINT_TYPE_NAME:
-      case HiveSchemaUtil.DATE_TYPE_NAME:
-        return literalExpr.getValue();
-      default:
-        return "";
+    if (literalExpr.getDataType() instanceof Types.StringType) {
+      return quoteStringLiteral(((Literal<String>)literalExpr).getValue());
     }
+
+    if (literalExpr.getDataType() instanceof Types.IntType || 
literalExpr.getDataType() instanceof Types.LongType
+        || literalExpr.getDataType() instanceof Types.DateType) {
+      return literalExpr.getValue().toString();
+    }
+
+    return "";
   }
 
   @Override
-  public String visitAttribute(AttributeReferenceExpression attribute) {
+  public String visitNameReference(NameReference attribute) {
     return attribute.getName();
   }
+
+  @Override
+  public String visitBoundReference(BoundReference boundReference) {
+    throw new UnsupportedOperationException("BoundReference cannot be used to 
build filter string");
+  }
 }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java
index 8202408d8bd..9ff22d2d5dc 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java
@@ -20,12 +20,14 @@ package org.apache.hudi.hive.util;
 
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.expression.Predicates;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveSyncException;
-import org.apache.hudi.hive.expression.AttributeReferenceExpression;
-import org.apache.hudi.hive.expression.BinaryOperator;
-import org.apache.hudi.hive.expression.Expression;
-import org.apache.hudi.hive.expression.Literal;
+import org.apache.hudi.expression.NameReference;
+import org.apache.hudi.expression.BinaryExpression;
+import org.apache.hudi.expression.Expression;
+import org.apache.hudi.expression.Literal;
+import org.apache.hudi.internal.schema.Types;
 import org.apache.hudi.sync.common.model.FieldSchema;
 import org.apache.hudi.sync.common.model.Partition;
 import org.apache.hudi.sync.common.model.PartitionValueExtractor;
@@ -54,6 +56,28 @@ public class PartitionFilterGenerator {
     }
   };
 
+  private static final String UNSUPPORTED_TYPE_ERROR = "The value type: %s 
doesn't support to "
+      + "be pushed down to HMS, acceptable types: " + String.join(",", 
SUPPORT_TYPES);
+
+  private static Literal buildLiteralExpression(String fieldValue, String 
fieldType) {
+    switch (fieldType.toLowerCase(Locale.ROOT)) {
+      case HiveSchemaUtil.INT_TYPE_NAME:
+        return new Literal<>(Integer.parseInt(fieldValue), 
Types.IntType.get());
+      case HiveSchemaUtil.BIGINT_TYPE_NAME:
+        return new Literal<>(Long.parseLong(fieldValue), Types.LongType.get());
+      // TODO Handle Date value
+      case HiveSchemaUtil.DATE_TYPE_NAME:
+        return new Literal<>(fieldValue, Types.DateType.get());
+      case HiveSchemaUtil.STRING_TYPE_NAME:
+        return new Literal<>(fieldValue, Types.StringType.get());
+      case HiveSchemaUtil.BOOLEAN_TYPE_NAME:
+        return new Literal<>(Boolean.parseBoolean(fieldValue), 
Types.BooleanType.get());
+      default:
+        throw new 
IllegalArgumentException(String.format(UNSUPPORTED_TYPE_ERROR, fieldType));
+    }
+  }
+
+
   /**
    * Build expression from the Partition list. Here we're trying to match all 
partitions.
    *
@@ -68,11 +92,10 @@ public class PartitionFilterGenerator {
 
       for (int i = 0; i < partitionFields.size(); i++) {
         FieldSchema field = partitionFields.get(i);
-        String value = partitionValues.get(i);
-        BinaryOperator exp = BinaryOperator.eq(new 
AttributeReferenceExpression(field.getName()),
-            new Literal(value, field.getType()));
+        BinaryExpression exp = Predicates.eq(new 
NameReference(field.getName()),
+            buildLiteralExpression(partitionValues.get(i), field.getType()));
         if (root != null) {
-          root = BinaryOperator.and(root, exp);
+          root = Predicates.and(root, exp);
         } else {
           root = exp;
         }
@@ -82,7 +105,7 @@ public class PartitionFilterGenerator {
       if (result == null) {
         return expr;
       } else {
-        return BinaryOperator.or(result, expr);
+        return Predicates.or(result, expr);
       }
     });
   }
@@ -129,8 +152,7 @@ public class PartitionFilterGenerator {
         case HiveSchemaUtil.STRING_TYPE_NAME:
           return s1.compareTo(s2);
         default:
-          throw new IllegalArgumentException("The value type: " + valueType + 
" doesn't support to "
-              + "be pushed down to HMS, acceptable types: " + String.join(",", 
SUPPORT_TYPES));
+          throw new 
IllegalArgumentException(String.format(UNSUPPORTED_TYPE_ERROR, valueType));
       }
     }
   }
@@ -152,26 +174,26 @@ public class PartitionFilterGenerator {
       String[] values = fieldWithValues.getValue();
 
       if (values.length == 1) {
-        return BinaryOperator.eq(new 
AttributeReferenceExpression(fieldSchema.getName()),
-            new Literal(values[0], fieldSchema.getType()));
+        return Predicates.eq(new NameReference(fieldSchema.getName()),
+            buildLiteralExpression(values[0], fieldSchema.getType()));
       }
 
       Arrays.sort(values, new ValueComparator(fieldSchema.getType()));
 
-      return BinaryOperator.and(
-          BinaryOperator.gteq(
-              new AttributeReferenceExpression(fieldSchema.getName()),
-              new Literal(values[0], fieldSchema.getType())),
-          BinaryOperator.lteq(
-              new AttributeReferenceExpression(fieldSchema.getName()),
-              new Literal(values[values.length - 1], fieldSchema.getType())));
+      return Predicates.and(
+          Predicates.gteq(
+              new NameReference(fieldSchema.getName()),
+              buildLiteralExpression(values[0], fieldSchema.getType())),
+          Predicates.lteq(
+              new NameReference(fieldSchema.getName()),
+              buildLiteralExpression(values[values.length - 1], 
fieldSchema.getType())));
     })
         .filter(Objects::nonNull)
         .reduce(null, (result, expr) -> {
           if (result == null) {
             return expr;
           } else {
-            return BinaryOperator.and(result, expr);
+            return Predicates.and(result, expr);
           }
         });
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
index 0c85d44d068..3f1a19421ac 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java
@@ -297,7 +297,7 @@ public class HoodieDataTableValidator implements 
Serializable {
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
     try {
       HoodieTableMetadata tableMetadata = new FileSystemBackedTableMetadata(
-          engineContext, engineContext.getHadoopConf(), cfg.basePath, 
cfg.assumeDatePartitioning);
+          engineContext, metaClient.getTableConfig(), 
engineContext.getHadoopConf(), cfg.basePath, cfg.assumeDatePartitioning);
       List<Path> allDataFilePaths = 
HoodieDataTableUtils.getBaseAndLogFilePathsFromFileSystem(tableMetadata, 
cfg.basePath);
       // verify that no data files present with commit time < earliest commit 
in active timeline.
       if (metaClient.getActiveTimeline().firstInstant().isPresent()) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
index 84fa604c76d..70146ef55c8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
@@ -164,7 +164,7 @@ public class HoodieRepairTool {
         .build();
 
     this.tableMetadata = new FileSystemBackedTableMetadata(
-        context, context.getHadoopConf(), cfg.basePath, 
cfg.assumeDatePartitioning);
+        context, metaClient.getTableConfig(), context.getHadoopConf(), 
cfg.basePath, cfg.assumeDatePartitioning);
   }
 
   public boolean run() {


Reply via email to