This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new bdf19ab2 fix: allow reading pyarrow timestamp as iceberg timestamptz 
(#2333)
bdf19ab2 is described below

commit bdf19ab283f178e54f1647009ec589e6e1839888
Author: Kevin Liu <[email protected]>
AuthorDate: Tue Aug 19 04:53:53 2025 -0700

    fix: allow reading pyarrow timestamp as iceberg timestamptz (#2333)
    
    <!--
    Thanks for opening a pull request!
    -->
    
    <!-- In the case this PR will resolve an issue, please replace
    ${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
    <!-- Closes #${GITHUB_ISSUE_ID} -->
    
    # Rationale for this change
    This PR fix reading pyarrow timestamp as Iceberg timestamptz type. It
    mirrors the pyarrow logic for dealing with pyarrow timestamp types
    
[here](https://github.com/apache/iceberg-python/blob/8b43eb88fcc80b4980ce16b71852d211d7e93b13/pyiceberg/io/pyarrow.py#L1330-L1353)
    
    Two changes were made to `ArrowProjectionVisitor._cast_if_needed`
    1. reorder the logic so that we handle dealing with timestamp first.
    Otherwise, it will try to `promote()` timestamp to timestamptz and fail.
    2. allow casting when the pyarrow's value has `None` timezone. This is
    allowed because we gate on the target type has "UTC" timezone. It
    mirrors the java logic for reading with default UTC timezone
    
([1](https://github.com/apache/iceberg/blob/856cbf6eb8a85dee01c65ae6291274b700f76746/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java#L107-L116),
    
[2](https://github.com/apache/iceberg/blob/856cbf6eb8a85dee01c65ae6291274b700f76746/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java#L35))
    
    ### Context
    I ran into an interesting edge case while testing metadata
    virtualization between delta and iceberg.
    
    Delta has both [TIMESTAMP and TIMESTAMP_NTZ data
    
types](https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-datatypes).
    TIMESTAMP has a timezone while TIMESTAMP_NTZ has no timezone.
    While Iceberg has [timestamp and
    timestamptz](https://iceberg.apache.org/spec/#primitive-types).
    timestamp has no timezone and timestamptz has a timezone.
    
    So Delta's TIMESTAMP -> Iceberg timestamptz and Delta's TIMESTAMP_NTZ ->
    Iceberg timestamp.
    
    Regardless of delta or iceberg, the [parquet file stores timestamp
    without the timezone
    
information](https://github.com/apache/parquet-format/blob/1dbc814b97c9307687a2e4bee55545ab6a2ef106/LogicalTypes.md#timestamp)
    
    So I end up a parquet file with timestamp column, and an iceberg table
    with timestamptz column, and pyiceberg cannot read this table.
    Its hard to recreate the scenario but i did trace it to the
    `_to_requested_schema` function. I added a unit test for this case.
    
    The issue is that `ArrowProjectionVisitor._cast_if_needed` will try to
    promote `timestamp` to `timstamptz` and this is not a valid promotion.
    ```
    E           pyiceberg.exceptions.ResolveError: Cannot promote timestamp to 
timestamptz
    ```
    
    
https://github.com/apache/iceberg-python/blob/640c592b705db01199020db8e5f2b6e2c67f06bf/pyiceberg/io/pyarrow.py#L1779-L1782
    
    The `elif` case below that can handle this case
    
https://github.com/apache/iceberg-python/blob/640c592b705db01199020db8e5f2b6e2c67f06bf/pyiceberg/io/pyarrow.py#L1800-L1806
    
    So maybe we just need to switch the order of execution...
    
    This was also an interesting read..
    https://arrow.apache.org/docs/python/timestamps.html
    
    # Are these changes tested?
    
    # Are there any user-facing changes?
    
    <!-- In the case of user-facing changes, please add the changelog label.
    -->
---
 pyiceberg/io/pyarrow.py  | 20 +++++++++++---------
 tests/io/test_pyarrow.py | 39 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 50 insertions(+), 9 deletions(-)

diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 7aa7a4e6..7779a422 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -1788,14 +1788,7 @@ class 
ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
         file_field = self._file_schema.find_field(field.field_id)
 
         if field.field_type.is_primitive:
-            if field.field_type != file_field.field_type:
-                target_schema = schema_to_pyarrow(
-                    promote(file_field.field_type, field.field_type), 
include_field_ids=self._include_field_ids
-                )
-                if self._use_large_types is False:
-                    target_schema = 
_pyarrow_schema_ensure_small_types(target_schema)
-                return values.cast(target_schema)
-            elif (target_type := schema_to_pyarrow(field.field_type, 
include_field_ids=self._include_field_ids)) != values.type:
+            if (target_type := schema_to_pyarrow(field.field_type, 
include_field_ids=self._include_field_ids)) != values.type:
                 if field.field_type == TimestampType():
                     # Downcasting of nanoseconds to microseconds
                     if (
@@ -1814,13 +1807,22 @@ class 
ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
                         pa.types.is_timestamp(target_type)
                         and target_type.tz == "UTC"
                         and pa.types.is_timestamp(values.type)
-                        and values.type.tz in UTC_ALIASES
+                        and (values.type.tz in UTC_ALIASES or values.type.tz 
is None)
                     ):
                         if target_type.unit == "us" and values.type.unit == 
"ns" and self._downcast_ns_timestamp_to_us:
                             return values.cast(target_type, safe=False)
                         elif target_type.unit == "us" and values.type.unit in 
{"s", "ms", "us"}:
                             return values.cast(target_type)
                     raise ValueError(f"Unsupported schema projection from 
{values.type} to {target_type}")
+
+            if field.field_type != file_field.field_type:
+                target_schema = schema_to_pyarrow(
+                    promote(file_field.field_type, field.field_type), 
include_field_ids=self._include_field_ids
+                )
+                if self._use_large_types is False:
+                    target_schema = 
_pyarrow_schema_ensure_small_types(target_schema)
+                return values.cast(target_schema)
+
         return values
 
     def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> 
pa.Field:
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index 420ff6f2..47c2db79 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -2550,6 +2550,45 @@ def test_initial_value() -> None:
         assert val.as_py() == 22
 
 
+def test__to_requested_schema_timestamp_to_timestamptz_projection() -> None:
+    from datetime import datetime, timezone
+
+    # file is written with timestamp without timezone
+    file_schema = Schema(NestedField(1, "ts_field", TimestampType(), 
required=False))
+    batch = pa.record_batch(
+        [
+            pa.array(
+                [
+                    datetime(2025, 8, 14, 12, 0, 0),
+                    datetime(2025, 8, 14, 13, 0, 0),
+                ],
+                type=pa.timestamp("us"),
+            )
+        ],
+        names=["ts_field"],
+    )
+
+    # table is written with timestamp with timezone
+    table_schema = Schema(NestedField(1, "ts_field", TimestamptzType(), 
required=False))
+
+    actual_result = _to_requested_schema(table_schema, file_schema, batch, 
downcast_ns_timestamp_to_us=True)
+    expected = pa.record_batch(
+        [
+            pa.array(
+                [
+                    datetime(2025, 8, 14, 12, 0, 0),
+                    datetime(2025, 8, 14, 13, 0, 0),
+                ],
+                type=pa.timestamp("us", tz=timezone.utc),
+            )
+        ],
+        names=["ts_field"],
+    )
+
+    # expect actual_result to have timezone
+    assert expected.equals(actual_result)
+
+
 def test__to_requested_schema_timestamps(
     arrow_table_schema_with_all_timestamp_precisions: pa.Schema,
     arrow_table_with_all_timestamp_precisions: pa.Table,

Reply via email to