This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 8ed913bc Read partitioned tables with source field missing (#2367)
8ed913bc is described below
commit 8ed913bc48b19460c47746273b4678e7d2e6af70
Author: Gabriel Igliozzi <[email protected]>
AuthorDate: Thu Dec 4 16:53:56 2025 +0100
Read partitioned tables with source field missing (#2367)
<!--
Thanks for opening a pull request!
-->
<!-- In the case this PR will resolve an issue, please replace
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
<!-- Closes #${GITHUB_ISSUE_ID} -->
# Rationale for this change
Following with the Java
[solution](https://github.com/apache/iceberg/pull/11868/files)
implementation on how to read partition specs when a source field was
dropped.
# Are these changes tested?
Yes, added one integration tests, and one unit test
# Are there any user-facing changes?
No
<!-- In the case of user-facing changes, please add the changelog label.
-->
---
pyiceberg/partitioning.py | 12 ++++++++----
tests/integration/test_reads.py | 16 ++++++++++++++++
tests/table/test_partitioning.py | 36 ++++++++++++++++++++++++++++++++++++
3 files changed, 60 insertions(+), 4 deletions(-)
diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py
index 45d0dfd2..8bf2b817 100644
--- a/pyiceberg/partitioning.py
+++ b/pyiceberg/partitioning.py
@@ -56,6 +56,7 @@ from pyiceberg.types import (
TimestampType,
TimestamptzType,
TimeType,
+ UnknownType,
UUIDType,
)
from pyiceberg.utils.datetime import date_to_days, datetime_to_micros,
time_to_micros
@@ -222,11 +223,14 @@ class PartitionSpec(IcebergBaseModel):
:return: A StructType that represents the PartitionSpec, with a
NestedField for each PartitionField.
"""
nested_fields = []
+ schema_ids = schema._lazy_id_to_field
for field in self.fields:
- source_type = schema.find_type(field.source_id)
- result_type = field.transform.result_type(source_type)
- required = schema.find_field(field.source_id).required
- nested_fields.append(NestedField(field.field_id, field.name,
result_type, required=required))
+ if source_field := schema_ids.get(field.source_id):
+ result_type =
field.transform.result_type(source_field.field_type)
+ nested_fields.append(NestedField(field.field_id, field.name,
result_type, required=source_field.required))
+ else:
+ # Since the source field has been drop we cannot determine the
type
+ nested_fields.append(NestedField(field.field_id, field.name,
UnknownType()))
return StructType(*nested_fields)
def partition_to_path(self, data: Record, schema: Schema) -> str:
diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py
index 99116ad1..785037ae 100644
--- a/tests/integration/test_reads.py
+++ b/tests/integration/test_reads.py
@@ -1083,3 +1083,19 @@ def test_filter_after_arrow_scan(catalog: Catalog) ->
None:
scan = scan.filter("ts >= '2023-03-05T00:00:00+00:00'")
assert len(scan.to_arrow()) > 0
+
+
[email protected]
[email protected]("catalog", [pytest.lazy_fixture("session_catalog")])
+def test_scan_source_field_missing_in_spec(catalog: Catalog, spark:
SparkSession) -> None:
+ identifier = "default.test_dropped_field"
+ spark.sql(f"DROP TABLE IF EXISTS {identifier}")
+ spark.sql(f"CREATE TABLE {identifier} (foo int, bar int, jaz string) USING
ICEBERG PARTITIONED BY (foo, bar)")
+ spark.sql(
+ f"INSERT INTO {identifier} (foo, bar, jaz) VALUES (1, 1, 'dummy
data'), (1, 2, 'dummy data again'), (2, 1, 'another partition')"
+ )
+ spark.sql(f"ALTER TABLE {identifier} DROP PARTITION FIELD foo")
+ spark.sql(f"ALTER TABLE {identifier} DROP COLUMN foo")
+
+ table = catalog.load_table(identifier)
+ assert len(list(table.scan().plan_files())) == 3
diff --git a/tests/table/test_partitioning.py b/tests/table/test_partitioning.py
index 0fe22391..576297c6 100644
--- a/tests/table/test_partitioning.py
+++ b/tests/table/test_partitioning.py
@@ -47,6 +47,7 @@ from pyiceberg.types import (
TimestampType,
TimestamptzType,
TimeType,
+ UnknownType,
UUIDType,
)
@@ -165,6 +166,28 @@ def test_partition_spec_to_path() -> None:
assert spec.partition_to_path(record, schema) ==
"my%23str%25bucket=my%2Bstr/other+str%2Bbucket=%28+%29/my%21int%3Abucket=10"
+def test_partition_spec_to_path_dropped_source_id() -> None:
+ schema = Schema(
+ NestedField(field_id=1, name="str", field_type=StringType(),
required=False),
+ NestedField(field_id=2, name="other_str", field_type=StringType(),
required=False),
+ NestedField(field_id=3, name="int", field_type=IntegerType(),
required=True),
+ )
+
+ spec = PartitionSpec(
+ PartitionField(source_id=1, field_id=1000,
transform=TruncateTransform(width=19), name="my#str%bucket"),
+ PartitionField(source_id=2, field_id=1001,
transform=IdentityTransform(), name="other str+bucket"),
+ # Point partition field to missing source id
+ PartitionField(source_id=4, field_id=1002,
transform=BucketTransform(num_buckets=25), name="my!int:bucket"),
+ spec_id=3,
+ )
+
+ record = Record("my+str", "( )", 10)
+
+ # Both partition field names and values should be URL encoded, with spaces
mapping to plus signs, to match the Java
+ # behaviour:
https://github.com/apache/iceberg/blob/ca3db931b0f024f0412084751ac85dd4ef2da7e7/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L198-L204
+ assert spec.partition_to_path(record, schema) ==
"my%23str%25bucket=my%2Bstr/other+str%2Bbucket=%28+%29/my%21int%3Abucket=10"
+
+
def test_partition_type(table_schema_simple: Schema) -> None:
spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000,
transform=TruncateTransform(width=19), name="str_truncate"),
@@ -178,6 +201,19 @@ def test_partition_type(table_schema_simple: Schema) ->
None:
)
+def test_partition_type_missing_source_field(table_schema_simple: Schema) ->
None:
+ spec = PartitionSpec(
+ PartitionField(source_id=1, field_id=1000,
transform=TruncateTransform(width=19), name="str_truncate"),
+ PartitionField(source_id=10, field_id=1001,
transform=BucketTransform(num_buckets=25), name="int_bucket"),
+ spec_id=3,
+ )
+
+ assert spec.partition_type(table_schema_simple) == StructType(
+ NestedField(field_id=1000, name="str_truncate",
field_type=StringType(), required=False),
+ NestedField(field_id=1001, name="int_bucket",
field_type=UnknownType(), required=False),
+ )
+
+
@pytest.mark.parametrize(
"source_type, value",
[