This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 24b12ddd Support reading nanoseconds from PyArrow (#2294)
24b12ddd is described below
commit 24b12ddd8fdab4a62650786a2c3cdd56a53f8719
Author: Alex Stephen <[email protected]>
AuthorDate: Thu Aug 7 14:06:03 2025 -0700
Support reading nanoseconds from PyArrow (#2294)
<!--
Thanks for opening a pull request!
-->
<!-- In the case this PR will resolve an issue, please replace
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
Closes #2270
Related to #1045
# Rationale for this change
This allows us to read nanosecond information from pyarrow. Right now,
we always downcast to microseconds or throw an error. By passing through
the format-version, we can grab nanosecond precision *just for v3
tables*
# Are these changes tested?
Included a test. I can't do a test involving writing since we don't
support v3 writing yet (there's a PR out for that)
# Are there any user-facing changes?
<!-- In the case of user-facing changes, please add the changelog label.
-->
---------
Co-authored-by: Fokko Driesprong <[email protected]>
---
pyiceberg/catalog/__init__.py | 15 +++++--
pyiceberg/catalog/rest/__init__.py | 6 ++-
pyiceberg/io/pyarrow.py | 60 ++++++++++++++++++++++------
pyiceberg/table/__init__.py | 22 +++++++---
tests/conftest.py | 22 ++++++++++
tests/integration/test_writes/test_writes.py | 15 +++++++
6 files changed, 119 insertions(+), 21 deletions(-)
diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index 4da11643..1607541d 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -70,6 +70,7 @@ from pyiceberg.typedef import (
Identifier,
Properties,
RecursiveDict,
+ TableVersion,
)
from pyiceberg.utils.config import Config, merge_config
from pyiceberg.utils.properties import property_as_bool
@@ -743,7 +744,9 @@ class Catalog(ABC):
return load_file_io({**self.properties, **properties}, location)
@staticmethod
- def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) ->
Schema:
+ def _convert_schema_if_needed(
+ schema: Union[Schema, "pa.Schema"], format_version: TableVersion =
TableProperties.DEFAULT_FORMAT_VERSION
+ ) -> Schema:
if isinstance(schema, Schema):
return schema
try:
@@ -754,7 +757,10 @@ class Catalog(ABC):
downcast_ns_timestamp_to_us =
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
if isinstance(schema, pa.Schema):
schema: Schema = visit_pyarrow( # type: ignore
- schema,
_ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
+ schema,
+ _ConvertToIcebergWithoutIDs(
+
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
format_version=format_version
+ ),
)
return schema
except ModuleNotFoundError:
@@ -847,7 +853,10 @@ class MetastoreCatalog(Catalog, ABC):
Returns:
StagedTable: the created staged table instance.
"""
- schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
+ schema: Schema = self._convert_schema_if_needed( # type: ignore
+ schema,
+ int(properties.get(TableProperties.FORMAT_VERSION,
TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
+ )
database_name, table_name =
self.identifier_to_database_and_table(identifier)
diff --git a/pyiceberg/catalog/rest/__init__.py
b/pyiceberg/catalog/rest/__init__.py
index b39af9fc..c43a64f3 100644
--- a/pyiceberg/catalog/rest/__init__.py
+++ b/pyiceberg/catalog/rest/__init__.py
@@ -64,6 +64,7 @@ from pyiceberg.table import (
StagedTable,
Table,
TableIdentifier,
+ TableProperties,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder,
assign_fresh_sort_order_ids
@@ -498,7 +499,10 @@ class RestCatalog(Catalog):
properties: Properties = EMPTY_DICT,
stage_create: bool = False,
) -> TableResponse:
- iceberg_schema = self._convert_schema_if_needed(schema)
+ iceberg_schema = self._convert_schema_if_needed(
+ schema,
+ int(properties.get(TableProperties.FORMAT_VERSION,
TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
+ )
fresh_schema = assign_fresh_schema_ids(iceberg_schema)
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec,
iceberg_schema, fresh_schema)
fresh_sort_order = assign_fresh_sort_order_ids(sort_order,
iceberg_schema, fresh_schema)
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index cde6d947..ab85893a 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -145,12 +145,13 @@ from pyiceberg.schema import (
visit,
visit_with_partner,
)
+from pyiceberg.table import TableProperties
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
from pyiceberg.table.puffin import PuffinFile
from pyiceberg.transforms import IdentityTransform, TruncateTransform
-from pyiceberg.typedef import EMPTY_DICT, Properties, Record
+from pyiceberg.typedef import EMPTY_DICT, Properties, Record, TableVersion
from pyiceberg.types import (
BinaryType,
BooleanType,
@@ -1017,13 +1018,20 @@ def _combine_positional_deletes(positional_deletes:
List[pa.ChunkedArray], start
def pyarrow_to_schema(
- schema: pa.Schema, name_mapping: Optional[NameMapping] = None,
downcast_ns_timestamp_to_us: bool = False
+ schema: pa.Schema,
+ name_mapping: Optional[NameMapping] = None,
+ downcast_ns_timestamp_to_us: bool = False,
+ format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
) -> Schema:
has_ids = visit_pyarrow(schema, _HasIds())
if has_ids:
- return visit_pyarrow(schema,
_ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us))
+ return visit_pyarrow(
+ schema,
_ConvertToIceberg(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
format_version=format_version)
+ )
elif name_mapping is not None:
- schema_without_ids = _pyarrow_to_schema_without_ids(schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
+ schema_without_ids = _pyarrow_to_schema_without_ids(
+ schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
format_version=format_version
+ )
return apply_name_mapping(schema_without_ids, name_mapping)
else:
raise ValueError(
@@ -1031,8 +1039,15 @@ def pyarrow_to_schema(
)
-def _pyarrow_to_schema_without_ids(schema: pa.Schema,
downcast_ns_timestamp_to_us: bool = False) -> Schema:
- return visit_pyarrow(schema,
_ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us))
+def _pyarrow_to_schema_without_ids(
+ schema: pa.Schema,
+ downcast_ns_timestamp_to_us: bool = False,
+ format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
+) -> Schema:
+ return visit_pyarrow(
+ schema,
+
_ConvertToIcebergWithoutIDs(downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
format_version=format_version),
+ )
def _pyarrow_schema_ensure_large_types(schema: pa.Schema) -> pa.Schema:
@@ -1214,9 +1229,12 @@ class
_ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
_field_names: List[str]
- def __init__(self, downcast_ns_timestamp_to_us: bool = False) -> None:
+ def __init__(
+ self, downcast_ns_timestamp_to_us: bool = False, format_version:
TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
+ ) -> None: # noqa: F821
self._field_names = []
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
+ self._format_version = format_version
def _field_id(self, field: pa.Field) -> int:
if (field_id := _get_field_id(field)) is not None:
@@ -1287,6 +1305,11 @@ class
_ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
elif primitive.unit == "ns":
if self._downcast_ns_timestamp_to_us:
logger.warning("Iceberg does not yet support 'ns'
timestamp precision. Downcasting to 'us'.")
+ elif self._format_version >= 3:
+ if primitive.tz in UTC_ALIASES:
+ return TimestamptzNanoType()
+ else:
+ return TimestampNanoType()
else:
raise TypeError(
"Iceberg does not yet support 'ns' timestamp
precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to
automatically downcast 'ns' to 'us' on write.",
@@ -2519,7 +2542,10 @@ def bin_pack_arrow_table(tbl: pa.Table,
target_file_size: int) -> Iterator[List[
def _check_pyarrow_schema_compatible(
- requested_schema: Schema, provided_schema: pa.Schema,
downcast_ns_timestamp_to_us: bool = False
+ requested_schema: Schema,
+ provided_schema: pa.Schema,
+ downcast_ns_timestamp_to_us: bool = False,
+ format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
) -> None:
"""
Check if the `requested_schema` is compatible with `provided_schema`.
@@ -2532,10 +2558,15 @@ def _check_pyarrow_schema_compatible(
name_mapping = requested_schema.name_mapping
try:
provided_schema = pyarrow_to_schema(
- provided_schema, name_mapping=name_mapping,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+ provided_schema,
+ name_mapping=name_mapping,
+ downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
+ format_version=format_version,
)
except ValueError as e:
- provided_schema = _pyarrow_to_schema_without_ids(provided_schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
+ provided_schema = _pyarrow_to_schema_without_ids(
+ provided_schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
format_version=format_version
+ )
additional_names = set(provided_schema._name_to_id.keys()) -
set(requested_schema._name_to_id.keys())
raise ValueError(
f"PyArrow table contains more columns: {',
'.join(sorted(additional_names))}. Update the schema first (hint, use
union_by_name)."
@@ -2561,7 +2592,7 @@ def parquet_file_to_data_file(io: FileIO, table_metadata:
TableMetadata, file_pa
)
schema = table_metadata.schema()
- _check_pyarrow_schema_compatible(schema, arrow_schema)
+ _check_pyarrow_schema_compatible(schema, arrow_schema,
format_version=table_metadata.format_version)
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=parquet_metadata,
@@ -2652,7 +2683,12 @@ def _dataframe_to_data_files(
)
name_mapping = table_metadata.schema().name_mapping
downcast_ns_timestamp_to_us =
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
- task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
+ task_schema = pyarrow_to_schema(
+ df.schema,
+ name_mapping=name_mapping,
+ downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
+ format_version=table_metadata.format_version,
+ )
if table_metadata.spec().is_unpartitioned():
yield from write_file(
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 30b06fcb..7d5cc10d 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -219,7 +219,7 @@ class TableProperties:
DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
FORMAT_VERSION = "format-version"
- DEFAULT_FORMAT_VERSION = 2
+ DEFAULT_FORMAT_VERSION: TableVersion = 2
MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes"
MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024 # 8 MB
@@ -477,7 +477,10 @@ class Transaction:
)
downcast_ns_timestamp_to_us =
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
_check_pyarrow_schema_compatible(
- self.table_metadata.schema(), provided_schema=df.schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+ self.table_metadata.schema(),
+ provided_schema=df.schema,
+ downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
+ format_version=self.table_metadata.format_version,
)
with self._append_snapshot_producer(snapshot_properties,
branch=branch) as append_files:
@@ -527,7 +530,10 @@ class Transaction:
downcast_ns_timestamp_to_us =
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
_check_pyarrow_schema_compatible(
- self.table_metadata.schema(), provided_schema=df.schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+ self.table_metadata.schema(),
+ provided_schema=df.schema,
+ downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
+ format_version=self.table_metadata.format_version,
)
# If dataframe does not have data, there is no need to overwrite
@@ -593,7 +599,10 @@ class Transaction:
)
downcast_ns_timestamp_to_us =
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
_check_pyarrow_schema_compatible(
- self.table_metadata.schema(), provided_schema=df.schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+ self.table_metadata.schema(),
+ provided_schema=df.schema,
+ downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
+ format_version=self.table_metadata.format_version,
)
if overwrite_filter != AlwaysFalse():
@@ -789,7 +798,10 @@ class Transaction:
downcast_ns_timestamp_to_us =
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
_check_pyarrow_schema_compatible(
- self.table_metadata.schema(), provided_schema=df.schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+ self.table_metadata.schema(),
+ provided_schema=df.schema,
+ downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
+ format_version=self.table_metadata.format_version,
)
# get list of rows that exist so we don't have to load the entire
target table
diff --git a/tests/conftest.py b/tests/conftest.py
index 16c9e06d..e036a2fa 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -2811,6 +2811,28 @@ def
arrow_table_schema_with_all_microseconds_timestamp_precisions() -> "pa.Schem
)
[email protected](scope="session")
+def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema":
+ """Pyarrow Schema with all microseconds timestamp."""
+ import pyarrow as pa
+
+ return pa.schema(
+ [
+ ("timestamp_s", pa.timestamp(unit="us")),
+ ("timestamptz_s", pa.timestamp(unit="us", tz="UTC")),
+ ("timestamp_ms", pa.timestamp(unit="us")),
+ ("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")),
+ ("timestamp_us", pa.timestamp(unit="us")),
+ ("timestamptz_us", pa.timestamp(unit="us", tz="UTC")),
+ ("timestamp_ns", pa.timestamp(unit="us")),
+ ("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")),
+ ("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")),
+ ("timestamptz_ns_z", pa.timestamp(unit="ns", tz="UTC")),
+ ("timestamptz_s_0000", pa.timestamp(unit="us", tz="UTC")),
+ ]
+ )
+
+
@pytest.fixture(scope="session")
def table_schema_with_all_microseconds_timestamp_precision() -> Schema:
"""Iceberg table Schema with only date, timestamp and timestamptz
values."""
diff --git a/tests/integration/test_writes/test_writes.py
b/tests/integration/test_writes/test_writes.py
index 173ddf78..38aea1e2 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -2246,3 +2246,18 @@ def test_branch_py_write_spark_read(session_catalog:
Catalog, spark: SparkSessio
)
assert main_df.count() == 3
assert branch_df.count() == 2
+
+
[email protected]
+def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None:
+ identifier = "default.test_nanosecond_support_on_catalog"
+ # Create a pyarrow table with a nanosecond timestamp column
+ table = pa.Table.from_arrays(
+ [
+ pa.array([datetime.now()], type=pa.timestamp("ns")),
+ pa.array([datetime.now()], type=pa.timestamp("ns",
tz="America/New_York")),
+ ],
+ names=["timestamp_ns", "timestamptz_ns"],
+ )
+
+ _create_table(session_catalog, identifier, {"format-version": "3"},
schema=table.schema)