This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 50a1a69 Add logic for table format-version updates (#55)
50a1a69 is described below
commit 50a1a693944829f8bec592ee049c64287100b3df
Author: Fokko Driesprong <[email protected]>
AuthorDate: Tue Oct 10 12:48:26 2023 +0200
Add logic for table format-version updates (#55)
* Add logic for table format-version updates
Add a few more tests
* Set -> Upgrade
---
dev/provision.py | 22 ++++++++++++++++++++++
dev/spark-defaults.conf | 2 +-
pyiceberg/table/__init__.py | 16 ++++++++++++++--
tests/test_integration.py | 30 ++++++++++++++++++++++++++++++
4 files changed, 67 insertions(+), 3 deletions(-)
diff --git a/dev/provision.py b/dev/provision.py
index 56e3459..b75030f 100644
--- a/dev/provision.py
+++ b/dev/provision.py
@@ -279,3 +279,25 @@ for table_name, partition in [
(CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS timestamp),
12, 'l');
"""
)
+
+# There is an issue with CREATE OR REPLACE
+# https://github.com/apache/iceberg/issues/8756
+spark.sql(
+ """
+DROP TABLE IF EXISTS default.test_table_version
+"""
+)
+
+spark.sql(
+ """
+CREATE TABLE default.test_table_version (
+ dt date,
+ number integer,
+ letter string
+)
+USING iceberg
+TBLPROPERTIES (
+ 'format-version'='1'
+);
+"""
+)
diff --git a/dev/spark-defaults.conf b/dev/spark-defaults.conf
index 28f93b1..56c3454 100644
--- a/dev/spark-defaults.conf
+++ b/dev/spark-defaults.conf
@@ -20,7 +20,7 @@ spark.sql.catalog.demo
org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.demo.type rest
spark.sql.catalog.demo.uri http://rest:8181
spark.sql.catalog.demo.io-impl org.apache.iceberg.aws.s3.S3FileIO
-spark.sql.catalog.demo.warehouse s3a://warehouse/wh/
+spark.sql.catalog.demo.warehouse s3://warehouse/wh/
spark.sql.catalog.demo.s3.endpoint http://minio:9000
spark.sql.defaultCatalog demo
spark.eventLog.enabled true
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index ebdca60..c171b39 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -166,7 +166,7 @@ class Transaction:
self._requirements = self._requirements + new_requirements
return self
- def set_table_version(self, format_version: Literal[1, 2]) -> Transaction:
+ def upgrade_table_version(self, format_version: Literal[1, 2]) ->
Transaction:
"""Set the table to a certain version.
Args:
@@ -175,7 +175,15 @@ class Transaction:
Returns:
The alter table builder.
"""
- raise NotImplementedError("Not yet implemented")
+ if format_version not in {1, 2}:
+ raise ValueError(f"Unsupported table format version:
{format_version}")
+
+ if format_version < self._table.metadata.format_version:
+ raise ValueError(f"Cannot downgrade
v{self._table.metadata.format_version} table to v{format_version}")
+ if format_version > self._table.metadata.format_version:
+ return
self._append_updates(UpgradeFormatVersionUpdate(format_version=format_version))
+ else:
+ return self
def set_properties(self, **updates: str) -> Transaction:
"""Set properties.
@@ -482,6 +490,10 @@ class Table:
limit=limit,
)
+ @property
+ def format_version(self) -> Literal[1, 2]:
+ return self.metadata.format_version
+
def schema(self) -> Schema:
"""Return the schema for this table."""
return next(schema for schema in self.metadata.schemas if
schema.schema_id == self.metadata.current_schema_id)
diff --git a/tests/test_integration.py b/tests/test_integration.py
index 6d777cf..6e874b6 100644
--- a/tests/test_integration.py
+++ b/tests/test_integration.py
@@ -83,6 +83,11 @@ def table_test_all_types(catalog: Catalog) -> Table:
return catalog.load_table("default.test_all_types")
[email protected]()
+def table_test_table_version(catalog: Catalog) -> Table:
+ return catalog.load_table("default.test_table_version")
+
+
TABLE_NAME = ("default", "t1")
@@ -366,3 +371,28 @@ def test_scan_tag(test_positional_mor_deletes: Table) ->
None:
def test_scan_branch(test_positional_mor_deletes: Table) -> None:
arrow_table =
test_positional_mor_deletes.scan().use_ref("without_5").to_arrow()
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10,
11, 12]
+
+
[email protected]
+def test_upgrade_table_version(table_test_table_version: Table) -> None:
+ assert table_test_table_version.format_version == 1
+
+ with table_test_table_version.transaction() as transaction:
+ transaction.upgrade_table_version(format_version=1)
+
+ assert table_test_table_version.format_version == 1
+
+ with table_test_table_version.transaction() as transaction:
+ transaction.upgrade_table_version(format_version=2)
+
+ assert table_test_table_version.format_version == 2
+
+ with pytest.raises(ValueError) as e: # type: ignore
+ with table_test_table_version.transaction() as transaction:
+ transaction.upgrade_table_version(format_version=1)
+ assert "Cannot downgrade v2 table to v1" in str(e.value)
+
+ with pytest.raises(ValueError) as e:
+ with table_test_table_version.transaction() as transaction:
+ transaction.upgrade_table_version(format_version=3)
+ assert "Unsupported table format version: 3" in str(e.value)