This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f221bf4a90 [python] Do not abort on commit failure (#7232)
f221bf4a90 is described below

commit f221bf4a90f2fa0e575b06e590bcda82ad6b6da2
Author: XiaoHongbo <[email protected]>
AuthorDate: Sun Feb 8 19:52:46 2026 +0800

    [python] Do not abort on commit failure (#7232)
    
    TableCommit._commit called abort() on any commit failure, which could
    delete committed data when the server had already committed but the
    client received an error (e.g. timeout, close failure).
---
 paimon-python/pypaimon/tests/ray_sink_test.py      |  6 +--
 .../rest/rest_catalog_commit_snapshot_test.py      | 46 ++++++++++++++++++++++
 paimon-python/pypaimon/write/ray_datasink.py       | 19 +--------
 paimon-python/pypaimon/write/table_commit.py       | 26 ++++++------
 4 files changed, 61 insertions(+), 36 deletions(-)

diff --git a/paimon-python/pypaimon/tests/ray_sink_test.py 
b/paimon-python/pypaimon/tests/ray_sink_test.py
index bc1ff3ef9f..8bd830ff9e 100644
--- a/paimon-python/pypaimon/tests/ray_sink_test.py
+++ b/paimon-python/pypaimon/tests/ray_sink_test.py
@@ -250,7 +250,7 @@ class RaySinkTest(unittest.TestCase):
         self.assertEqual(len(commit_args), 2)  # Empty message filtered out
         mock_commit.close.assert_called_once()
 
-        # Test commit failure: abort should be called
+        # Test commit failure: abort should not be called
         datasink = PaimonDatasink(self.table, overwrite=False)
         datasink.on_write_start()
         commit_msg1 = Mock(spec=CommitMessage)
@@ -271,9 +271,7 @@ class RaySinkTest(unittest.TestCase):
         with self.assertRaises(Exception):
             datasink.on_write_complete(write_result)
 
-        mock_commit.abort.assert_called_once()
-        abort_args = mock_commit.abort.call_args[0][0]
-        self.assertEqual(len(abort_args), 2)
+        mock_commit.abort.assert_not_called()
         mock_commit.close.assert_called_once()
 
         # Test table_commit creation failure
diff --git 
a/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py 
b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
index e9f95d3d0b..998b169084 100644
--- a/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
@@ -22,6 +22,9 @@ import time
 import unittest
 from unittest.mock import Mock, patch
 
+import pyarrow as pa
+
+from pypaimon import Schema
 from pypaimon.api.api_response import CommitTableResponse
 from pypaimon.common.options import Options
 from pypaimon.api.rest_exception import NoSuchResourceException
@@ -31,6 +34,7 @@ from pypaimon.catalog.rest.rest_catalog import RESTCatalog
 from pypaimon.common.identifier import Identifier
 from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_commit import PartitionStatistics
+from pypaimon.tests.rest.rest_base_test import RESTBaseTest
 
 
 class TestRESTCatalogCommitSnapshot(unittest.TestCase):
@@ -292,5 +296,47 @@ class TestRESTCatalogCommitSnapshot(unittest.TestCase):
             shutil.rmtree(temp_dir, ignore_errors=True)
 
 
+class TestRESTCommit(RESTBaseTest):
+
+    def test_commit_succeeded_on_server_but_client_fails(self):
+        pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
+        opts = {
+            'bucket': '1',
+            'file.format': 'parquet',
+            'commit.max-retries': '0',
+            'commit.timeout': '1000',
+        }
+        schema = Schema.from_pyarrow_schema(
+            pa_schema, partition_keys=['id'], options=opts)
+        self.rest_catalog.create_table('default.test_abort_bug', schema, False)
+        table = self.rest_catalog.get_table('default.test_abort_bug')
+
+        tw = table.new_batch_write_builder().new_write()
+        tc = table.new_batch_write_builder().new_commit()
+        data = pa.Table.from_pydict(
+            {'id': [1, 2, 3], 'name': ['a', 'b', 'c']}, schema=pa_schema)
+        tw.write_arrow(data)
+        cm = tw.prepare_commit()
+
+        real_commit = tc.file_store_commit.snapshot_commit.commit
+
+        def commit_then_raise(sn, br, st):
+            real_commit(sn, br, st)
+            raise RuntimeError("simulated")
+
+        with patch.object(tc.file_store_commit.snapshot_commit, 'commit', 
side_effect=commit_then_raise):
+            with self.assertRaises(RuntimeError):
+                tc.commit(cm)
+        tw.close()
+        tc.close()
+
+        # We no longer abort on failure. Data was committed on server.
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 3)
+        self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+        self.assertEqual(actual.column('name').to_pylist(), ['a', 'b', 'c'])
+
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/write/ray_datasink.py 
b/paimon-python/pypaimon/write/ray_datasink.py
index 7daba68115..e289712516 100644
--- a/paimon-python/pypaimon/write/ray_datasink.py
+++ b/paimon-python/pypaimon/write/ray_datasink.py
@@ -110,7 +110,6 @@ class PaimonDatasink(_DatasinkBase):
         self, write_result: Any
     ):
         table_commit = None
