This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b21c249b9 [python] Fix multiple write brefore once commit  (#6241)
5b21c249b9 is described below

commit 5b21c249b9a0bee28bd94b486f1ab19b4b5363ad
Author: umi <[email protected]>
AuthorDate: Thu Sep 11 22:25:55 2025 +0800

    [python] Fix multiple write brefore once commit  (#6241)
---
 paimon-python/pypaimon/common/file_io.py           | 14 +++----
 .../pypaimon/tests/py36/ao_read_write_test.py      | 36 ++++++++++++++++++
 .../pypaimon/tests/reader_append_only_test.py      | 34 +++++++++++++++++
 .../pypaimon/tests/reader_primary_key_test.py      | 44 ++++++++++++++++++++++
 .../write/writer/append_only_data_writer.py        |  6 +--
 paimon-python/pypaimon/write/writer/data_writer.py |  8 ++--
 .../pypaimon/write/writer/key_value_data_writer.py |  6 +--
 7 files changed, 130 insertions(+), 18 deletions(-)

diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index c98322543e..3ec930b24b 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -283,32 +283,30 @@ class FileIO:
 
         return None
 
-    def write_parquet(self, path: Path, data: pyarrow.RecordBatch, 
compression: str = 'snappy', **kwargs):
+    def write_parquet(self, path: Path, data: pyarrow.Table, compression: str 
= 'snappy', **kwargs):
         try:
             import pyarrow.parquet as pq
-            table = pyarrow.Table.from_batches([data])
 
             with self.new_output_stream(path) as output_stream:
-                pq.write_table(table, output_stream, compression=compression, 
**kwargs)
+                pq.write_table(data, output_stream, compression=compression, 
**kwargs)
 
         except Exception as e:
             self.delete_quietly(path)
             raise RuntimeError(f"Failed to write Parquet file {path}: {e}") 
from e
 
-    def write_orc(self, path: Path, data: pyarrow.RecordBatch, compression: 
str = 'zstd', **kwargs):
+    def write_orc(self, path: Path, data: pyarrow.Table, compression: str = 
'zstd', **kwargs):
         try:
             """Write ORC file using PyArrow ORC writer."""
             import sys
             import pyarrow.orc as orc
-            table = pyarrow.Table.from_batches([data])
 
             with self.new_output_stream(path) as output_stream:
                 # Check Python version - if 3.6, don't use compression 
parameter
                 if sys.version_info[:2] == (3, 6):
-                    orc.write_table(table, output_stream, **kwargs)
+                    orc.write_table(data, output_stream, **kwargs)
                 else:
                     orc.write_table(
-                        table,
+                        data,
                         output_stream,
                         compression=compression,
                         **kwargs
@@ -318,7 +316,7 @@ class FileIO:
             self.delete_quietly(path)
             raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e
 
-    def write_avro(self, path: Path, data: pyarrow.RecordBatch, avro_schema: 
Optional[Dict[str, Any]] = None, **kwargs):
+    def write_avro(self, path: Path, data: pyarrow.Table, avro_schema: 
Optional[Dict[str, Any]] = None, **kwargs):
         import fastavro
         if avro_schema is None:
             from pypaimon.schema.data_types import PyarrowFieldParser
diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py 
b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
index 9873d6bf3f..ca75d352a1 100644
--- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
@@ -271,6 +271,42 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
         actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
         self.assertEqual(actual, self.expected)
 
+    def test_append_only_multi_write_once_commit(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
+        
self.rest_catalog.create_table('default.test_append_only_multi_once_commit', 
schema, False)
+        table = 
self.rest_catalog.get_table('default.test_append_only_multi_once_commit')
+        write_builder = table.new_batch_write_builder()
+
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data1 = {
+            'user_id': [1, 2, 3, 4],
+            'item_id': [1001, 1002, 1003, 1004],
+            'behavior': ['a', 'b', 'c', None],
+            'dt': ['p1', 'p1', 'p2', 'p1'],
+            'long-dt': ['2024-10-10', '2024-10-10', '2024-10-10', 
'2024-01-01'],
+        }
+        pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema)
+        data2 = {
+            'user_id': [5, 6, 7, 8],
+            'item_id': [1005, 1006, 1007, 1008],
+            'behavior': ['e', 'f', 'g', 'h'],
+            'dt': ['p2', 'p1', 'p2', 'p2'],
+            'long-dt': ['2024-10-10', '2025-01-23', 'abcdefghijklmnopk', 
'2025-08-08'],
+        }
+        pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema)
+
+        table_write.write_arrow(pa_table1)
+        table_write.write_arrow(pa_table2)
+
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+        self.assertEqual(actual, self.expected)
+
     def testAppendOnlyReaderWithFilter(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
         self.rest_catalog.create_table('default.test_append_only_filter', 
schema, False)
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py 
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index 17acb9a183..ca22c79fbe 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -79,6 +79,40 @@ class AoReaderTest(unittest.TestCase):
         actual = self._read_test_table(read_builder).sort_by('user_id')
         self.assertEqual(actual, self.expected)
 
+    def test_append_only_multi_write_once_commit(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
+        
self.catalog.create_table('default.test_append_only_multi_once_commit', schema, 
False)
+        table = 
self.catalog.get_table('default.test_append_only_multi_once_commit')
+        write_builder = table.new_batch_write_builder()
+
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data1 = {
+            'user_id': [1, 2, 3, 4],
+            'item_id': [1001, 1002, 1003, 1004],
+            'behavior': ['a', 'b', 'c', None],
+            'dt': ['p1', 'p1', 'p2', 'p1'],
+        }
+        pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema)
+        data2 = {
+            'user_id': [5, 6, 7, 8],
+            'item_id': [1005, 1006, 1007, 1008],
+            'behavior': ['e', 'f', 'g', 'h'],
+            'dt': ['p2', 'p1', 'p2', 'p2'],
+        }
+        pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema)
+
+        table_write.write_arrow(pa_table1)
+        table_write.write_arrow(pa_table2)
+
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        self.assertEqual(actual, self.expected)
+
     def testAppendOnlyReaderWithFilter(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
         self.catalog.create_table('default.test_append_only_filter', schema, 
False)
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py 
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index 8b9b853350..73cca1a788 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -104,6 +104,50 @@ class PkReaderTest(unittest.TestCase):
         actual = self._read_test_table(read_builder).sort_by('user_id')
         self.assertEqual(actual, self.expected)
 
+    def test_pk_multi_write_once_commit(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema,
+                                            partition_keys=['dt'],
+                                            primary_keys=['user_id', 'dt'],
+                                            options={'bucket': '2'})
+        self.catalog.create_table('default.test_pk_multi', schema, False)
+        table = self.catalog.get_table('default.test_pk_multi')
+        write_builder = table.new_batch_write_builder()
+
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data1 = {
+            'user_id': [1, 2, 3, 4],
+            'item_id': [1001, 1002, 1003, 1004],
+            'behavior': ['a', 'b', 'c', None],
+            'dt': ['p1', 'p1', 'p2', 'p1'],
+        }
+        pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema)
+        data2 = {
+            'user_id': [5, 2, 7, 8],
+            'item_id': [1005, 1002, 1007, 1008],
+            'behavior': ['e', 'b-new', 'g', 'h'],
+            'dt': ['p2', 'p1', 'p1', 'p2']
+        }
+        pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema)
+
+        table_write.write_arrow(pa_table1)
+        table_write.write_arrow(pa_table2)
+
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        # TODO support pk merge feature when multiple write
+        expected = pa.Table.from_pydict({
+            'user_id': [1, 2, 2, 3, 4, 5, 7, 8],
+            'item_id': [1001, 1002, 1002, 1003, 1004, 1005, 1007, 1008],
+            'behavior': ['a', 'b', 'b-new', 'c', None, 'e', 'g', 'h'],
+            'dt': ['p1', 'p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2'],
+        }, schema=self.pa_schema)
+        self.assertEqual(actual, expected)
+
     def testPkReaderWithFilter(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema,
                                             partition_keys=['dt'],
diff --git a/paimon-python/pypaimon/write/writer/append_only_data_writer.py 
b/paimon-python/pypaimon/write/writer/append_only_data_writer.py
index c9d4c8f864..3bd128d7b4 100644
--- a/paimon-python/pypaimon/write/writer/append_only_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/append_only_data_writer.py
@@ -24,8 +24,8 @@ from pypaimon.write.writer.data_writer import DataWriter
 class AppendOnlyDataWriter(DataWriter):
     """Data writer for append-only tables."""
 
-    def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
-        return data
+    def _process_data(self, data: pa.RecordBatch) -> pa.Table:
+        return pa.Table.from_batches([data])
 
-    def _merge_data(self, existing_data: pa.RecordBatch, new_data: 
pa.RecordBatch) -> pa.RecordBatch:
+    def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> 
pa.Table:
         return pa.concat_tables([existing_data, new_data])
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 787dc70624..e2f778b586 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -53,7 +53,7 @@ class DataWriter(ABC):
         self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd")
         self.sequence_generator = SequenceGenerator(max_seq_number)
 
-        self.pending_data: Optional[pa.RecordBatch] = None
+        self.pending_data: Optional[pa.Table] = None
         self.committed_files: List[DataFileMeta] = []
 
     def write(self, data: pa.RecordBatch):
@@ -100,7 +100,7 @@ class DataWriter(ABC):
                 self.pending_data = remaining_data
                 self._check_and_roll_if_needed()
 
-    def _write_data_to_file(self, data: pa.RecordBatch):
+    def _write_data_to_file(self, data: pa.Table):
         if data.num_rows == 0:
             return
         file_name = f"data-{uuid.uuid4()}-0.{self.file_format}"
@@ -115,8 +115,8 @@ class DataWriter(ABC):
             raise ValueError(f"Unsupported file format: {self.file_format}")
 
         # min key & max key
-        table = pa.Table.from_batches([data])
-        selected_table = table.select(self.trimmed_primary_key)
+
+        selected_table = data.select(self.trimmed_primary_key)
         key_columns_batch = selected_table.to_batches()[0]
         min_key_row_batch = key_columns_batch.slice(0, 1)
         max_key_row_batch = key_columns_batch.slice(key_columns_batch.num_rows 
- 1, 1)
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py 
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
index 99b11a9788..fb929710e8 100644
--- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -25,11 +25,11 @@ from pypaimon.write.writer.data_writer import DataWriter
 class KeyValueDataWriter(DataWriter):
     """Data writer for primary key tables with system fields and sorting."""
 
-    def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
+    def _process_data(self, data: pa.RecordBatch) -> pa.Table:
         enhanced_data = self._add_system_fields(data)
-        return self._sort_by_primary_key(enhanced_data)
+        return 
pa.Table.from_batches([self._sort_by_primary_key(enhanced_data)])
 
-    def _merge_data(self, existing_data: pa.RecordBatch, new_data: 
pa.RecordBatch) -> pa.RecordBatch:
+    def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> 
pa.Table:
         combined = pa.concat_tables([existing_data, new_data])
         return self._sort_by_primary_key(combined)
 

Reply via email to