tscottcoombes1 commented on code in PR #1534:
URL: https://github.com/apache/iceberg-python/pull/1534#discussion_r1941734610
##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,115 @@ def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
return self.metadata.name_mapping()
+ @dataclass
+ class MergeResult:
+ """docstring"""
+ rows_updated: int
+ rows_inserted: int
+ info_msgs: str
+ error_msgs: str
+
+ def merge_rows(self, df: pa.Table, join_cols: list
+ , when_matched_update_all: bool = True
+ , when_not_matched_insert_all: bool = True
+ ) -> MergeResult:
+ """
+ Shorthand API for performing an upsert/merge to an iceberg table.
+
+ Args:
+ df: The input dataframe to merge with the table's data.
+ join_cols: The columns to join on.
+ when_matched_update_all: Bool indicating to update rows that are
matched but require an update due to a value in a non-key column changing
+ when_not_matched_insert_all: Bool indicating new rows to be
inserted that do not match any existing rows in the table
+
+ Returns:
+ A dictionary containing the number of rows updated and inserted.
+ """
+
+ from pyiceberg.table import merge_rows_util
+
+ try:
+ from datafusion import SessionContext
+ except ModuleNotFoundError as e:
+ raise ModuleNotFoundError("For merge_rows, DataFusion needs to be
installed") from e
+
+ try:
+ from pyarrow import dataset as ds
+ except ModuleNotFoundError as e:
+ raise ModuleNotFoundError("For merge_rows, PyArrow needs to be
installed") from e
+
+ source_table_name = "source"
+ target_table_name = "target"
+
+ if when_matched_update_all == False and when_not_matched_insert_all ==
False:
+ return {'rows_updated': 0, 'rows_inserted': 0, 'info_msgs': 'no
merge options selected...exiting'}
+
+ missing_columns = merge_rows_util.do_join_columns_exist(df, self,
join_cols)
+
+ if missing_columns['source'] or missing_columns['target']:
+
+ return {'error_msgs': f"Join columns missing in tables: Source
table columns missing: {missing_columns['source']}, Target table columns
missing: {missing_columns['target']}"}
+
+ if merge_rows_util.dups_check_in_source(df, join_cols):
+
+ return {'error_msgs': 'Duplicate rows found in source dataset
based on the key columns. No Merge executed'}
+
+
+ source_col_list = merge_rows_util.get_table_column_list_pa(df)
+ target_col_list = merge_rows_util.get_table_column_list_iceberg(self)
Review Comment:
re: the sets, you basically don't need these two functions
```suggestion
source_col_list = df.column_names
target_col_list = self.schema().column_names
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]