This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5b21c249b9 [python] Fix multiple write brefore once commit (#6241)
5b21c249b9 is described below
commit 5b21c249b9a0bee28bd94b486f1ab19b4b5363ad
Author: umi <[email protected]>
AuthorDate: Thu Sep 11 22:25:55 2025 +0800
[python] Fix multiple write brefore once commit (#6241)
---
paimon-python/pypaimon/common/file_io.py | 14 +++----
.../pypaimon/tests/py36/ao_read_write_test.py | 36 ++++++++++++++++++
.../pypaimon/tests/reader_append_only_test.py | 34 +++++++++++++++++
.../pypaimon/tests/reader_primary_key_test.py | 44 ++++++++++++++++++++++
.../write/writer/append_only_data_writer.py | 6 +--
paimon-python/pypaimon/write/writer/data_writer.py | 8 ++--
.../pypaimon/write/writer/key_value_data_writer.py | 6 +--
7 files changed, 130 insertions(+), 18 deletions(-)
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index c98322543e..3ec930b24b 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -283,32 +283,30 @@ class FileIO:
return None
- def write_parquet(self, path: Path, data: pyarrow.RecordBatch,
compression: str = 'snappy', **kwargs):
+ def write_parquet(self, path: Path, data: pyarrow.Table, compression: str
= 'snappy', **kwargs):
try:
import pyarrow.parquet as pq
- table = pyarrow.Table.from_batches([data])
with self.new_output_stream(path) as output_stream:
- pq.write_table(table, output_stream, compression=compression,
**kwargs)
+ pq.write_table(data, output_stream, compression=compression,
**kwargs)
except Exception as e:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write Parquet file {path}: {e}")
from e
- def write_orc(self, path: Path, data: pyarrow.RecordBatch, compression:
str = 'zstd', **kwargs):
+ def write_orc(self, path: Path, data: pyarrow.Table, compression: str =
'zstd', **kwargs):
try:
"""Write ORC file using PyArrow ORC writer."""
import sys
import pyarrow.orc as orc
- table = pyarrow.Table.from_batches([data])
with self.new_output_stream(path) as output_stream:
# Check Python version - if 3.6, don't use compression
parameter
if sys.version_info[:2] == (3, 6):
- orc.write_table(table, output_stream, **kwargs)
+ orc.write_table(data, output_stream, **kwargs)
else:
orc.write_table(
- table,
+ data,
output_stream,
compression=compression,
**kwargs
@@ -318,7 +316,7 @@ class FileIO:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e
- def write_avro(self, path: Path, data: pyarrow.RecordBatch, avro_schema:
Optional[Dict[str, Any]] = None, **kwargs):
+ def write_avro(self, path: Path, data: pyarrow.Table, avro_schema:
Optional[Dict[str, Any]] = None, **kwargs):
import fastavro
if avro_schema is None:
from pypaimon.schema.data_types import PyarrowFieldParser
diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
index 9873d6bf3f..ca75d352a1 100644
--- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
@@ -271,6 +271,42 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
self.assertEqual(actual, self.expected)
+ def test_append_only_multi_write_once_commit(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
+
self.rest_catalog.create_table('default.test_append_only_multi_once_commit',
schema, False)
+ table =
self.rest_catalog.get_table('default.test_append_only_multi_once_commit')
+ write_builder = table.new_batch_write_builder()
+
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data1 = {
+ 'user_id': [1, 2, 3, 4],
+ 'item_id': [1001, 1002, 1003, 1004],
+ 'behavior': ['a', 'b', 'c', None],
+ 'dt': ['p1', 'p1', 'p2', 'p1'],
+ 'long-dt': ['2024-10-10', '2024-10-10', '2024-10-10',
'2024-01-01'],
+ }
+ pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema)
+ data2 = {
+ 'user_id': [5, 6, 7, 8],
+ 'item_id': [1005, 1006, 1007, 1008],
+ 'behavior': ['e', 'f', 'g', 'h'],
+ 'dt': ['p2', 'p1', 'p2', 'p2'],
+ 'long-dt': ['2024-10-10', '2025-01-23', 'abcdefghijklmnopk',
'2025-08-08'],
+ }
+ pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema)
+
+ table_write.write_arrow(pa_table1)
+ table_write.write_arrow(pa_table2)
+
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+ self.assertEqual(actual, self.expected)
+
def testAppendOnlyReaderWithFilter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_filter',
schema, False)
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index 17acb9a183..ca22c79fbe 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -79,6 +79,40 @@ class AoReaderTest(unittest.TestCase):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
+ def test_append_only_multi_write_once_commit(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
+
self.catalog.create_table('default.test_append_only_multi_once_commit', schema,
False)
+ table =
self.catalog.get_table('default.test_append_only_multi_once_commit')
+ write_builder = table.new_batch_write_builder()
+
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data1 = {
+ 'user_id': [1, 2, 3, 4],
+ 'item_id': [1001, 1002, 1003, 1004],
+ 'behavior': ['a', 'b', 'c', None],
+ 'dt': ['p1', 'p1', 'p2', 'p1'],
+ }
+ pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema)
+ data2 = {
+ 'user_id': [5, 6, 7, 8],
+ 'item_id': [1005, 1006, 1007, 1008],
+ 'behavior': ['e', 'f', 'g', 'h'],
+ 'dt': ['p2', 'p1', 'p2', 'p2'],
+ }
+ pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema)
+
+ table_write.write_arrow(pa_table1)
+ table_write.write_arrow(pa_table2)
+
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ self.assertEqual(actual, self.expected)
+
def testAppendOnlyReaderWithFilter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.catalog.create_table('default.test_append_only_filter', schema,
False)
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index 8b9b853350..73cca1a788 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -104,6 +104,50 @@ class PkReaderTest(unittest.TestCase):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
+ def test_pk_multi_write_once_commit(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
+ partition_keys=['dt'],
+ primary_keys=['user_id', 'dt'],
+ options={'bucket': '2'})
+ self.catalog.create_table('default.test_pk_multi', schema, False)
+ table = self.catalog.get_table('default.test_pk_multi')
+ write_builder = table.new_batch_write_builder()
+
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data1 = {
+ 'user_id': [1, 2, 3, 4],
+ 'item_id': [1001, 1002, 1003, 1004],
+ 'behavior': ['a', 'b', 'c', None],
+ 'dt': ['p1', 'p1', 'p2', 'p1'],
+ }
+ pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema)
+ data2 = {
+ 'user_id': [5, 2, 7, 8],
+ 'item_id': [1005, 1002, 1007, 1008],
+ 'behavior': ['e', 'b-new', 'g', 'h'],
+ 'dt': ['p2', 'p1', 'p1', 'p2']
+ }
+ pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema)
+
+ table_write.write_arrow(pa_table1)
+ table_write.write_arrow(pa_table2)
+
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ actual = self._read_test_table(read_builder).sort_by('user_id')
+ # TODO support pk merge feature when multiple write
+ expected = pa.Table.from_pydict({
+ 'user_id': [1, 2, 2, 3, 4, 5, 7, 8],
+ 'item_id': [1001, 1002, 1002, 1003, 1004, 1005, 1007, 1008],
+ 'behavior': ['a', 'b', 'b-new', 'c', None, 'e', 'g', 'h'],
+ 'dt': ['p1', 'p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2'],
+ }, schema=self.pa_schema)
+ self.assertEqual(actual, expected)
+
def testPkReaderWithFilter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
diff --git a/paimon-python/pypaimon/write/writer/append_only_data_writer.py
b/paimon-python/pypaimon/write/writer/append_only_data_writer.py
index c9d4c8f864..3bd128d7b4 100644
--- a/paimon-python/pypaimon/write/writer/append_only_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/append_only_data_writer.py
@@ -24,8 +24,8 @@ from pypaimon.write.writer.data_writer import DataWriter
class AppendOnlyDataWriter(DataWriter):
"""Data writer for append-only tables."""
- def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
- return data
+ def _process_data(self, data: pa.RecordBatch) -> pa.Table:
+ return pa.Table.from_batches([data])
- def _merge_data(self, existing_data: pa.RecordBatch, new_data:
pa.RecordBatch) -> pa.RecordBatch:
+ def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) ->
pa.Table:
return pa.concat_tables([existing_data, new_data])
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index 787dc70624..e2f778b586 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -53,7 +53,7 @@ class DataWriter(ABC):
self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd")
self.sequence_generator = SequenceGenerator(max_seq_number)
- self.pending_data: Optional[pa.RecordBatch] = None
+ self.pending_data: Optional[pa.Table] = None
self.committed_files: List[DataFileMeta] = []
def write(self, data: pa.RecordBatch):
@@ -100,7 +100,7 @@ class DataWriter(ABC):
self.pending_data = remaining_data
self._check_and_roll_if_needed()
- def _write_data_to_file(self, data: pa.RecordBatch):
+ def _write_data_to_file(self, data: pa.Table):
if data.num_rows == 0:
return
file_name = f"data-{uuid.uuid4()}-0.{self.file_format}"
@@ -115,8 +115,8 @@ class DataWriter(ABC):
raise ValueError(f"Unsupported file format: {self.file_format}")
# min key & max key
- table = pa.Table.from_batches([data])
- selected_table = table.select(self.trimmed_primary_key)
+
+ selected_table = data.select(self.trimmed_primary_key)
key_columns_batch = selected_table.to_batches()[0]
min_key_row_batch = key_columns_batch.slice(0, 1)
max_key_row_batch = key_columns_batch.slice(key_columns_batch.num_rows
- 1, 1)
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
index 99b11a9788..fb929710e8 100644
--- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -25,11 +25,11 @@ from pypaimon.write.writer.data_writer import DataWriter
class KeyValueDataWriter(DataWriter):
"""Data writer for primary key tables with system fields and sorting."""
- def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
+ def _process_data(self, data: pa.RecordBatch) -> pa.Table:
enhanced_data = self._add_system_fields(data)
- return self._sort_by_primary_key(enhanced_data)
+ return
pa.Table.from_batches([self._sort_by_primary_key(enhanced_data)])
- def _merge_data(self, existing_data: pa.RecordBatch, new_data:
pa.RecordBatch) -> pa.RecordBatch:
+ def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) ->
pa.Table:
combined = pa.concat_tables([existing_data, new_data])
return self._sort_by_primary_key(combined)