This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cf34dcf7ad [core] Support pushing down Timestamp filter in parquet 
format (#7203)
cf34dcf7ad is described below

commit cf34dcf7ad4bcb6e61db966bc3c33a49b355b59e
Author: Kerwin Zhang <[email protected]>
AuthorDate: Wed Feb 4 17:47:36 2026 +0800

    [core] Support pushing down Timestamp filter in parquet format (#7203)
---
 .../parquet/filter2/predicate/ParquetFilters.java  |  35 ++++-
 .../paimon/format/parquet/ParquetFiltersTest.java  | 173 +++++++++++++++++++++
 .../paimon/spark/sql/PaimonPushDownTestBase.scala  |  41 +++++
 3 files changed, 246 insertions(+), 3 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java
 
b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java
index 8643ebb6d3..6641b9304e 100644
--- 
a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java
+++ 
b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java
@@ -20,6 +20,7 @@ package org.apache.parquet.filter2.predicate;
 
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.format.parquet.ParquetSchemaConverter;
 import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.FunctionVisitor;
@@ -237,6 +238,15 @@ public class ParquetFilters {
         return converted;
     }
 
+    private static int getTimestampPrecision(org.apache.paimon.types.DataType 
type) {
+        if (type instanceof TimestampType) {
+            return ((TimestampType) type).getPrecision();
+        } else if (type instanceof LocalZonedTimestampType) {
+            return ((LocalZonedTimestampType) type).getPrecision();
+        }
+        throw new IllegalArgumentException("Not a timestamp type: " + type);
+    }
+
     private static Operators.Column<?> toParquetColumn(FieldRef fieldRef) {
         return fieldRef.type().accept(new 
ConvertToColumnTypeVisitor(fieldRef.name()));
     }
@@ -270,9 +280,20 @@ public class ParquetFilters {
             } else {
                 return Binary.fromConstantByteArray(decimal.toUnscaledBytes());
             }
+        } else if (value instanceof Timestamp) {
+            Timestamp timestamp = (Timestamp) value;
+            int precision = getTimestampPrecision(type);
+            if (precision <= 3) {
+                // milliseconds
+                return timestamp.getMillisecond();
+            } else if (precision <= 6) {
+                // microseconds
+                return timestamp.toMicros();
+            }
+            // precision > 6 uses INT96, not supported
+            throw new UnsupportedOperationException();
         }
 
-        // TODO Support Timestamp
         throw new UnsupportedOperationException();
     }
 
@@ -362,15 +383,23 @@ public class ParquetFilters {
             }
         }
 
-        // TODO we can support timestamp
-
         @Override
         public Operators.Column<?> visit(TimestampType timestampType) {
+            int precision = timestampType.getPrecision();
+            if (precision <= 6) {
+                return FilterApi.longColumn(name);
+            }
+            // precision > 6 uses INT96, not supported for filter pushdown
             throw new UnsupportedOperationException();
         }
 
         @Override
         public Operators.Column<?> visit(LocalZonedTimestampType 
localZonedTimestampType) {
+            int precision = localZonedTimestampType.getPrecision();
+            if (precision <= 6) {
+                return FilterApi.longColumn(name);
+            }
+            // precision > 6 uses INT96, not supported for filter pushdown
             throw new UnsupportedOperationException();
         }
 
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java
index 3cd98e1434..18fe1ef28c 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format.parquet;
 
 import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.types.BigIntType;
@@ -26,7 +27,9 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.DoubleType;
 import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.LocalZonedTimestampType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.types.VarCharType;
 
 import org.apache.parquet.filter2.compat.FilterCompat;
@@ -410,6 +413,176 @@ class ParquetFiltersTest {
                 true);
     }
 
