This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new a626bc2c Update schema projection to support `initial-defaults` (#1644)
a626bc2c is described below

commit a626bc2c242bb99d1613594ee59d56b634fb1aa4
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Tue Jul 8 08:54:37 2025 +0200

    Update schema projection to support `initial-defaults` (#1644)
    
    Add the projection piece of the initial defaults.
    
    Closes #1836
    
    ---------
    
    Co-authored-by: Kevin Liu <kevinjq...@users.noreply.github.com>
---
 pyiceberg/expressions/visitors.py | 23 ++++++++++++++++++-----
 pyiceberg/io/pyarrow.py           | 10 +++++++---
 tests/integration/test_reads.py   | 29 +++++++++++++++++++++++++++++
 tests/io/test_pyarrow.py          | 11 +++++++++++
 4 files changed, 65 insertions(+), 8 deletions(-)

diff --git a/pyiceberg/expressions/visitors.py 
b/pyiceberg/expressions/visitors.py
index abac19bc..26241d23 100644
--- a/pyiceberg/expressions/visitors.py
+++ b/pyiceberg/expressions/visitors.py
@@ -893,15 +893,28 @@ class 
_ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]):
         raise TypeError(f"Expected Bound Predicate, got: {predicate.term}")
 
     def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> 
BooleanExpression:
-        file_column_name = 
self.file_schema.find_column_name(predicate.term.ref().field.field_id)
+        field = predicate.term.ref().field
+        file_column_name = self.file_schema.find_column_name(field.field_id)
 
         if file_column_name is None:
             # In the case of schema evolution, the column might not be present
-            # in the file schema when reading older data
-            if isinstance(predicate, BoundIsNull):
-                return AlwaysTrue()
+            # we can use the default value as a constant and evaluate it 
against
+            # the predicate
+            pred: BooleanExpression
+            if isinstance(predicate, BoundUnaryPredicate):
+                pred = predicate.as_unbound(field.name)
+            elif isinstance(predicate, BoundLiteralPredicate):
+                pred = predicate.as_unbound(field.name, predicate.literal)
+            elif isinstance(predicate, BoundSetPredicate):
+                pred = predicate.as_unbound(field.name, predicate.literals)
             else:
-                return AlwaysFalse()
+                raise ValueError(f"Unsupported predicate: {predicate}")
+
+            return (
+                AlwaysTrue()
+                if expression_evaluator(Schema(field), pred, 
case_sensitive=self.case_sensitive)(Record(field.initial_default))
+                else AlwaysFalse()
+            )
 
         if isinstance(predicate, BoundUnaryPredicate):
             return predicate.as_unbound(file_column_name)
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 3e49885e..f6dacc16 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -1814,9 +1814,13 @@ class 
ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
                 array = self._cast_if_needed(field, field_array)
                 field_arrays.append(array)
                 fields.append(self._construct_field(field, array.type))
-            elif field.optional:
+            elif field.optional or field.initial_default is not None:
+                # When an optional field is added, or when a required field 
with a non-null initial default is added
                 arrow_type = schema_to_pyarrow(field.field_type, 
include_field_ids=self._include_field_ids)
-                field_arrays.append(pa.nulls(len(struct_array), 
type=arrow_type))
+                if field.initial_default is None:
+                    field_arrays.append(pa.nulls(len(struct_array), 
type=arrow_type))
+                else:
+                    field_arrays.append(pa.repeat(field.initial_default, 
len(struct_array)))
                 fields.append(self._construct_field(field, arrow_type))
             else:
                 raise ResolveError(f"Field is required, and could not be found 
in the file: {field}")
@@ -2249,7 +2253,7 @@ def parquet_path_to_id_mapping(
     Compute the mapping of parquet column path to Iceberg ID.
 
     For each column, the parquet file metadata has a path_in_schema attribute 
that follows
-    a specific naming scheme for nested columnds. This function computes a 
mapping of
+    a specific naming scheme for nested columns. This function computes a 
mapping of
     the full paths to the corresponding Iceberg IDs.
 
     Args:
diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py
index b417a436..99fac9c3 100644
--- a/tests/integration/test_reads.py
+++ b/tests/integration/test_reads.py
@@ -29,6 +29,7 @@ import pytest
 from hive_metastore.ttypes import LockRequest, LockResponse, LockState, 
UnlockRequest
 from pyarrow.fs import S3FileSystem
 from pydantic_core import ValidationError
+from pyspark.sql import SparkSession
 
 from pyiceberg.catalog import Catalog
 from pyiceberg.catalog.hive import HiveCatalog, _HiveClient
@@ -1024,3 +1025,31 @@ def test_scan_with_datetime(catalog: Catalog) -> None:
 
     df = table.scan(row_filter=LessThan("datetime", yesterday)).to_pandas()
     assert len(df) == 0
+
+
+@pytest.mark.integration
+# TODO: For Hive we require writing V3
+# @pytest.mark.parametrize("catalog", 
[pytest.lazy_fixture("session_catalog_hive"), 
pytest.lazy_fixture("session_catalog")])
+@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")])
+def test_initial_default(catalog: Catalog, spark: SparkSession) -> None:
+    identifier = "default.test_initial_default"
+    try:
+        catalog.drop_table(identifier)
+    except NoSuchTableError:
+        pass
+
+    one_column = pa.table([pa.nulls(10, pa.int32())], names=["some_field"])
+
+    tbl = catalog.create_table(identifier, schema=one_column.schema, 
properties={"format-version": "2"})
+
+    tbl.append(one_column)
+
+    # Do the bump version through Spark, since PyIceberg does not support this 
(yet)
+    spark.sql(f"ALTER TABLE {identifier} SET 
TBLPROPERTIES('format-version'='3')")
+
+    with tbl.update_schema() as upd:
+        upd.add_column("so_true", BooleanType(), required=False, 
default_value=True)
+
+    result_table = tbl.scan().filter("so_true == True").to_arrow()
+
+    assert len(result_table) == 10
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index 92494455..db4f04de 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -2398,6 +2398,17 @@ def test_identity_partition_on_multi_columns() -> None:
         ) == arrow_table.sort_by([("born_year", "ascending"), ("n_legs", 
"ascending"), ("animal", "ascending")])
 
 
+def test_initial_value() -> None:
+    # Have some fake data, otherwise it will generate a table without records
+    data = pa.record_batch([pa.nulls(10, pa.int64())], names=["some_field"])
+    result = _to_requested_schema(
+        Schema(NestedField(1, "we-love-22", LongType(), required=True, 
initial_default=22)), Schema(), data
+    )
+    assert result.column_names == ["we-love-22"]
+    for val in result[0]:
+        assert val.as_py() == 22
+
+
 def test__to_requested_schema_timestamps(
     arrow_table_schema_with_all_timestamp_precisions: pa.Schema,
     arrow_table_with_all_timestamp_precisions: pa.Table,

Reply via email to