kevinjqliu commented on code in PR #1534:
URL: https://github.com/apache/iceberg-python/pull/1534#discussion_r1943674177


##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,78 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+        info_msgs: Optional[str] = None
+        error_msgs: Optional[str] = None
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on.
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+        Returns: a UpsertResult class
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            return {'rows_updated': 0, 'rows_inserted': 0, 'info_msgs': 'no 
upsert options selected...exiting'}
+            #return UpsertResult(info_msgs='no upsert options 
selected...exiting')
+
+        if upsert_util.dups_check_in_source(df, join_cols):
+
+            return {'error_msgs': 'Duplicate rows found in source dataset 
based on the key columns. No upsert executed'}

Review Comment:
   same here, instead of returning an error message, can we just throw an error?



##########
pyproject.toml:
##########


Review Comment:
   aside from using datafusion for testing, all the other changes are not 
needed, right? 
   we should also move datafusion to the dev dep group. and get rid of all the 
unnecessary `tool.mypy.overrides` changes



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,78 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+        info_msgs: Optional[str] = None
+        error_msgs: Optional[str] = None

Review Comment:
   i think we can get rid of `info_msgs` and `error_msgs`. looks like 
`info_msgs` is used as an error message. can we just throw the error message 
instead? 



##########
pyiceberg/table/upsert_util.py:
##########
@@ -0,0 +1,158 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from pyarrow import Table as pyarrow_table
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyiceberg import table as pyiceberg_table
+
+from pyiceberg.expressions import (
+    BooleanExpression,
+    And,
+    EqualTo,
+    Or,
+    In,
+)
+
+def get_filter_list(df: pyarrow_table, join_cols: list) -> BooleanExpression:
+
+    unique_keys = df.select(join_cols).group_by(join_cols).aggregate([])
+
+    pred = None
+
+    if len(join_cols) == 1:
+        pred = In(join_cols[0], unique_keys[0].to_pylist())
+    else:
+        pred = Or(*[
+            And(*[
+                EqualTo(col, row[col])
+                for col in join_cols
+            ])
+            for row in unique_keys.to_pylist()
+        ])
+
+    return pred
+
+def dups_check_in_source(df: pyarrow_table, join_cols: list) -> bool:

Review Comment:
   nit: wdyt about renaming this `has_duplicate_rows`? 
   i would also make the comment more generic, not solely about "source table"



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,78 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+        info_msgs: Optional[str] = None
+        error_msgs: Optional[str] = None
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on.
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+        Returns: a UpsertResult class
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            return {'rows_updated': 0, 'rows_inserted': 0, 'info_msgs': 'no 
upsert options selected...exiting'}
+            #return UpsertResult(info_msgs='no upsert options 
selected...exiting')
+
+        if upsert_util.dups_check_in_source(df, join_cols):
+
+            return {'error_msgs': 'Duplicate rows found in source dataset 
based on the key columns. No upsert executed'}
+
+        #get list of rows that exist so we don't have to load the entire 
target table
+        pred = upsert_util.get_filter_list(df, join_cols)
+        iceberg_table_trimmed = self.scan(row_filter=pred).to_arrow()

Review Comment:
   can we use `to_arrow_batch_reader` here instead? it'll read in a streaming 
fashion, `to_arrow` will materialize everything in memory



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,78 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+        info_msgs: Optional[str] = None
+        error_msgs: Optional[str] = None
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on.
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+        Returns: a UpsertResult class
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            return {'rows_updated': 0, 'rows_inserted': 0, 'info_msgs': 'no 
upsert options selected...exiting'}
+            #return UpsertResult(info_msgs='no upsert options 
selected...exiting')
+
+        if upsert_util.dups_check_in_source(df, join_cols):
+
+            return {'error_msgs': 'Duplicate rows found in source dataset 
based on the key columns. No upsert executed'}
+
+        #get list of rows that exist so we don't have to load the entire 
target table
+        pred = upsert_util.get_filter_list(df, join_cols)
+        iceberg_table_trimmed = self.scan(row_filter=pred).to_arrow()
+
+        update_row_cnt = 0
+        insert_row_cnt = 0

Review Comment:
   nit: what if we just set `update_recs` and `insert_recs` as empty lists and 
then do a `len()` in the return statement? i think its more readable that way 
and we dont need to keep track of an extra variable



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,78 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+        info_msgs: Optional[str] = None
+        error_msgs: Optional[str] = None
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on.

Review Comment:
   we should check that `join_cols` is in both source and target tables.
   
   furthermore, does source and target table need to have the same width? 
   for example
   ```
   target has column a b c d 
   source has a, c
   join_cols is a
   ```
   what should happen here? 



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,78 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+        info_msgs: Optional[str] = None
+        error_msgs: Optional[str] = None
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on.
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+        Returns: a UpsertResult class
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            return {'rows_updated': 0, 'rows_inserted': 0, 'info_msgs': 'no 
upsert options selected...exiting'}
+            #return UpsertResult(info_msgs='no upsert options 
selected...exiting')
+
+        if upsert_util.dups_check_in_source(df, join_cols):
+
+            return {'error_msgs': 'Duplicate rows found in source dataset 
based on the key columns. No upsert executed'}
+
+        #get list of rows that exist so we don't have to load the entire 
target table
+        pred = upsert_util.get_filter_list(df, join_cols)
+        iceberg_table_trimmed = self.scan(row_filter=pred).to_arrow()
+
+        update_row_cnt = 0
+        insert_row_cnt = 0
+
+        try:
+
+            with self.transaction() as txn:
+            
+                if when_matched_update_all:
+
+                    update_recs = upsert_util.get_rows_to_update(df, 
iceberg_table_trimmed, join_cols)

Review Comment:
   ```suggestion
                       rows_to_update = 
upsert_util.get_rows_to_update(source=df, target=matched_table, 
primary_keys=join_cols)
   ```
   i like something like this, using kwargs here is helpful



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,78 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+        info_msgs: Optional[str] = None
+        error_msgs: Optional[str] = None
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on.
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+        Returns: a UpsertResult class
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            return {'rows_updated': 0, 'rows_inserted': 0, 'info_msgs': 'no 
upsert options selected...exiting'}
+            #return UpsertResult(info_msgs='no upsert options 
selected...exiting')
+
+        if upsert_util.dups_check_in_source(df, join_cols):
+
+            return {'error_msgs': 'Duplicate rows found in source dataset 
based on the key columns. No upsert executed'}
+
+        #get list of rows that exist so we don't have to load the entire 
target table
+        pred = upsert_util.get_filter_list(df, join_cols)
+        iceberg_table_trimmed = self.scan(row_filter=pred).to_arrow()
+
+        update_row_cnt = 0
+        insert_row_cnt = 0
+
+        try:
+
+            with self.transaction() as txn:
+            
+                if when_matched_update_all:
+
+                    update_recs = upsert_util.get_rows_to_update(df, 
iceberg_table_trimmed, join_cols)
+
+                    update_row_cnt = len(update_recs)
+
+                    overwrite_filter = 
upsert_util.get_filter_list(update_recs, join_cols)
+
+                    txn.overwrite(update_recs, 
overwrite_filter=overwrite_filter)    
+
+
+                if when_not_matched_insert_all:
+                    
+                    insert_recs = upsert_util.get_rows_to_insert(df, 
iceberg_table_trimmed, join_cols)

Review Comment:
   ```suggestion
                       not_matched_predicate = 
Not(upsert_util.get_filter_list(df, join_cols))
                       not_matched_table = 
self.scan(row_filter=not_matched_predicate).to_arrow()
                       rows_to_insert = 
upsert_util.get_rows_to_insert(source=df, target=not_matched_table, 
primary_keys=join_cols)
   ```
   
   
   wdyt about something like this? it'll simplify the `get_rows_to_insert` and 
make the current function more readable 



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,78 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+        info_msgs: Optional[str] = None
+        error_msgs: Optional[str] = None
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.

Review Comment:
   i think we should add more context around how this process works, took me a 
while to understand. 
   
   the target table is the current iceberg table, the `self`
   the source table is the given arrow table, `df`
   the `join_cols` are like primary keys, they are the column to match on both 
source and target. for all columns in `join_cols`, we match all the column 
values in source table to all the column values in target table. 
   once matched, `when_matched_update_all` and `when_not_matched_insert_all` 
controls the upsert behavior. 
   
   ```
   Case 1: Both Parameters = True (Full Upsert)
   Existing row found → Update it
   New row found → Insert it
   
   Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
   Existing row found → Do nothing (no updates)
   New row found → Insert it
   
   Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
   Existing row found → Update it
   New row found → Do nothing (no inserts)
   
   Case 4: Both Parameters = False (No Merge Effect)
   Existing row found → Do nothing
   New row found → Do nothing
   (Function effectively does nothing)
   ```
   
   This a version of this syntax
   ```
   MERGE INTO self.table AS target
   USING df AS source
   ON join_cols
   WHEN MATCHED THEN 
       UPDATE SET ...
   WHEN NOT MATCHED THEN 
       INSERT ...
   ```
   
   



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,78 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+        info_msgs: Optional[str] = None
+        error_msgs: Optional[str] = None
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on.
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+        Returns: a UpsertResult class
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            return {'rows_updated': 0, 'rows_inserted': 0, 'info_msgs': 'no 
upsert options selected...exiting'}
+            #return UpsertResult(info_msgs='no upsert options 
selected...exiting')
+
+        if upsert_util.dups_check_in_source(df, join_cols):

Review Comment:
   +1 to throwing, similar to my comment above about returning `info_msgs` 



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1064,119 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    def merge_rows(self, df: pa.Table, join_cols: list

Review Comment:
   +1 to upsert, #402 is related to that. i think we'll be able to reuse it for 
merge later on too



##########
pyiceberg/table/upsert_util.py:
##########
@@ -0,0 +1,158 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from pyarrow import Table as pyarrow_table
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyiceberg import table as pyiceberg_table
+
+from pyiceberg.expressions import (
+    BooleanExpression,
+    And,
+    EqualTo,
+    Or,
+    In,
+)
+
+def get_filter_list(df: pyarrow_table, join_cols: list) -> BooleanExpression:
+
+    unique_keys = df.select(join_cols).group_by(join_cols).aggregate([])
+
+    pred = None
+
+    if len(join_cols) == 1:
+        pred = In(join_cols[0], unique_keys[0].to_pylist())
+    else:
+        pred = Or(*[
+            And(*[
+                EqualTo(col, row[col])
+                for col in join_cols
+            ])
+            for row in unique_keys.to_pylist()
+        ])
+
+    return pred
+
+def dups_check_in_source(df: pyarrow_table, join_cols: list) -> bool:
+    """
+    This function checks if there are duplicate rows in the source table based 
on the join columns.
+    It returns True if there are duplicate rows in the source table, otherwise 
it returns False.
+    """
+    # Check for duplicates in the source table
+    source_dup_count = len(
+        df.select(join_cols)
+            .group_by(join_cols)
+            .aggregate([([], "count_all")])
+            .filter(pc.field("count_all") > 1)
+    )
+    
+    return source_dup_count > 0
+
+def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, 
join_cols: list) -> pa.Table:
+    
+    """
+        This function takes the source_table, trims it down to rows that match 
in both source and target.
+        It then does a scan for the non-key columns to see if any are 
mis-aligned before returning the final row set to update
+    """
+    
+    all_columns = set(source_table.column_names)
+    join_cols_set = set(join_cols)
+
+    non_key_cols = list(all_columns - join_cols_set)
+
+    
+    match_expr = None
+
+    for col in join_cols:
+        target_values = target_table.column(col).to_pylist()
+        expr = pc.field(col).isin(target_values)
+
+        if match_expr is None:
+            match_expr = expr
+        else:
+            match_expr = match_expr & expr
+
+    
+    matching_source_rows = source_table.filter(match_expr)
+
+    rows_to_update = []
+
+    for index in range(matching_source_rows.num_rows):
+        
+        source_row = matching_source_rows.slice(index, 1)
+
+        
+        target_filter = None
+
+        for col in join_cols:
+            target_value = source_row.column(col)[0].as_py()  
+            if target_filter is None:
+                target_filter = pc.field(col) == target_value
+            else:
+                target_filter = target_filter & (pc.field(col) == target_value)
+
+        matching_target_row = target_table.filter(target_filter)
+
+        if matching_target_row.num_rows > 0:
+            needs_update = False
+
+            for non_key_col in non_key_cols:
+                source_value = source_row.column(non_key_col)[0].as_py()
+                target_value = 
matching_target_row.column(non_key_col)[0].as_py()
+
+                if source_value != target_value:
+                    needs_update = True
+                    break 
+
+            if needs_update:
+                rows_to_update.append(source_row)
+
+    if rows_to_update:
+        rows_to_update_table = pa.concat_tables(rows_to_update)
+    else:
+        rows_to_update_table = pa.Table.from_arrays([], 
names=source_table.column_names)
+
+    common_columns = 
set(source_table.column_names).intersection(set(target_table.column_names))
+    rows_to_update_table = rows_to_update_table.select(list(common_columns))
+
+    return rows_to_update_table
+
+def get_rows_to_insert(source_table: pa.Table, target_table: pa.Table, 
join_cols: list) -> pa.Table:
+  
+    source_filter_expr = None
+
+    for col in join_cols:
+
+        target_values = target_table.column(col).to_pylist()
+        expr = pc.field(col).isin(target_values)
+
+        if source_filter_expr is None:
+            source_filter_expr = expr
+        else:
+            source_filter_expr = source_filter_expr & expr

Review Comment:
   is `to_pylist()` necessary? 
   
[is_in](https://arrow.apache.org/docs/python/generated/pyarrow.compute.is_in.html#pyarrow-compute-is-in)
 takes a `valuesArray-like or scalar-like` and [`column(self, 
i)`](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.column)
 already return an Array



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,81 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass
+    class UpsertResult:
+        """docstring"""
+        rows_updated: int
+        rows_inserted: int
+        info_msgs: str
+        error_msgs: str
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on.
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+        Returns: a UpsertResult class
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            return {'rows_updated': 0, 'rows_inserted': 0, 'info_msgs': 'no 
upsert options selected...exiting'}

Review Comment:
   can we throw an error here instead? 
   since its not a valid usecase to set both params to False; the function 
would be no-op



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,78 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+        info_msgs: Optional[str] = None
+        error_msgs: Optional[str] = None
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on.
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+        Returns: a UpsertResult class
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            return {'rows_updated': 0, 'rows_inserted': 0, 'info_msgs': 'no 
upsert options selected...exiting'}
+            #return UpsertResult(info_msgs='no upsert options 
selected...exiting')
+
+        if upsert_util.dups_check_in_source(df, join_cols):

Review Comment:
   but also generally, i dont understand why duplicates in the source table 
should be invalid...
   
   oh are we replicating the spark+iceberg behavior? 



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,78 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+        info_msgs: Optional[str] = None
+        error_msgs: Optional[str] = None
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on.
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+        Returns: a UpsertResult class
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            return {'rows_updated': 0, 'rows_inserted': 0, 'info_msgs': 'no 
upsert options selected...exiting'}
+            #return UpsertResult(info_msgs='no upsert options 
selected...exiting')
+
+        if upsert_util.dups_check_in_source(df, join_cols):
+
+            return {'error_msgs': 'Duplicate rows found in source dataset 
based on the key columns. No upsert executed'}
+
+        #get list of rows that exist so we don't have to load the entire 
target table
+        pred = upsert_util.get_filter_list(df, join_cols)
+        iceberg_table_trimmed = self.scan(row_filter=pred).to_arrow()
+
+        update_row_cnt = 0
+        insert_row_cnt = 0
+
+        try:
+
+            with self.transaction() as txn:
+            
+                if when_matched_update_all:
+
+                    update_recs = upsert_util.get_rows_to_update(df, 
iceberg_table_trimmed, join_cols)
+
+                    update_row_cnt = len(update_recs)
+
+                    overwrite_filter = 
upsert_util.get_filter_list(update_recs, join_cols)
+
+                    txn.overwrite(update_recs, 
overwrite_filter=overwrite_filter)    
+
+
+                if when_not_matched_insert_all:
+                    
+                    insert_recs = upsert_util.get_rows_to_insert(df, 
iceberg_table_trimmed, join_cols)
+
+                    insert_row_cnt = len(insert_recs)
+
+                    txn.append(insert_recs)
+
+            return {
+                "rows_updated": update_row_cnt,
+                "rows_inserted": insert_row_cnt
+            }
+
+        except Exception as e:

Review Comment:
   +1 i think we can get rid of this try/except if it'll just reraise



##########
pyiceberg/table/upsert_util.py:
##########
@@ -0,0 +1,158 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from pyarrow import Table as pyarrow_table
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyiceberg import table as pyiceberg_table
+
+from pyiceberg.expressions import (
+    BooleanExpression,
+    And,
+    EqualTo,
+    Or,
+    In,
+)
+
+def get_filter_list(df: pyarrow_table, join_cols: list) -> BooleanExpression:

Review Comment:
   wdyt of renaming this to something like `build_match_predicate` or 
`create_match_filer`
   
   i think this will align more with the `merge into` semantic 
   
   ```
   MERGE INTO users AS target
   USING new_users AS source
   ON target.id = source.id
   WHEN MATCHED THEN 
   ...
   ```



##########
pyiceberg/expressions/literals.py:
##########
@@ -145,6 +148,8 @@ def literal(value: L) -> Literal[L]:
         return BinaryLiteral(value)
     elif isinstance(value, Decimal):
         return DecimalLiteral(value)
+    elif isinstance(value, date):
+        return LongLiteral(date_to_days(value))

Review Comment:
   was this discovered as part of the tests? I prefer to split this out into 
its own PR since this PR already has a lot of changes. 



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,78 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+        info_msgs: Optional[str] = None
+        error_msgs: Optional[str] = None
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on.
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+        Returns: a UpsertResult class
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            return {'rows_updated': 0, 'rows_inserted': 0, 'info_msgs': 'no 
upsert options selected...exiting'}
+            #return UpsertResult(info_msgs='no upsert options 
selected...exiting')
+
+        if upsert_util.dups_check_in_source(df, join_cols):
+
+            return {'error_msgs': 'Duplicate rows found in source dataset 
based on the key columns. No upsert executed'}
+
+        #get list of rows that exist so we don't have to load the entire 
target table
+        pred = upsert_util.get_filter_list(df, join_cols)
+        iceberg_table_trimmed = self.scan(row_filter=pred).to_arrow()
+
+        update_row_cnt = 0
+        insert_row_cnt = 0
+
+        try:
+
+            with self.transaction() as txn:
+            
+                if when_matched_update_all:
+
+                    update_recs = upsert_util.get_rows_to_update(df, 
iceberg_table_trimmed, join_cols)
+
+                    update_row_cnt = len(update_recs)
+
+                    overwrite_filter = 
upsert_util.get_filter_list(update_recs, join_cols)
+
+                    txn.overwrite(update_recs, 
overwrite_filter=overwrite_filter)    

Review Comment:
   ```suggestion
                       overwrite_mask_predicate = 
upsert_util.get_filter_list(update_recs, join_cols)
                       txn.overwrite(update_recs, 
overwrite_filter=overwrite_mask_predicate)    
   ```
   
   can we add a comment here of why we're creating `overwrite_filter`? 
   i think its to use `overwrite_filter` as a mask, right? 



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,78 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+        info_msgs: Optional[str] = None
+        error_msgs: Optional[str] = None
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on.
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+        Returns: a UpsertResult class
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            return {'rows_updated': 0, 'rows_inserted': 0, 'info_msgs': 'no 
upsert options selected...exiting'}
+            #return UpsertResult(info_msgs='no upsert options 
selected...exiting')
+
+        if upsert_util.dups_check_in_source(df, join_cols):
+
+            return {'error_msgs': 'Duplicate rows found in source dataset 
based on the key columns. No upsert executed'}
+
+        #get list of rows that exist so we don't have to load the entire 
target table
+        pred = upsert_util.get_filter_list(df, join_cols)
+        iceberg_table_trimmed = self.scan(row_filter=pred).to_arrow()

Review Comment:
   ```suggestion
           matched_predicate = upsert_util.get_filter_list(df, join_cols)
           matched_table = self.scan(row_filter=matched_predicate).to_arrow()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to