This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e5cd328 Feat/update sort order (#2552)
7e5cd328 is described below

commit 7e5cd32890b8206cbea7fbfc1b9ad49b59f4a8c2
Author: Fokko Driesprong <[email protected]>
AuthorDate: Fri Oct 3 05:16:08 2025 +0200

    Feat/update sort order (#2552)
    
    # Rationale for this change
    
    Fixed the tests in https://github.com/apache/iceberg-python/pull/1500,
    but kept the commits so @JasperHG90 gets all the credits for the great
    work!
    
    Closes #1500
    Closes #1245
    
    ## Are these changes tested?
    
    ## Are there any user-facing changes?
    
    <!-- In the case of user-facing changes, please add the changelog label.
    -->
    
    ---------
    
    Co-authored-by: Jasper Ginn <[email protected]>
---
 mkdocs/docs/api.md                          |  18 +++
 pyiceberg/table/__init__.py                 |  29 ++++-
 pyiceberg/table/update/sorting.py           | 136 +++++++++++++++++++++++
 tests/integration/test_sort_order_update.py | 166 ++++++++++++++++++++++++++++
 4 files changed, 348 insertions(+), 1 deletion(-)

diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md
index 0e0dc375..05a4db92 100644
--- a/mkdocs/docs/api.md
+++ b/mkdocs/docs/api.md
@@ -1268,6 +1268,24 @@ with table.update_spec() as update:
     update.rename_field("bucketed_id", "sharded_id")
 ```
 
+## Sort order updates
+
+Users can update the sort order on existing tables for new data. See 
[sorting](https://iceberg.apache.org/spec/#sorting) for more details.
+
+The API to use when updating a sort order is the `update_sort_order` API on 
the table.
+
+Sort orders can only be updated by adding a new sort order. They cannot be 
deleted or modified.
+
+### Updating a sort order on a table
+
+To create a new sort order, you can use either the `asc` or `desc` API 
depending on whether you want you data sorted in ascending or descending order. 
Both take the name of the field, the sort order transform, and a null order 
that describes the order of null values when sorted.
+
+```python
+with table.update_sort_order() as update:
+    update.desc("event_ts", DayTransform(), NullOrder.NULLS_FIRST)
+    update.asc("some_field", IdentityTransform(), NullOrder.NULLS_LAST)
+```
+
 ## Table properties
 
 Set and remove properties through the `Transaction` API:
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 6e0f79a0..972efc8c 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -116,7 +116,12 @@ from pyiceberg.table.update import (
     update_table_metadata,
 )
 from pyiceberg.table.update.schema import UpdateSchema
-from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, 
_FastAppendFiles
+from pyiceberg.table.update.snapshot import (
+    ManageSnapshots,
+    UpdateSnapshot,
+    _FastAppendFiles,
+)
+from pyiceberg.table.update.sorting import UpdateSortOrder
 from pyiceberg.table.update.spec import UpdateSpec
 from pyiceberg.table.update.statistics import UpdateStatistics
 from pyiceberg.transforms import IdentityTransform
@@ -436,6 +441,20 @@ class Transaction:
             name_mapping=self.table_metadata.name_mapping(),
         )
 
+    def update_sort_order(self, case_sensitive: bool = True) -> 
UpdateSortOrder:
+        """Create a new UpdateSortOrder to update the sort order of this table.
+
+        Args:
+            case_sensitive: If field names are case-sensitive.
+
+        Returns:
+            A new UpdateSortOrder.
+        """
+        return UpdateSortOrder(
+            self,
+            case_sensitive=case_sensitive,
+        )
+
     def update_snapshot(
         self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: 
Optional[str] = MAIN_BRANCH
     ) -> UpdateSnapshot:
@@ -1298,6 +1317,14 @@ class Table:
             name_mapping=self.name_mapping(),
         )
 
+    def update_sort_order(self, case_sensitive: bool = True) -> 
UpdateSortOrder:
+        """Create a new UpdateSortOrder to update the sort order of this table.
+
+        Returns:
+            A new UpdateSortOrder.
+        """
+        return UpdateSortOrder(transaction=Transaction(self, autocommit=True), 
case_sensitive=case_sensitive)
+
     def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
diff --git a/pyiceberg/table/update/sorting.py 
b/pyiceberg/table/update/sorting.py
new file mode 100644
index 00000000..a356229f
--- /dev/null
+++ b/pyiceberg/table/update/sorting.py
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, List, Optional, Tuple
+
+from pyiceberg.table.sorting import INITIAL_SORT_ORDER_ID, 
UNSORTED_SORT_ORDER, NullOrder, SortDirection, SortField, SortOrder
+from pyiceberg.table.update import (
+    AddSortOrderUpdate,
+    AssertDefaultSortOrderId,
+    SetDefaultSortOrderUpdate,
+    TableRequirement,
+    TableUpdate,
+    UpdatesAndRequirements,
+    UpdateTableMetadata,
+)
+from pyiceberg.transforms import Transform
+
+if TYPE_CHECKING:
+    from pyiceberg.table import Transaction
+
+
+class UpdateSortOrder(UpdateTableMetadata["UpdateSortOrder"]):
+    _transaction: Transaction
+    _last_assigned_order_id: Optional[int]
+    _case_sensitive: bool
+    _fields: List[SortField]
+
+    def __init__(self, transaction: Transaction, case_sensitive: bool = True) 
-> None:
+        super().__init__(transaction)
+        self._fields: List[SortField] = []
+        self._case_sensitive: bool = case_sensitive
+        self._last_assigned_order_id: Optional[int] = None
+
+    def _column_name_to_id(self, column_name: str) -> int:
+        """Map the column name to the column field id."""
+        return (
+            self._transaction.table_metadata.schema()
+            .find_field(
+                name_or_id=column_name,
+                case_sensitive=self._case_sensitive,
+            )
+            .field_id
+        )
+
+    def _add_sort_field(
+        self,
+        source_id: int,
+        transform: Transform[Any, Any],
+        direction: SortDirection,
+        null_order: NullOrder,
+    ) -> UpdateSortOrder:
+        """Add a sort field to the sort order list."""
+        self._fields.append(
+            SortField(
+                source_id=source_id,
+                transform=transform,
+                direction=direction,
+                null_order=null_order,
+            )
+        )
+        return self
+
+    def _reuse_or_create_sort_order_id(self) -> int:
+        """Return the last assigned sort order id or create a new one."""
+        new_sort_order_id = INITIAL_SORT_ORDER_ID
+        for sort_order in self._transaction.table_metadata.sort_orders:
+            new_sort_order_id = max(new_sort_order_id, sort_order.order_id)
+            if sort_order.fields == self._fields:
+                return sort_order.order_id
+            elif new_sort_order_id <= sort_order.order_id:
+                new_sort_order_id = sort_order.order_id + 1
+        return new_sort_order_id
+
+    def asc(
+        self, source_column_name: str, transform: Transform[Any, Any], 
null_order: NullOrder = NullOrder.NULLS_LAST
+    ) -> UpdateSortOrder:
+        """Add a sort field with ascending order."""
+        return self._add_sort_field(
+            source_id=self._column_name_to_id(source_column_name),
+            transform=transform,
+            direction=SortDirection.ASC,
+            null_order=null_order,
+        )
+
+    def desc(
+        self, source_column_name: str, transform: Transform[Any, Any], 
null_order: NullOrder = NullOrder.NULLS_LAST
+    ) -> UpdateSortOrder:
+        """Add a sort field with descending order."""
+        return self._add_sort_field(
+            source_id=self._column_name_to_id(source_column_name),
+            transform=transform,
+            direction=SortDirection.DESC,
+            null_order=null_order,
+        )
+
+    def _apply(self) -> SortOrder:
+        """Return the sort order."""
+        if next(iter(self._fields), None) is None:
+            return UNSORTED_SORT_ORDER
+        else:
+            return SortOrder(*self._fields, 
order_id=self._reuse_or_create_sort_order_id())
+
+    def _commit(self) -> UpdatesAndRequirements:
+        """Apply the pending changes and commit."""
+        new_sort_order = self._apply()
+        requirements: Tuple[TableRequirement, ...] = ()
+        updates: Tuple[TableUpdate, ...] = ()
+
+        if (
+            self._transaction.table_metadata.default_sort_order_id != 
new_sort_order.order_id
+            and 
self._transaction.table_metadata.sort_order_by_id(new_sort_order.order_id) is 
None
+        ):
+            self._last_assigned_order_id = new_sort_order.order_id
+            updates = (AddSortOrderUpdate(sort_order=new_sort_order), 
SetDefaultSortOrderUpdate(sort_order_id=-1))
+        else:
+            updates = 
(SetDefaultSortOrderUpdate(sort_order_id=new_sort_order.order_id),)
+
+        required_last_assigned_sort_order_id = 
self._transaction.table_metadata.default_sort_order_id
+        requirements = 
(AssertDefaultSortOrderId(default_sort_order_id=required_last_assigned_sort_order_id),)
+
+        return updates, requirements
diff --git a/tests/integration/test_sort_order_update.py 
b/tests/integration/test_sort_order_update.py
new file mode 100644
index 00000000..548c6692
--- /dev/null
+++ b/tests/integration/test_sort_order_update.py
@@ -0,0 +1,166 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+
+import pytest
+
+from pyiceberg.catalog import Catalog
+from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.schema import Schema
+from pyiceberg.table import Table
+from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, 
SortOrder
+from pyiceberg.transforms import (
+    IdentityTransform,
+)
+
+
+def _simple_table(catalog: Catalog, table_schema_simple: Schema, 
format_version: str) -> Table:
+    return _create_table_with_schema(catalog, table_schema_simple, 
format_version)
+
+
+def _create_table_with_schema(catalog: Catalog, schema: Schema, 
format_version: str) -> Table:
+    tbl_name = "default.test_schema_evolution"
+    try:
+        catalog.drop_table(tbl_name)
+    except NoSuchTableError:
+        pass
+    return catalog.create_table(identifier=tbl_name, schema=schema, 
properties={"format-version": format_version})
+
+
[email protected]
[email protected](
+    "catalog, format_version",
+    [
+        (pytest.lazy_fixture("session_catalog"), "1"),
+        (pytest.lazy_fixture("session_catalog_hive"), "1"),
+        (pytest.lazy_fixture("session_catalog"), "2"),
+        (pytest.lazy_fixture("session_catalog_hive"), "2"),
+    ],
+)
+def test_map_column_name_to_id(catalog: Catalog, format_version: str, 
table_schema_simple: Schema) -> None:
+    simple_table = _simple_table(catalog, table_schema_simple, format_version)
+    for col_name, col_id in {"foo": 1, "bar": 2, "baz": 3}.items():
+        assert col_id == 
simple_table.update_sort_order()._column_name_to_id(col_name)
+
+
[email protected]
[email protected](
+    "catalog, format_version",
+    [
+        (pytest.lazy_fixture("session_catalog"), "1"),
+        (pytest.lazy_fixture("session_catalog_hive"), "1"),
+        (pytest.lazy_fixture("session_catalog"), "2"),
+        (pytest.lazy_fixture("session_catalog_hive"), "2"),
+    ],
+)
+def test_update_sort_order(catalog: Catalog, format_version: str, 
table_schema_simple: Schema) -> None:
+    simple_table = _simple_table(catalog, table_schema_simple, format_version)
+    simple_table.update_sort_order().asc("foo", IdentityTransform(), 
NullOrder.NULLS_FIRST).desc(
+        "bar", IdentityTransform(), NullOrder.NULLS_LAST
+    ).commit()
+    assert simple_table.sort_order() == SortOrder(
+        SortField(source_id=1, transform=IdentityTransform(), 
direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
+        SortField(source_id=2, transform=IdentityTransform(), 
direction=SortDirection.DESC, null_order=NullOrder.NULLS_LAST),
+        order_id=1,
+    )
+
+
[email protected]
[email protected](
+    "catalog, format_version",
+    [
+        (pytest.lazy_fixture("session_catalog"), "1"),
+        (pytest.lazy_fixture("session_catalog_hive"), "1"),
+        (pytest.lazy_fixture("session_catalog"), "2"),
+        (pytest.lazy_fixture("session_catalog_hive"), "2"),
+    ],
+)
+def test_increment_existing_sort_order_id(catalog: Catalog, format_version: 
str, table_schema_simple: Schema) -> None:
+    simple_table = _simple_table(catalog, table_schema_simple, format_version)
+    simple_table.update_sort_order().asc("foo", IdentityTransform(), 
NullOrder.NULLS_FIRST).commit()
+    assert simple_table.sort_order() == SortOrder(
+        SortField(source_id=1, transform=IdentityTransform(), 
direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
+        order_id=1,
+    )
+    simple_table.update_sort_order().asc("foo", IdentityTransform(), 
NullOrder.NULLS_LAST).desc(
+        "bar", IdentityTransform(), NullOrder.NULLS_FIRST
+    ).commit()
+    assert (
+        len(simple_table.sort_orders()) == 3
+    )  # 0: empty sort order from creating tables, 1: first sort order, 2: 
second sort order
+    assert simple_table.sort_order() == SortOrder(
+        SortField(source_id=1, transform=IdentityTransform(), 
direction=SortDirection.ASC, null_order=NullOrder.NULLS_LAST),
+        SortField(source_id=2, transform=IdentityTransform(), 
direction=SortDirection.DESC, null_order=NullOrder.NULLS_FIRST),
+        order_id=2,
+    )
+
+
[email protected]
[email protected](
+    "catalog, format_version",
+    [
+        (pytest.lazy_fixture("session_catalog"), "1"),
+        (pytest.lazy_fixture("session_catalog_hive"), "1"),
+        (pytest.lazy_fixture("session_catalog"), "2"),
+        (pytest.lazy_fixture("session_catalog_hive"), "2"),
+    ],
+)
+def test_update_existing_sort_order(catalog: Catalog, format_version: str, 
table_schema_simple: Schema) -> None:
+    simple_table = _simple_table(catalog, table_schema_simple, format_version)
+    simple_table.update_sort_order().asc("foo", IdentityTransform(), 
NullOrder.NULLS_FIRST).commit()
+    assert simple_table.sort_order() == SortOrder(
+        SortField(source_id=1, transform=IdentityTransform(), 
direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
+        order_id=1,
+    )
+    simple_table.update_sort_order().asc("foo", IdentityTransform(), 
NullOrder.NULLS_LAST).desc(
+        "bar", IdentityTransform(), NullOrder.NULLS_FIRST
+    ).commit()
+    # Go back to the first sort order
+    simple_table.update_sort_order().asc("foo", IdentityTransform(), 
NullOrder.NULLS_FIRST).commit()
+    assert (
+        len(simple_table.sort_orders()) == 3
+    )  # line 133 should not create a new sort order since it is the same as 
the first one
+    assert simple_table.sort_order() == SortOrder(
+        SortField(source_id=1, transform=IdentityTransform(), 
direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
+        order_id=1,
+    )
+
+
[email protected]
[email protected](
+    "catalog, format_version",
+    [
+        (pytest.lazy_fixture("session_catalog"), "1"),
+        (pytest.lazy_fixture("session_catalog_hive"), "1"),
+        (pytest.lazy_fixture("session_catalog"), "2"),
+        (pytest.lazy_fixture("session_catalog_hive"), "2"),
+    ],
+)
+def test_update_existing_sort_order_with_unsorted_sort_order(
+    catalog: Catalog, format_version: str, table_schema_simple: Schema
+) -> None:
+    simple_table = _simple_table(catalog, table_schema_simple, format_version)
+    simple_table.update_sort_order().asc("foo", IdentityTransform(), 
NullOrder.NULLS_FIRST).commit()
+    assert simple_table.sort_order() == SortOrder(
+        SortField(source_id=1, transform=IdentityTransform(), 
direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
+        order_id=1,
+    )
+    # Table should now be unsorted
+    simple_table.update_sort_order().commit()
+    # Go back to the first sort order
+    assert len(simple_table.sort_orders()) == 2
+    assert simple_table.sort_order() == SortOrder(order_id=0)

Reply via email to