This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 50a1a69  Add logic for table format-version updates (#55)
50a1a69 is described below

commit 50a1a693944829f8bec592ee049c64287100b3df
Author: Fokko Driesprong <[email protected]>
AuthorDate: Tue Oct 10 12:48:26 2023 +0200

    Add logic for table format-version updates (#55)
    
    * Add logic for table format-version updates
    
    Add a few more tests
    
    * Set -> Upgrade
---
 dev/provision.py            | 22 ++++++++++++++++++++++
 dev/spark-defaults.conf     |  2 +-
 pyiceberg/table/__init__.py | 16 ++++++++++++++--
 tests/test_integration.py   | 30 ++++++++++++++++++++++++++++++
 4 files changed, 67 insertions(+), 3 deletions(-)

diff --git a/dev/provision.py b/dev/provision.py
index 56e3459..b75030f 100644
--- a/dev/provision.py
+++ b/dev/provision.py
@@ -279,3 +279,25 @@ for table_name, partition in [
         (CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS timestamp), 
12, 'l');
     """
     )
+
+# There is an issue with CREATE OR REPLACE
+# https://github.com/apache/iceberg/issues/8756
+spark.sql(
+    """
+DROP TABLE IF EXISTS default.test_table_version
+"""
+)
+
+spark.sql(
+    """
+CREATE TABLE default.test_table_version (
+    dt     date,
+    number integer,
+    letter string
+)
+USING iceberg
+TBLPROPERTIES (
+    'format-version'='1'
+);
+"""
+)
diff --git a/dev/spark-defaults.conf b/dev/spark-defaults.conf
index 28f93b1..56c3454 100644
--- a/dev/spark-defaults.conf
+++ b/dev/spark-defaults.conf
@@ -20,7 +20,7 @@ spark.sql.catalog.demo                 
org.apache.iceberg.spark.SparkCatalog
 spark.sql.catalog.demo.type            rest
 spark.sql.catalog.demo.uri             http://rest:8181
 spark.sql.catalog.demo.io-impl         org.apache.iceberg.aws.s3.S3FileIO
-spark.sql.catalog.demo.warehouse       s3a://warehouse/wh/
+spark.sql.catalog.demo.warehouse       s3://warehouse/wh/
 spark.sql.catalog.demo.s3.endpoint     http://minio:9000
 spark.sql.defaultCatalog               demo
 spark.eventLog.enabled                 true
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index ebdca60..c171b39 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -166,7 +166,7 @@ class Transaction:
         self._requirements = self._requirements + new_requirements
         return self
 
-    def set_table_version(self, format_version: Literal[1, 2]) -> Transaction:
+    def upgrade_table_version(self, format_version: Literal[1, 2]) -> 
Transaction:
         """Set the table to a certain version.
 
         Args:
@@ -175,7 +175,15 @@ class Transaction:
         Returns:
             The alter table builder.
         """
-        raise NotImplementedError("Not yet implemented")
+        if format_version not in {1, 2}:
+            raise ValueError(f"Unsupported table format version: 
{format_version}")
+
+        if format_version < self._table.metadata.format_version:
+            raise ValueError(f"Cannot downgrade 
v{self._table.metadata.format_version} table to v{format_version}")
+        if format_version > self._table.metadata.format_version:
+            return 
self._append_updates(UpgradeFormatVersionUpdate(format_version=format_version))
+        else:
+            return self
 
     def set_properties(self, **updates: str) -> Transaction:
         """Set properties.
@@ -482,6 +490,10 @@ class Table:
             limit=limit,
         )
 
+    @property
+    def format_version(self) -> Literal[1, 2]:
+        return self.metadata.format_version
+
     def schema(self) -> Schema:
         """Return the schema for this table."""
         return next(schema for schema in self.metadata.schemas if 
schema.schema_id == self.metadata.current_schema_id)
diff --git a/tests/test_integration.py b/tests/test_integration.py
index 6d777cf..6e874b6 100644
--- a/tests/test_integration.py
+++ b/tests/test_integration.py
@@ -83,6 +83,11 @@ def table_test_all_types(catalog: Catalog) -> Table:
     return catalog.load_table("default.test_all_types")
 
 
[email protected]()
+def table_test_table_version(catalog: Catalog) -> Table:
+    return catalog.load_table("default.test_table_version")
+
+
 TABLE_NAME = ("default", "t1")
 
 
@@ -366,3 +371,28 @@ def test_scan_tag(test_positional_mor_deletes: Table) -> 
None:
 def test_scan_branch(test_positional_mor_deletes: Table) -> None:
     arrow_table = 
test_positional_mor_deletes.scan().use_ref("without_5").to_arrow()
     assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10, 
11, 12]
+
+
[email protected]
+def test_upgrade_table_version(table_test_table_version: Table) -> None:
+    assert table_test_table_version.format_version == 1
+
+    with table_test_table_version.transaction() as transaction:
+        transaction.upgrade_table_version(format_version=1)
+
+    assert table_test_table_version.format_version == 1
+
+    with table_test_table_version.transaction() as transaction:
+        transaction.upgrade_table_version(format_version=2)
+
+    assert table_test_table_version.format_version == 2
+
+    with pytest.raises(ValueError) as e:  # type: ignore
+        with table_test_table_version.transaction() as transaction:
+            transaction.upgrade_table_version(format_version=1)
+    assert "Cannot downgrade v2 table to v1" in str(e.value)
+
+    with pytest.raises(ValueError) as e:
+        with table_test_table_version.transaction() as transaction:
+            transaction.upgrade_table_version(format_version=3)
+    assert "Unsupported table format version: 3" in str(e.value)

Reply via email to