This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new b0a78781 Core: Respect partition evolution in inspect.partitions 
(#2845)
b0a78781 is described below

commit b0a787814e409f53725b9c924a18b6d81c647a68
Author: Soham <[email protected]>
AuthorDate: Thu Dec 25 01:16:40 2025 +0530

    Core: Respect partition evolution in inspect.partitions (#2845)
    
    What does this change do?
    - Build the inspect.partitions schema using only the partition specs
    present in the selected snapshot’s manifests, so newer partition fields
    don’t appear as None for older snapshots.
    - Normalize partition comparisons in integration tests to ignore
    trailing None fields from dropped partition columns.
    
    Why is this needed?
    - Partition evolution should show the partition shape that actually
    existed when the data was written. Mixing all specs produced misleading
    None fields for older partitions.
    
    How was this tested?
    - make lint
    - uv run python -m pytest tests/io/test_pyarrow.py -k
    "partition_evolution" -v
    - CI: full integration suite (GitHub Actions)
    
    Closes #1120
    
    ---------
    
    Co-authored-by: Soham <[email protected]>
    Co-authored-by: Kevin Liu <[email protected]>
    Co-authored-by: Kevin Liu <[email protected]>
---
 pyiceberg/table/inspect.py              |  6 +++---
 pyiceberg/table/metadata.py             | 14 ++++++++++----
 tests/integration/test_inspect_table.py | 12 +++++++++++-
 tests/io/test_pyarrow.py                | 30 ++++++++++++++++++++++++++++++
 4 files changed, 54 insertions(+), 8 deletions(-)

diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py
index bfe2fffa..5da343cc 100644
--- a/pyiceberg/table/inspect.py
+++ b/pyiceberg/table/inspect.py
@@ -285,7 +285,9 @@ class InspectTable:
             ]
         )
 
-        partition_record = self.tbl.metadata.specs_struct()
+        snapshot = self._get_snapshot(snapshot_id)
+        spec_ids = {manifest.partition_spec_id for manifest in 
snapshot.manifests(self.tbl.io)}
+        partition_record = self.tbl.metadata.specs_struct(spec_ids=spec_ids)
         has_partitions = len(partition_record.fields) > 0
 
         if has_partitions:
@@ -299,8 +301,6 @@ class InspectTable:
 
             table_schema = pa.unify_schemas([partitions_schema, table_schema])
 
-        snapshot = self._get_snapshot(snapshot_id)
-
         scan = DataScan(
             table_metadata=self.tbl.metadata,
             io=self.tbl.io,
diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py
index 8ae93037..8a55f77b 100644
--- a/pyiceberg/table/metadata.py
+++ b/pyiceberg/table/metadata.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 import datetime
 import uuid
+from collections.abc import Iterable
 from copy import copy
 from typing import Annotated, Any, Literal
 
@@ -262,18 +263,23 @@ class TableMetadataCommonFields(IcebergBaseModel):
         """Return a dict the partition specs this table."""
         return {spec.spec_id: spec for spec in self.partition_specs}
 
-    def specs_struct(self) -> StructType:
-        """Produce a struct of all the combined PartitionSpecs.
+    def specs_struct(self, spec_ids: Iterable[int] | None = None) -> 
StructType:
+        """Produce a struct of the combined PartitionSpecs.
 
         The partition fields should be optional: Partition fields may be added 
later,
         in which case not all files would have the result field, and it may be 
null.
 
-        :return: A StructType that represents all the combined PartitionSpecs 
of the table
+        Args:
+            spec_ids: Optional iterable of spec IDs to include. When not 
provided,
+                all table specs are used.
+
+        :return: A StructType that represents the combined PartitionSpecs of 
the table
         """
         specs = self.specs()
+        selected_specs = specs.values() if spec_ids is None else 
[specs[spec_id] for spec_id in spec_ids if spec_id in specs]
 
         # Collect all the fields
-        struct_fields = {field.field_id: field for spec in specs.values() for 
field in spec.fields}
+        struct_fields = {field.field_id: field for spec in selected_specs for 
field in spec.fields}
 
         schema = self.schema()
 
diff --git a/tests/integration/test_inspect_table.py 
b/tests/integration/test_inspect_table.py
index 4add18cf..ea0cca9b 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -18,6 +18,7 @@
 
 import math
 from datetime import date, datetime
+from typing import Any
 
 import pyarrow as pa
 import pytest
@@ -208,9 +209,18 @@ def _inspect_files_asserts(df: pa.Table, spark_df: 
DataFrame) -> None:
 def _check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) -> 
None:
     lhs = df.to_pandas().sort_values("last_updated_at")
     rhs = spark_df.toPandas().sort_values("last_updated_at")
+
+    def _normalize_partition(d: dict[str, Any]) -> dict[str, Any]:
+        return {k: v for k, v in d.items() if v is not None}
+
     for column in df.column_names:
         for left, right in zip(lhs[column].to_list(), rhs[column].to_list(), 
strict=True):
-            assert left == right, f"Difference in column {column}: {left} != 
{right}"
+            if column == "partition":
+                assert _normalize_partition(left) == 
_normalize_partition(right), (
+                    f"Difference in column {column}: {left} != {right}"
+                )
+            else:
+                assert left == right, f"Difference in column {column}: {left} 
!= {right}"
 
 
 @pytest.mark.integration
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index ea2928ca..f1ed109d 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -2594,6 +2594,36 @@ def test_inspect_partition_for_nested_field(catalog: 
InMemoryCatalog) -> None:
     assert {part["part"] for part in partitions} == {"data-a", "data-b"}
 
 
+def test_inspect_partitions_respects_partition_evolution(catalog: 
InMemoryCatalog) -> None:
+    schema = Schema(
+        NestedField(id=1, name="dt", field_type=DateType(), required=False),
+        NestedField(id=2, name="category", field_type=StringType(), 
required=False),
+    )
+    spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, 
transform=IdentityTransform(), name="dt"))
+    catalog.create_namespace("default")
+    table = catalog.create_table(
+        "default.test_inspect_partitions_respects_partition_evolution", 
schema=schema, partition_spec=spec
+    )
+
+    old_spec_id = table.spec().spec_id
+    old_data = [{"dt": date(2025, 1, 1), "category": "old"}]
+    table.append(pa.Table.from_pylist(old_data, 
schema=table.schema().as_arrow()))
+
+    table.update_spec().add_identity("category").commit()
+    new_spec_id = table.spec().spec_id
+    assert new_spec_id != old_spec_id
+
+    partitions_table = table.inspect.partitions()
+    partitions = partitions_table["partition"].to_pylist()
+    assert all("category" not in partition for partition in partitions)
+
+    new_data = [{"dt": date(2025, 1, 2), "category": "new"}]
+    table.append(pa.Table.from_pylist(new_data, 
schema=table.schema().as_arrow()))
+
+    partitions_table = table.inspect.partitions()
+    assert set(partitions_table["spec_id"].to_pylist()) == {old_spec_id, 
new_spec_id}
+
+
 def test_identity_partition_on_multi_columns() -> None:
     test_pa_schema = pa.schema([("born_year", pa.int64()), ("n_legs", 
pa.int64()), ("animal", pa.string())])
     test_schema = Schema(

Reply via email to