mattmartin14 commented on code in PR #1534:
URL: https://github.com/apache/iceberg-python/pull/1534#discussion_r1945220594
##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1066,97 @@ def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
return self.metadata.name_mapping()
+ @dataclass(frozen=True)
+ class UpsertResult:
+ """Summary the upsert operation"""
+ rows_updated: int = 0
+ rows_inserted: int = 0
+
+ def upsert(self, df: pa.Table, join_cols: list
+ , when_matched_update_all: bool = True
+ , when_not_matched_insert_all: bool = True
+ ) -> UpsertResult:
+ """
+ Shorthand API for performing an upsert to an iceberg table.
+
+ Args:
+ self: the target Iceberg table to execute the upsert on
+ df: The input dataframe to upsert with the table's data.
+ join_cols: The columns to join on. These are essentially analogous
to primary keys
+ when_matched_update_all: Bool indicating to update rows that are
matched but require an update due to a value in a non-key column changing
+ when_not_matched_insert_all: Bool indicating new rows to be
inserted that do not match any existing rows in the table
+
+ Example Use Cases:
+ Case 1: Both Parameters = True (Full Upsert)
+ Existing row found → Update it
+ New row found → Insert it
+
+ Case 2: when_matched_update_all = False,
when_not_matched_insert_all = True
+ Existing row found → Do nothing (no updates)
+ New row found → Insert it
+
+ Case 3: when_matched_update_all = True,
when_not_matched_insert_all = False
+ Existing row found → Update it
+ New row found → Do nothing (no inserts)
+
+ Case 4: Both Parameters = False (No Merge Effect)
+ Existing row found → Do nothing
+ New row found → Do nothing
+ (Function effectively does nothing)
+
+
+ Returns: a UpsertResult class (contains details of rows updated and
inserted)
+ """
+
+ from pyiceberg.table import upsert_util
+
+ if when_matched_update_all == False and when_not_matched_insert_all ==
False:
+ raise Exception('no upsert options selected...exiting')
+
+ if upsert_util.has_duplicate_rows(df, join_cols):
+
+ raise Exception('Duplicate rows found in source dataset based on
the key columns. No upsert executed')
+
+ #get list of rows that exist so we don't have to load the entire
target table
+ matched_predicate = upsert_util.create_match_filter(df, join_cols)
+ matched_iceberg_table =
self.scan(row_filter=matched_predicate).to_arrow()
+
+ update_row_cnt = 0
+ insert_row_cnt = 0
+
+ try:
+
+ with self.transaction() as txn:
+
+ if when_matched_update_all:
+
+ #function get_rows_to_update is doing a check on non-key
columns to see if any of the values have actually changed
+ rows_to_update = upsert_util.get_rows_to_update(df,
matched_iceberg_table, join_cols)
Review Comment:
@Fokko - function get_rows_to_update is slightly different than
create_match_filter; get_rows_to_update also evaulates if any non-key columns
from both tables are different. for example, if we had these 2 tables:
df
id | val
a | abc
b | edf
and iceberg table
id | val
a | abc
b | hij
and ran get_rows_to_update, it would only return row (b, edf) since val
"edf" differs from "hij" in the target.
Function create_match_filter would return both rows, but really, we only
want to update what has actually changed. Does that make sense?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]