This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0ffd2f24d feat(datafusion): Add Timestamp scalar value conversion for
predicate pushdown (#2069)
0ffd2f24d is described below
commit 0ffd2f24dcba928803d59b14915597b29e40cedf
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Jan 30 14:12:09 2026 +0800
feat(datafusion): Add Timestamp scalar value conversion for predicate
pushdown (#2069)
## Which issue does this PR close?
- Closes #.
## What changes are included in this PR?
Add support for converting DataFusion Timestamp scalar values to Iceberg
Datum for predicate pushdown. This enables timestamp literal comparisons
to be pushed down to the storage layer when DataFusion provides
timestamp literals in ScalarValue form, improving query performance.
Changes:
- Add conversion for 2 timestamp time units (Microsecond, Nanosecond) to
Iceberg's native representations
- Add comprehensive unit tests covering all time units and edge cases
- Add sqllogictest (timestamp_predicate_pushdown.slt) to validate
end-to-end timestamp predicate filtering and pushdown behavior
Implementation details:
- Iceberg uses microseconds and nanoseconds as the native timestamp
representations
- Timezone information in ScalarValue is preserved but not used in
conversion (Iceberg timestamp type is timezone-agnostic)
## Are these changes tested?
---------
Co-authored-by: Claude Sonnet 4.5 <[email protected]>
---
crates/iceberg/src/arrow/schema.rs | 8 +-
crates/iceberg/src/spec/values/datum.rs | 12 ++
.../src/physical_plan/expr_to_predicate.rs | 44 +++++
.../schedules/df_timestamp_predicate_pushdown.toml | 23 +++
.../slts/df_test/timestamp_predicate_pushdown.slt | 192 +++++++++++++++++++++
5 files changed, 278 insertions(+), 1 deletion(-)
diff --git a/crates/iceberg/src/arrow/schema.rs
b/crates/iceberg/src/arrow/schema.rs
index e00f79a3b..bd9e249f4 100644
--- a/crates/iceberg/src/arrow/schema.rs
+++ b/crates/iceberg/src/arrow/schema.rs
@@ -24,7 +24,7 @@ use arrow_array::types::{Decimal128Type,
validate_decimal_precision_and_scale};
use arrow_array::{
BinaryArray, BooleanArray, Date32Array, Datum as ArrowDatum,
Decimal128Array,
FixedSizeBinaryArray, Float32Array, Float64Array, Int32Array, Int64Array,
Scalar, StringArray,
- TimestampMicrosecondArray,
+ TimestampMicrosecondArray, TimestampNanosecondArray,
};
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
@@ -742,6 +742,12 @@ pub(crate) fn get_arrow_datum(datum: &Datum) ->
Result<Arc<dyn ArrowDatum + Send
(PrimitiveType::Timestamptz, PrimitiveLiteral::Long(value)) =>
Ok(Arc::new(Scalar::new(
TimestampMicrosecondArray::new(vec![*value; 1].into(),
None).with_timezone_utc(),
))),
+ (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(value)) => {
+ Ok(Arc::new(TimestampNanosecondArray::new_scalar(*value)))
+ }
+ (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(value)) =>
Ok(Arc::new(Scalar::new(
+ TimestampNanosecondArray::new(vec![*value; 1].into(),
None).with_timezone_utc(),
+ ))),
(PrimitiveType::Decimal { precision, scale },
PrimitiveLiteral::Int128(value)) => {
let array = Decimal128Array::from_value(*value, 1)
.with_precision_and_scale(*precision as _, *scale as _)
diff --git a/crates/iceberg/src/spec/values/datum.rs
b/crates/iceberg/src/spec/values/datum.rs
index 3d4abc019..68ea6b3d4 100644
--- a/crates/iceberg/src/spec/values/datum.rs
+++ b/crates/iceberg/src/spec/values/datum.rs
@@ -233,6 +233,18 @@ impl PartialOrd for Datum {
PrimitiveType::Timestamptz,
PrimitiveType::Timestamptz,
) => val.partial_cmp(other_val),
+ (
+ PrimitiveLiteral::Long(val),
+ PrimitiveLiteral::Long(other_val),
+ PrimitiveType::TimestampNs,
+ PrimitiveType::TimestampNs,
+ ) => val.partial_cmp(other_val),
+ (
+ PrimitiveLiteral::Long(val),
+ PrimitiveLiteral::Long(other_val),
+ PrimitiveType::TimestamptzNs,
+ PrimitiveType::TimestamptzNs,
+ ) => val.partial_cmp(other_val),
(
PrimitiveLiteral::String(val),
PrimitiveLiteral::String(other_val),
diff --git
a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
index c69fdbb88..9f37345f8 100644
--- a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
+++ b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
@@ -269,6 +269,7 @@ fn reverse_predicate_operator(op: PredicateOperator) ->
PredicateOperator {
}
const MILLIS_PER_DAY: i64 = 24 * 60 * 60 * 1000;
+
/// Convert a scalar value to an iceberg datum.
fn scalar_value_to_datum(value: &ScalarValue) -> Option<Datum> {
match value {
@@ -285,6 +286,13 @@ fn scalar_value_to_datum(value: &ScalarValue) ->
Option<Datum> {
ScalarValue::LargeBinary(Some(v)) => Some(Datum::binary(v.clone())),
ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)),
ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY)
as i32)),
+ // Timestamp conversions
+ // Note: TimestampSecond and TimestampMillisecond are not handled here
because
+ // DataFusion's type coercion always converts them to match the column
type
+ // (either TimestampMicrosecond or TimestampNanosecond) before
predicate pushdown.
+ // See unit tests for how those conversions would work if needed.
+ ScalarValue::TimestampMicrosecond(Some(v), _) =>
Some(Datum::timestamp_micros(*v)),
+ ScalarValue::TimestampNanosecond(Some(v), _) =>
Some(Datum::timestamp_nanos(*v)),
_ => None,
}
}
@@ -501,6 +509,42 @@ mod tests {
assert_eq!(predicate, None);
}
+ #[test]
+ fn test_scalar_value_to_datum_timestamp() {
+ use datafusion::common::ScalarValue;
+
+ // Test TimestampMicrosecond - maps directly to Datum::timestamp_micros
+ let ts_micros = 1672876800000000i64; // 2023-01-05 00:00:00 UTC in
microseconds
+ let datum =
+
super::scalar_value_to_datum(&ScalarValue::TimestampMicrosecond(Some(ts_micros),
None));
+ assert_eq!(datum, Some(Datum::timestamp_micros(ts_micros)));
+
+ // Test TimestampNanosecond - maps to Datum::timestamp_nanos to
preserve precision
+ let ts_nanos = 1672876800000000500i64; // 2023-01-05
00:00:00.000000500 UTC in nanoseconds
+ let datum =
+
super::scalar_value_to_datum(&ScalarValue::TimestampNanosecond(Some(ts_nanos),
None));
+ assert_eq!(datum, Some(Datum::timestamp_nanos(ts_nanos)));
+
+ // Test None timestamp
+ let datum =
super::scalar_value_to_datum(&ScalarValue::TimestampMicrosecond(None, None));
+ assert_eq!(datum, None);
+
+ // Note: TimestampSecond and TimestampMillisecond are not supported
because
+ // DataFusion's type coercion converts them to TimestampMicrosecond or
TimestampNanosecond
+ // before they reach scalar_value_to_datum in SQL queries.
+ //
+ // These return None (not pushed down):
+ let ts_seconds = 1672876800i64; // 2023-01-05 00:00:00 UTC in seconds
+ let datum =
+
super::scalar_value_to_datum(&ScalarValue::TimestampSecond(Some(ts_seconds),
None));
+ assert_eq!(datum, None);
+
+ let ts_millis = 1672876800000i64; // 2023-01-05 00:00:00 UTC in
milliseconds
+ let datum =
+
super::scalar_value_to_datum(&ScalarValue::TimestampMillisecond(Some(ts_millis),
None));
+ assert_eq!(datum, None);
+ }
+
#[test]
fn test_scalar_value_to_datum_binary() {
use datafusion::common::ScalarValue;
diff --git
a/crates/sqllogictest/testdata/schedules/df_timestamp_predicate_pushdown.toml
b/crates/sqllogictest/testdata/schedules/df_timestamp_predicate_pushdown.toml
new file mode 100644
index 000000000..849bba50a
--- /dev/null
+++
b/crates/sqllogictest/testdata/schedules/df_timestamp_predicate_pushdown.toml
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[engines]
+df = { type = "datafusion" }
+
+[[steps]]
+engine = "df"
+slt = "df_test/timestamp_predicate_pushdown.slt"
diff --git
a/crates/sqllogictest/testdata/slts/df_test/timestamp_predicate_pushdown.slt
b/crates/sqllogictest/testdata/slts/df_test/timestamp_predicate_pushdown.slt
new file mode 100644
index 000000000..255731c04
--- /dev/null
+++ b/crates/sqllogictest/testdata/slts/df_test/timestamp_predicate_pushdown.slt
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Test timestamp predicate pushdown behavior
+#
+# When you CREATE TABLE with TIMESTAMP in DataFusion, it creates a TimestampNs
column
+# (nanosecond precision) in Iceberg, since DataFusion's default TIMESTAMP type
is nanoseconds.
+#
+# We use Datum::timestamp_nanos() for nanosecond timestamp predicates to
preserve
+# full precision. This allows predicates to be correctly pushed down to
Iceberg.
+
+# Create test table with timestamp column
+statement ok
+CREATE TABLE default.default.test_timestamp_table (id INT NOT NULL, ts
TIMESTAMP)
+
+# Insert test data with timestamps
+# We use CAST to convert string timestamps to proper timestamp values
+query I
+INSERT INTO default.default.test_timestamp_table
+VALUES
+ (1, CAST('2023-01-01 00:00:00' AS TIMESTAMP)),
+ (2, CAST('2023-01-05 12:30:00' AS TIMESTAMP)),
+ (3, CAST('2023-01-10 15:45:30' AS TIMESTAMP)),
+ (4, CAST('2023-01-15 09:00:00' AS TIMESTAMP)),
+ (5, CAST('2023-01-20 18:20:10' AS TIMESTAMP))
+----
+5
+
+# Verify timestamp equality predicate IS pushed down
+query TT
+EXPLAIN SELECT * FROM default.default.test_timestamp_table WHERE ts =
CAST('2023-01-05 12:30:00' AS TIMESTAMP)
+----
+logical_plan
+01)Filter: default.default.test_timestamp_table.ts =
TimestampNanosecond(1672921800000000000, None)
+02)--TableScan: default.default.test_timestamp_table projection=[id, ts],
partial_filters=[default.default.test_timestamp_table.ts =
TimestampNanosecond(1672921800000000000, None)]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=8192
+02)--FilterExec: ts@1 = 1672921800000000000
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------CooperativeExec
+05)--------IcebergTableScan projection:[id,ts] predicate:[ts = 2023-01-05
12:30:00]
+
+# Verify timestamp equality filtering works
+query I?
+SELECT * FROM default.default.test_timestamp_table WHERE ts = CAST('2023-01-05
12:30:00' AS TIMESTAMP)
+----
+2 2023-01-05T12:30:00
+
+# Verify timestamp greater than predicate IS pushed down
+query TT
+EXPLAIN SELECT * FROM default.default.test_timestamp_table WHERE ts >
CAST('2023-01-10 00:00:00' AS TIMESTAMP)
+----
+logical_plan
+01)Filter: default.default.test_timestamp_table.ts >
TimestampNanosecond(1673308800000000000, None)
+02)--TableScan: default.default.test_timestamp_table projection=[id, ts],
partial_filters=[default.default.test_timestamp_table.ts >
TimestampNanosecond(1673308800000000000, None)]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=8192
+02)--FilterExec: ts@1 > 1673308800000000000
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------CooperativeExec
+05)--------IcebergTableScan projection:[id,ts] predicate:[ts > 2023-01-10
00:00:00]
+
+# Verify timestamp greater than filtering
+query I? rowsort
+SELECT * FROM default.default.test_timestamp_table WHERE ts > CAST('2023-01-10
00:00:00' AS TIMESTAMP)
+----
+3 2023-01-10T15:45:30
+4 2023-01-15T09:00:00
+5 2023-01-20T18:20:10
+
+# Test timestamp less than or equal filtering
+query I? rowsort
+SELECT * FROM default.default.test_timestamp_table WHERE ts <=
CAST('2023-01-05 12:30:00' AS TIMESTAMP)
+----
+1 2023-01-01T00:00:00
+2 2023-01-05T12:30:00
+
+# Verify timestamp range predicate (AND of two comparisons) IS pushed down
+query TT
+EXPLAIN SELECT * FROM default.default.test_timestamp_table
+WHERE ts >= CAST('2023-01-05 00:00:00' AS TIMESTAMP)
+ AND ts <= CAST('2023-01-15 23:59:59' AS TIMESTAMP)
+----
+logical_plan
+01)Filter: default.default.test_timestamp_table.ts >=
TimestampNanosecond(1672876800000000000, None) AND
default.default.test_timestamp_table.ts <=
TimestampNanosecond(1673827199000000000, None)
+02)--TableScan: default.default.test_timestamp_table projection=[id, ts],
partial_filters=[default.default.test_timestamp_table.ts >=
TimestampNanosecond(1672876800000000000, None),
default.default.test_timestamp_table.ts <=
TimestampNanosecond(1673827199000000000, None)]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=8192
+02)--FilterExec: ts@1 >= 1672876800000000000 AND ts@1 <= 1673827199000000000
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------CooperativeExec
+05)--------IcebergTableScan projection:[id,ts] predicate:[(ts >= 2023-01-05
00:00:00) AND (ts <= 2023-01-15 23:59:59)]
+
+# Test timestamp range predicate filtering
+query I? rowsort
+SELECT * FROM default.default.test_timestamp_table
+WHERE ts >= CAST('2023-01-05 00:00:00' AS TIMESTAMP)
+ AND ts <= CAST('2023-01-15 23:59:59' AS TIMESTAMP)
+----
+2 2023-01-05T12:30:00
+3 2023-01-10T15:45:30
+4 2023-01-15T09:00:00
+
+# Test timestamp predicate combined with other predicates
+query I? rowsort
+SELECT * FROM default.default.test_timestamp_table
+WHERE ts >= CAST('2023-01-10 00:00:00' AS TIMESTAMP) AND id < 5
+----
+3 2023-01-10T15:45:30
+4 2023-01-15T09:00:00
+
+# Test timestamp NOT EQUAL predicate
+query I? rowsort
+SELECT * FROM default.default.test_timestamp_table WHERE ts !=
CAST('2023-01-05 12:30:00' AS TIMESTAMP)
+----
+1 2023-01-01T00:00:00
+3 2023-01-10T15:45:30
+4 2023-01-15T09:00:00
+5 2023-01-20T18:20:10
+
+# Test timestamp less than filtering
+query I? rowsort
+SELECT * FROM default.default.test_timestamp_table WHERE ts < CAST('2023-01-05
00:00:00' AS TIMESTAMP)
+----
+1 2023-01-01T00:00:00
+
+# Clean up: Drop the test table
+statement ok
+DROP TABLE default.default.test_timestamp_table
+
+# ============================================================================
+# Test timestamp predicate pushdown with different precisions
+# ============================================================================
+
+# Test with TIMESTAMP(6) - microsecond precision
+statement ok
+CREATE TABLE default.default.test_timestamp_micros (id INT NOT NULL, ts
TIMESTAMP(6))
+
+query I
+INSERT INTO default.default.test_timestamp_micros
+VALUES
+ (1, CAST('2023-01-01 00:00:00' AS TIMESTAMP)),
+ (2, CAST('2023-01-05 12:30:00' AS TIMESTAMP))
+----
+2
+
+# Verify microsecond timestamp predicate is pushed down
+query TT
+EXPLAIN SELECT * FROM default.default.test_timestamp_micros WHERE ts >
CAST('2023-01-01 00:00:00' AS TIMESTAMP)
+----
+logical_plan
+01)Filter: default.default.test_timestamp_micros.ts >
TimestampMicrosecond(1672531200000000, None)
+02)--TableScan: default.default.test_timestamp_micros projection=[id, ts],
partial_filters=[default.default.test_timestamp_micros.ts >
TimestampMicrosecond(1672531200000000, None)]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=8192
+02)--FilterExec: ts@1 > 1672531200000000
+03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+04)------CooperativeExec
+05)--------IcebergTableScan projection:[id,ts] predicate:[ts > 2023-01-01
00:00:00]
+
+query I?
+SELECT * FROM default.default.test_timestamp_micros WHERE ts >
CAST('2023-01-01 00:00:00' AS TIMESTAMP)
+----
+2 2023-01-05T12:30:00
+
+statement ok
+DROP TABLE default.default.test_timestamp_micros
+
+# Test with TIMESTAMP(3) - millisecond precision
+# This should fail because Iceberg doesn't support millisecond precision
+statement error DataFusion error: External error: DataInvalid => Unsupported
Arrow data type: Timestamp\(ms\)
+CREATE TABLE default.default.test_timestamp_millis (id INT NOT NULL, ts
TIMESTAMP(3))
+
+# Test with TIMESTAMP(0) - second precision
+# This should fail because Iceberg doesn't support second precision
+statement error DataFusion error: External error: DataInvalid => Unsupported
Arrow data type: Timestamp\(s\)
+CREATE TABLE default.default.test_timestamp_seconds (id INT NOT NULL, ts
TIMESTAMP(0))
+