This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new d69a1911 fix `upsert` with null values (#1861)
d69a1911 is described below
commit d69a19113ea537d16c34b60ab6e69c4285f933c0
Author: Kevin Liu <[email protected]>
AuthorDate: Mon Mar 31 13:32:56 2025 -0400
fix `upsert` with null values (#1861)
<!--
Thanks for opening a pull request!
-->
<!-- In the case this PR will resolve an issue, please replace
${GITHUB_ISSUE_ID} below with the actual Github issue id. -->
<!-- Closes #${GITHUB_ISSUE_ID} -->
# Rationale for this change
Closes #1835
Original implementation, `!=`
([not_equal](https://arrow.apache.org/docs/python/generated/pyarrow.compute.not_equal.html#pyarrow.compute.not_equal))
does not account for `null` values. It emits `null` when either sides
are `null`
The new implementation, first checks for `not_equal`. And on null
values, returns `true` only if both sides are `null`
Similar to https://github.com/apache/iceberg-rust/pull/1045
# Are these changes tested?
Yes
# Are there any user-facing changes?
No
<!-- In the case of user-facing changes, please add the changelog label.
-->
---
pyiceberg/table/upsert_util.py | 11 ++++++++++-
tests/table/test_upsert.py | 43 ++++++++++++++++++++++++++++++++++++++++++
2 files changed, 53 insertions(+), 1 deletion(-)
diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py
index e67f6c02..c2d554df 100644
--- a/pyiceberg/table/upsert_util.py
+++ b/pyiceberg/table/upsert_util.py
@@ -71,7 +71,16 @@ def get_rows_to_update(source_table: pa.Table, target_table:
pa.Table, join_cols
# When the target table is empty, there is nothing to update :)
return source_table.schema.empty_table()
- diff_expr = functools.reduce(operator.or_, [pc.field(f"{col}-lhs") !=
pc.field(f"{col}-rhs") for col in non_key_cols])
+ diff_expr = functools.reduce(
+ operator.or_,
+ [
+ pc.or_kleene(
+ pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs")),
+ pc.is_null(pc.not_equal(pc.field(f"{col}-lhs"),
pc.field(f"{col}-rhs"))),
+ )
+ for col in non_key_cols
+ ],
+ )
return (
source_table
diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py
index 19bfbc01..5de4a611 100644
--- a/tests/table/test_upsert.py
+++ b/tests/table/test_upsert.py
@@ -509,3 +509,46 @@ def test_upsert_without_identifier_fields(catalog:
Catalog) -> None:
ValueError, match="Join columns could not be found, please set
identifier-field-ids or pass in explicitly."
):
tbl.upsert(df)
+
+
+def test_upsert_with_nulls(catalog: Catalog) -> None:
+ identifier = "default.test_upsert_with_nulls"
+ _drop_table(catalog, identifier)
+
+ schema = pa.schema(
+ [
+ ("foo", pa.string()),
+ ("bar", pa.int32()),
+ ("baz", pa.bool_()),
+ ]
+ )
+
+ # create table with null value
+ table = catalog.create_table(identifier, schema)
+ data_with_null = pa.Table.from_pylist(
+ [
+ {"foo": "apple", "bar": None, "baz": False},
+ {"foo": "banana", "bar": None, "baz": False},
+ ],
+ schema=schema,
+ )
+ table.append(data_with_null)
+ assert table.scan().to_arrow()["bar"].is_null()
+
+ # upsert table with non-null value
+ data_without_null = pa.Table.from_pylist(
+ [
+ {"foo": "apple", "bar": 7, "baz": False},
+ ],
+ schema=schema,
+ )
+ upd = table.upsert(data_without_null, join_cols=["foo"])
+ assert upd.rows_updated == 1
+ assert upd.rows_inserted == 0
+ assert table.scan().to_arrow() == pa.Table.from_pylist(
+ [
+ {"foo": "apple", "bar": 7, "baz": False},
+ {"foo": "banana", "bar": None, "baz": False},
+ ],
+ schema=schema,
+ )