This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cf34dcf7ad [core] Support pushing down Timestamp filter in parquet
format (#7203)
cf34dcf7ad is described below
commit cf34dcf7ad4bcb6e61db966bc3c33a49b355b59e
Author: Kerwin Zhang <[email protected]>
AuthorDate: Wed Feb 4 17:47:36 2026 +0800
[core] Support pushing down Timestamp filter in parquet format (#7203)
---
.../parquet/filter2/predicate/ParquetFilters.java | 35 ++++-
.../paimon/format/parquet/ParquetFiltersTest.java | 173 +++++++++++++++++++++
.../paimon/spark/sql/PaimonPushDownTestBase.scala | 41 +++++
3 files changed, 246 insertions(+), 3 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java
b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java
index 8643ebb6d3..6641b9304e 100644
---
a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java
+++
b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java
@@ -20,6 +20,7 @@ package org.apache.parquet.filter2.predicate;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
import org.apache.paimon.format.parquet.ParquetSchemaConverter;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.FunctionVisitor;
@@ -237,6 +238,15 @@ public class ParquetFilters {
return converted;
}
+ private static int getTimestampPrecision(org.apache.paimon.types.DataType
type) {
+ if (type instanceof TimestampType) {
+ return ((TimestampType) type).getPrecision();
+ } else if (type instanceof LocalZonedTimestampType) {
+ return ((LocalZonedTimestampType) type).getPrecision();
+ }
+ throw new IllegalArgumentException("Not a timestamp type: " + type);
+ }
+
private static Operators.Column<?> toParquetColumn(FieldRef fieldRef) {
return fieldRef.type().accept(new
ConvertToColumnTypeVisitor(fieldRef.name()));
}
@@ -270,9 +280,20 @@ public class ParquetFilters {
} else {
return Binary.fromConstantByteArray(decimal.toUnscaledBytes());
}
+ } else if (value instanceof Timestamp) {
+ Timestamp timestamp = (Timestamp) value;
+ int precision = getTimestampPrecision(type);
+ if (precision <= 3) {
+ // milliseconds
+ return timestamp.getMillisecond();
+ } else if (precision <= 6) {
+ // microseconds
+ return timestamp.toMicros();
+ }
+ // precision > 6 uses INT96, not supported
+ throw new UnsupportedOperationException();
}
- // TODO Support Timestamp
throw new UnsupportedOperationException();
}
@@ -362,15 +383,23 @@ public class ParquetFilters {
}
}
- // TODO we can support timestamp
-
@Override
public Operators.Column<?> visit(TimestampType timestampType) {
+ int precision = timestampType.getPrecision();
+ if (precision <= 6) {
+ return FilterApi.longColumn(name);
+ }
+ // precision > 6 uses INT96, not supported for filter pushdown
throw new UnsupportedOperationException();
}
@Override
public Operators.Column<?> visit(LocalZonedTimestampType
localZonedTimestampType) {
+ int precision = localZonedTimestampType.getPrecision();
+ if (precision <= 6) {
+ return FilterApi.longColumn(name);
+ }
+ // precision > 6 uses INT96, not supported for filter pushdown
throw new UnsupportedOperationException();
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java
index 3cd98e1434..18fe1ef28c 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFiltersTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.parquet;
import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.types.BigIntType;
@@ -26,7 +27,9 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VarCharType;
import org.apache.parquet.filter2.compat.FilterCompat;
@@ -410,6 +413,176 @@ class ParquetFiltersTest {
true);
}
+ @Test
+ public void testTimestampMillis() {
+ // precision <= 3 uses milliseconds (INT64)
+ int precision = 3;
+ PredicateBuilder builder =
+ new PredicateBuilder(
+ new RowType(
+ Collections.singletonList(
+ new DataField(0, "ts1", new
TimestampType(precision)))));
+
+ Timestamp value = Timestamp.fromEpochMillis(1704067200000L); //
2024-01-01 00:00:00
+ long expectedMillis = value.getMillisecond();
+
+ test(builder.isNull(0), "eq(ts1, null)", true);
+ test(builder.isNotNull(0), "noteq(ts1, null)", true);
+ test(builder.equal(0, value), "eq(ts1, " + expectedMillis + ")", true);
+ test(builder.notEqual(0, value), "noteq(ts1, " + expectedMillis + ")",
true);
+ test(builder.lessThan(0, value), "lt(ts1, " + expectedMillis + ")",
true);
+ test(builder.lessOrEqual(0, value), "lteq(ts1, " + expectedMillis +
")", true);
+ test(builder.greaterThan(0, value), "gt(ts1, " + expectedMillis + ")",
true);
+ test(builder.greaterOrEqual(0, value), "gteq(ts1, " + expectedMillis +
")", true);
+ }
+
+ @Test
+ public void testTimestampMicros() {
+ // 3 < precision <= 6 uses microseconds (INT64)
+ int precision = 6;
+ PredicateBuilder builder =
+ new PredicateBuilder(
+ new RowType(
+ Collections.singletonList(
+ new DataField(0, "ts1", new
TimestampType(precision)))));
+
+ Timestamp value = Timestamp.fromEpochMillis(1704067200123L, 456000);
// with nanos
+ long expectedMicros = value.getMillisecond() * 1000 +
value.getNanoOfMillisecond() / 1000;
+
+ test(builder.isNull(0), "eq(ts1, null)", true);
+ test(builder.isNotNull(0), "noteq(ts1, null)", true);
+ test(builder.equal(0, value), "eq(ts1, " + expectedMicros + ")", true);
+ test(builder.notEqual(0, value), "noteq(ts1, " + expectedMicros + ")",
true);
+ test(builder.lessThan(0, value), "lt(ts1, " + expectedMicros + ")",
true);
+ test(builder.lessOrEqual(0, value), "lteq(ts1, " + expectedMicros +
")", true);
+ test(builder.greaterThan(0, value), "gt(ts1, " + expectedMicros + ")",
true);
+ test(builder.greaterOrEqual(0, value), "gteq(ts1, " + expectedMicros +
")", true);
+ }
+
+ @Test
+ public void testLocalZonedTimestampMillis() {
+ // precision <= 3 uses milliseconds (INT64)
+ int precision = 3;
+ PredicateBuilder builder =
+ new PredicateBuilder(
+ new RowType(
+ Collections.singletonList(
+ new DataField(
+ 0,
+ "ts1",
+ new
LocalZonedTimestampType(precision)))));
+
+ Timestamp value = Timestamp.fromEpochMillis(1704067200000L);
+ long expectedMillis = value.getMillisecond();
+
+ test(builder.isNull(0), "eq(ts1, null)", true);
+ test(builder.isNotNull(0), "noteq(ts1, null)", true);
+ test(builder.equal(0, value), "eq(ts1, " + expectedMillis + ")", true);
+ test(builder.notEqual(0, value), "noteq(ts1, " + expectedMillis + ")",
true);
+ test(builder.lessThan(0, value), "lt(ts1, " + expectedMillis + ")",
true);
+ test(builder.greaterThan(0, value), "gt(ts1, " + expectedMillis + ")",
true);
+ }
+
+ @Test
+ public void testLocalZonedTimestampMicros() {
+ // 3 < precision <= 6 uses microseconds (INT64)
+ int precision = 6;
+ PredicateBuilder builder =
+ new PredicateBuilder(
+ new RowType(
+ Collections.singletonList(
+ new DataField(
+ 0,
+ "ts1",
+ new
LocalZonedTimestampType(precision)))));
+
+ Timestamp value = Timestamp.fromEpochMillis(1704067200123L, 456000);
+ long expectedMicros = value.getMillisecond() * 1000 +
value.getNanoOfMillisecond() / 1000;
+
+ test(builder.isNull(0), "eq(ts1, null)", true);
+ test(builder.isNotNull(0), "noteq(ts1, null)", true);
+ test(builder.equal(0, value), "eq(ts1, " + expectedMicros + ")", true);
+ test(builder.notEqual(0, value), "noteq(ts1, " + expectedMicros + ")",
true);
+ test(builder.lessThan(0, value), "lt(ts1, " + expectedMicros + ")",
true);
+ test(builder.greaterThan(0, value), "gt(ts1, " + expectedMicros + ")",
true);
+ }
+
+ @Test
+ public void testInFilterTimestampMillis() {
+ int precision = 3;
+ PredicateBuilder builder =
+ new PredicateBuilder(
+ new RowType(
+ Collections.singletonList(
+ new DataField(0, "ts1", new
TimestampType(precision)))));
+
+ Timestamp v1 = Timestamp.fromEpochMillis(1704067200000L);
+ Timestamp v2 = Timestamp.fromEpochMillis(1704153600000L);
+ Timestamp v3 = Timestamp.fromEpochMillis(1704240000000L);
+
+ test(
+ builder.in(0, Arrays.asList(v1, v2, v3)),
+ "or(or(eq(ts1, "
+ + v1.getMillisecond()
+ + "), eq(ts1, "
+ + v2.getMillisecond()
+ + ")), eq(ts1, "
+ + v3.getMillisecond()
+ + "))",
+ true);
+
+ test(
+ builder.notIn(0, Arrays.asList(v1, v2, v3)),
+ "and(and(noteq(ts1, "
+ + v1.getMillisecond()
+ + "), noteq(ts1, "
+ + v2.getMillisecond()
+ + ")), noteq(ts1, "
+ + v3.getMillisecond()
+ + "))",
+ true);
+ }
+
+ @Test
+ public void testInFilterTimestampMicros() {
+ int precision = 6;
+ PredicateBuilder builder =
+ new PredicateBuilder(
+ new RowType(
+ Collections.singletonList(
+ new DataField(0, "ts1", new
TimestampType(precision)))));
+
+ Timestamp v1 = Timestamp.fromEpochMillis(1704067200000L, 123000);
+ Timestamp v2 = Timestamp.fromEpochMillis(1704153600000L, 456000);
+ Timestamp v3 = Timestamp.fromEpochMillis(1704240000000L, 789000);
+
+ long micros1 = v1.getMillisecond() * 1000 + v1.getNanoOfMillisecond()
/ 1000;
+ long micros2 = v2.getMillisecond() * 1000 + v2.getNanoOfMillisecond()
/ 1000;
+ long micros3 = v3.getMillisecond() * 1000 + v3.getNanoOfMillisecond()
/ 1000;
+
+ test(
+ builder.in(0, Arrays.asList(v1, v2, v3)),
+ "or(or(eq(ts1, "
+ + micros1
+ + "), eq(ts1, "
+ + micros2
+ + ")), eq(ts1, "
+ + micros3
+ + "))",
+ true);
+
+ test(
+ builder.notIn(0, Arrays.asList(v1, v2, v3)),
+ "and(and(noteq(ts1, "
+ + micros1
+ + "), noteq(ts1, "
+ + micros2
+ + ")), noteq(ts1, "
+ + micros3
+ + "))",
+ true);
+ }
+
private void test(Predicate predicate, FilterPredicate parquetPredicate,
boolean canPushDown) {
FilterCompat.Filter filter =
ParquetFilters.convert(PredicateBuilder.splitAnd(predicate));
if (canPushDown) {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index 747b140cb3..c28000e73a 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -807,6 +807,47 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase with AdaptiveS
}
}
+ test(s"Paimon pushdown: parquet timestamp filter") {
+ withTable("T") {
+ spark.sql(s"""
+ |CREATE TABLE T (a TIMESTAMP, b STRING) using paimon
TBLPROPERTIES
+ |(
+ |'file.format' = 'parquet',
+ |'parquet.block.size' = '100',
+ |'target-file-size' = '10g'
+ |)
+ |""".stripMargin)
+
+ spark.sql("""INSERT INTO T VALUES
+ |(TIMESTAMP '2024-01-01 00:00:00', 'a'),
+ |(TIMESTAMP '2024-01-02 12:30:00', 'b'),
+ |(TIMESTAMP '2024-01-03 23:59:59', 'c'),
+ |(TIMESTAMP '2024-01-01 12:00:00', 'd')
+ |""".stripMargin)
+
+ // Test equals filter
+ checkAnswer(
+ spark.sql("SELECT * FROM T WHERE a = TIMESTAMP '2024-01-01 00:00:00'"),
+ Row(java.sql.Timestamp.valueOf("2024-01-01 00:00:00"), "a") :: Nil
+ )
+
+ // Test comparison filter
+ checkAnswer(
+ spark.sql("SELECT * FROM T WHERE a < TIMESTAMP '2024-01-02 00:00:00'
ORDER BY a"),
+ Row(java.sql.Timestamp.valueOf("2024-01-01 00:00:00"), "a") ::
+ Row(java.sql.Timestamp.valueOf("2024-01-01 12:00:00"), "d") :: Nil
+ )
+
+ // Test between filter
+ checkAnswer(
+ spark.sql(
+ "SELECT * FROM T WHERE a BETWEEN TIMESTAMP '2024-01-01 00:00:00' AND
TIMESTAMP '2024-01-02 00:00:00' ORDER BY a"),
+ Row(java.sql.Timestamp.valueOf("2024-01-01 00:00:00"), "a") ::
+ Row(java.sql.Timestamp.valueOf("2024-01-01 12:00:00"), "d") :: Nil
+ )
+ }
+ }
+
private def getScanBuilder(tableName: String = "T"): ScanBuilder = {
SparkTable(loadTable(tableName)).newScanBuilder(CaseInsensitiveStringMap.empty())
}