This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 5506fd01 Change Append/Overwrite API to accept snapshot properties 
(#419)
5506fd01 is described below

commit 5506fd01e98b03229b43772d87d8041acf7a2609
Author: Gowthami B <gowthamibhogire...@gmail.com>
AuthorDate: Tue Mar 19 09:30:24 2024 -0400

    Change Append/Overwrite API to accept snapshot properties (#419)
    
    * added test for snapshot properties
    
    * change append/overwrite to accept snapshot_properties
    
    * Update tests/catalog/test_glue.py
    
    Co-authored-by: Fokko Driesprong <fo...@apache.org>
    
    * Update pyiceberg/table/__init__.py
    
    Co-authored-by: Fokko Driesprong <fo...@apache.org>
    
    * updated docs,docstrings
    
    * fix linting
    
    * Update mkdocs/docs/api.md
    
    Co-authored-by: Fokko Driesprong <fo...@apache.org>
    
    * Update pyiceberg/table/__init__.py
    
    Co-authored-by: Fokko Driesprong <fo...@apache.org>
    
    ---------
    
    Co-authored-by: Fokko Driesprong <fo...@apache.org>
---
 mkdocs/docs/api.md          | 14 ++++++++++
 pyiceberg/table/__init__.py | 33 +++++++++++++++--------
 tests/catalog/test_glue.py  | 66 +++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 102 insertions(+), 11 deletions(-)

diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 4056bc3a..3cd0d312 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -563,6 +563,20 @@ table = 
table.transaction().remove_properties("abc").commit_transaction()
 assert table.properties == {}
 ```
 
+## Snapshot properties
+
+Optionally, Snapshot properties can be set while writing to a table using 
`append` or `overwrite` API:
+
+```python
+tbl.append(df, snapshot_properties={"abc": "def"})
+
+# or
+
+tbl.overwrite(df, snapshot_properties={"abc": "def"})
+
+assert tbl.metadata.snapshots[-1].summary["abc"] == "def"
+```
+
 ## Query the data
 
 To query a table, a table scan is needed. A table scan accepts a filter, 
columns, optionally a limit and a snapshot ID:
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 517e6c86..437a2640 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -332,13 +332,13 @@ class Transaction:
             name_mapping=self._table.name_mapping(),
         )
 
-    def update_snapshot(self) -> UpdateSnapshot:
+    def update_snapshot(self, snapshot_properties: Dict[str, str] = 
EMPTY_DICT) -> UpdateSnapshot:
         """Create a new UpdateSnapshot to produce a new snapshot for the table.
 
         Returns:
             A new UpdateSnapshot
         """
-        return UpdateSnapshot(self, io=self._table.io)
+        return UpdateSnapshot(self, io=self._table.io, 
snapshot_properties=snapshot_properties)
 
     def update_spec(self) -> UpdateSpec:
         """Create a new UpdateSpec to update the partitioning of the table.
@@ -1095,12 +1095,13 @@ class Table:
         else:
             return None
 
-    def append(self, df: pa.Table) -> None:
+    def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = 
EMPTY_DICT) -> None:
         """
         Shorthand API for appending a PyArrow table to the table.
 
         Args:
             df: The Arrow dataframe that will be appended to overwrite the 
table
+            snapshot_properties: Custom properties to be added to the snapshot 
summary
         """
         try:
             import pyarrow as pa
@@ -1116,7 +1117,7 @@ class Table:
         _check_schema(self.schema(), other_schema=df.schema)
 
         with self.transaction() as txn:
-            with txn.update_snapshot().fast_append() as update_snapshot:
+            with 
txn.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as 
update_snapshot:
                 # skip writing data files if the dataframe is empty
                 if df.shape[0] > 0:
                     data_files = _dataframe_to_data_files(
@@ -1125,7 +1126,9 @@ class Table:
                     for data_file in data_files:
                         update_snapshot.append_data_file(data_file)
 
-    def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = 
ALWAYS_TRUE) -> None:
+    def overwrite(
+        self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, 
snapshot_properties: Dict[str, str] = EMPTY_DICT
+    ) -> None:
         """
         Shorthand for overwriting the table with a PyArrow table.
 
@@ -1133,6 +1136,7 @@ class Table:
             df: The Arrow dataframe that will be used to overwrite the table
             overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                               or a boolean expression in case of a partial 
overwrite
+            snapshot_properties: Custom properties to be added to the snapshot 
summary
         """
         try:
             import pyarrow as pa
@@ -1151,7 +1155,7 @@ class Table:
         _check_schema(self.schema(), other_schema=df.schema)
 
         with self.transaction() as txn:
-            with txn.update_snapshot().overwrite() as update_snapshot:
+            with 
txn.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as 
update_snapshot:
                 # skip writing data files if the dataframe is empty
                 if df.shape[0] > 0:
                     data_files = _dataframe_to_data_files(
@@ -2551,6 +2555,7 @@ class 
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
         transaction: Transaction,
         io: FileIO,
         commit_uuid: Optional[uuid.UUID] = None,
+        snapshot_properties: Dict[str, str] = EMPTY_DICT,
     ) -> None:
         super().__init__(transaction)
         self.commit_uuid = commit_uuid or uuid.uuid4()
@@ -2562,6 +2567,7 @@ class 
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
             snapshot.snapshot_id if (snapshot := 
self._transaction.table_metadata.current_snapshot()) else None
         )
         self._added_data_files = []
+        self.snapshot_properties = snapshot_properties
 
     def append_data_file(self, data_file: DataFile) -> 
_MergingSnapshotProducer:
         self._added_data_files.append(data_file)
@@ -2629,7 +2635,7 @@ class 
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
 
         return added_manifests.result() + delete_manifests.result() + 
existing_manifests.result()
 
-    def _summary(self) -> Summary:
+    def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> 
Summary:
         ssc = SnapshotSummaryCollector()
         partition_summary_limit = int(
             self._transaction.table_metadata.properties.get(
@@ -2652,7 +2658,7 @@ class 
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
         )
 
         return update_snapshot_summaries(
-            summary=Summary(operation=self._operation, **ssc.build()),
+            summary=Summary(operation=self._operation, **ssc.build(), 
**snapshot_properties),
             previous_summary=previous_snapshot.summary if previous_snapshot is 
not None else None,
             truncate_full_table=self._operation == Operation.OVERWRITE,
         )
@@ -2661,7 +2667,7 @@ class 
_MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]):
         new_manifests = self._manifests()
         next_sequence_number = 
self._transaction.table_metadata.next_sequence_number()
 
-        summary = self._summary()
+        summary = self._summary(self.snapshot_properties)
 
         manifest_list_file_path = _generate_manifest_list_path(
             location=self._transaction.table_metadata.location,
@@ -2776,13 +2782,17 @@ class OverwriteFiles(_MergingSnapshotProducer):
 class UpdateSnapshot:
     _transaction: Transaction
     _io: FileIO
+    _snapshot_properties: Dict[str, str]
 
-    def __init__(self, transaction: Transaction, io: FileIO) -> None:
+    def __init__(self, transaction: Transaction, io: FileIO, 
snapshot_properties: Dict[str, str]) -> None:
         self._transaction = transaction
         self._io = io
+        self._snapshot_properties = snapshot_properties
 
     def fast_append(self) -> FastAppendFiles:
-        return FastAppendFiles(operation=Operation.APPEND, 
transaction=self._transaction, io=self._io)
+        return FastAppendFiles(
+            operation=Operation.APPEND, transaction=self._transaction, 
io=self._io, snapshot_properties=self._snapshot_properties
+        )
 
     def overwrite(self) -> OverwriteFiles:
         return OverwriteFiles(
@@ -2791,6 +2801,7 @@ class UpdateSnapshot:
             else Operation.APPEND,
             transaction=self._transaction,
             io=self._io,
+            snapshot_properties=self._snapshot_properties,
         )
 
 
diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py
index 6d44d927..d4ed085c 100644
--- a/tests/catalog/test_glue.py
+++ b/tests/catalog/test_glue.py
@@ -32,6 +32,7 @@ from pyiceberg.exceptions import (
     NoSuchTableError,
     TableAlreadyExistsError,
 )
+from pyiceberg.io.pyarrow import schema_to_pyarrow
 from pyiceberg.schema import Schema
 from pyiceberg.types import IntegerType
 from tests.conftest import BUCKET_NAME, TABLE_METADATA_LOCATION_REGEX
@@ -692,3 +693,68 @@ def test_commit_table_properties(
     updated_table_metadata = table.metadata
     assert test_catalog._parse_metadata_version(table.metadata_location) == 1
     assert updated_table_metadata.properties == {"test_a": "test_aa", 
"test_c": "test_c"}
+
+
+@mock_aws
+def test_commit_append_table_snapshot_properties(
+    _bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: 
Schema, database_name: str, table_name: str
+) -> None:
+    catalog_name = "glue"
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": 
moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
+    test_catalog.create_namespace(namespace=database_name)
+    table = test_catalog.create_table(identifier=identifier, 
schema=table_schema_simple)
+
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 0
+
+    table.append(
+        pa.Table.from_pylist(
+            [{"foo": "foo_val", "bar": 1, "baz": False}],
+            schema=schema_to_pyarrow(table_schema_simple),
+        ),
+        snapshot_properties={"snapshot_prop_a": "test_prop_a"},
+    )
+
+    updated_table_metadata = table.metadata
+    summary = updated_table_metadata.snapshots[-1].summary
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 1
+    assert summary is not None
+    assert summary["snapshot_prop_a"] == "test_prop_a"
+
+
+@mock_aws
+def test_commit_overwrite_table_snapshot_properties(
+    _bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: 
Schema, database_name: str, table_name: str
+) -> None:
+    catalog_name = "glue"
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": 
moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
+    test_catalog.create_namespace(namespace=database_name)
+    table = test_catalog.create_table(identifier=identifier, 
schema=table_schema_simple)
+
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 0
+
+    table.append(
+        pa.Table.from_pylist(
+            [{"foo": "foo_val", "bar": 1, "baz": False}],
+            schema=schema_to_pyarrow(table_schema_simple),
+        ),
+        snapshot_properties={"snapshot_prop_a": "test_prop_a"},
+    )
+
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 1
+
+    table.overwrite(
+        pa.Table.from_pylist(
+            [{"foo": "foo_val", "bar": 2, "baz": True}],
+            schema=schema_to_pyarrow(table_schema_simple),
+        ),
+        snapshot_properties={"snapshot_prop_b": "test_prop_b"},
+    )
+
+    updated_table_metadata = table.metadata
+    summary = updated_table_metadata.snapshots[-1].summary
+    assert test_catalog._parse_metadata_version(table.metadata_location) == 2
+    assert summary is not None
+    assert summary["snapshot_prop_a"] is None
+    assert summary["snapshot_prop_b"] == "test_prop_b"

Reply via email to