This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 610a1543 Add the rest of the _convert_schema_if_needed calls (#2300)
610a1543 is described below
commit 610a1543f2181b4da321864ff9ae49b901d9c956
Author: Alex Stephen <[email protected]>
AuthorDate: Tue Aug 19 04:33:25 2025 -0700
Add the rest of the _convert_schema_if_needed calls (#2300)
<!--
Thanks for opening a pull request!
-->
Closes #2270
This adds the rest of the _convert_schema_if_needed calls.
# Rationale for this change
# Are these changes tested?
# Are there any user-facing changes?
<!-- In the case of user-facing changes, please add the changelog label.
-->
---
pyiceberg/catalog/dynamodb.py | 7 ++++--
pyiceberg/catalog/sql.py | 7 ++++--
pyiceberg/io/pyarrow.py | 6 +++++-
pyiceberg/table/update/schema.py | 11 +++++++---
tests/conftest.py | 22 -------------------
tests/integration/test_writes/test_writes.py | 32 ++++++++++++++++++----------
6 files changed, 44 insertions(+), 41 deletions(-)
diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py
index 3b377626..2c1b19be 100644
--- a/pyiceberg/catalog/dynamodb.py
+++ b/pyiceberg/catalog/dynamodb.py
@@ -53,7 +53,7 @@ from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION,
AWS_SECRET_ACCESS_KEY, A
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import CommitTableResponse, Table
+from pyiceberg.table import CommitTableResponse, Table, TableProperties
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -181,7 +181,10 @@ class DynamoDbCatalog(MetastoreCatalog):
ValueError: If the identifier is invalid, or no path is given to
store metadata.
"""
- schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
+ schema: Schema = self._convert_schema_if_needed( # type: ignore
+ schema,
+ int(properties.get(TableProperties.FORMAT_VERSION,
TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
+ )
database_name, table_name =
self.identifier_to_database_and_table(identifier)
diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py
index 880a4db4..0167b5a1 100644
--- a/pyiceberg/catalog/sql.py
+++ b/pyiceberg/catalog/sql.py
@@ -62,7 +62,7 @@ from pyiceberg.io import load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import CommitTableResponse, Table
+from pyiceberg.table import CommitTableResponse, Table, TableProperties
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -200,7 +200,10 @@ class SqlCatalog(MetastoreCatalog):
ValueError: If the identifier is invalid, or no path is given to
store metadata.
"""
- schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
+ schema: Schema = self._convert_schema_if_needed( # type: ignore
+ schema,
+ int(properties.get(TableProperties.FORMAT_VERSION,
TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore
+ )
namespace_identifier = Catalog.namespace_from(identifier)
table_name = Catalog.table_name_from(identifier)
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 6607d80c..7aa7a4e6 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -1483,6 +1483,7 @@ def _task_to_record_batches(
case_sensitive: bool,
name_mapping: Optional[NameMapping] = None,
partition_spec: Optional[PartitionSpec] = None,
+ format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
) -> Iterator[pa.RecordBatch]:
arrow_format = ds.ParquetFileFormat(pre_buffer=True,
buffer_size=(ONE_MEGABYTE * 8))
with io.new_input(task.file.file_path).open() as fin:
@@ -1492,7 +1493,9 @@ def _task_to_record_batches(
# Hence it is reasonable to always cast 'ns' timestamp to 'us' on read.
# When V3 support is introduced, we will update
`downcast_ns_timestamp_to_us` flag based on
# the table format version.
- file_schema = pyarrow_to_schema(physical_schema, name_mapping,
downcast_ns_timestamp_to_us=True)
+ file_schema = pyarrow_to_schema(
+ physical_schema, name_mapping, downcast_ns_timestamp_to_us=True,
format_version=format_version
+ )
# Apply column projection rules:
https://iceberg.apache.org/spec/#column-projection
projected_missing_fields = _get_column_projection_values(
@@ -1721,6 +1724,7 @@ class ArrowScan:
self._case_sensitive,
self._table_metadata.name_mapping(),
self._table_metadata.specs().get(task.file.spec_id),
+ self._table_metadata.format_version,
)
for batch in batches:
if self._limit is not None:
diff --git a/pyiceberg/table/update/schema.py b/pyiceberg/table/update/schema.py
index 6ad01e97..b193c35a 100644
--- a/pyiceberg/table/update/schema.py
+++ b/pyiceberg/table/update/schema.py
@@ -48,7 +48,7 @@ from pyiceberg.table.update import (
UpdatesAndRequirements,
UpdateTableMetadata,
)
-from pyiceberg.typedef import L
+from pyiceberg.typedef import L, TableVersion
from pyiceberg.types import IcebergType, ListType, MapType, NestedField,
PrimitiveType, StructType
if TYPE_CHECKING:
@@ -142,11 +142,16 @@ class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
self._case_sensitive = case_sensitive
return self
- def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) ->
UpdateSchema:
+ def union_by_name(
+ # TODO: Move TableProperties.DEFAULT_FORMAT_VERSION to separate file
and set that as format_version default.
+ self,
+ new_schema: Union[Schema, "pa.Schema"],
+ format_version: TableVersion = 2,
+ ) -> UpdateSchema:
from pyiceberg.catalog import Catalog
visit_with_partner(
- Catalog._convert_schema_if_needed(new_schema),
+ Catalog._convert_schema_if_needed(new_schema,
format_version=format_version),
-1,
_UnionByNameVisitor(update_schema=self,
existing_schema=self._schema, case_sensitive=self._case_sensitive),
# type: ignore
diff --git a/tests/conftest.py b/tests/conftest.py
index e036a2fa..16c9e06d 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -2811,28 +2811,6 @@ def
arrow_table_schema_with_all_microseconds_timestamp_precisions() -> "pa.Schem
)
[email protected](scope="session")
-def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema":
- """Pyarrow Schema with all microseconds timestamp."""
- import pyarrow as pa
-
- return pa.schema(
- [
- ("timestamp_s", pa.timestamp(unit="us")),
- ("timestamptz_s", pa.timestamp(unit="us", tz="UTC")),
- ("timestamp_ms", pa.timestamp(unit="us")),
- ("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")),
- ("timestamp_us", pa.timestamp(unit="us")),
- ("timestamptz_us", pa.timestamp(unit="us", tz="UTC")),
- ("timestamp_ns", pa.timestamp(unit="us")),
- ("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")),
- ("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")),
- ("timestamptz_ns_z", pa.timestamp(unit="ns", tz="UTC")),
- ("timestamptz_s_0000", pa.timestamp(unit="us", tz="UTC")),
- ]
- )
-
-
@pytest.fixture(scope="session")
def table_schema_with_all_microseconds_timestamp_precision() -> Schema:
"""Iceberg table Schema with only date, timestamp and timestamptz
values."""
diff --git a/tests/integration/test_writes/test_writes.py
b/tests/integration/test_writes/test_writes.py
index 38aea1e2..bda50bd1 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -18,6 +18,7 @@
import math
import os
import random
+import re
import time
import uuid
from datetime import date, datetime, timedelta
@@ -44,7 +45,7 @@ from pyiceberg.catalog.hive import HiveCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import CommitFailedException, NoSuchTableError
from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In,
LessThan, Not
-from pyiceberg.io.pyarrow import _dataframe_to_data_files
+from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException,
_dataframe_to_data_files
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
@@ -2249,15 +2250,24 @@ def test_branch_py_write_spark_read(session_catalog:
Catalog, spark: SparkSessio
@pytest.mark.integration
-def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None:
+def test_nanosecond_support_on_catalog(
+ session_catalog: Catalog,
arrow_table_schema_with_all_timestamp_precisions: pa.Schema
+) -> None:
identifier = "default.test_nanosecond_support_on_catalog"
- # Create a pyarrow table with a nanosecond timestamp column
- table = pa.Table.from_arrays(
- [
- pa.array([datetime.now()], type=pa.timestamp("ns")),
- pa.array([datetime.now()], type=pa.timestamp("ns",
tz="America/New_York")),
- ],
- names=["timestamp_ns", "timestamptz_ns"],
- )
- _create_table(session_catalog, identifier, {"format-version": "3"},
schema=table.schema)
+ catalog = load_catalog("default", type="in-memory")
+ catalog.create_namespace("ns")
+
+ _create_table(session_catalog, identifier, {"format-version": "3"},
schema=arrow_table_schema_with_all_timestamp_precisions)
+
+ with pytest.raises(NotImplementedError, match="Writing V3 is not yet
supported"):
+ catalog.create_table(
+ "ns.table1",
schema=arrow_table_schema_with_all_timestamp_precisions,
properties={"format-version": "3"}
+ )
+
+ with pytest.raises(
+ UnsupportedPyArrowTypeException, match=re.escape("Column
'timestamp_ns' has an unsupported type: timestamp[ns]")
+ ):
+ _create_table(
+ session_catalog, identifier, {"format-version": "2"},
schema=arrow_table_schema_with_all_timestamp_precisions
+ )