This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new b0a78781 Core: Respect partition evolution in inspect.partitions
(#2845)
b0a78781 is described below
commit b0a787814e409f53725b9c924a18b6d81c647a68
Author: Soham <[email protected]>
AuthorDate: Thu Dec 25 01:16:40 2025 +0530
Core: Respect partition evolution in inspect.partitions (#2845)
What does this change do?
- Build the inspect.partitions schema using only the partition specs
present in the selected snapshot’s manifests, so newer partition fields
don’t appear as None for older snapshots.
- Normalize partition comparisons in integration tests to ignore
trailing None fields from dropped partition columns.
Why is this needed?
- Partition evolution should show the partition shape that actually
existed when the data was written. Mixing all specs produced misleading
None fields for older partitions.
How was this tested?
- make lint
- uv run python -m pytest tests/io/test_pyarrow.py -k
"partition_evolution" -v
- CI: full integration suite (GitHub Actions)
Closes #1120
---------
Co-authored-by: Soham <[email protected]>
Co-authored-by: Kevin Liu <[email protected]>
Co-authored-by: Kevin Liu <[email protected]>
---
pyiceberg/table/inspect.py | 6 +++---
pyiceberg/table/metadata.py | 14 ++++++++++----
tests/integration/test_inspect_table.py | 12 +++++++++++-
tests/io/test_pyarrow.py | 30 ++++++++++++++++++++++++++++++
4 files changed, 54 insertions(+), 8 deletions(-)
diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py
index bfe2fffa..5da343cc 100644
--- a/pyiceberg/table/inspect.py
+++ b/pyiceberg/table/inspect.py
@@ -285,7 +285,9 @@ class InspectTable:
]
)
- partition_record = self.tbl.metadata.specs_struct()
+ snapshot = self._get_snapshot(snapshot_id)
+ spec_ids = {manifest.partition_spec_id for manifest in
snapshot.manifests(self.tbl.io)}
+ partition_record = self.tbl.metadata.specs_struct(spec_ids=spec_ids)
has_partitions = len(partition_record.fields) > 0
if has_partitions:
@@ -299,8 +301,6 @@ class InspectTable:
table_schema = pa.unify_schemas([partitions_schema, table_schema])
- snapshot = self._get_snapshot(snapshot_id)
-
scan = DataScan(
table_metadata=self.tbl.metadata,
io=self.tbl.io,
diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py
index 8ae93037..8a55f77b 100644
--- a/pyiceberg/table/metadata.py
+++ b/pyiceberg/table/metadata.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import datetime
import uuid
+from collections.abc import Iterable
from copy import copy
from typing import Annotated, Any, Literal
@@ -262,18 +263,23 @@ class TableMetadataCommonFields(IcebergBaseModel):
"""Return a dict the partition specs this table."""
return {spec.spec_id: spec for spec in self.partition_specs}
- def specs_struct(self) -> StructType:
- """Produce a struct of all the combined PartitionSpecs.
+ def specs_struct(self, spec_ids: Iterable[int] | None = None) ->
StructType:
+ """Produce a struct of the combined PartitionSpecs.
The partition fields should be optional: Partition fields may be added
later,
in which case not all files would have the result field, and it may be
null.
- :return: A StructType that represents all the combined PartitionSpecs
of the table
+ Args:
+ spec_ids: Optional iterable of spec IDs to include. When not
provided,
+ all table specs are used.
+
+ :return: A StructType that represents the combined PartitionSpecs of
the table
"""
specs = self.specs()
+ selected_specs = specs.values() if spec_ids is None else
[specs[spec_id] for spec_id in spec_ids if spec_id in specs]
# Collect all the fields
- struct_fields = {field.field_id: field for spec in specs.values() for
field in spec.fields}
+ struct_fields = {field.field_id: field for spec in selected_specs for
field in spec.fields}
schema = self.schema()
diff --git a/tests/integration/test_inspect_table.py
b/tests/integration/test_inspect_table.py
index 4add18cf..ea0cca9b 100644
--- a/tests/integration/test_inspect_table.py
+++ b/tests/integration/test_inspect_table.py
@@ -18,6 +18,7 @@
import math
from datetime import date, datetime
+from typing import Any
import pyarrow as pa
import pytest
@@ -208,9 +209,18 @@ def _inspect_files_asserts(df: pa.Table, spark_df:
DataFrame) -> None:
def _check_pyiceberg_df_equals_spark_df(df: pa.Table, spark_df: DataFrame) ->
None:
lhs = df.to_pandas().sort_values("last_updated_at")
rhs = spark_df.toPandas().sort_values("last_updated_at")
+
+ def _normalize_partition(d: dict[str, Any]) -> dict[str, Any]:
+ return {k: v for k, v in d.items() if v is not None}
+
for column in df.column_names:
for left, right in zip(lhs[column].to_list(), rhs[column].to_list(),
strict=True):
- assert left == right, f"Difference in column {column}: {left} !=
{right}"
+ if column == "partition":
+ assert _normalize_partition(left) ==
_normalize_partition(right), (
+ f"Difference in column {column}: {left} != {right}"
+ )
+ else:
+ assert left == right, f"Difference in column {column}: {left}
!= {right}"
@pytest.mark.integration
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index ea2928ca..f1ed109d 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -2594,6 +2594,36 @@ def test_inspect_partition_for_nested_field(catalog:
InMemoryCatalog) -> None:
assert {part["part"] for part in partitions} == {"data-a", "data-b"}
+def test_inspect_partitions_respects_partition_evolution(catalog:
InMemoryCatalog) -> None:
+ schema = Schema(
+ NestedField(id=1, name="dt", field_type=DateType(), required=False),
+ NestedField(id=2, name="category", field_type=StringType(),
required=False),
+ )
+ spec = PartitionSpec(PartitionField(source_id=1, field_id=1000,
transform=IdentityTransform(), name="dt"))
+ catalog.create_namespace("default")
+ table = catalog.create_table(
+ "default.test_inspect_partitions_respects_partition_evolution",
schema=schema, partition_spec=spec
+ )
+
+ old_spec_id = table.spec().spec_id
+ old_data = [{"dt": date(2025, 1, 1), "category": "old"}]
+ table.append(pa.Table.from_pylist(old_data,
schema=table.schema().as_arrow()))
+
+ table.update_spec().add_identity("category").commit()
+ new_spec_id = table.spec().spec_id
+ assert new_spec_id != old_spec_id
+
+ partitions_table = table.inspect.partitions()
+ partitions = partitions_table["partition"].to_pylist()
+ assert all("category" not in partition for partition in partitions)
+
+ new_data = [{"dt": date(2025, 1, 2), "category": "new"}]
+ table.append(pa.Table.from_pylist(new_data,
schema=table.schema().as_arrow()))
+
+ partitions_table = table.inspect.partitions()
+ assert set(partitions_table["spec_id"].to_pylist()) == {old_spec_id,
new_spec_id}
+
+
def test_identity_partition_on_multi_columns() -> None:
test_pa_schema = pa.schema([("born_year", pa.int64()), ("n_legs",
pa.int64()), ("animal", pa.string())])
test_schema = Schema(