This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 134e1b69769b858b153780367f2b0be56f7fe8a9
Author: umi <[email protected]>
AuthorDate: Wed Nov 5 16:00:04 2025 +0800

    [Python] Suppport multi prepare commit in the same TableWrite  (#6526)
---
 paimon-python/pypaimon/snapshot/snapshot.py        |   2 +
 paimon-python/pypaimon/table/file_store_table.py   |   5 +-
 paimon-python/pypaimon/table/table.py              |   6 +-
 .../pypaimon/tests/py36/ao_simple_test.py          |  56 ++++++++
 .../pypaimon/tests/write/table_write_test.py       | 155 +++++++++++++++++++++
 paimon-python/pypaimon/write/file_store_write.py   |   5 +-
 .../{batch_table_commit.py => table_commit.py}     |  19 ++-
 .../write/{batch_table_write.py => table_write.py} |  29 ++--
 .../{batch_write_builder.py => write_builder.py}   |  35 +++--
 9 files changed, 287 insertions(+), 25 deletions(-)

diff --git a/paimon-python/pypaimon/snapshot/snapshot.py 
b/paimon-python/pypaimon/snapshot/snapshot.py
index 96b287ab55..f164f5eb61 100644
--- a/paimon-python/pypaimon/snapshot/snapshot.py
+++ b/paimon-python/pypaimon/snapshot/snapshot.py
@@ -21,6 +21,8 @@ from typing import Dict, Optional
 
 from pypaimon.common.json_util import json_field
 
+COMMIT_IDENTIFIER = 0x7fffffffffffffff
+
 
 @dataclass
 class Snapshot:
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index cde282eff0..6132c9bd69 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -28,7 +28,7 @@ from pypaimon.schema.schema_manager import SchemaManager
 from pypaimon.schema.table_schema import TableSchema
 from pypaimon.table.bucket_mode import BucketMode
 from pypaimon.table.table import Table
-from pypaimon.write.batch_write_builder import BatchWriteBuilder
+from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder
 from pypaimon.write.row_key_extractor import (DynamicBucketRowKeyExtractor,
                                               FixedBucketRowKeyExtractor,
                                               PostponeBucketRowKeyExtractor,
@@ -98,6 +98,9 @@ class FileStoreTable(Table):
     def new_batch_write_builder(self) -> BatchWriteBuilder:
         return BatchWriteBuilder(self)
 
+    def new_stream_write_builder(self) -> StreamWriteBuilder:
+        return StreamWriteBuilder(self)
+
     def create_row_key_extractor(self) -> RowKeyExtractor:
         bucket_mode = self.bucket_mode()
         if bucket_mode == BucketMode.HASH_FIXED:
diff --git a/paimon-python/pypaimon/table/table.py 
b/paimon-python/pypaimon/table/table.py
index 3a1fe2e622..e20784f1fc 100644
--- a/paimon-python/pypaimon/table/table.py
+++ b/paimon-python/pypaimon/table/table.py
@@ -19,7 +19,7 @@
 from abc import ABC, abstractmethod
 
 from pypaimon.read.read_builder import ReadBuilder
-from pypaimon.write.batch_write_builder import BatchWriteBuilder
+from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder
 
 
 class Table(ABC):
@@ -32,3 +32,7 @@ class Table(ABC):
     @abstractmethod
     def new_batch_write_builder(self) -> BatchWriteBuilder:
         """Returns a builder for building batch table write and table 
commit."""
+
+    @abstractmethod
+    def new_stream_write_builder(self) -> StreamWriteBuilder:
+        """Returns a builder for building stream table write and table 
commit."""
diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py 
b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
index efb3189e06..b14fbcf8db 100644
--- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
@@ -421,3 +421,59 @@ class AOSimpleTest(RESTBaseTest):
                                          session_token="TOKEN",
                                          region="cn-hangzhou",
                                          endpoint_override="oss-bucket." + 
props[OssOptions.OSS_ENDPOINT])
+
+    def test_multi_prepare_commit_ao(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
+        self.rest_catalog.create_table('default.test_append_only_parquet', 
schema, False)
+        table = self.rest_catalog.get_table('default.test_append_only_parquet')
+        write_builder = table.new_stream_write_builder()
+
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        # write 1
+        data1 = {
+            'user_id': [1, 2, 3, 4],
+            'item_id': [1001, 1002, 1003, 1004],
+            'behavior': ['a', 'b', 'c', None],
+            'dt': ['p1', 'p1', 'p2', 'p1'],
+        }
+        pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema)
+        table_write.write_arrow(pa_table)
+        table_write.prepare_commit(0)
+        # write 2
+        data2 = {
+            'user_id': [5, 6, 7, 8],
+            'item_id': [1005, 1006, 1007, 1008],
+            'behavior': ['e', 'f', 'g', 'h'],
+            'dt': ['p2', 'p1', 'p2', 'p2'],
+        }
+        pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema)
+        table_write.write_arrow(pa_table)
+        table_write.prepare_commit(1)
+        # write 3
+        data3 = {
+            'user_id': [9, 10],
+            'item_id': [1009, 1010],
+            'behavior': ['i', 'j'],
+            'dt': ['p2', 'p1'],
+        }
+        pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema)
+        table_write.write_arrow(pa_table)
+        cm = table_write.prepare_commit(2)
+        # commit
+        table_commit.commit(cm, 2)
+        table_write.close()
+        table_commit.close()
+        self.assertEqual(2, table_write.file_store_write.commit_identifier)
+
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        actual = table_sort_by(table_read.to_arrow(splits), 'user_id')
+        expected = pa.Table.from_pydict({
+            'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
+            'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 
1010],
+            'behavior': ['a', 'b', 'c', None, 'e', 'f', 'g', 'h', 'i', 'j'],
+            'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p2', 'p2', 'p1']
+        }, schema=self.pa_schema)
+        self.assertEqual(expected, actual)
diff --git a/paimon-python/pypaimon/tests/write/table_write_test.py 
b/paimon-python/pypaimon/tests/write/table_write_test.py
new file mode 100644
index 0000000000..21b76731ac
--- /dev/null
+++ b/paimon-python/pypaimon/tests/write/table_write_test.py
@@ -0,0 +1,155 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import os
+import shutil
+
+import tempfile
+import unittest
+
+from pypaimon import CatalogFactory, Schema
+import pyarrow as pa
+
+
+class TableWriteTest(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({
+            'warehouse': cls.warehouse
+        })
+        cls.catalog.create_database('default', True)
+
+        cls.pa_schema = pa.schema([
+            ('user_id', pa.int32()),
+            ('item_id', pa.int64()),
+            ('behavior', pa.string()),
+            ('dt', pa.string())
+        ])
+        cls.expected = pa.Table.from_pydict({
+            'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
+            'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 
1010],
+            'behavior': ['a', 'b', 'c', None, 'e', 'f', 'g', 'h', 'i', 'j'],
+            'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p2', 'p2', 'p1']
+        }, schema=cls.pa_schema)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def test_multi_prepare_commit_ao(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
+        self.catalog.create_table('default.test_append_only_parquet', schema, 
False)
+        table = self.catalog.get_table('default.test_append_only_parquet')
+        write_builder = table.new_stream_write_builder()
+
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        # write 1
+        data1 = {
+            'user_id': [1, 2, 3, 4],
+            'item_id': [1001, 1002, 1003, 1004],
+            'behavior': ['a', 'b', 'c', None],
+            'dt': ['p1', 'p1', 'p2', 'p1'],
+        }
+        pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema)
+        table_write.write_arrow(pa_table)
+        table_write.prepare_commit(0)
+        # write 2
+        data2 = {
+            'user_id': [5, 6, 7, 8],
+            'item_id': [1005, 1006, 1007, 1008],
+            'behavior': ['e', 'f', 'g', 'h'],
+            'dt': ['p2', 'p1', 'p2', 'p2'],
+        }
+        pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema)
+        table_write.write_arrow(pa_table)
+        table_write.prepare_commit(1)
+        # write 3
+        data3 = {
+            'user_id': [9, 10],
+            'item_id': [1009, 1010],
+            'behavior': ['i', 'j'],
+            'dt': ['p2', 'p1'],
+        }
+        pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema)
+        table_write.write_arrow(pa_table)
+        cm = table_write.prepare_commit(2)
+        # commit
+        table_commit.commit(cm, 2)
+        table_write.close()
+        table_commit.close()
+        self.assertEqual(2, table_write.file_store_write.commit_identifier)
+
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        actual = table_read.to_arrow(splits).sort_by('user_id')
+        self.assertEqual(self.expected, actual)
+
+    def test_multi_prepare_commit_pk(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], primary_keys=['user_id', 'dt'],
+                                            options={'bucket': '2'})
+        self.catalog.create_table('default.test_primary_key_parquet', schema, 
False)
+        table = self.catalog.get_table('default.test_primary_key_parquet')
+        write_builder = table.new_stream_write_builder()
+
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        # write 1
+        data1 = {
+            'user_id': [1, 2, 3, 4],
+            'item_id': [1001, 1002, 1003, 1004],
+            'behavior': ['a', 'b', 'c', None],
+            'dt': ['p1', 'p1', 'p2', 'p1'],
+        }
+        pa_table = pa.Table.from_pydict(data1, schema=self.pa_schema)
+        table_write.write_arrow(pa_table)
+        table_write.prepare_commit(0)
+        # write 2
+        data2 = {
+            'user_id': [5, 6, 7, 8],
+            'item_id': [1005, 1006, 1007, 1008],
+            'behavior': ['e', 'f', 'g', 'h'],
+            'dt': ['p2', 'p1', 'p2', 'p2'],
+        }
+        pa_table = pa.Table.from_pydict(data2, schema=self.pa_schema)
+        table_write.write_arrow(pa_table)
+        table_write.prepare_commit(1)
+        # write 3
+        data3 = {
+            'user_id': [9, 10],
+            'item_id': [1009, 1010],
+            'behavior': ['i', 'j'],
+            'dt': ['p2', 'p1'],
+        }
+        pa_table = pa.Table.from_pydict(data3, schema=self.pa_schema)
+        table_write.write_arrow(pa_table)
+        cm = table_write.prepare_commit(2)
+        # commit
+        table_commit.commit(cm, 2)
+        table_write.close()
+        table_commit.close()
+        self.assertEqual(2, table_write.file_store_write.commit_identifier)
+
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        actual = table_read.to_arrow(splits).sort_by('user_id')
+        self.assertEqual(self.expected, actual)
diff --git a/paimon-python/pypaimon/write/file_store_write.py 
b/paimon-python/pypaimon/write/file_store_write.py
index 9ebabf1103..c100b64966 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -37,6 +37,7 @@ class FileStoreWrite:
         self.data_writers: Dict[Tuple, DataWriter] = {}
         self.max_seq_numbers: dict = {}
         self.write_cols = None
