This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f221bf4a90 [python] Do not abort on commit failure (#7232)
f221bf4a90 is described below
commit f221bf4a90f2fa0e575b06e590bcda82ad6b6da2
Author: XiaoHongbo <[email protected]>
AuthorDate: Sun Feb 8 19:52:46 2026 +0800
[python] Do not abort on commit failure (#7232)
TableCommit._commit called abort() on any commit failure, which could
delete committed data when the server had already committed but the
client received an error (e.g. timeout, close failure).
---
paimon-python/pypaimon/tests/ray_sink_test.py | 6 +--
.../rest/rest_catalog_commit_snapshot_test.py | 46 ++++++++++++++++++++++
paimon-python/pypaimon/write/ray_datasink.py | 19 +--------
paimon-python/pypaimon/write/table_commit.py | 26 ++++++------
4 files changed, 61 insertions(+), 36 deletions(-)
diff --git a/paimon-python/pypaimon/tests/ray_sink_test.py
b/paimon-python/pypaimon/tests/ray_sink_test.py
index bc1ff3ef9f..8bd830ff9e 100644
--- a/paimon-python/pypaimon/tests/ray_sink_test.py
+++ b/paimon-python/pypaimon/tests/ray_sink_test.py
@@ -250,7 +250,7 @@ class RaySinkTest(unittest.TestCase):
self.assertEqual(len(commit_args), 2) # Empty message filtered out
mock_commit.close.assert_called_once()
- # Test commit failure: abort should be called
+ # Test commit failure: abort should not be called
datasink = PaimonDatasink(self.table, overwrite=False)
datasink.on_write_start()
commit_msg1 = Mock(spec=CommitMessage)
@@ -271,9 +271,7 @@ class RaySinkTest(unittest.TestCase):
with self.assertRaises(Exception):
datasink.on_write_complete(write_result)
- mock_commit.abort.assert_called_once()
- abort_args = mock_commit.abort.call_args[0][0]
- self.assertEqual(len(abort_args), 2)
+ mock_commit.abort.assert_not_called()
mock_commit.close.assert_called_once()
# Test table_commit creation failure
diff --git
a/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
index e9f95d3d0b..998b169084 100644
--- a/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_catalog_commit_snapshot_test.py
@@ -22,6 +22,9 @@ import time
import unittest
from unittest.mock import Mock, patch
+import pyarrow as pa
+
+from pypaimon import Schema
from pypaimon.api.api_response import CommitTableResponse
from pypaimon.common.options import Options
from pypaimon.api.rest_exception import NoSuchResourceException
@@ -31,6 +34,7 @@ from pypaimon.catalog.rest.rest_catalog import RESTCatalog
from pypaimon.common.identifier import Identifier
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import PartitionStatistics
+from pypaimon.tests.rest.rest_base_test import RESTBaseTest
class TestRESTCatalogCommitSnapshot(unittest.TestCase):
@@ -292,5 +296,47 @@ class TestRESTCatalogCommitSnapshot(unittest.TestCase):
shutil.rmtree(temp_dir, ignore_errors=True)
+class TestRESTCommit(RESTBaseTest):
+
+ def test_commit_succeeded_on_server_but_client_fails(self):
+ pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
+ opts = {
+ 'bucket': '1',
+ 'file.format': 'parquet',
+ 'commit.max-retries': '0',
+ 'commit.timeout': '1000',
+ }
+ schema = Schema.from_pyarrow_schema(
+ pa_schema, partition_keys=['id'], options=opts)
+ self.rest_catalog.create_table('default.test_abort_bug', schema, False)
+ table = self.rest_catalog.get_table('default.test_abort_bug')
+
+ tw = table.new_batch_write_builder().new_write()
+ tc = table.new_batch_write_builder().new_commit()
+ data = pa.Table.from_pydict(
+ {'id': [1, 2, 3], 'name': ['a', 'b', 'c']}, schema=pa_schema)
+ tw.write_arrow(data)
+ cm = tw.prepare_commit()
+
+ real_commit = tc.file_store_commit.snapshot_commit.commit
+
+ def commit_then_raise(sn, br, st):
+ real_commit(sn, br, st)
+ raise RuntimeError("simulated")
+
+ with patch.object(tc.file_store_commit.snapshot_commit, 'commit',
side_effect=commit_then_raise):
+ with self.assertRaises(RuntimeError):
+ tc.commit(cm)
+ tw.close()
+ tc.close()
+
+ # We no longer abort on failure. Data was committed on server.
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 3)
+ self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+ self.assertEqual(actual.column('name').to_pylist(), ['a', 'b', 'c'])
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/write/ray_datasink.py
b/paimon-python/pypaimon/write/ray_datasink.py
index 7daba68115..e289712516 100644
--- a/paimon-python/pypaimon/write/ray_datasink.py
+++ b/paimon-python/pypaimon/write/ray_datasink.py
@@ -110,7 +110,6 @@ class PaimonDatasink(_DatasinkBase):
self, write_result: Any
):
table_commit = None
- commit_messages_to_abort = []
try:
# WriteResult.write_returns (Ray 2.44+); older Ray may pass list
of returns
if hasattr(write_result, "write_returns"):
@@ -146,10 +145,8 @@ class PaimonDatasink(_DatasinkBase):
)
table_commit = self._writer_builder.new_commit()
- commit_messages_to_abort = non_empty_messages
table_commit.commit(non_empty_messages)
- commit_messages_to_abort = []
self._pending_commit_messages = []
logger.info(f"Successfully committed write job for table
{self._table_name}")
@@ -158,20 +155,8 @@ class PaimonDatasink(_DatasinkBase):
f"Error committing write job for table {self._table_name}:
{e}",
exc_info=e
)
- if table_commit is not None and commit_messages_to_abort:
- try:
- table_commit.abort(commit_messages_to_abort)
- logger.info(
- f"Aborted {len(commit_messages_to_abort)} commit
messages "
- f"for table {self._table_name}"
- )
- except Exception as abort_error:
- logger.error(
- f"Error aborting commit messages: {abort_error}",
- exc_info=abort_error
- )
- finally:
- self._pending_commit_messages = []
+ if table_commit is not None:
+ self._pending_commit_messages = []
raise
finally:
if table_commit is not None:
diff --git a/paimon-python/pypaimon/write/table_commit.py
b/paimon-python/pypaimon/write/table_commit.py
index 27b78d5dcb..e2b75881ea 100644
--- a/paimon-python/pypaimon/write/table_commit.py
+++ b/paimon-python/pypaimon/write/table_commit.py
@@ -48,21 +48,17 @@ class TableCommit:
if not non_empty_messages:
return
- try:
- if self.overwrite_partition is not None:
- self.file_store_commit.overwrite(
- overwrite_partition=self.overwrite_partition,
- commit_messages=non_empty_messages,
- commit_identifier=commit_identifier
- )
- else:
- self.file_store_commit.commit(
- commit_messages=non_empty_messages,
- commit_identifier=commit_identifier
- )
- except Exception as e:
- self.file_store_commit.abort(commit_messages)
- raise RuntimeError(f"Failed to commit: {str(e)}") from e
+ if self.overwrite_partition is not None:
+ self.file_store_commit.overwrite(
+ overwrite_partition=self.overwrite_partition,
+ commit_messages=non_empty_messages,
+ commit_identifier=commit_identifier
+ )
+ else:
+ self.file_store_commit.commit(
+ commit_messages=non_empty_messages,
+ commit_identifier=commit_identifier
+ )
def abort(self, commit_messages: List[CommitMessage]):
self.file_store_commit.abort(commit_messages)