This is an automated email from the ASF dual-hosted git repository. fokko pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push: new a626bc2c Update schema projection to support `initial-defaults` (#1644) a626bc2c is described below commit a626bc2c242bb99d1613594ee59d56b634fb1aa4 Author: Fokko Driesprong <fo...@apache.org> AuthorDate: Tue Jul 8 08:54:37 2025 +0200 Update schema projection to support `initial-defaults` (#1644) Add the projection piece of the initial defaults. Closes #1836 --------- Co-authored-by: Kevin Liu <kevinjq...@users.noreply.github.com> --- pyiceberg/expressions/visitors.py | 23 ++++++++++++++++++----- pyiceberg/io/pyarrow.py | 10 +++++++--- tests/integration/test_reads.py | 29 +++++++++++++++++++++++++++++ tests/io/test_pyarrow.py | 11 +++++++++++ 4 files changed, 65 insertions(+), 8 deletions(-) diff --git a/pyiceberg/expressions/visitors.py b/pyiceberg/expressions/visitors.py index abac19bc..26241d23 100644 --- a/pyiceberg/expressions/visitors.py +++ b/pyiceberg/expressions/visitors.py @@ -893,15 +893,28 @@ class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]): raise TypeError(f"Expected Bound Predicate, got: {predicate.term}") def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpression: - file_column_name = self.file_schema.find_column_name(predicate.term.ref().field.field_id) + field = predicate.term.ref().field + file_column_name = self.file_schema.find_column_name(field.field_id) if file_column_name is None: # In the case of schema evolution, the column might not be present - # in the file schema when reading older data - if isinstance(predicate, BoundIsNull): - return AlwaysTrue() + # we can use the default value as a constant and evaluate it against + # the predicate + pred: BooleanExpression + if isinstance(predicate, BoundUnaryPredicate): + pred = predicate.as_unbound(field.name) + elif isinstance(predicate, BoundLiteralPredicate): + pred = predicate.as_unbound(field.name, predicate.literal) + elif isinstance(predicate, BoundSetPredicate): + pred = predicate.as_unbound(field.name, predicate.literals) else: - return AlwaysFalse() + raise ValueError(f"Unsupported predicate: {predicate}") + + return ( + AlwaysTrue() + if expression_evaluator(Schema(field), pred, case_sensitive=self.case_sensitive)(Record(field.initial_default)) + else AlwaysFalse() + ) if isinstance(predicate, BoundUnaryPredicate): return predicate.as_unbound(file_column_name) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 3e49885e..f6dacc16 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1814,9 +1814,13 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra array = self._cast_if_needed(field, field_array) field_arrays.append(array) fields.append(self._construct_field(field, array.type)) - elif field.optional: + elif field.optional or field.initial_default is not None: + # When an optional field is added, or when a required field with a non-null initial default is added arrow_type = schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids) - field_arrays.append(pa.nulls(len(struct_array), type=arrow_type)) + if field.initial_default is None: + field_arrays.append(pa.nulls(len(struct_array), type=arrow_type)) + else: + field_arrays.append(pa.repeat(field.initial_default, len(struct_array))) fields.append(self._construct_field(field, arrow_type)) else: raise ResolveError(f"Field is required, and could not be found in the file: {field}") @@ -2249,7 +2253,7 @@ def parquet_path_to_id_mapping( Compute the mapping of parquet column path to Iceberg ID. For each column, the parquet file metadata has a path_in_schema attribute that follows - a specific naming scheme for nested columnds. This function computes a mapping of + a specific naming scheme for nested columns. This function computes a mapping of the full paths to the corresponding Iceberg IDs. Args: diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index b417a436..99fac9c3 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -29,6 +29,7 @@ import pytest from hive_metastore.ttypes import LockRequest, LockResponse, LockState, UnlockRequest from pyarrow.fs import S3FileSystem from pydantic_core import ValidationError +from pyspark.sql import SparkSession from pyiceberg.catalog import Catalog from pyiceberg.catalog.hive import HiveCatalog, _HiveClient @@ -1024,3 +1025,31 @@ def test_scan_with_datetime(catalog: Catalog) -> None: df = table.scan(row_filter=LessThan("datetime", yesterday)).to_pandas() assert len(df) == 0 + + +@pytest.mark.integration +# TODO: For Hive we require writing V3 +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) +def test_initial_default(catalog: Catalog, spark: SparkSession) -> None: + identifier = "default.test_initial_default" + try: + catalog.drop_table(identifier) + except NoSuchTableError: + pass + + one_column = pa.table([pa.nulls(10, pa.int32())], names=["some_field"]) + + tbl = catalog.create_table(identifier, schema=one_column.schema, properties={"format-version": "2"}) + + tbl.append(one_column) + + # Do the bump version through Spark, since PyIceberg does not support this (yet) + spark.sql(f"ALTER TABLE {identifier} SET TBLPROPERTIES('format-version'='3')") + + with tbl.update_schema() as upd: + upd.add_column("so_true", BooleanType(), required=False, default_value=True) + + result_table = tbl.scan().filter("so_true == True").to_arrow() + + assert len(result_table) == 10 diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 92494455..db4f04de 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2398,6 +2398,17 @@ def test_identity_partition_on_multi_columns() -> None: ) == arrow_table.sort_by([("born_year", "ascending"), ("n_legs", "ascending"), ("animal", "ascending")]) +def test_initial_value() -> None: + # Have some fake data, otherwise it will generate a table without records + data = pa.record_batch([pa.nulls(10, pa.int64())], names=["some_field"]) + result = _to_requested_schema( + Schema(NestedField(1, "we-love-22", LongType(), required=True, initial_default=22)), Schema(), data + ) + assert result.column_names == ["we-love-22"] + for val in result[0]: + assert val.as_py() == 22 + + def test__to_requested_schema_timestamps( arrow_table_schema_with_all_timestamp_precisions: pa.Schema, arrow_table_with_all_timestamp_precisions: pa.Table,