This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 610a1543 Add the rest of the _convert_schema_if_needed calls (#2300)
610a1543 is described below

commit 610a1543f2181b4da321864ff9ae49b901d9c956
Author: Alex Stephen <[email protected]>
AuthorDate: Tue Aug 19 04:33:25 2025 -0700

    Add the rest of the _convert_schema_if_needed calls (#2300)
    
    <!--
    Thanks for opening a pull request!
    -->
    Closes #2270
    
    This adds the rest of the _convert_schema_if_needed calls.
    
    # Rationale for this change
    
    # Are these changes tested?
    
    # Are there any user-facing changes?
    
    <!-- In the case of user-facing changes, please add the changelog label.
    -->
---
 pyiceberg/catalog/dynamodb.py                |  7 ++++--
 pyiceberg/catalog/sql.py                     |  7 ++++--
 pyiceberg/io/pyarrow.py                      |  6 +++++-
 pyiceberg/table/update/schema.py             | 11 +++++++---
 tests/conftest.py                            | 22 -------------------
 tests/integration/test_writes/test_writes.py | 32 ++++++++++++++++++----------
 6 files changed, 44 insertions(+), 41 deletions(-)

diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py
index 3b377626..2c1b19be 100644
--- a/pyiceberg/catalog/dynamodb.py
+++ b/pyiceberg/catalog/dynamodb.py
@@ -53,7 +53,7 @@ from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, 
AWS_SECRET_ACCESS_KEY, A
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import CommitTableResponse, Table
+from pyiceberg.table import CommitTableResponse, Table, TableProperties
 from pyiceberg.table.locations import load_location_provider
 from pyiceberg.table.metadata import new_table_metadata
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -181,7 +181,10 @@ class DynamoDbCatalog(MetastoreCatalog):
             ValueError: If the identifier is invalid, or no path is given to 
store metadata.
 
         """
-        schema: Schema = self._convert_schema_if_needed(schema)  # type: ignore
+        schema: Schema = self._convert_schema_if_needed(  # type: ignore
+            schema,
+            int(properties.get(TableProperties.FORMAT_VERSION, 
TableProperties.DEFAULT_FORMAT_VERSION)),  # type: ignore
+        )
 
         database_name, table_name = 
self.identifier_to_database_and_table(identifier)
 
diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py
index 880a4db4..0167b5a1 100644
--- a/pyiceberg/catalog/sql.py
+++ b/pyiceberg/catalog/sql.py
@@ -62,7 +62,7 @@ from pyiceberg.io import load_file_io
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import CommitTableResponse, Table
+from pyiceberg.table import CommitTableResponse, Table, TableProperties
 from pyiceberg.table.locations import load_location_provider
 from pyiceberg.table.metadata import new_table_metadata
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -200,7 +200,10 @@ class SqlCatalog(MetastoreCatalog):
             ValueError: If the identifier is invalid, or no path is given to 
store metadata.
 
         """
-        schema: Schema = self._convert_schema_if_needed(schema)  # type: ignore
+        schema: Schema = self._convert_schema_if_needed(  # type: ignore
+            schema,
+            int(properties.get(TableProperties.FORMAT_VERSION, 
TableProperties.DEFAULT_FORMAT_VERSION)),  # type: ignore
+        )
 
         namespace_identifier = Catalog.namespace_from(identifier)
         table_name = Catalog.table_name_from(identifier)
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 6607d80c..7aa7a4e6 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -1483,6 +1483,7 @@ def _task_to_record_batches(
     case_sensitive: bool,
     name_mapping: Optional[NameMapping] = None,
     partition_spec: Optional[PartitionSpec] = None,
+    format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
 ) -> Iterator[pa.RecordBatch]:
     arrow_format = ds.ParquetFileFormat(pre_buffer=True, 
buffer_size=(ONE_MEGABYTE * 8))
     with io.new_input(task.file.file_path).open() as fin:
@@ -1492,7 +1493,9 @@ def _task_to_record_batches(
         # Hence it is reasonable to always cast 'ns' timestamp to 'us' on read.
         # When V3 support is introduced, we will update 
`downcast_ns_timestamp_to_us` flag based on
         # the table format version.
-        file_schema = pyarrow_to_schema(physical_schema, name_mapping, 
downcast_ns_timestamp_to_us=True)
+        file_schema = pyarrow_to_schema(
+            physical_schema, name_mapping, downcast_ns_timestamp_to_us=True, 
format_version=format_version
+        )
 
         # Apply column projection rules: 
https://iceberg.apache.org/spec/#column-projection
         projected_missing_fields = _get_column_projection_values(
@@ -1721,6 +1724,7 @@ class ArrowScan:
                 self._case_sensitive,
                 self._table_metadata.name_mapping(),
                 self._table_metadata.specs().get(task.file.spec_id),
+                self._table_metadata.format_version,
             )
             for batch in batches:
                 if self._limit is not None:
diff --git a/pyiceberg/table/update/schema.py b/pyiceberg/table/update/schema.py
index 6ad01e97..b193c35a 100644
--- a/pyiceberg/table/update/schema.py
+++ b/pyiceberg/table/update/schema.py
@@ -48,7 +48,7 @@ from pyiceberg.table.update import (
     UpdatesAndRequirements,
     UpdateTableMetadata,
 )
-from pyiceberg.typedef import L
+from pyiceberg.typedef import L, TableVersion
 from pyiceberg.types import IcebergType, ListType, MapType, NestedField, 
PrimitiveType, StructType
 
 if TYPE_CHECKING:
@@ -142,11 +142,16 @@ class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
         self._case_sensitive = case_sensitive
         return self
 
-    def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> 
UpdateSchema:
+    def union_by_name(
+        # TODO: Move TableProperties.DEFAULT_FORMAT_VERSION to separate file 
and set that as format_version default.
+        self,
+        new_schema: Union[Schema, "pa.Schema"],
+        format_version: TableVersion = 2,
+    ) -> UpdateSchema:
         from pyiceberg.catalog import Catalog
 
         visit_with_partner(
-            Catalog._convert_schema_if_needed(new_schema),
+            Catalog._convert_schema_if_needed(new_schema, 
format_version=format_version),
             -1,
             _UnionByNameVisitor(update_schema=self, 
existing_schema=self._schema, case_sensitive=self._case_sensitive),
             # type: ignore
diff --git a/tests/conftest.py b/tests/conftest.py
index e036a2fa..16c9e06d 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -2811,28 +2811,6 @@ def 
arrow_table_schema_with_all_microseconds_timestamp_precisions() -> "pa.Schem
     )
 
 
[email protected](scope="session")
-def arrow_table_schema_with_nanoseconds_timestamp_precisions() -> "pa.Schema":
-    """Pyarrow Schema with all microseconds timestamp."""
-    import pyarrow as pa
-
-    return pa.schema(
-        [
-            ("timestamp_s", pa.timestamp(unit="us")),
-            ("timestamptz_s", pa.timestamp(unit="us", tz="UTC")),
-            ("timestamp_ms", pa.timestamp(unit="us")),
-            ("timestamptz_ms", pa.timestamp(unit="us", tz="UTC")),
-            ("timestamp_us", pa.timestamp(unit="us")),
-            ("timestamptz_us", pa.timestamp(unit="us", tz="UTC")),
-            ("timestamp_ns", pa.timestamp(unit="us")),
-            ("timestamptz_ns", pa.timestamp(unit="ns", tz="UTC")),
-            ("timestamptz_us_etc_utc", pa.timestamp(unit="us", tz="UTC")),
-            ("timestamptz_ns_z", pa.timestamp(unit="ns", tz="UTC")),
-            ("timestamptz_s_0000", pa.timestamp(unit="us", tz="UTC")),
-        ]
-    )
-
-
 @pytest.fixture(scope="session")
 def table_schema_with_all_microseconds_timestamp_precision() -> Schema:
     """Iceberg table Schema with only date, timestamp and timestamptz 
values."""
diff --git a/tests/integration/test_writes/test_writes.py 
b/tests/integration/test_writes/test_writes.py
index 38aea1e2..bda50bd1 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -18,6 +18,7 @@
 import math
 import os
 import random
+import re
 import time
 import uuid
 from datetime import date, datetime, timedelta
@@ -44,7 +45,7 @@ from pyiceberg.catalog.hive import HiveCatalog
 from pyiceberg.catalog.sql import SqlCatalog
 from pyiceberg.exceptions import CommitFailedException, NoSuchTableError
 from pyiceberg.expressions import And, EqualTo, GreaterThanOrEqual, In, 
LessThan, Not
-from pyiceberg.io.pyarrow import _dataframe_to_data_files
+from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, 
_dataframe_to_data_files
 from pyiceberg.partitioning import PartitionField, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table import TableProperties
@@ -2249,15 +2250,24 @@ def test_branch_py_write_spark_read(session_catalog: 
Catalog, spark: SparkSessio
 
 
 @pytest.mark.integration
-def test_nanosecond_support_on_catalog(session_catalog: Catalog) -> None:
+def test_nanosecond_support_on_catalog(
+    session_catalog: Catalog, 
arrow_table_schema_with_all_timestamp_precisions: pa.Schema
+) -> None:
     identifier = "default.test_nanosecond_support_on_catalog"
-    # Create a pyarrow table with a nanosecond timestamp column
-    table = pa.Table.from_arrays(
-        [
-            pa.array([datetime.now()], type=pa.timestamp("ns")),
-            pa.array([datetime.now()], type=pa.timestamp("ns", 
tz="America/New_York")),
-        ],
-        names=["timestamp_ns", "timestamptz_ns"],
-    )
 
-    _create_table(session_catalog, identifier, {"format-version": "3"}, 
schema=table.schema)
+    catalog = load_catalog("default", type="in-memory")
+    catalog.create_namespace("ns")
+
+    _create_table(session_catalog, identifier, {"format-version": "3"}, 
schema=arrow_table_schema_with_all_timestamp_precisions)
+
+    with pytest.raises(NotImplementedError, match="Writing V3 is not yet 
supported"):
+        catalog.create_table(
+            "ns.table1", 
schema=arrow_table_schema_with_all_timestamp_precisions, 
properties={"format-version": "3"}
+        )
+
+    with pytest.raises(
+        UnsupportedPyArrowTypeException, match=re.escape("Column 
'timestamp_ns' has an unsupported type: timestamp[ns]")
+    ):
+        _create_table(
+            session_catalog, identifier, {"format-version": "2"}, 
schema=arrow_table_schema_with_all_timestamp_precisions
+        )

Reply via email to