+        self.commit_identifier = 0
 
     def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
         key = (partition, bucket)
@@ -48,6 +49,7 @@ class FileStoreWrite:
     def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter:
         def max_seq_number():
             return self._seq_number_stats(partition).get(bucket, 1)
+
         # Check if table has blob columns
         if self._has_blob_columns():
             return DataBlobWriter(
@@ -83,7 +85,8 @@ class FileStoreWrite:
                 return True
         return False
 
-    def prepare_commit(self) -> List[CommitMessage]:
+    def prepare_commit(self, commit_identifier) -> List[CommitMessage]:
+        self.commit_identifier = commit_identifier
         commit_messages = []
         for (partition, bucket), writer in self.data_writers.items():
             committed_files = writer.prepare_commit()
diff --git a/paimon-python/pypaimon/write/batch_table_commit.py 
b/paimon-python/pypaimon/write/table_commit.py
similarity index 86%
rename from paimon-python/pypaimon/write/batch_table_commit.py
rename to paimon-python/pypaimon/write/table_commit.py
index 7f42e1cef1..0dcfabf99e 100644
--- a/paimon-python/pypaimon/write/batch_table_commit.py
+++ b/paimon-python/pypaimon/write/table_commit.py
@@ -16,14 +16,14 @@
 # limitations under the License.
 
################################################################################
 
-import time
 from typing import List, Optional
 
+from pypaimon.snapshot.snapshot import COMMIT_IDENTIFIER
 from pypaimon.write.commit_message import CommitMessage
 from pypaimon.write.file_store_commit import FileStoreCommit
 
 
-class BatchTableCommit:
+class TableCommit:
     """Python implementation of BatchTableCommit for batch writing 
scenarios."""
 
     def __init__(self, table, commit_user: str, static_partition: 
Optional[dict]):
@@ -41,15 +41,13 @@ class BatchTableCommit:
         self.file_store_commit = FileStoreCommit(snapshot_commit, table, 
commit_user)
         self.batch_committed = False
 
-    def commit(self, commit_messages: List[CommitMessage]):
+    def _commit(self, commit_messages: List[CommitMessage], commit_identifier: 
int = COMMIT_IDENTIFIER):
         self._check_committed()
 
         non_empty_messages = [msg for msg in commit_messages if not 
msg.is_empty()]
         if not non_empty_messages:
             return
 
-        commit_identifier = int(time.time() * 1000)
-
         try:
             if self.overwrite_partition is not None:
                 self.file_store_commit.overwrite(
@@ -76,3 +74,14 @@ class BatchTableCommit:
         if self.batch_committed:
             raise RuntimeError("BatchTableCommit only supports one-time 
committing.")
         self.batch_committed = True
+
+
+class BatchTableCommit(TableCommit):
+    def commit(self, commit_messages: List[CommitMessage]):
+        self._commit(commit_messages, COMMIT_IDENTIFIER)
+
+
+class StreamTableCommit(TableCommit):
+
+    def commit(self, commit_messages: List[CommitMessage], commit_identifier: 
int = COMMIT_IDENTIFIER):
+        self._commit(commit_messages, commit_identifier)
diff --git a/paimon-python/pypaimon/write/batch_table_write.py 
b/paimon-python/pypaimon/write/table_write.py
similarity index 89%
rename from paimon-python/pypaimon/write/batch_table_write.py
rename to paimon-python/pypaimon/write/table_write.py
index f8d0660bfa..3ccc078223 100644
--- a/paimon-python/pypaimon/write/batch_table_write.py
+++ b/paimon-python/pypaimon/write/table_write.py
@@ -15,18 +15,18 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
 from collections import defaultdict
 from typing import List
 
 import pyarrow as pa
 
 from pypaimon.schema.data_types import PyarrowFieldParser
+from pypaimon.snapshot.snapshot import COMMIT_IDENTIFIER
 from pypaimon.write.commit_message import CommitMessage
 from pypaimon.write.file_store_write import FileStoreWrite
 
 
-class BatchTableWrite:
+class TableWrite:
     def __init__(self, table):
         from pypaimon.table.file_store_table import FileStoreTable
 
@@ -34,7 +34,6 @@ class BatchTableWrite:
         self.table_pyarrow_schema = 
PyarrowFieldParser.from_paimon_schema(self.table.table_schema.fields)
         self.file_store_write = FileStoreWrite(self.table)
         self.row_key_extractor = self.table.create_row_key_extractor()
-        self.batch_committed = False
 
     def write_arrow(self, table: pa.Table):
         batches_iterator = table.to_batches()
@@ -59,12 +58,6 @@ class BatchTableWrite:
         record_batch = pa.RecordBatch.from_pandas(dataframe, schema=pa_schema)
         return self.write_arrow_batch(record_batch)
 
-    def prepare_commit(self) -> List[CommitMessage]:
-        if self.batch_committed:
-            raise RuntimeError("BatchTableWrite only supports one-time 
committing.")
-        self.batch_committed = True
-        return self.file_store_write.prepare_commit()
-
     def with_write_type(self, write_cols: List[str]):
         for col in write_cols:
             if col not in self.table_pyarrow_schema.names:
@@ -83,3 +76,21 @@ class BatchTableWrite:
                              f"Input schema is: {data_schema} "
                              f"Table schema is: {self.table_pyarrow_schema} "
                              f"Write cols is: 
{self.file_store_write.write_cols}")
+
+
+class BatchTableWrite(TableWrite):
+    def __init__(self, table):
+        super().__init__(table)
+        self.batch_committed = False
+
+    def prepare_commit(self) -> List[CommitMessage]:
+        if self.batch_committed:
+            raise RuntimeError("BatchTableWrite only supports one-time 
committing.")
+        self.batch_committed = True
+        return self.file_store_write.prepare_commit(COMMIT_IDENTIFIER)
+
+
+class StreamTableWrite(TableWrite):
+
+    def prepare_commit(self, commit_identifier: int = COMMIT_IDENTIFIER) -> 
List[CommitMessage]:
+        return self.file_store_write.prepare_commit(commit_identifier)
diff --git a/paimon-python/pypaimon/write/batch_write_builder.py 
b/paimon-python/pypaimon/write/write_builder.py
similarity index 73%
rename from paimon-python/pypaimon/write/batch_write_builder.py
rename to paimon-python/pypaimon/write/write_builder.py
index 2380530fbc..8c9ed725f5 100644
--- a/paimon-python/pypaimon/write/batch_write_builder.py
+++ b/paimon-python/pypaimon/write/write_builder.py
@@ -17,14 +17,15 @@
 
################################################################################
 
 import uuid
+from abc import ABC
 from typing import Optional
 
 from pypaimon.common.core_options import CoreOptions
-from pypaimon.write.batch_table_commit import BatchTableCommit
-from pypaimon.write.batch_table_write import BatchTableWrite
+from pypaimon.write.table_commit import BatchTableCommit, StreamTableCommit, 
TableCommit
+from pypaimon.write.table_write import BatchTableWrite, StreamTableWrite, 
TableWrite
 
 
-class BatchWriteBuilder:
+class WriteBuilder(ABC):
     def __init__(self, table):
         from pypaimon.table.file_store_table import FileStoreTable
 
@@ -36,15 +37,33 @@ class BatchWriteBuilder:
         self.static_partition = static_partition if static_partition is not 
None else {}
         return self
 
-    def new_write(self) -> BatchTableWrite:
-        return BatchTableWrite(self.table)
+    def new_write(self) -> TableWrite:
+        """Returns a table write."""
 
-    def new_commit(self) -> BatchTableCommit:
-        commit = BatchTableCommit(self.table, self.commit_user, 
self.static_partition)
-        return commit
+    def new_commit(self) -> TableCommit:
+        """Returns a table commit."""
 
     def _create_commit_user(self):
         if CoreOptions.COMMIT_USER_PREFIX in self.table.options:
             return 
f"{self.table.options.get(CoreOptions.COMMIT_USER_PREFIX)}_{uuid.uuid4()}"
         else:
             return str(uuid.uuid4())
+
+
+class BatchWriteBuilder(WriteBuilder):
+
+    def new_write(self) -> BatchTableWrite:
+        return BatchTableWrite(self.table)
+
+    def new_commit(self) -> BatchTableCommit:
+        commit = BatchTableCommit(self.table, self.commit_user, 
self.static_partition)
+        return commit
+
+
+class StreamWriteBuilder(WriteBuilder):
+    def new_write(self) -> StreamTableWrite:
+        return StreamTableWrite(self.table)
+
+    def new_commit(self) -> StreamTableCommit:
+        commit = StreamTableCommit(self.table, self.commit_user, 
self.static_partition)
+        return commit

Reply via email to