This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 7e5cd328 Feat/update sort order (#2552)
7e5cd328 is described below
commit 7e5cd32890b8206cbea7fbfc1b9ad49b59f4a8c2
Author: Fokko Driesprong <[email protected]>
AuthorDate: Fri Oct 3 05:16:08 2025 +0200
Feat/update sort order (#2552)
# Rationale for this change
Fixed the tests in https://github.com/apache/iceberg-python/pull/1500,
but kept the commits so @JasperHG90 gets all the credits for the great
work!
Closes #1500
Closes #1245
## Are these changes tested?
## Are there any user-facing changes?
<!-- In the case of user-facing changes, please add the changelog label.
-->
---------
Co-authored-by: Jasper Ginn <[email protected]>
---
mkdocs/docs/api.md | 18 +++
pyiceberg/table/__init__.py | 29 ++++-
pyiceberg/table/update/sorting.py | 136 +++++++++++++++++++++++
tests/integration/test_sort_order_update.py | 166 ++++++++++++++++++++++++++++
4 files changed, 348 insertions(+), 1 deletion(-)
diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 0e0dc375..05a4db92 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -1268,6 +1268,24 @@ with table.update_spec() as update:
update.rename_field("bucketed_id", "sharded_id")
```
+## Sort order updates
+
+Users can update the sort order on existing tables for new data. See
[sorting](https://iceberg.apache.org/spec/#sorting) for more details.
+
+The API to use when updating a sort order is the `update_sort_order` API on
the table.
+
+Sort orders can only be updated by adding a new sort order. They cannot be
deleted or modified.
+
+### Updating a sort order on a table
+
+To create a new sort order, you can use either the `asc` or `desc` API
depending on whether you want you data sorted in ascending or descending order.
Both take the name of the field, the sort order transform, and a null order
that describes the order of null values when sorted.
+
+```python
+with table.update_sort_order() as update:
+ update.desc("event_ts", DayTransform(), NullOrder.NULLS_FIRST)
+ update.asc("some_field", IdentityTransform(), NullOrder.NULLS_LAST)
+```
+
## Table properties
Set and remove properties through the `Transaction` API:
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 6e0f79a0..972efc8c 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -116,7 +116,12 @@ from pyiceberg.table.update import (
update_table_metadata,
)
from pyiceberg.table.update.schema import UpdateSchema
-from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot,
_FastAppendFiles
+from pyiceberg.table.update.snapshot import (
+ ManageSnapshots,
+ UpdateSnapshot,
+ _FastAppendFiles,
+)
+from pyiceberg.table.update.sorting import UpdateSortOrder
from pyiceberg.table.update.spec import UpdateSpec
from pyiceberg.table.update.statistics import UpdateStatistics
from pyiceberg.transforms import IdentityTransform
@@ -436,6 +441,20 @@ class Transaction:
name_mapping=self.table_metadata.name_mapping(),
)
+ def update_sort_order(self, case_sensitive: bool = True) ->
UpdateSortOrder:
+ """Create a new UpdateSortOrder to update the sort order of this table.
+
+ Args:
+ case_sensitive: If field names are case-sensitive.
+
+ Returns:
+ A new UpdateSortOrder.
+ """
+ return UpdateSortOrder(
+ self,
+ case_sensitive=case_sensitive,
+ )
+
def update_snapshot(
self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch:
Optional[str] = MAIN_BRANCH
) -> UpdateSnapshot:
@@ -1298,6 +1317,14 @@ class Table:
name_mapping=self.name_mapping(),
)
+ def update_sort_order(self, case_sensitive: bool = True) ->
UpdateSortOrder:
+ """Create a new UpdateSortOrder to update the sort order of this table.
+
+ Returns:
+ A new UpdateSortOrder.
+ """
+ return UpdateSortOrder(transaction=Transaction(self, autocommit=True),
case_sensitive=case_sensitive)
+
def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
return self.metadata.name_mapping()
diff --git a/pyiceberg/table/update/sorting.py
b/pyiceberg/table/update/sorting.py
new file mode 100644
index 00000000..a356229f
--- /dev/null
+++ b/pyiceberg/table/update/sorting.py
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, List, Optional, Tuple
+
+from pyiceberg.table.sorting import INITIAL_SORT_ORDER_ID,
UNSORTED_SORT_ORDER, NullOrder, SortDirection, SortField, SortOrder
+from pyiceberg.table.update import (
+ AddSortOrderUpdate,
+ AssertDefaultSortOrderId,
+ SetDefaultSortOrderUpdate,
+ TableRequirement,
+ TableUpdate,
+ UpdatesAndRequirements,
+ UpdateTableMetadata,
+)
+from pyiceberg.transforms import Transform
+
+if TYPE_CHECKING:
+ from pyiceberg.table import Transaction
+
+
+class UpdateSortOrder(UpdateTableMetadata["UpdateSortOrder"]):
+ _transaction: Transaction
+ _last_assigned_order_id: Optional[int]
+ _case_sensitive: bool
+ _fields: List[SortField]
+
+ def __init__(self, transaction: Transaction, case_sensitive: bool = True)
-> None:
+ super().__init__(transaction)
+ self._fields: List[SortField] = []
+ self._case_sensitive: bool = case_sensitive
+ self._last_assigned_order_id: Optional[int] = None
+
+ def _column_name_to_id(self, column_name: str) -> int:
+ """Map the column name to the column field id."""
+ return (
+ self._transaction.table_metadata.schema()
+ .find_field(
+ name_or_id=column_name,
+ case_sensitive=self._case_sensitive,
+ )
+ .field_id
+ )
+
+ def _add_sort_field(
+ self,
+ source_id: int,
+ transform: Transform[Any, Any],
+ direction: SortDirection,
+ null_order: NullOrder,
+ ) -> UpdateSortOrder:
+ """Add a sort field to the sort order list."""
+ self._fields.append(
+ SortField(
+ source_id=source_id,
+ transform=transform,
+ direction=direction,
+ null_order=null_order,
+ )
+ )
+ return self
+
+ def _reuse_or_create_sort_order_id(self) -> int:
+ """Return the last assigned sort order id or create a new one."""
+ new_sort_order_id = INITIAL_SORT_ORDER_ID
+ for sort_order in self._transaction.table_metadata.sort_orders:
+ new_sort_order_id = max(new_sort_order_id, sort_order.order_id)
+ if sort_order.fields == self._fields:
+ return sort_order.order_id
+ elif new_sort_order_id <= sort_order.order_id:
+ new_sort_order_id = sort_order.order_id + 1
+ return new_sort_order_id
+
+ def asc(
+ self, source_column_name: str, transform: Transform[Any, Any],
null_order: NullOrder = NullOrder.NULLS_LAST
+ ) -> UpdateSortOrder:
+ """Add a sort field with ascending order."""
+ return self._add_sort_field(
+ source_id=self._column_name_to_id(source_column_name),
+ transform=transform,
+ direction=SortDirection.ASC,
+ null_order=null_order,
+ )
+
+ def desc(
+ self, source_column_name: str, transform: Transform[Any, Any],
null_order: NullOrder = NullOrder.NULLS_LAST
+ ) -> UpdateSortOrder:
+ """Add a sort field with descending order."""
+ return self._add_sort_field(
+ source_id=self._column_name_to_id(source_column_name),
+ transform=transform,
+ direction=SortDirection.DESC,
+ null_order=null_order,
+ )
+
+ def _apply(self) -> SortOrder:
+ """Return the sort order."""
+ if next(iter(self._fields), None) is None:
+ return UNSORTED_SORT_ORDER
+ else:
+ return SortOrder(*self._fields,
order_id=self._reuse_or_create_sort_order_id())
+
+ def _commit(self) -> UpdatesAndRequirements:
+ """Apply the pending changes and commit."""
+ new_sort_order = self._apply()
+ requirements: Tuple[TableRequirement, ...] = ()
+ updates: Tuple[TableUpdate, ...] = ()
+
+ if (
+ self._transaction.table_metadata.default_sort_order_id !=
new_sort_order.order_id
+ and
self._transaction.table_metadata.sort_order_by_id(new_sort_order.order_id) is
None
+ ):
+ self._last_assigned_order_id = new_sort_order.order_id
+ updates = (AddSortOrderUpdate(sort_order=new_sort_order),
SetDefaultSortOrderUpdate(sort_order_id=-1))
+ else:
+ updates =
(SetDefaultSortOrderUpdate(sort_order_id=new_sort_order.order_id),)
+
+ required_last_assigned_sort_order_id =
self._transaction.table_metadata.default_sort_order_id
+ requirements =
(AssertDefaultSortOrderId(default_sort_order_id=required_last_assigned_sort_order_id),)
+
+ return updates, requirements
diff --git a/tests/integration/test_sort_order_update.py
b/tests/integration/test_sort_order_update.py
new file mode 100644
index 00000000..548c6692
--- /dev/null
+++ b/tests/integration/test_sort_order_update.py
@@ -0,0 +1,166 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+
+import pytest
+
+from pyiceberg.catalog import Catalog
+from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.schema import Schema
+from pyiceberg.table import Table
+from pyiceberg.table.sorting import NullOrder, SortDirection, SortField,
SortOrder
+from pyiceberg.transforms import (
+ IdentityTransform,
+)
+
+
+def _simple_table(catalog: Catalog, table_schema_simple: Schema,
format_version: str) -> Table:
+ return _create_table_with_schema(catalog, table_schema_simple,
format_version)
+
+
+def _create_table_with_schema(catalog: Catalog, schema: Schema,
format_version: str) -> Table:
+ tbl_name = "default.test_schema_evolution"
+ try:
+ catalog.drop_table(tbl_name)
+ except NoSuchTableError:
+ pass
+ return catalog.create_table(identifier=tbl_name, schema=schema,
properties={"format-version": format_version})
+
+
[email protected]
[email protected](
+ "catalog, format_version",
+ [
+ (pytest.lazy_fixture("session_catalog"), "1"),
+ (pytest.lazy_fixture("session_catalog_hive"), "1"),
+ (pytest.lazy_fixture("session_catalog"), "2"),
+ (pytest.lazy_fixture("session_catalog_hive"), "2"),
+ ],
+)
+def test_map_column_name_to_id(catalog: Catalog, format_version: str,
table_schema_simple: Schema) -> None:
+ simple_table = _simple_table(catalog, table_schema_simple, format_version)
+ for col_name, col_id in {"foo": 1, "bar": 2, "baz": 3}.items():
+ assert col_id ==
simple_table.update_sort_order()._column_name_to_id(col_name)
+
+
[email protected]
[email protected](
+ "catalog, format_version",
+ [
+ (pytest.lazy_fixture("session_catalog"), "1"),
+ (pytest.lazy_fixture("session_catalog_hive"), "1"),
+ (pytest.lazy_fixture("session_catalog"), "2"),
+ (pytest.lazy_fixture("session_catalog_hive"), "2"),
+ ],
+)
+def test_update_sort_order(catalog: Catalog, format_version: str,
table_schema_simple: Schema) -> None:
+ simple_table = _simple_table(catalog, table_schema_simple, format_version)
+ simple_table.update_sort_order().asc("foo", IdentityTransform(),
NullOrder.NULLS_FIRST).desc(
+ "bar", IdentityTransform(), NullOrder.NULLS_LAST
+ ).commit()
+ assert simple_table.sort_order() == SortOrder(
+ SortField(source_id=1, transform=IdentityTransform(),
direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
+ SortField(source_id=2, transform=IdentityTransform(),
direction=SortDirection.DESC, null_order=NullOrder.NULLS_LAST),
+ order_id=1,
+ )
+
+
[email protected]
[email protected](
+ "catalog, format_version",
+ [
+ (pytest.lazy_fixture("session_catalog"), "1"),
+ (pytest.lazy_fixture("session_catalog_hive"), "1"),
+ (pytest.lazy_fixture("session_catalog"), "2"),
+ (pytest.lazy_fixture("session_catalog_hive"), "2"),
+ ],
+)
+def test_increment_existing_sort_order_id(catalog: Catalog, format_version:
str, table_schema_simple: Schema) -> None:
+ simple_table = _simple_table(catalog, table_schema_simple, format_version)
+ simple_table.update_sort_order().asc("foo", IdentityTransform(),
NullOrder.NULLS_FIRST).commit()
+ assert simple_table.sort_order() == SortOrder(
+ SortField(source_id=1, transform=IdentityTransform(),
direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
+ order_id=1,
+ )
+ simple_table.update_sort_order().asc("foo", IdentityTransform(),
NullOrder.NULLS_LAST).desc(
+ "bar", IdentityTransform(), NullOrder.NULLS_FIRST
+ ).commit()
+ assert (
+ len(simple_table.sort_orders()) == 3
+ ) # 0: empty sort order from creating tables, 1: first sort order, 2:
second sort order
+ assert simple_table.sort_order() == SortOrder(
+ SortField(source_id=1, transform=IdentityTransform(),
direction=SortDirection.ASC, null_order=NullOrder.NULLS_LAST),
+ SortField(source_id=2, transform=IdentityTransform(),
direction=SortDirection.DESC, null_order=NullOrder.NULLS_FIRST),
+ order_id=2,
+ )
+
+
[email protected]
[email protected](
+ "catalog, format_version",
+ [
+ (pytest.lazy_fixture("session_catalog"), "1"),
+ (pytest.lazy_fixture("session_catalog_hive"), "1"),
+ (pytest.lazy_fixture("session_catalog"), "2"),
+ (pytest.lazy_fixture("session_catalog_hive"), "2"),
+ ],
+)
+def test_update_existing_sort_order(catalog: Catalog, format_version: str,
table_schema_simple: Schema) -> None:
+ simple_table = _simple_table(catalog, table_schema_simple, format_version)
+ simple_table.update_sort_order().asc("foo", IdentityTransform(),
NullOrder.NULLS_FIRST).commit()
+ assert simple_table.sort_order() == SortOrder(
+ SortField(source_id=1, transform=IdentityTransform(),
direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
+ order_id=1,
+ )
+ simple_table.update_sort_order().asc("foo", IdentityTransform(),
NullOrder.NULLS_LAST).desc(
+ "bar", IdentityTransform(), NullOrder.NULLS_FIRST
+ ).commit()
+ # Go back to the first sort order
+ simple_table.update_sort_order().asc("foo", IdentityTransform(),
NullOrder.NULLS_FIRST).commit()
+ assert (
+ len(simple_table.sort_orders()) == 3
+ ) # line 133 should not create a new sort order since it is the same as
the first one
+ assert simple_table.sort_order() == SortOrder(
+ SortField(source_id=1, transform=IdentityTransform(),
direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
+ order_id=1,
+ )
+
+
[email protected]
[email protected](
+ "catalog, format_version",
+ [
+ (pytest.lazy_fixture("session_catalog"), "1"),
+ (pytest.lazy_fixture("session_catalog_hive"), "1"),
+ (pytest.lazy_fixture("session_catalog"), "2"),
+ (pytest.lazy_fixture("session_catalog_hive"), "2"),
+ ],
+)
+def test_update_existing_sort_order_with_unsorted_sort_order(
+ catalog: Catalog, format_version: str, table_schema_simple: Schema
+) -> None:
+ simple_table = _simple_table(catalog, table_schema_simple, format_version)
+ simple_table.update_sort_order().asc("foo", IdentityTransform(),
NullOrder.NULLS_FIRST).commit()
+ assert simple_table.sort_order() == SortOrder(
+ SortField(source_id=1, transform=IdentityTransform(),
direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
+ order_id=1,
+ )
+ # Table should now be unsorted
+ simple_table.update_sort_order().commit()
+ # Go back to the first sort order
+ assert len(simple_table.sort_orders()) == 2
+ assert simple_table.sort_order() == SortOrder(order_id=0)