-        commit_messages_to_abort = []
         try:
             # WriteResult.write_returns (Ray 2.44+); older Ray may pass list 
of returns
             if hasattr(write_result, "write_returns"):
@@ -146,10 +145,8 @@ class PaimonDatasink(_DatasinkBase):
             )
 
             table_commit = self._writer_builder.new_commit()
-            commit_messages_to_abort = non_empty_messages
             table_commit.commit(non_empty_messages)
 
-            commit_messages_to_abort = []
             self._pending_commit_messages = []
 
             logger.info(f"Successfully committed write job for table 
{self._table_name}")
@@ -158,20 +155,8 @@ class PaimonDatasink(_DatasinkBase):
                 f"Error committing write job for table {self._table_name}: 
{e}",
                 exc_info=e
             )
-            if table_commit is not None and commit_messages_to_abort:
-                try:
-                    table_commit.abort(commit_messages_to_abort)
-                    logger.info(
-                        f"Aborted {len(commit_messages_to_abort)} commit 
messages "
-                        f"for table {self._table_name}"
-                    )
-                except Exception as abort_error:
-                    logger.error(
-                        f"Error aborting commit messages: {abort_error}",
-                        exc_info=abort_error
-                    )
-                finally:
-                    self._pending_commit_messages = []
+            if table_commit is not None:
+                self._pending_commit_messages = []
             raise
         finally:
             if table_commit is not None:
diff --git a/paimon-python/pypaimon/write/table_commit.py 
b/paimon-python/pypaimon/write/table_commit.py
index 27b78d5dcb..e2b75881ea 100644
--- a/paimon-python/pypaimon/write/table_commit.py
+++ b/paimon-python/pypaimon/write/table_commit.py
@@ -48,21 +48,17 @@ class TableCommit:
         if not non_empty_messages:
             return
 
-        try:
-            if self.overwrite_partition is not None:
-                self.file_store_commit.overwrite(
-                    overwrite_partition=self.overwrite_partition,
-                    commit_messages=non_empty_messages,
-                    commit_identifier=commit_identifier
-                )
-            else:
-                self.file_store_commit.commit(
-                    commit_messages=non_empty_messages,
-                    commit_identifier=commit_identifier
-                )
-        except Exception as e:
-            self.file_store_commit.abort(commit_messages)
-            raise RuntimeError(f"Failed to commit: {str(e)}") from e
+        if self.overwrite_partition is not None:
+            self.file_store_commit.overwrite(
+                overwrite_partition=self.overwrite_partition,
+                commit_messages=non_empty_messages,
+                commit_identifier=commit_identifier
+            )
+        else:
+            self.file_store_commit.commit(
+                commit_messages=non_empty_messages,
+                commit_identifier=commit_identifier
+            )
 
     def abort(self, commit_messages: List[CommitMessage]):
         self.file_store_commit.abort(commit_messages)

Reply via email to