Fokko commented on code in PR #1534:
URL: https://github.com/apache/iceberg-python/pull/1534#discussion_r1932123622
##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1064,125 @@ def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
return self.metadata.name_mapping()
+ def merge_rows(self, df: pa.Table, join_cols: list
+ ,merge_options: dict = {'when_matched_update_all': True,
'when_not_matched_insert_all': True}
Review Comment:
How about just expanding this into arguments?
```suggestion
def merge_rows(self, df: pa.Table, join_cols: list,
when_matched_update_all: True, when_not_matched_insert_all: True
```
##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1064,125 @@ def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
return self.metadata.name_mapping()
+ def merge_rows(self, df: pa.Table, join_cols: list
+ ,merge_options: dict = {'when_matched_update_all': True,
'when_not_matched_insert_all': True}
+ ) -> Dict:
+ """
+ Shorthand API for performing an upsert/merge to an iceberg table.
+
+ Args:
+ df: The input dataframe to merge with the table's data.
+ join_cols: The columns to join on.
+ merge_options: A dictionary of merge actions to perform. Currently
supports these predicates >
+ when_matched_update_all: default is True
+ when_not_matched_insert_all: default is True
+
+ Returns:
+ A dictionary containing the number of rows updated and inserted.
+ """
+
+ from pyiceberg.table import merge_rows_util
+
+ try:
+ from datafusion import SessionContext
+ except ModuleNotFoundError as e:
+ raise ModuleNotFoundError("For merge_rows, DataFusion needs to be
installed") from e
+
+ try:
+ from pyarrow import dataset as ds
+ except ModuleNotFoundError as e:
+ raise ModuleNotFoundError("For merge_rows, PyArrow needs to be
installed") from e
+
+ source_table_name = "source"
+ target_table_name = "target"
+
+ if merge_options is None or merge_options == {}:
+ merge_options = {'when_matched_update_all': True,
'when_not_matched_insert_all': True}
+
+ when_matched_update_all = merge_options.get('when_matched_update_all',
False)
+ when_not_matched_insert_all =
merge_options.get('when_not_matched_insert_all', False)
+
+ if when_matched_update_all == False and when_not_matched_insert_all ==
False:
+ return {'rows_updated': 0, 'rows_inserted': 0, 'msg': 'no merge
options selected...exiting'}
+
+ ctx = SessionContext()
+
+ #register both source and target tables so we can find the deltas to
update/append
+ ctx.register_dataset(source_table_name, ds.dataset(df))
+ ctx.register_dataset(target_table_name,
ds.dataset(self.scan().to_arrow()))
+
+ source_col_list = merge_rows_util.get_table_column_list(ctx,
source_table_name)
+ target_col_list = merge_rows_util.get_table_column_list(ctx,
target_table_name)
+
+ source_col_names = set([col[0] for col in source_col_list])
+ target_col_names = set([col[0] for col in target_col_list])
+
+ source_col_types = {col[0]: col[1] for col in source_col_list}
+
+ missing_columns =
merge_rows_util.do_join_columns_exist(source_col_names, target_col_names,
join_cols)
+
+ if missing_columns['source'] or missing_columns['target']:
+
+ return {'error_msgs': f"Join columns missing in tables: Source
table columns missing: {missing_columns['source']}, Target table columns
missing: {missing_columns['target']}"}
+
+ #check for dups on source
+ if merge_rows_util.dups_check_in_source(source_table_name, join_cols,
ctx):
+
+ return {'error_msgs': 'Duplicate rows found in source dataset
based on the key columns. No Merge executed'}
+
+ update_row_cnt = 0
+ insert_row_cnt = 0
+
+ txn = self.transaction()
+
+ try:
+
+ if when_matched_update_all:
+
+ update_recs_sql =
merge_rows_util.get_rows_to_update_sql(source_table_name, target_table_name,
join_cols, source_col_names, target_col_names)
+
+ update_recs = ctx.sql(update_recs_sql).to_arrow_table()
+
+ update_row_cnt = len(update_recs)
+
+ if len(join_cols) == 1:
+ join_col = join_cols[0]
+ col_type = source_col_types[join_col]
+ values = [row[join_col] for row in update_recs.to_pylist()]
+ # if strings are in the filter, we encapsulate with tick
marks
+ formatted_values = [f"'{value}'" if col_type == 'string'
else str(value) for value in values]
+ overwrite_filter = f"{join_col} IN ({',
'.join(formatted_values)})"
Review Comment:
I see that we're constructing the overwrite filter as a string. The
`overwrite(..)` method will take this string, and parse it into a
`BooleanExpression`. How about constructing a `BooleanExpression` right away:
```suggestion
overwrite_filter = In(join_col, values)
```
##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1064,125 @@ def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
return self.metadata.name_mapping()
+ def merge_rows(self, df: pa.Table, join_cols: list
+ ,merge_options: dict = {'when_matched_update_all': True,
'when_not_matched_insert_all': True}
+ ) -> Dict:
+ """
+ Shorthand API for performing an upsert/merge to an iceberg table.
+
+ Args:
+ df: The input dataframe to merge with the table's data.
+ join_cols: The columns to join on.
+ merge_options: A dictionary of merge actions to perform. Currently
supports these predicates >
+ when_matched_update_all: default is True
+ when_not_matched_insert_all: default is True
+
+ Returns:
+ A dictionary containing the number of rows updated and inserted.
+ """
+
+ from pyiceberg.table import merge_rows_util
+
+ try:
+ from datafusion import SessionContext
+ except ModuleNotFoundError as e:
+ raise ModuleNotFoundError("For merge_rows, DataFusion needs to be
installed") from e
+
+ try:
+ from pyarrow import dataset as ds
+ except ModuleNotFoundError as e:
+ raise ModuleNotFoundError("For merge_rows, PyArrow needs to be
installed") from e
+
+ source_table_name = "source"
+ target_table_name = "target"
+
+ if merge_options is None or merge_options == {}:
+ merge_options = {'when_matched_update_all': True,
'when_not_matched_insert_all': True}
+
+ when_matched_update_all = merge_options.get('when_matched_update_all',
False)
+ when_not_matched_insert_all =
merge_options.get('when_not_matched_insert_all', False)
+
+ if when_matched_update_all == False and when_not_matched_insert_all ==
False:
+ return {'rows_updated': 0, 'rows_inserted': 0, 'msg': 'no merge
options selected...exiting'}
+
+ ctx = SessionContext()
+
+ #register both source and target tables so we can find the deltas to
update/append
+ ctx.register_dataset(source_table_name, ds.dataset(df))
+ ctx.register_dataset(target_table_name,
ds.dataset(self.scan().to_arrow()))
Review Comment:
This is something we want to avoid. This will materialize the whole table
into memory, which does not scale well.
Instead, I think it makes much more sense to re-use the existing
`delete(delete_filter: BooleanExpresssion)` to delete existing rows. This will
only be one of the files relevant to the query, and avoid a lot of unnecessary
compute and IO.
##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1064,125 @@ def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
return self.metadata.name_mapping()
+ def merge_rows(self, df: pa.Table, join_cols: list
+ ,merge_options: dict = {'when_matched_update_all': True,
'when_not_matched_insert_all': True}
+ ) -> Dict:
+ """
+ Shorthand API for performing an upsert/merge to an iceberg table.
+
+ Args:
+ df: The input dataframe to merge with the table's data.
+ join_cols: The columns to join on.
Review Comment:
We can also do this later, but I think we should make the `join_cols`
optional. Instead, we want to consider rows equal that have the same
identifier-field-ids, see:
https://iceberg.apache.org/spec/?column-projection#identifier-field-ids
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]