+    @Test
+    public void testTimestampMillis() {
+        // precision <= 3 uses milliseconds (INT64)
+        int precision = 3;
+        PredicateBuilder builder =
+                new PredicateBuilder(
+                        new RowType(
+                                Collections.singletonList(
+                                        new DataField(0, "ts1", new 
TimestampType(precision)))));
+
+        Timestamp value = Timestamp.fromEpochMillis(1704067200000L); // 
2024-01-01 00:00:00
+        long expectedMillis = value.getMillisecond();
+
+        test(builder.isNull(0), "eq(ts1, null)", true);
+        test(builder.isNotNull(0), "noteq(ts1, null)", true);
+        test(builder.equal(0, value), "eq(ts1, " + expectedMillis + ")", true);
+        test(builder.notEqual(0, value), "noteq(ts1, " + expectedMillis + ")", 
true);
+        test(builder.lessThan(0, value), "lt(ts1, " + expectedMillis + ")", 
true);
+        test(builder.lessOrEqual(0, value), "lteq(ts1, " + expectedMillis + 
")", true);
+        test(builder.greaterThan(0, value), "gt(ts1, " + expectedMillis + ")", 
true);
+        test(builder.greaterOrEqual(0, value), "gteq(ts1, " + expectedMillis + 
")", true);
+    }
+
+    @Test
+    public void testTimestampMicros() {
+        // 3 < precision <= 6 uses microseconds (INT64)
+        int precision = 6;
+        PredicateBuilder builder =
+                new PredicateBuilder(
+                        new RowType(
+                                Collections.singletonList(
+                                        new DataField(0, "ts1", new 
TimestampType(precision)))));
+
+        Timestamp value = Timestamp.fromEpochMillis(1704067200123L, 456000); 
// with nanos
+        long expectedMicros = value.getMillisecond() * 1000 + 
value.getNanoOfMillisecond() / 1000;
+
+        test(builder.isNull(0), "eq(ts1, null)", true);
+        test(builder.isNotNull(0), "noteq(ts1, null)", true);
+        test(builder.equal(0, value), "eq(ts1, " + expectedMicros + ")", true);
+        test(builder.notEqual(0, value), "noteq(ts1, " + expectedMicros + ")", 
true);
+        test(builder.lessThan(0, value), "lt(ts1, " + expectedMicros + ")", 
true);
+        test(builder.lessOrEqual(0, value), "lteq(ts1, " + expectedMicros + 
")", true);
+        test(builder.greaterThan(0, value), "gt(ts1, " + expectedMicros + ")", 
true);
+        test(builder.greaterOrEqual(0, value), "gteq(ts1, " + expectedMicros + 
")", true);
+    }
+
+    @Test
+    public void testLocalZonedTimestampMillis() {
+        // precision <= 3 uses milliseconds (INT64)
+        int precision = 3;
+        PredicateBuilder builder =
+                new PredicateBuilder(
+                        new RowType(
+                                Collections.singletonList(
+                                        new DataField(
+                                                0,
+                                                "ts1",
+                                                new 
LocalZonedTimestampType(precision)))));
+
+        Timestamp value = Timestamp.fromEpochMillis(1704067200000L);
+        long expectedMillis = value.getMillisecond();
+
+        test(builder.isNull(0), "eq(ts1, null)", true);
+        test(builder.isNotNull(0), "noteq(ts1, null)", true);
+        test(builder.equal(0, value), "eq(ts1, " + expectedMillis + ")", true);
+        test(builder.notEqual(0, value), "noteq(ts1, " + expectedMillis + ")", 
true);
+        test(builder.lessThan(0, value), "lt(ts1, " + expectedMillis + ")", 
true);
+        test(builder.greaterThan(0, value), "gt(ts1, " + expectedMillis + ")", 
true);
+    }
+
+    @Test
+    public void testLocalZonedTimestampMicros() {
+        // 3 < precision <= 6 uses microseconds (INT64)
+        int precision = 6;
+        PredicateBuilder builder =
+                new PredicateBuilder(
+                        new RowType(
+                                Collections.singletonList(
+                                        new DataField(
+                                                0,
+                                                "ts1",
+                                                new 
LocalZonedTimestampType(precision)))));
+
+        Timestamp value = Timestamp.fromEpochMillis(1704067200123L, 456000);
+        long expectedMicros = value.getMillisecond() * 1000 + 
value.getNanoOfMillisecond() / 1000;
+
+        test(builder.isNull(0), "eq(ts1, null)", true);
+        test(builder.isNotNull(0), "noteq(ts1, null)", true);
+        test(builder.equal(0, value), "eq(ts1, " + expectedMicros + ")", true);
+        test(builder.notEqual(0, value), "noteq(ts1, " + expectedMicros + ")", 
true);
+        test(builder.lessThan(0, value), "lt(ts1, " + expectedMicros + ")", 
true);
+        test(builder.greaterThan(0, value), "gt(ts1, " + expectedMicros + ")", 
true);
+    }
+
+    @Test
+    public void testInFilterTimestampMillis() {
+        int precision = 3;
+        PredicateBuilder builder =
+                new PredicateBuilder(
+                        new RowType(
+                                Collections.singletonList(
+                                        new DataField(0, "ts1", new 
TimestampType(precision)))));
+
+        Timestamp v1 = Timestamp.fromEpochMillis(1704067200000L);
+        Timestamp v2 = Timestamp.fromEpochMillis(1704153600000L);
+        Timestamp v3 = Timestamp.fromEpochMillis(1704240000000L);
+
+        test(
+                builder.in(0, Arrays.asList(v1, v2, v3)),
+                "or(or(eq(ts1, "
+                        + v1.getMillisecond()
+                        + "), eq(ts1, "
+                        + v2.getMillisecond()
+                        + ")), eq(ts1, "
+                        + v3.getMillisecond()
+                        + "))",
+                true);
+
+        test(
+                builder.notIn(0, Arrays.asList(v1, v2, v3)),
+                "and(and(noteq(ts1, "
+                        + v1.getMillisecond()
+                        + "), noteq(ts1, "
+                        + v2.getMillisecond()
+                        + ")), noteq(ts1, "
+                        + v3.getMillisecond()
+                        + "))",
+                true);
+    }
+
+    @Test
+    public void testInFilterTimestampMicros() {
+        int precision = 6;
+        PredicateBuilder builder =
+                new PredicateBuilder(
+                        new RowType(
+                                Collections.singletonList(
+                                        new DataField(0, "ts1", new 
TimestampType(precision)))));
+
+        Timestamp v1 = Timestamp.fromEpochMillis(1704067200000L, 123000);
+        Timestamp v2 = Timestamp.fromEpochMillis(1704153600000L, 456000);
+        Timestamp v3 = Timestamp.fromEpochMillis(1704240000000L, 789000);
+
+        long micros1 = v1.getMillisecond() * 1000 + v1.getNanoOfMillisecond() 
/ 1000;
+        long micros2 = v2.getMillisecond() * 1000 + v2.getNanoOfMillisecond() 
/ 1000;
+        long micros3 = v3.getMillisecond() * 1000 + v3.getNanoOfMillisecond() 
/ 1000;
+
+        test(
+                builder.in(0, Arrays.asList(v1, v2, v3)),
+                "or(or(eq(ts1, "
+                        + micros1
+                        + "), eq(ts1, "
+                        + micros2
+                        + ")), eq(ts1, "
+                        + micros3
+                        + "))",
+                true);
+
+        test(
+                builder.notIn(0, Arrays.asList(v1, v2, v3)),
+                "and(and(noteq(ts1, "
+                        + micros1
+                        + "), noteq(ts1, "
+                        + micros2
+                        + ")), noteq(ts1, "
+                        + micros3
+                        + "))",
+                true);
+    }
+
     private void test(Predicate predicate, FilterPredicate parquetPredicate, 
boolean canPushDown) {
         FilterCompat.Filter filter = 
ParquetFilters.convert(PredicateBuilder.splitAnd(predicate));
         if (canPushDown) {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index 747b140cb3..c28000e73a 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -807,6 +807,47 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase with AdaptiveS
     }
   }
 
