This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new bdf19ab2 fix: allow reading pyarrow timestamp as iceberg timestamptz
(#2333)
bdf19ab2 is described below
commit bdf19ab283f178e54f1647009ec589e6e1839888
Author: Kevin Liu <[email protected]>
AuthorDate: Tue Aug 19 04:53:53 2025 -0700
fix: allow reading pyarrow timestamp as iceberg timestamptz (#2333)
<!--
Thanks for opening a pull request!
-->
<!-- In the case this PR will resolve an issue, please replace
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
<!-- Closes #${GITHUB_ISSUE_ID} -->
# Rationale for this change
This PR fix reading pyarrow timestamp as Iceberg timestamptz type. It
mirrors the pyarrow logic for dealing with pyarrow timestamp types
[here](https://github.com/apache/iceberg-python/blob/8b43eb88fcc80b4980ce16b71852d211d7e93b13/pyiceberg/io/pyarrow.py#L1330-L1353)
Two changes were made to `ArrowProjectionVisitor._cast_if_needed`
1. reorder the logic so that we handle dealing with timestamp first.
Otherwise, it will try to `promote()` timestamp to timestamptz and fail.
2. allow casting when the pyarrow's value has `None` timezone. This is
allowed because we gate on the target type has "UTC" timezone. It
mirrors the java logic for reading with default UTC timezone
([1](https://github.com/apache/iceberg/blob/856cbf6eb8a85dee01c65ae6291274b700f76746/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java#L107-L116),
[2](https://github.com/apache/iceberg/blob/856cbf6eb8a85dee01c65ae6291274b700f76746/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java#L35))
### Context
I ran into an interesting edge case while testing metadata
virtualization between delta and iceberg.
Delta has both [TIMESTAMP and TIMESTAMP_NTZ data
types](https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-datatypes).
TIMESTAMP has a timezone while TIMESTAMP_NTZ has no timezone.
While Iceberg has [timestamp and
timestamptz](https://iceberg.apache.org/spec/#primitive-types).
timestamp has no timezone and timestamptz has a timezone.
So Delta's TIMESTAMP -> Iceberg timestamptz and Delta's TIMESTAMP_NTZ ->
Iceberg timestamp.
Regardless of delta or iceberg, the [parquet file stores timestamp
without the timezone
information](https://github.com/apache/parquet-format/blob/1dbc814b97c9307687a2e4bee55545ab6a2ef106/LogicalTypes.md#timestamp)
So I end up a parquet file with timestamp column, and an iceberg table
with timestamptz column, and pyiceberg cannot read this table.
Its hard to recreate the scenario but i did trace it to the
`_to_requested_schema` function. I added a unit test for this case.
The issue is that `ArrowProjectionVisitor._cast_if_needed` will try to
promote `timestamp` to `timstamptz` and this is not a valid promotion.
```
E pyiceberg.exceptions.ResolveError: Cannot promote timestamp to
timestamptz
```
https://github.com/apache/iceberg-python/blob/640c592b705db01199020db8e5f2b6e2c67f06bf/pyiceberg/io/pyarrow.py#L1779-L1782
The `elif` case below that can handle this case
https://github.com/apache/iceberg-python/blob/640c592b705db01199020db8e5f2b6e2c67f06bf/pyiceberg/io/pyarrow.py#L1800-L1806
So maybe we just need to switch the order of execution...
This was also an interesting read..
https://arrow.apache.org/docs/python/timestamps.html
# Are these changes tested?
# Are there any user-facing changes?
<!-- In the case of user-facing changes, please add the changelog label.
-->
---
pyiceberg/io/pyarrow.py | 20 +++++++++++---------
tests/io/test_pyarrow.py | 39 +++++++++++++++++++++++++++++++++++++++
2 files changed, 50 insertions(+), 9 deletions(-)
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 7aa7a4e6..7779a422 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -1788,14 +1788,7 @@ class
ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
file_field = self._file_schema.find_field(field.field_id)
if field.field_type.is_primitive:
- if field.field_type != file_field.field_type:
- target_schema = schema_to_pyarrow(
- promote(file_field.field_type, field.field_type),
include_field_ids=self._include_field_ids
- )
- if self._use_large_types is False:
- target_schema =
_pyarrow_schema_ensure_small_types(target_schema)
- return values.cast(target_schema)
- elif (target_type := schema_to_pyarrow(field.field_type,
include_field_ids=self._include_field_ids)) != values.type:
+ if (target_type := schema_to_pyarrow(field.field_type,
include_field_ids=self._include_field_ids)) != values.type:
if field.field_type == TimestampType():
# Downcasting of nanoseconds to microseconds
if (
@@ -1814,13 +1807,22 @@ class
ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
pa.types.is_timestamp(target_type)
and target_type.tz == "UTC"
and pa.types.is_timestamp(values.type)
- and values.type.tz in UTC_ALIASES
+ and (values.type.tz in UTC_ALIASES or values.type.tz
is None)
):
if target_type.unit == "us" and values.type.unit ==
"ns" and self._downcast_ns_timestamp_to_us:
return values.cast(target_type, safe=False)
elif target_type.unit == "us" and values.type.unit in
{"s", "ms", "us"}:
return values.cast(target_type)
raise ValueError(f"Unsupported schema projection from
{values.type} to {target_type}")
+
+ if field.field_type != file_field.field_type:
+ target_schema = schema_to_pyarrow(
+ promote(file_field.field_type, field.field_type),
include_field_ids=self._include_field_ids
+ )
+ if self._use_large_types is False:
+ target_schema =
_pyarrow_schema_ensure_small_types(target_schema)
+ return values.cast(target_schema)
+
return values
def _construct_field(self, field: NestedField, arrow_type: pa.DataType) ->
pa.Field:
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index 420ff6f2..47c2db79 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -2550,6 +2550,45 @@ def test_initial_value() -> None:
assert val.as_py() == 22
+def test__to_requested_schema_timestamp_to_timestamptz_projection() -> None:
+ from datetime import datetime, timezone
+
+ # file is written with timestamp without timezone
+ file_schema = Schema(NestedField(1, "ts_field", TimestampType(),
required=False))
+ batch = pa.record_batch(
+ [
+ pa.array(
+ [
+ datetime(2025, 8, 14, 12, 0, 0),
+ datetime(2025, 8, 14, 13, 0, 0),
+ ],
+ type=pa.timestamp("us"),
+ )
+ ],
+ names=["ts_field"],
+ )
+
+ # table is written with timestamp with timezone
+ table_schema = Schema(NestedField(1, "ts_field", TimestamptzType(),
required=False))
+
+ actual_result = _to_requested_schema(table_schema, file_schema, batch,
downcast_ns_timestamp_to_us=True)
+ expected = pa.record_batch(
+ [
+ pa.array(
+ [
+ datetime(2025, 8, 14, 12, 0, 0),
+ datetime(2025, 8, 14, 13, 0, 0),
+ ],
+ type=pa.timestamp("us", tz=timezone.utc),
+ )
+ ],
+ names=["ts_field"],
+ )
+
+ # expect actual_result to have timezone
+ assert expected.equals(actual_result)
+
+
def test__to_requested_schema_timestamps(
arrow_table_schema_with_all_timestamp_precisions: pa.Schema,
arrow_table_with_all_timestamp_precisions: pa.Table,