This is an automated email from the ASF dual-hosted git repository. fokko pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push: new 5506fd01 Change Append/Overwrite API to accept snapshot properties (#419) 5506fd01 is described below commit 5506fd01e98b03229b43772d87d8041acf7a2609 Author: Gowthami B <gowthamibhogire...@gmail.com> AuthorDate: Tue Mar 19 09:30:24 2024 -0400 Change Append/Overwrite API to accept snapshot properties (#419) * added test for snapshot properties * change append/overwrite to accept snapshot_properties * Update tests/catalog/test_glue.py Co-authored-by: Fokko Driesprong <fo...@apache.org> * Update pyiceberg/table/__init__.py Co-authored-by: Fokko Driesprong <fo...@apache.org> * updated docs,docstrings * fix linting * Update mkdocs/docs/api.md Co-authored-by: Fokko Driesprong <fo...@apache.org> * Update pyiceberg/table/__init__.py Co-authored-by: Fokko Driesprong <fo...@apache.org> --------- Co-authored-by: Fokko Driesprong <fo...@apache.org> --- mkdocs/docs/api.md | 14 ++++++++++ pyiceberg/table/__init__.py | 33 +++++++++++++++-------- tests/catalog/test_glue.py | 66 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 11 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 4056bc3a..3cd0d312 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -563,6 +563,20 @@ table = table.transaction().remove_properties("abc").commit_transaction() assert table.properties == {} ``` +## Snapshot properties + +Optionally, Snapshot properties can be set while writing to a table using `append` or `overwrite` API: + +```python +tbl.append(df, snapshot_properties={"abc": "def"}) + +# or + +tbl.overwrite(df, snapshot_properties={"abc": "def"}) + +assert tbl.metadata.snapshots[-1].summary["abc"] == "def" +``` + ## Query the data To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 517e6c86..437a2640 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -332,13 +332,13 @@ class Transaction: name_mapping=self._table.name_mapping(), ) - def update_snapshot(self) -> UpdateSnapshot: + def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot: """Create a new UpdateSnapshot to produce a new snapshot for the table. Returns: A new UpdateSnapshot """ - return UpdateSnapshot(self, io=self._table.io) + return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties) def update_spec(self) -> UpdateSpec: """Create a new UpdateSpec to update the partitioning of the table. @@ -1095,12 +1095,13 @@ class Table: else: return None - def append(self, df: pa.Table) -> None: + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ Shorthand API for appending a PyArrow table to the table. Args: df: The Arrow dataframe that will be appended to overwrite the table + snapshot_properties: Custom properties to be added to the snapshot summary """ try: import pyarrow as pa @@ -1116,7 +1117,7 @@ class Table: _check_schema(self.schema(), other_schema=df.schema) with self.transaction() as txn: - with txn.update_snapshot().fast_append() as update_snapshot: + with txn.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot: # skip writing data files if the dataframe is empty if df.shape[0] > 0: data_files = _dataframe_to_data_files( @@ -1125,7 +1126,9 @@ class Table: for data_file in data_files: update_snapshot.append_data_file(data_file) - def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE) -> None: + def overwrite( + self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT + ) -> None: """ Shorthand for overwriting the table with a PyArrow table. @@ -1133,6 +1136,7 @@ class Table: df: The Arrow dataframe that will be used to overwrite the table overwrite_filter: ALWAYS_TRUE when you overwrite all the data, or a boolean expression in case of a partial overwrite + snapshot_properties: Custom properties to be added to the snapshot summary """ try: import pyarrow as pa @@ -1151,7 +1155,7 @@ class Table: _check_schema(self.schema(), other_schema=df.schema) with self.transaction() as txn: - with txn.update_snapshot().overwrite() as update_snapshot: + with txn.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as update_snapshot: # skip writing data files if the dataframe is empty if df.shape[0] > 0: data_files = _dataframe_to_data_files( @@ -2551,6 +2555,7 @@ class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]): transaction: Transaction, io: FileIO, commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, ) -> None: super().__init__(transaction) self.commit_uuid = commit_uuid or uuid.uuid4() @@ -2562,6 +2567,7 @@ class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]): snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None ) self._added_data_files = [] + self.snapshot_properties = snapshot_properties def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer: self._added_data_files.append(data_file) @@ -2629,7 +2635,7 @@ class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]): return added_manifests.result() + delete_manifests.result() + existing_manifests.result() - def _summary(self) -> Summary: + def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: ssc = SnapshotSummaryCollector() partition_summary_limit = int( self._transaction.table_metadata.properties.get( @@ -2652,7 +2658,7 @@ class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]): ) return update_snapshot_summaries( - summary=Summary(operation=self._operation, **ssc.build()), + summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties), previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, truncate_full_table=self._operation == Operation.OVERWRITE, ) @@ -2661,7 +2667,7 @@ class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]): new_manifests = self._manifests() next_sequence_number = self._transaction.table_metadata.next_sequence_number() - summary = self._summary() + summary = self._summary(self.snapshot_properties) manifest_list_file_path = _generate_manifest_list_path( location=self._transaction.table_metadata.location, @@ -2776,13 +2782,17 @@ class OverwriteFiles(_MergingSnapshotProducer): class UpdateSnapshot: _transaction: Transaction _io: FileIO + _snapshot_properties: Dict[str, str] - def __init__(self, transaction: Transaction, io: FileIO) -> None: + def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str]) -> None: self._transaction = transaction self._io = io + self._snapshot_properties = snapshot_properties def fast_append(self) -> FastAppendFiles: - return FastAppendFiles(operation=Operation.APPEND, transaction=self._transaction, io=self._io) + return FastAppendFiles( + operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties + ) def overwrite(self) -> OverwriteFiles: return OverwriteFiles( @@ -2791,6 +2801,7 @@ class UpdateSnapshot: else Operation.APPEND, transaction=self._transaction, io=self._io, + snapshot_properties=self._snapshot_properties, ) diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index 6d44d927..d4ed085c 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -32,6 +32,7 @@ from pyiceberg.exceptions import ( NoSuchTableError, TableAlreadyExistsError, ) +from pyiceberg.io.pyarrow import schema_to_pyarrow from pyiceberg.schema import Schema from pyiceberg.types import IntegerType from tests.conftest import BUCKET_NAME, TABLE_METADATA_LOCATION_REGEX @@ -692,3 +693,68 @@ def test_commit_table_properties( updated_table_metadata = table.metadata assert test_catalog._parse_metadata_version(table.metadata_location) == 1 assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"} + + +@mock_aws +def test_commit_append_table_snapshot_properties( + _bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str +) -> None: + catalog_name = "glue" + identifier = (database_name, table_name) + test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"}) + test_catalog.create_namespace(namespace=database_name) + table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple) + + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 + + table.append( + pa.Table.from_pylist( + [{"foo": "foo_val", "bar": 1, "baz": False}], + schema=schema_to_pyarrow(table_schema_simple), + ), + snapshot_properties={"snapshot_prop_a": "test_prop_a"}, + ) + + updated_table_metadata = table.metadata + summary = updated_table_metadata.snapshots[-1].summary + assert test_catalog._parse_metadata_version(table.metadata_location) == 1 + assert summary is not None + assert summary["snapshot_prop_a"] == "test_prop_a" + + +@mock_aws +def test_commit_overwrite_table_snapshot_properties( + _bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str +) -> None: + catalog_name = "glue" + identifier = (database_name, table_name) + test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"}) + test_catalog.create_namespace(namespace=database_name) + table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple) + + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 + + table.append( + pa.Table.from_pylist( + [{"foo": "foo_val", "bar": 1, "baz": False}], + schema=schema_to_pyarrow(table_schema_simple), + ), + snapshot_properties={"snapshot_prop_a": "test_prop_a"}, + ) + + assert test_catalog._parse_metadata_version(table.metadata_location) == 1 + + table.overwrite( + pa.Table.from_pylist( + [{"foo": "foo_val", "bar": 2, "baz": True}], + schema=schema_to_pyarrow(table_schema_simple), + ), + snapshot_properties={"snapshot_prop_b": "test_prop_b"}, + ) + + updated_table_metadata = table.metadata + summary = updated_table_metadata.snapshots[-1].summary + assert test_catalog._parse_metadata_version(table.metadata_location) == 2 + assert summary is not None + assert summary["snapshot_prop_a"] is None + assert summary["snapshot_prop_b"] == "test_prop_b"