+  test(s"Paimon pushdown: parquet timestamp filter") {
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (a TIMESTAMP, b STRING) using paimon 
TBLPROPERTIES
+                   |(
+                   |'file.format' = 'parquet',
+                   |'parquet.block.size' = '100',
+                   |'target-file-size' = '10g'
+                   |)
+                   |""".stripMargin)
+
+      spark.sql("""INSERT INTO T VALUES
+                  |(TIMESTAMP '2024-01-01 00:00:00', 'a'),
+                  |(TIMESTAMP '2024-01-02 12:30:00', 'b'),
+                  |(TIMESTAMP '2024-01-03 23:59:59', 'c'),
+                  |(TIMESTAMP '2024-01-01 12:00:00', 'd')
+                  |""".stripMargin)
+
+      // Test equals filter
+      checkAnswer(
+        spark.sql("SELECT * FROM T WHERE a = TIMESTAMP '2024-01-01 00:00:00'"),
+        Row(java.sql.Timestamp.valueOf("2024-01-01 00:00:00"), "a") :: Nil
+      )
+
+      // Test comparison filter
+      checkAnswer(
+        spark.sql("SELECT * FROM T WHERE a < TIMESTAMP '2024-01-02 00:00:00' 
ORDER BY a"),
+        Row(java.sql.Timestamp.valueOf("2024-01-01 00:00:00"), "a") ::
+          Row(java.sql.Timestamp.valueOf("2024-01-01 12:00:00"), "d") :: Nil
+      )
+
+      // Test between filter
+      checkAnswer(
+        spark.sql(
+          "SELECT * FROM T WHERE a BETWEEN TIMESTAMP '2024-01-01 00:00:00' AND 
TIMESTAMP '2024-01-02 00:00:00' ORDER BY a"),
+        Row(java.sql.Timestamp.valueOf("2024-01-01 00:00:00"), "a") ::
+          Row(java.sql.Timestamp.valueOf("2024-01-01 12:00:00"), "d") :: Nil
+      )
+    }
+  }
+
   private def getScanBuilder(tableName: String = "T"): ScanBuilder = {
     
SparkTable(loadTable(tableName)).newScanBuilder(CaseInsensitiveStringMap.empty())
   }

Reply via email to