This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 40bc087e84 [Python] Support data writer for PyPaimon (#5997)
40bc087e84 is described below
commit 40bc087e8491a21158601d736f84bb5ef9f4cec6
Author: ChengHui Chen <[email protected]>
AuthorDate: Thu Jul 31 15:25:28 2025 +0800
[Python] Support data writer for PyPaimon (#5997)
---
paimon-python/pypaimon/common/file_io.py | 4 +
.../{common/file_io.py => write/__init__.py} | 36 -----
.../file_io.py => write/writer/__init__.py} | 36 -----
.../writer/append_only_data_writer.py} | 38 +----
paimon-python/pypaimon/write/writer/data_writer.py | 161 +++++++++++++++++++++
.../pypaimon/write/writer/key_value_data_writer.py | 80 ++++++++++
6 files changed, 253 insertions(+), 102 deletions(-)
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index f2539ca2c0..b58a936af4 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -51,3 +51,7 @@ class FileIO(ABC):
@abstractmethod
def new_input_stream(self, path: Path):
""""""
+
+ @abstractmethod
+ def get_file_size(self, path: Path):
+ """"""
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/write/__init__.py
similarity index 55%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/write/__init__.py
index f2539ca2c0..65b48d4d79 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/write/__init__.py
@@ -15,39 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
-
-
-class FileIO(ABC):
- @abstractmethod
- def exists(self, path: Path) -> bool:
- """"""
-
- @abstractmethod
- def read_file_utf8(self, path: Path) -> str:
- """"""
-
- @abstractmethod
- def try_to_write_atomic(self, path: Path, content: str) -> bool:
- """"""
-
- @abstractmethod
- def list_status(self, path: Path):
- """"""
-
- @abstractmethod
- def mkdirs(self, path: Path) -> bool:
- """"""
-
- @abstractmethod
- def write_file(self, path: Path, content: str, overwrite: bool = False):
- """"""
-
- @abstractmethod
- def delete_quietly(self, path: Path):
- """"""
-
- @abstractmethod
- def new_input_stream(self, path: Path):
- """"""
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/write/writer/__init__.py
similarity index 55%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/write/writer/__init__.py
index f2539ca2c0..65b48d4d79 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/write/writer/__init__.py
@@ -15,39 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
-
-
-class FileIO(ABC):
- @abstractmethod
- def exists(self, path: Path) -> bool:
- """"""
-
- @abstractmethod
- def read_file_utf8(self, path: Path) -> str:
- """"""
-
- @abstractmethod
- def try_to_write_atomic(self, path: Path, content: str) -> bool:
- """"""
-
- @abstractmethod
- def list_status(self, path: Path):
- """"""
-
- @abstractmethod
- def mkdirs(self, path: Path) -> bool:
- """"""
-
- @abstractmethod
- def write_file(self, path: Path, content: str, overwrite: bool = False):
- """"""
-
- @abstractmethod
- def delete_quietly(self, path: Path):
- """"""
-
- @abstractmethod
- def new_input_stream(self, path: Path):
- """"""
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/write/writer/append_only_data_writer.py
similarity index 55%
copy from paimon-python/pypaimon/common/file_io.py
copy to paimon-python/pypaimon/write/writer/append_only_data_writer.py
index f2539ca2c0..c9d4c8f864 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/write/writer/append_only_data_writer.py
@@ -15,39 +15,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from abc import ABC, abstractmethod
-from pathlib import Path
+import pyarrow as pa
-class FileIO(ABC):
- @abstractmethod
- def exists(self, path: Path) -> bool:
- """"""
+from pypaimon.write.writer.data_writer import DataWriter
- @abstractmethod
- def read_file_utf8(self, path: Path) -> str:
- """"""
- @abstractmethod
- def try_to_write_atomic(self, path: Path, content: str) -> bool:
- """"""
+class AppendOnlyDataWriter(DataWriter):
+ """Data writer for append-only tables."""
- @abstractmethod
- def list_status(self, path: Path):
- """"""
+ def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
+ return data
- @abstractmethod
- def mkdirs(self, path: Path) -> bool:
- """"""
-
- @abstractmethod
- def write_file(self, path: Path, content: str, overwrite: bool = False):
- """"""
-
- @abstractmethod
- def delete_quietly(self, path: Path):
- """"""
-
- @abstractmethod
- def new_input_stream(self, path: Path):
- """"""
+ def _merge_data(self, existing_data: pa.RecordBatch, new_data:
pa.RecordBatch) -> pa.RecordBatch:
+ return pa.concat_tables([existing_data, new_data])
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
new file mode 100644
index 0000000000..78de30a459
--- /dev/null
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -0,0 +1,161 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import uuid
+
+import pyarrow as pa
+from typing import Tuple, Optional, List
+from pathlib import Path
+from abc import ABC, abstractmethod
+
+from pypaimon.api.core_options import CoreOptions
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.table.file_store_table import FileStoreTable
+from pypaimon.table.row.binary_row import BinaryRow
+
+
+class DataWriter(ABC):
+ """Base class for data writers that handle PyArrow tables directly."""
+
+ def __init__(self, table: FileStoreTable, partition: Tuple, bucket: int):
+ self.table = table
+ self.partition = partition
+ self.bucket = bucket
+
+ self.file_io = self.table.file_io
+ self.trimmed_primary_key_fields =
self.table.table_schema.get_trimmed_primary_key_fields()
+
+ options = self.table.options
+ self.target_file_size = 256 * 1024 * 1024
+ self.file_format = options.get(CoreOptions.FILE_FORMAT,
CoreOptions.FILE_FORMAT_PARQUET)
+ self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd")
+
+ self.pending_data: Optional[pa.RecordBatch] = None
+ self.committed_files: List[DataFileMeta] = []
+
+ def write(self, data: pa.RecordBatch):
+ processed_data = self._process_data(data)
+
+ if self.pending_data is None:
+ self.pending_data = processed_data
+ else:
+ self.pending_data = self._merge_data(self.pending_data,
processed_data)
+
+ self._check_and_roll_if_needed()
+
+ def prepare_commit(self) -> List[DataFileMeta]:
+ if self.pending_data is not None and self.pending_data.num_rows > 0:
+ self._write_data_to_file(self.pending_data)
+ self.pending_data = None
+
+ return self.committed_files.copy()
+
+ def close(self):
+ self.pending_data = None
+ self.committed_files.clear()
+
+ @abstractmethod
+ def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
+ """Process incoming data (e.g., add system fields, sort). Must be
implemented by subclasses."""
+
+ @abstractmethod
+ def _merge_data(self, existing_data: pa.RecordBatch, new_data:
pa.RecordBatch) -> pa.RecordBatch:
+ """Merge existing data with new data. Must be implemented by
subclasses."""
+
+ def _check_and_roll_if_needed(self):
+ if self.pending_data is None:
+ return
+
+ current_size = self.pending_data.get_total_buffer_size()
+ if current_size > self.target_file_size:
+ split_row = _find_optimal_split_point(self.pending_data,
self.target_file_size)
+ if split_row > 0:
+ data_to_write = self.pending_data.slice(0, split_row)
+ remaining_data = self.pending_data.slice(split_row)
+
+ self._write_data_to_file(data_to_write)
+ self.pending_data = remaining_data
+ self._check_and_roll_if_needed()
+
+ def _write_data_to_file(self, data: pa.RecordBatch):
+ if data.num_rows == 0:
+ return
+ file_name = f"data-{uuid.uuid4()}.{self.file_format}"
+ file_path = self._generate_file_path(file_name)
+ try:
+ if self.file_format == CoreOptions.FILE_FORMAT_PARQUET:
+ self.file_io.write_parquet(file_path, data,
compression=self.compression)
+ elif self.file_format == CoreOptions.FILE_FORMAT_ORC:
+ self.file_io.write_orc(file_path, data,
compression=self.compression)
+ elif self.file_format == CoreOptions.FILE_FORMAT_AVRO:
+ self.file_io.write_avro(file_path, data,
compression=self.compression)
+ else:
+ raise ValueError(f"Unsupported file format:
{self.file_format}")
+
+ key_columns_batch = data.select(self.trimmed_primary_key_fields)
+ min_key_data = key_columns_batch.slice(0, 1).to_pylist()[0]
+ max_key_data = key_columns_batch.slice(key_columns_batch.num_rows
- 1, 1).to_pylist()[0]
+ self.committed_files.append(DataFileMeta(
+ file_name=file_name,
+ file_size=self.file_io.get_file_size(file_path),
+ row_count=data.num_rows,
+ min_key=BinaryRow(min_key_data,
self.trimmed_primary_key_fields),
+ max_key=BinaryRow(max_key_data,
self.trimmed_primary_key_fields),
+ key_stats=None, # TODO
+ value_stats=None,
+ min_sequence_number=0,
+ max_sequence_number=0,
+ schema_id=0,
+ level=0,
+ extra_files=None,
+ file_path=str(file_path),
+ ))
+
+ except Exception as e:
+ raise RuntimeError(f"Failed to write {self.file_format} file
{file_path}: {e}") from e
+
+ def _generate_file_path(self, file_name: str) -> Path:
+ path_builder = self.table.table_path
+
+ for i, field_name in enumerate(self.table.partition_keys):
+ path_builder = path_builder / (field_name + "=" +
self.partition[i])
+ path_builder = path_builder / ("bucket-" + str(self.bucket)) /
file_name
+
+ return path_builder
+
+
+def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int:
+ total_rows = data.num_rows
+ if total_rows <= 1:
+ return 0
+
+ left, right = 1, total_rows
+ best_split = 0
+
+ while left <= right:
+ mid = (left + right) // 2
+ slice_data = data.slice(0, mid)
+ slice_size = slice_data.get_total_buffer_size()
+
+ if slice_size <= target_size:
+ best_split = mid
+ left = mid + 1
+ else:
+ right = mid - 1
+
+ return best_split
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
new file mode 100644
index 0000000000..9b10236e14
--- /dev/null
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -0,0 +1,80 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import pyarrow as pa
+import pyarrow.compute as pc
+from typing import Tuple, Dict
+
+from pypaimon.write.writer.data_writer import DataWriter
+
+
+class KeyValueDataWriter(DataWriter):
+ """Data writer for primary key tables with system fields and sorting."""
+
+ def __init__(self, partition: Tuple, bucket: int, file_io, table_schema,
table_identifier,
+ target_file_size: int, options: Dict[str, str]):
+ super().__init__(partition, bucket, file_io, table_schema,
table_identifier,
+ target_file_size, options)
+ self.sequence_generator = SequenceGenerator()
+ self.trimmed_primary_key = [field.name for field in
self.table.table_schema.get_trimmed_primary_key_fields()]
+
+ def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
+ enhanced_data = self._add_system_fields(data)
+ return self._sort_by_primary_key(enhanced_data)
+
+ def _merge_data(self, existing_data: pa.RecordBatch, new_data:
pa.RecordBatch) -> pa.RecordBatch:
+ combined = pa.concat_tables([existing_data, new_data])
+ return self._sort_by_primary_key(combined)
+
+ def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch:
+ """Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND."""
+ num_rows = data.num_rows
+ enhanced_table = data
+
+ for pk_key in reversed(self.trimmed_primary_key):
+ if pk_key in data.column_names:
+ key_column = data.column(pk_key)
+ enhanced_table = enhanced_table.add_column(0,
f'_KEY_{pk_key}', key_column)
+
+ sequence_column = pa.array([self.sequence_generator.next() for _ in
range(num_rows)], type=pa.int64())
+ enhanced_table =
enhanced_table.add_column(len(self.trimmed_primary_key), '_SEQUENCE_NUMBER',
sequence_column)
+
+ # TODO: support real row kind here
+ value_kind_column = pa.repeat(0, num_rows)
+ enhanced_table =
enhanced_table.add_column(len(self.trimmed_primary_key) + 1, '_VALUE_KIND',
+ value_kind_column)
+
+ return enhanced_table
+
+ def _sort_by_primary_key(self, data: pa.RecordBatch) -> pa.RecordBatch:
+ sort_keys = self.trimmed_primary_key
+ if '_SEQUENCE_NUMBER' in data.column_names:
+ sort_keys.append('_SEQUENCE_NUMBER')
+
+ sorted_indices = pc.sort_indices(data, sort_keys=sort_keys)
+ sorted_batch = data.take(sorted_indices)
+ return sorted_batch
+
+
+class SequenceGenerator:
+ def __init__(self, start: int = 0):
+ self.current = start
+
+ def next(self) -> int:
+ self.current += 1
+ return self.current