This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 24b12ddd Support reading nanoseconds from PyArrow (#2294)
24b12ddd is described below

commit 24b12ddd8fdab4a62650786a2c3cdd56a53f8719
Author: Alex Stephen <[email protected]>
AuthorDate: Thu Aug 7 14:06:03 2025 -0700

    Support reading nanoseconds from PyArrow (#2294)
    
    <!--
    Thanks for opening a pull request!
    -->
    
    <!-- In the case this PR will resolve an issue, please replace
    ${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
    Closes #2270
    Related to #1045
    
    # Rationale for this change
    This allows us to read nanosecond information from pyarrow. Right now,
    we always downcast to microseconds or throw an error. By passing through
    the format-version, we can grab nanosecond precision *just for v3
    tables*
    
    # Are these changes tested?
    Included a test. I can't do a test involving writing since we don't
    support v3 writing yet (there's a PR out for that)
    
    # Are there any user-facing changes?
    
    <!-- In the case of user-facing changes, please add the changelog label.
    -->
    
    ---------
    
    Co-authored-by: Fokko Driesprong <[email protected]>
---
 pyiceberg/catalog/__init__.py                | 15 +++++--
 pyiceberg/catalog/rest/__init__.py           |  6 ++-
 pyiceberg/io/pyarrow.py                      | 60 ++++++++++++++++++++++------
 pyiceberg/table/__init__.py                  | 22 +++++++---
 tests/conftest.py                            | 22 ++++++++++
 tests/integration/test_writes/test_writes.py | 15 +++++++
 6 files changed, 119 insertions(+), 21 deletions(-)

diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index 4da11643..1607541d 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -70,6 +70,7 @@ from pyiceberg.typedef import (
     Identifier,
     Properties,
     RecursiveDict,
+    TableVersion,
 )
 from pyiceberg.utils.config import Config, merge_config
 from pyiceberg.utils.properties import property_as_bool
@@ -743,7 +744,9 @@ class Catalog(ABC):
         return load_file_io({**self.properties, **properties}, location)
 
     @staticmethod
-    def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> 
Schema:
+    def _convert_schema_if_needed(
+        schema: Union[Schema, "pa.Schema"], format_version: TableVersion = 
TableProperties.DEFAULT_FORMAT_VERSION
+    ) -> Schema:
         if isinstance(schema, Schema):
             return schema
         try:
@@ -754,7 +757,10 @@ class Catalog(ABC):
             downcast_ns_timestamp_to_us = 
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
             if isinstance(schema, pa.Schema):
                 schema: Schema = visit_pyarrow(  # type: ignore
-                    schema, 
_ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
+                    schema,
+                    _ConvertToIcebergWithoutIDs(
+                        
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, 
format_version=format_version
+                    ),
                 )
                 return schema
         except ModuleNotFoundError:
@@ -847,7 +853,10 @@ class MetastoreCatalog(Catalog, ABC):
         Returns:
             StagedTable: the created staged table instance.
         """
-        schema: Schema = self._convert_schema_if_needed(schema)  # type: ignore
+        schema: Schema = self._convert_schema_if_needed(  # type: ignore
+            schema,
+            int(properties.get(TableProperties.FORMAT_VERSION, 
TableProperties.DEFAULT_FORMAT_VERSION)),  # type: ignore
+        )
 
         database_name, table_name = 
self.identifier_to_database_and_table(identifier)
 
diff --git a/pyiceberg/catalog/rest/__init__.py 
b/pyiceberg/catalog/rest/__init__.py
index b39af9fc..c43a64f3 100644
--- a/pyiceberg/catalog/rest/__init__.py
+++ b/pyiceberg/catalog/rest/__init__.py
@@ -64,6 +64,7 @@ from pyiceberg.table import (
     StagedTable,
     Table,
     TableIdentifier,
+    TableProperties,
 )
 from pyiceberg.table.metadata import TableMetadata
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, 
assign_fresh_sort_order_ids
@@ -498,7 +499,10 @@ class RestCatalog(Catalog):
         properties: Properties = EMPTY_DICT,
         stage_create: bool = False,
     ) -> TableResponse:
-        iceberg_schema = self._convert_schema_if_needed(schema)
+        iceberg_schema = self._convert_schema_if_needed(
+            schema,
+            int(properties.get(TableProperties.FORMAT_VERSION, 
TableProperties.DEFAULT_FORMAT_VERSION)),  # type: ignore
+        )
         fresh_schema = assign_fresh_schema_ids(iceberg_schema)
         fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, 
iceberg_schema, fresh_schema)
         fresh_sort_order = assign_fresh_sort_order_ids(sort_order, 
iceberg_schema, fresh_schema)
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index cde6d947..ab85893a 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -145,12 +145,13 @@ from pyiceberg.schema import (
     visit,
     visit_with_partner,
 )
+from pyiceberg.table import TableProperties
 from pyiceberg.table.locations import load_location_provider
 from pyiceberg.table.metadata import TableMetadata
 from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
 from pyiceberg.table.puffin import PuffinFile
 from pyiceberg.transforms import IdentityTransform, TruncateTransform
-from pyiceberg.typedef import EMPTY_DICT, Properties, Record
+from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion
 from pyiceberg.types import (
     BinaryType,
     BooleanType,
@@ -1017,13 +1018,20 @@ def _combine_positional_deletes(positional_deletes: 
List[pa.ChunkedArray], start
 
 
 def pyarrow_to_schema(
-    schema: pa.Schema, name_mapping: Optional[NameMapping] = None, 
downcast_ns_timestamp_to_us: bool = False
+    schema: pa.Schema,
+    name_mapping: Optional[NameMapping] = None,
+    downcast_ns_timestamp_to_us: bool = False,
+    format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
 ) -> Schema:
     has_ids = visit_pyarrow(schema, _HasIds())
     if has_ids:
-        return visit_pyarrow(schema, 
_ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us))
+        return visit_pyarrow(
+            schema, 
_ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, 
format_version=format_version)
+        )
     elif name_mapping is not None:
-        schema_without_ids = _pyarrow_to_schema_without_ids(schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
+        schema_without_ids = _pyarrow_to_schema_without_ids(
+            schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, 
format_version=format_version
+        )
         return apply_name_mapping(schema_without_ids, name_mapping)
     else:
         raise ValueError(
@@ -1031,8 +1039,15 @@ def pyarrow_to_schema(
         )
 
 
-def _pyarrow_to_schema_without_ids(schema: pa.Schema, 
downcast_ns_timestamp_to_us: bool = False) -> Schema:
-    return visit_pyarrow(schema, 
_ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us))
+def _pyarrow_to_schema_without_ids(
+    schema: pa.Schema,
+    downcast_ns_timestamp_to_us: bool = False,
+    format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
+) -> Schema:
+    return visit_pyarrow(
+        schema,
+        
_ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
 format_version=format_version),
+    )
 
 
 def _pyarrow_schema_ensure_large_types(schema: pa.Schema) -> pa.Schema:
@@ -1214,9 +1229,12 @@ class 
_ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
 
     _field_names: List[str]
 
-    def __init__(self, downcast_ns_timestamp_to_us: bool = False) -> None:
+    def __init__(
+        self, downcast_ns_timestamp_to_us: bool = False, format_version: 
TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
+    ) -> None:  # noqa: F821
         self._field_names = []
         self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
+        self._format_version = format_version
 
     def _field_id(self, field: pa.Field) -> int:
         if (field_id := _get_field_id(field)) is not None:
@@ -1287,6 +1305,11 @@ class 
_ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
             elif primitive.unit == "ns":
                 if self._downcast_ns_timestamp_to_us:
                     logger.warning("Iceberg does not yet support 'ns' 
timestamp precision. Downcasting to 'us'.")
+                elif self._format_version >= 3:
+                    if primitive.tz in UTC_ALIASES:
+                        return TimestamptzNanoType()
+                    else:
+                        return TimestampNanoType()
                 else:
                     raise TypeError(
                         "Iceberg does not yet support 'ns' timestamp 
precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to 
automatically downcast 'ns' to 'us' on write.",
@@ -2519,7 +2542,10 @@ def bin_pack_arrow_table(tbl: pa.Table, 
target_file_size: int) -> Iterator[List[
 
 
 def _check_pyarrow_schema_compatible(
-    requested_schema: Schema, provided_schema: pa.Schema, 
downcast_ns_timestamp_to_us: bool = False
+    requested_schema: Schema,
+    provided_schema: pa.Schema,
+    downcast_ns_timestamp_to_us: bool = False,
+    format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
 ) -> None:
     """
     Check if the `requested_schema` is compatible with `provided_schema`.
@@ -2532,10 +2558,15 @@ def _check_pyarrow_schema_compatible(
     name_mapping = requested_schema.name_mapping
     try:
         provided_schema = pyarrow_to_schema(
-            provided_schema, name_mapping=name_mapping, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+            provided_schema,
+            name_mapping=name_mapping,
+            downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
+            format_version=format_version,
         )
     except ValueError as e:
-        provided_schema = _pyarrow_to_schema_without_ids(provided_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
+        provided_schema = _pyarrow_to_schema_without_ids(
+            provided_schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, 
format_version=format_version
+        )
         additional_names = set(provided_schema._name_to_id.keys()) - 
set(requested_schema._name_to_id.keys())
         raise ValueError(
             f"PyArrow table contains more columns: {', 
'.join(sorted(additional_names))}. Update the schema first (hint, use 
union_by_name)."
@@ -2561,7 +2592,7 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: 
TableMetadata, file_pa
         )
 
     schema = table_metadata.schema()
-    _check_pyarrow_schema_compatible(schema, arrow_schema)
+    _check_pyarrow_schema_compatible(schema, arrow_schema, 
format_version=table_metadata.format_version)
 
     statistics = data_file_statistics_from_parquet_metadata(
         parquet_metadata=parquet_metadata,
@@ -2652,7 +2683,12 @@ def _dataframe_to_data_files(
     )
     name_mapping = table_metadata.schema().name_mapping
     downcast_ns_timestamp_to_us = 
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
-    task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
+    task_schema = pyarrow_to_schema(
+        df.schema,
+        name_mapping=name_mapping,
+        downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
+        format_version=table_metadata.format_version,
+    )
 
     if table_metadata.spec().is_unpartitioned():
         yield from write_file(
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 30b06fcb..7d5cc10d 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -219,7 +219,7 @@ class TableProperties:
 
     DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
     FORMAT_VERSION = "format-version"
-    DEFAULT_FORMAT_VERSION = 2
+    DEFAULT_FORMAT_VERSION: TableVersion = 2
 
     MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes"
     MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024  # 8 MB
@@ -477,7 +477,10 @@ class Transaction:
             )
         downcast_ns_timestamp_to_us = 
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
         _check_pyarrow_schema_compatible(
-            self.table_metadata.schema(), provided_schema=df.schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+            self.table_metadata.schema(),
+            provided_schema=df.schema,
+            downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
+            format_version=self.table_metadata.format_version,
         )
 
         with self._append_snapshot_producer(snapshot_properties, 
branch=branch) as append_files:
@@ -527,7 +530,10 @@ class Transaction:
 
         downcast_ns_timestamp_to_us = 
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
         _check_pyarrow_schema_compatible(
-            self.table_metadata.schema(), provided_schema=df.schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+            self.table_metadata.schema(),
+            provided_schema=df.schema,
+            downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
+            format_version=self.table_metadata.format_version,
         )
 
         # If dataframe does not have data, there is no need to overwrite
@@ -593,7 +599,10 @@ class Transaction:
             )
         downcast_ns_timestamp_to_us = 
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
         _check_pyarrow_schema_compatible(
-            self.table_metadata.schema(), provided_schema=df.schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+            self.table_metadata.schema(),
+            provided_schema=df.schema,
+            downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
+            format_version=self.table_metadata.format_version,
         )
 
         if overwrite_filter != AlwaysFalse():
@@ -789,7 +798,10 @@ class Transaction:
 
         downcast_ns_timestamp_to_us = 
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
         _check_pyarrow_schema_compatible(
-            self.table_metadata.schema(), provided_schema=df.schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+            self.table_metadata.schema(),
+            provided_schema=df.schema,
+            downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
+            format_version=self.table_metadata.format_version,
         )
 
         # get list of rows that exist so we don't have to load the entire 
target table
diff --git a/tests/conftest.py b/tests/conftest.py
index 16c9e06d..e036a2fa 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -2811,6 +2811,28 @@ def 
arrow_table_schema_with_all_microseconds_timestamp_precisions() -> "pa.Schem
     )
 
 
[email protected](scope="session")
+def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema":
+    """Pyarrow Schema with all microseconds timestamp."""
+    import pyarrow as pa
+
+    return pa.schema(
+        [
+            ("timestamp_s", pa.timestamp(unit="us")),
+            ("timestamptz_s", pa.timestamp(unit="us", tz="UTC")),
+            ("timestamp_ms", pa.timestamp(unit="us")),
+            ("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")),
+            ("timestamp_us", pa.timestamp(unit="us")),
+            ("timestamptz_us", pa.timestamp(unit="us", tz="UTC")),
+            ("timestamp_ns", pa.timestamp(unit="us")),
+            ("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")),
+            ("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")),
+            ("timestamptz_ns_z", pa.timestamp(unit="ns", tz="UTC")),
+            ("timestamptz_s_0000", pa.timestamp(unit="us", tz="UTC")),
+        ]
+    )
+
+
 @pytest.fixture(scope="session")
 def table_schema_with_all_microseconds_timestamp_precision() -> Schema:
     """Iceberg table Schema with only date, timestamp and timestamptz 
values."""
diff --git a/tests/integration/test_writes/test_writes.py 
b/tests/integration/test_writes/test_writes.py
index 173ddf78..38aea1e2 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -2246,3 +2246,18 @@ def test_branch_py_write_spark_read(session_catalog: 
Catalog, spark: SparkSessio
     )
     assert main_df.count() == 3
     assert branch_df.count() == 2
+
+
[email protected]
+def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None:
+    identifier = "default.test_nanosecond_support_on_catalog"
+    # Create a pyarrow table with a nanosecond timestamp column
+    table = pa.Table.from_arrays(
+        [
+            pa.array([datetime.now()], type=pa.timestamp("ns")),
+            pa.array([datetime.now()], type=pa.timestamp("ns", 
tz="America/New_York")),
+        ],
+        names=["timestamp_ns", "timestamptz_ns"],
+    )
+
+    _create_table(session_catalog, identifier, {"format-version": "3"}, 
schema=table.schema)

Reply via email to