This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4ec5e81a6a [python] Support table write for PyPaimon (#6001)
4ec5e81a6a is described below
commit 4ec5e81a6aa967b8e496c9d6a7bae8b0c70c73b7
Author: ChengHui Chen <[email protected]>
AuthorDate: Fri Aug 1 10:08:11 2025 +0800
[python] Support table write for PyPaimon (#6001)
---
.github/workflows/paimon-python-checks.yml | 2 +-
paimon-python/pypaimon/api/__init__.py | 2 +-
paimon-python/pypaimon/api/auth.py | 2 +-
paimon-python/pypaimon/api/token_loader.py | 4 +-
paimon-python/pypaimon/catalog/catalog_utils.py | 2 +-
.../pypaimon/catalog/filesystem_catalog.py | 5 +-
.../pypaimon/catalog/rest/rest_catalog.py | 2 +-
paimon-python/pypaimon/{api => common}/config.py | 9 +
.../pypaimon/{api => common}/core_options.py | 2 +
paimon-python/pypaimon/common/file_io.py | 311 +++++++++++++++++++--
.../pypaimon/manifest/manifest_file_manager.py | 3 +-
.../pypaimon/manifest/manifest_list_manager.py | 7 +-
paimon-python/pypaimon/pvfs/__init__.py | 2 +-
paimon-python/pypaimon/read/read_builder.py | 7 +-
paimon-python/pypaimon/read/table_scan.py | 7 +-
paimon-python/pypaimon/schema/table_schema.py | 2 +-
.../pypaimon/snapshot/snapshot_manager.py | 7 +-
paimon-python/pypaimon/table/file_store_table.py | 21 +-
paimon-python/pypaimon/table/table.py | 13 +-
.../pypaimon/tests/filesystem_catalog_test.py | 83 ++++++
paimon-python/pypaimon/tests/writer_test.py | 78 ++++++
paimon-python/pypaimon/write/batch_table_commit.py | 72 +++++
paimon-python/pypaimon/write/batch_table_write.py | 63 +++++
.../pypaimon/write/batch_write_builder.py | 51 ++++
.../core_options.py => write/commit_message.py} | 55 ++--
paimon-python/pypaimon/write/file_store_commit.py | 120 ++++++++
paimon-python/pypaimon/write/file_store_write.py | 75 +++++
paimon-python/pypaimon/write/row_key_extractor.py | 102 +++++++
paimon-python/pypaimon/write/writer/data_writer.py | 9 +-
.../pypaimon/write/writer/key_value_data_writer.py | 10 +-
paimon-python/setup.py | 1 +
31 files changed, 1039 insertions(+), 90 deletions(-)
diff --git a/.github/workflows/paimon-python-checks.yml
b/.github/workflows/paimon-python-checks.yml
index 2a3a22e1f6..de6c35dac4 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -45,7 +45,7 @@ jobs:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install dependencies
run: |
- python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 pyarrow==15.0.2 numpy==1.24.3
pandas==2.0.3 flake8==4.0.1 pytest~=7.0 requests 2>&1 >/dev/null
+ python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==15.0.2
numpy==1.24.3 pandas==2.0.3 flake8==4.0.1 pytest~=7.0 requests 2>&1 >/dev/null
- name: Run lint-python.sh
run: |
chmod +x paimon-python/dev/lint-python.sh
diff --git a/paimon-python/pypaimon/api/__init__.py
b/paimon-python/pypaimon/api/__init__.py
index 509c4d91a2..4c99ff6ed0 100644
--- a/paimon-python/pypaimon/api/__init__.py
+++ b/paimon-python/pypaimon/api/__init__.py
@@ -31,7 +31,7 @@ from pypaimon.api.api_response import (
)
from pypaimon.api.api_resquest import CreateDatabaseRequest,
AlterDatabaseRequest, RenameTableRequest, \
CreateTableRequest
-from pypaimon.api.config import CatalogOptions
+from pypaimon.common.config import CatalogOptions
from pypaimon.api.client import HttpClient
from pypaimon.common.identifier import Identifier
from pypaimon.api.typedef import T
diff --git a/paimon-python/pypaimon/api/auth.py
b/paimon-python/pypaimon/api/auth.py
index 393fa15920..eef71e3664 100644
--- a/paimon-python/pypaimon/api/auth.py
+++ b/paimon-python/pypaimon/api/auth.py
@@ -27,7 +27,7 @@ from typing import Optional, Dict
from .token_loader import DLFTokenLoader, DLFToken
from .typedef import RESTAuthParameter
-from .config import CatalogOptions
+from pypaimon.common.config import CatalogOptions
class AuthProvider(ABC):
diff --git a/paimon-python/pypaimon/api/token_loader.py
b/paimon-python/pypaimon/api/token_loader.py
index 9214345fce..bfc1ef173e 100644
--- a/paimon-python/pypaimon/api/token_loader.py
+++ b/paimon-python/pypaimon/api/token_loader.py
@@ -25,7 +25,7 @@ from requests.adapters import HTTPAdapter
from requests.exceptions import RequestException
from pypaimon.common.rest_json import json_field, JSON
-from .config import CatalogOptions
+from pypaimon.common.config import CatalogOptions
from .client import ExponentialRetry
@@ -59,7 +59,7 @@ class DLFToken:
@classmethod
def from_options(cls, options: Dict[str, str]) -> Optional['DLFToken']:
- from .config import CatalogOptions
+ from pypaimon.common.config import CatalogOptions
if (options.get(CatalogOptions.DLF_ACCESS_KEY_ID) is None
or options.get(CatalogOptions.DLF_ACCESS_KEY_SECRET) is None):
return None
diff --git a/paimon-python/pypaimon/catalog/catalog_utils.py
b/paimon-python/pypaimon/catalog/catalog_utils.py
index 2bae27aa4e..9b030b9f02 100644
--- a/paimon-python/pypaimon/catalog/catalog_utils.py
+++ b/paimon-python/pypaimon/catalog/catalog_utils.py
@@ -18,7 +18,7 @@ limitations under the License.
from pathlib import Path
from typing import Callable, Any
-from pypaimon.api.core_options import CoreOptions
+from pypaimon.common.core_options import CoreOptions
from pypaimon.common.identifier import Identifier
from pypaimon.catalog.table_metadata import TableMetadata
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 6651eec317..fa31a93c53 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -22,9 +22,10 @@ from urllib.parse import urlparse
from pypaimon import Catalog, Database, Table
from pypaimon.api import CatalogOptions, Identifier
-from pypaimon.api.core_options import CoreOptions
+from pypaimon.common.core_options import CoreOptions
from pypaimon.catalog.catalog_exception import TableNotExistException,
DatabaseNotExistException, \
TableAlreadyExistException, DatabaseAlreadyExistException
+from pypaimon.common.file_io import FileIO
from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.table.file_store_table import FileStoreTable
@@ -35,7 +36,7 @@ class FileSystemCatalog(Catalog):
raise ValueError(f"Paimon '{CatalogOptions.WAREHOUSE}' path must
be set")
self.warehouse = catalog_options.get(CatalogOptions.WAREHOUSE)
self.catalog_options = catalog_options
- self.file_io = None # FileIO(self.warehouse, self.catalog_options)
+ self.file_io = FileIO(self.warehouse, self.catalog_options)
def get_database(self, name: str) -> Database:
if self.file_io.exists(self.get_database_path(name)):
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index ab08685840..540beb6353 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -21,7 +21,7 @@ from typing import List, Dict, Optional, Union
from pypaimon import Database, Catalog, Schema
from pypaimon.api import RESTApi, CatalogOptions
from pypaimon.api.api_response import PagedList, GetTableResponse
-from pypaimon.api.core_options import CoreOptions
+from pypaimon.common.core_options import CoreOptions
from pypaimon.common.identifier import Identifier
from pypaimon.api.options import Options
diff --git a/paimon-python/pypaimon/api/config.py
b/paimon-python/pypaimon/common/config.py
similarity index 87%
rename from paimon-python/pypaimon/api/config.py
rename to paimon-python/pypaimon/common/config.py
index 2a33182993..4bfcc418e7 100644
--- a/paimon-python/pypaimon/api/config.py
+++ b/paimon-python/pypaimon/common/config.py
@@ -20,6 +20,15 @@ class OssOptions:
OSS_ACCESS_KEY_SECRET = "fs.oss.accessKeySecret"
OSS_SECURITY_TOKEN = "fs.oss.securityToken"
OSS_ENDPOINT = "fs.oss.endpoint"
+ OSS_REGION = "fs.oss.region"
+
+
+class S3Options:
+ S3_ACCESS_KEY_ID = "fs.s3.accessKeyId"
+ S3_ACCESS_KEY_SECRET = "fs.s3.accessKeySecret"
+ S3_SECURITY_TOKEN = "fs.s3.securityToken"
+ S3_ENDPOINT = "fs.s3.endpoint"
+ S3_REGION = "fs.s3.region"
class CatalogOptions:
diff --git a/paimon-python/pypaimon/api/core_options.py
b/paimon-python/pypaimon/common/core_options.py
similarity index 96%
copy from paimon-python/pypaimon/api/core_options.py
copy to paimon-python/pypaimon/common/core_options.py
index 18d2fba291..1aa4fc065f 100644
--- a/paimon-python/pypaimon/api/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -44,3 +44,5 @@ class CoreOptions(str, Enum):
FILE_BLOCK_SIZE = "file.block-size"
# Scan options
SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
+ # commit options
+ COMMIT_USER_PREFIX = "commit.user-prefix"
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index b58a936af4..c2d579eb53 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -15,43 +15,302 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from abc import ABC, abstractmethod
+
+import logging
+import os
+import subprocess
from pathlib import Path
+from typing import Optional, Dict, Any, List
+from urllib.parse import urlparse, splitport
+import pyarrow
+from pyarrow._fs import FileSystem
-class FileIO(ABC):
- @abstractmethod
- def exists(self, path: Path) -> bool:
- """"""
+from pypaimon.common.config import OssOptions, S3Options
- @abstractmethod
- def read_file_utf8(self, path: Path) -> str:
- """"""
- @abstractmethod
- def try_to_write_atomic(self, path: Path, content: str) -> bool:
- """"""
+class FileIO:
+ def __init__(self, warehouse: str, catalog_options: dict):
+ self.properties = catalog_options
+ self.logger = logging.getLogger(__name__)
+ scheme, netloc, path = self.parse_location(warehouse)
+ if scheme in {"oss"}:
+ self.filesystem = self._initialize_oss_fs()
+ elif scheme in {"s3", "s3a", "s3n"}:
+ self.filesystem = self._initialize_s3_fs()
+ elif scheme in {"hdfs", "viewfs"}:
+ self.filesystem = self._initialize_hdfs_fs(scheme, netloc)
+ elif scheme in {"file"}:
+ self.filesystem = self._initialize_local_fs()
+ else:
+ raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")
+
+ @staticmethod
+ def parse_location(location: str):
+ uri = urlparse(location)
+ if not uri.scheme:
+ return "file", uri.netloc, os.path.abspath(location)
+ elif uri.scheme in ("hdfs", "viewfs"):
+ return uri.scheme, uri.netloc, uri.path
+ else:
+ return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
+
+ def _initialize_oss_fs(self) -> FileSystem:
+ from pyarrow.fs import S3FileSystem
+
+ client_kwargs = {
+ "endpoint_override": self.properties.get(OssOptions.OSS_ENDPOINT),
+ "access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID),
+ "secret_key":
self.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET),
+ "session_token":
self.properties.get(OssOptions.OSS_SECURITY_TOKEN),
+ "region": self.properties.get(OssOptions.OSS_REGION),
+ "force_virtual_addressing": True,
+ }
+
+ return S3FileSystem(**client_kwargs)
+
+ def _initialize_s3_fs(self) -> FileSystem:
+ from pyarrow.fs import S3FileSystem
+
+ client_kwargs = {
+ "endpoint_override": self.properties.get(S3Options.S3_ENDPOINT),
+ "access_key": self.properties.get(S3Options.S3_ACCESS_KEY_ID),
+ "secret_key": self.properties.get(S3Options.S3_ACCESS_KEY_SECRET),
+ "session_token": self.properties.get(S3Options.S3_SECURITY_TOKEN),
+ "region": self.properties.get(S3Options.S3_REGION),
+ "force_virtual_addressing": True,
+ }
+
+ return S3FileSystem(**client_kwargs)
+
+ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) ->
FileSystem:
+ from pyarrow.fs import HadoopFileSystem
+
+ if 'HADOOP_HOME' not in os.environ:
+ raise RuntimeError("HADOOP_HOME environment variable is not set.")
+ if 'HADOOP_CONF_DIR' not in os.environ:
+ raise RuntimeError("HADOOP_CONF_DIR environment variable is not
set.")
+
+ hadoop_home = os.environ.get("HADOOP_HOME")
+ native_lib_path = f"{hadoop_home}/lib/native"
+ os.environ['LD_LIBRARY_PATH'] =
f"{native_lib_path}:{os.environ.get('LD_LIBRARY_PATH', '')}"
+
+ class_paths = subprocess.run(
+ [f'{hadoop_home}/bin/hadoop', 'classpath', '--glob'],
+ capture_output=True,
+ text=True,
+ check=True
+ )
+ os.environ['CLASSPATH'] = class_paths.stdout.strip()
+
+ host, port_str = splitport(netloc)
+ return HadoopFileSystem(
+ host=host,
+ port=int(port_str),
+ user=os.environ.get('HADOOP_USER_NAME', 'hadoop')
+ )
+
+ def _initialize_local_fs(self) -> FileSystem:
+ from pyarrow.fs import LocalFileSystem
+
+ return LocalFileSystem()
+
+ def new_input_stream(self, path: Path):
+ return self.filesystem.open_input_file(str(path))
+
+ def new_output_stream(self, path: Path):
+ parent_dir = path.parent
+ if str(parent_dir) and not self.exists(parent_dir):
+ self.mkdirs(parent_dir)
+
+ return self.filesystem.open_output_stream(str(path))
+
+ def get_file_status(self, path: Path):
+ file_infos = self.filesystem.get_file_info([str(path)])
+ return file_infos[0]
- @abstractmethod
def list_status(self, path: Path):
- """"""
+ selector = pyarrow.fs.FileSelector(str(path), recursive=False,
allow_not_found=True)
+ return self.filesystem.get_file_info(selector)
+
+ def list_directories(self, path: Path):
+ file_infos = self.list_status(path)
+ return [info for info in file_infos if info.type ==
pyarrow.fs.FileType.Directory]
+
+ def exists(self, path: Path) -> bool:
+ try:
+ file_info = self.filesystem.get_file_info([str(path)])[0]
+ return file_info.type != pyarrow.fs.FileType.NotFound
+ except Exception:
+ return False
+
+ def delete(self, path: Path, recursive: bool = False) -> bool:
+ try:
+ file_info = self.filesystem.get_file_info([str(path)])[0]
+ if file_info.type == pyarrow.fs.FileType.Directory:
+ if recursive:
+ self.filesystem.delete_dir_contents(str(path))
+ else:
+ self.filesystem.delete_dir(str(path))
+ else:
+ self.filesystem.delete_file(str(path))
+ return True
+ except Exception as e:
+ self.logger.warning(f"Failed to delete {path}: {e}")
+ return False
- @abstractmethod
def mkdirs(self, path: Path) -> bool:
- """"""
+ try:
+ self.filesystem.create_dir(str(path), recursive=True)
+ return True
+ except Exception as e:
+ self.logger.warning(f"Failed to create directory {path}: {e}")
+ return False
- @abstractmethod
- def write_file(self, path: Path, content: str, overwrite: bool = False):
- """"""
+ def rename(self, src: Path, dst: Path) -> bool:
+ try:
+ dst_parent = dst.parent
+ if str(dst_parent) and not self.exists(dst_parent):
+ self.mkdirs(dst_parent)
+
+ self.filesystem.move(str(src), str(dst))
+ return True
+ except Exception as e:
+ self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
+ return False
- @abstractmethod
def delete_quietly(self, path: Path):
- """"""
+ if self.logger.isEnabledFor(logging.DEBUG):
+ self.logger.debug(f"Ready to delete {path}")
- @abstractmethod
- def new_input_stream(self, path: Path):
- """"""
+ try:
+ if not self.delete(path, False) and self.exists(path):
+ self.logger.warning(f"Failed to delete file {path}")
+ except Exception:
+ self.logger.warning(f"Exception occurs when deleting file {path}",
exc_info=True)
+
+ def delete_files_quietly(self, files: List[Path]):
+ for file_path in files:
+ self.delete_quietly(file_path)
+
+ def delete_directory_quietly(self, directory: Path):
+ if self.logger.isEnabledFor(logging.DEBUG):
+ self.logger.debug(f"Ready to delete {directory}")
+
+ try:
+ if not self.delete(directory, True) and self.exists(directory):
+ self.logger.warning(f"Failed to delete directory {directory}")
+ except Exception:
+ self.logger.warning(f"Exception occurs when deleting directory
{directory}", exc_info=True)
+
+ def get_file_size(self, path: Path) -> int:
+ file_info = self.get_file_status(path)
+ if file_info.size is None:
+ raise ValueError(f"File size not available for {path}")
+ return file_info.size
+
+ def is_dir(self, path: Path) -> bool:
+ file_info = self.get_file_status(path)
+ return file_info.type == pyarrow.fs.FileType.Directory
+
+ def check_or_mkdirs(self, path: Path):
+ if self.exists(path):
+ if not self.is_dir(path):
+ raise ValueError(f"The path '{path}' should be a directory.")
+ else:
+ self.mkdirs(path)
+
+ def read_file_utf8(self, path: Path) -> str:
+ with self.new_input_stream(path) as input_stream:
+ return input_stream.read().decode('utf-8')
+
+ def try_to_write_atomic(self, path: Path, content: str) -> bool:
+ temp_path = path.with_suffix(path.suffix + ".tmp") if path.suffix else
Path(str(path) + ".tmp")
+ success = False
+ try:
+ self.write_file(temp_path, content, False)
+ success = self.rename(temp_path, path)
+ finally:
+ if not success:
+ self.delete_quietly(temp_path)
+ return success
+
+ def write_file(self, path: Path, content: str, overwrite: bool = False):
+ with self.new_output_stream(path) as output_stream:
+ output_stream.write(content.encode('utf-8'))
+
+ def overwrite_file_utf8(self, path: Path, content: str):
+ with self.new_output_stream(path) as output_stream:
+ output_stream.write(content.encode('utf-8'))
+
+ def copy_file(self, source_path: Path, target_path: Path, overwrite: bool
= False):
+ if not overwrite and self.exists(target_path):
+ raise FileExistsError(f"Target file {target_path} already exists
and overwrite=False")
+
+ self.filesystem.copy_file(str(source_path), str(target_path))
+
+ def copy_files(self, source_directory: Path, target_directory: Path,
overwrite: bool = False):
+ file_infos = self.list_status(source_directory)
+ for file_info in file_infos:
+ if file_info.type == pyarrow.fs.FileType.File:
+ source_file = Path(file_info.path)
+ target_file = target_directory / source_file.name
+ self.copy_file(source_file, target_file, overwrite)
+
+ def read_overwritten_file_utf8(self, path: Path) -> Optional[str]:
+ retry_number = 0
+ exception = None
+ while retry_number < 5:
+ try:
+ return self.read_file_utf8(path)
+ except FileNotFoundError:
+ return None
+ except Exception as e:
+ if not self.exists(path):
+ return None
+
+ if
(str(type(e).__name__).endswith("RemoteFileChangedException") or
+ (str(e) and "Blocklist for" in str(e) and "has
changed" in str(e))):
+ exception = e
+ retry_number += 1
+ else:
+ raise e
+
+ if exception:
+ if isinstance(exception, Exception):
+ raise exception
+ else:
+ raise RuntimeError(exception)
+
+ return None
+
+ def write_parquet(self, path: Path, data: pyarrow.RecordBatch,
compression: str = 'snappy', **kwargs):
+ try:
+ import pyarrow.parquet as pq
+
+ with self.new_output_stream(path) as output_stream:
+ with pq.ParquetWriter(output_stream, data.schema,
compression=compression, **kwargs) as pw:
+ pw.write_batch(data)
+
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write Parquet file {path}: {e}")
from e
+
+ def write_orc(self, path: Path, data: pyarrow.RecordBatch, compression:
str = 'zstd', **kwargs):
+ try:
+ import pyarrow.orc as orc
+
+ with self.new_output_stream(path) as output_stream:
+ orc.write_table(
+ data,
+ output_stream,
+ compression=compression,
+ **kwargs
+ )
+
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e
- @abstractmethod
- def get_file_size(self, path: Path):
- """"""
+ def write_avro(self, path: Path, table: pyarrow.RecordBatch, schema:
Optional[Dict[str, Any]] = None, **kwargs):
+ raise ValueError("Unsupported write_avro yet")
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index a70d0e2d46..6775d24bcd 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -23,7 +23,6 @@ from io import BytesIO
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import ManifestEntry,
MANIFEST_ENTRY_SCHEMA
-from pypaimon.table.file_store_table import FileStoreTable
from pypaimon.table.row.binary_row import BinaryRowDeserializer,
BinaryRowSerializer, BinaryRow
@@ -31,6 +30,8 @@ class ManifestFileManager:
"""Writer for manifest files in Avro format using unified FileIO."""
def __init__(self, table):
+ from pypaimon.table.file_store_table import FileStoreTable
+
self.table: FileStoreTable = table
self.manifest_path = table.table_path / "manifest"
self.file_io = table.file_io
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index 969c6a05ef..1c58ea5b6a 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -24,14 +24,15 @@ from io import BytesIO
from pypaimon.manifest.schema.manifest_file_meta import
MANIFEST_FILE_META_SCHEMA
from pypaimon.snapshot.snapshot import Snapshot
-from pypaimon.table.file_store_table import FileStoreTable
class ManifestListManager:
"""Manager for manifest list files in Avro format using unified FileIO."""
- def __init__(self, table: FileStoreTable):
- self.table = table
+ def __init__(self, table):
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ self.table: FileStoreTable = table
self.manifest_path = self.table.table_path / "manifest"
self.file_io = self.table.file_io
diff --git a/paimon-python/pypaimon/pvfs/__init__.py
b/paimon-python/pypaimon/pvfs/__init__.py
index 2b425d0251..ec1cb0f05c 100644
--- a/paimon-python/pypaimon/pvfs/__init__.py
+++ b/paimon-python/pypaimon/pvfs/__init__.py
@@ -33,7 +33,7 @@ from fsspec.implementations.local import LocalFileSystem
from pypaimon.api import RESTApi, GetTableTokenResponse, Schema, Identifier,
GetTableResponse
from pypaimon.api.client import NoSuchResourceException, AlreadyExistsException
-from pypaimon.api.config import CatalogOptions, OssOptions, PVFSOptions
+from pypaimon.common.config import CatalogOptions, OssOptions, PVFSOptions
PROTOCOL_NAME = "pvfs"
diff --git a/paimon-python/pypaimon/read/read_builder.py
b/paimon-python/pypaimon/read/read_builder.py
index 604eff9ff1..30f824a698 100644
--- a/paimon-python/pypaimon/read/read_builder.py
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -23,14 +23,15 @@ from pypaimon.common.predicate_builder import
PredicateBuilder
from pypaimon.read.table_read import TableRead
from pypaimon.read.table_scan import TableScan
from pypaimon.schema.data_types import DataField
-from pypaimon.table.file_store_table import FileStoreTable
class ReadBuilder:
"""Implementation of ReadBuilder for native Python reading."""
- def __init__(self, table: FileStoreTable):
- self.table = table
+ def __init__(self, table):
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ self.table: FileStoreTable = table
self._predicate: Optional[Predicate] = None
self._projection: Optional[List[str]] = None
self._limit: Optional[int] = None
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index fb56d70a82..27015d3eda 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -29,15 +29,16 @@ from pypaimon.read.plan import Plan
from pypaimon.read.split import Split
from pypaimon.schema.data_types import DataField
from pypaimon.snapshot.snapshot_manager import SnapshotManager
-from pypaimon.table.file_store_table import FileStoreTable
class TableScan:
"""Implementation of TableScan for native Python reading."""
- def __init__(self, table: FileStoreTable, predicate: Optional[Predicate],
limit: Optional[int],
+ def __init__(self, table, predicate: Optional[Predicate], limit:
Optional[int],
read_type: List[DataField]):
- self.table = table
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ self.table: FileStoreTable = table
self.predicate = predicate
self.predicate = predicate
self.limit = limit
diff --git a/paimon-python/pypaimon/schema/table_schema.py
b/paimon-python/pypaimon/schema/table_schema.py
index 635cb3464d..736de902db 100644
--- a/paimon-python/pypaimon/schema/table_schema.py
+++ b/paimon-python/pypaimon/schema/table_schema.py
@@ -27,7 +27,7 @@ import pyarrow
from pypaimon import Schema
from pypaimon.common.rest_json import json_field
from pypaimon.schema import data_types
-from pypaimon.api.core_options import CoreOptions
+from pypaimon.common.core_options import CoreOptions
from pypaimon.common.file_io import FileIO
from pypaimon.schema.data_types import DataField
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index 9e500c48d2..6a8a68e73a 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -21,14 +21,15 @@ from typing import Optional
from pypaimon.common.file_io import FileIO
from pypaimon.snapshot.snapshot import Snapshot
-from pypaimon.table.file_store_table import FileStoreTable
class SnapshotManager:
"""Manager for snapshot files using unified FileIO."""
- def __init__(self, table: FileStoreTable):
- self.table = table
+ def __init__(self, table):
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ self.table: FileStoreTable = table
self.file_io: FileIO = self.table.file_io
self.snapshot_dir = self.table.table_path / "snapshot"
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index e1b8322fe8..225edd1cc4 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -19,12 +19,14 @@
from pathlib import Path
from pypaimon import Table
-from pypaimon.api.core_options import CoreOptions
+from pypaimon.common.core_options import CoreOptions
from pypaimon.common.identifier import Identifier
from pypaimon.schema.table_schema import TableSchema
from pypaimon.common.file_io import FileIO
from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.table.bucket_mode import BucketMode
+from pypaimon.write.batch_write_builder import BatchWriteBuilder
+from pypaimon.write.row_key_extractor import RowKeyExtractor,
FixedBucketRowKeyExtractor, UnawareBucketRowKeyExtractor
class FileStoreTable(Table):
@@ -56,3 +58,20 @@ class FileStoreTable(Table):
return BucketMode.BUCKET_UNAWARE
else:
return BucketMode.HASH_FIXED
+
+ def new_read_builder(self) -> 'ReadBuilder':
+ pass
+
+ def new_batch_write_builder(self) -> BatchWriteBuilder:
+ return BatchWriteBuilder(self)
+
+ def create_row_key_extractor(self) -> RowKeyExtractor:
+ bucket_mode = self.bucket_mode()
+ if bucket_mode == BucketMode.HASH_FIXED:
+ return FixedBucketRowKeyExtractor(self.table_schema)
+ elif bucket_mode == BucketMode.BUCKET_UNAWARE:
+ return UnawareBucketRowKeyExtractor(self.table_schema)
+ elif bucket_mode == BucketMode.HASH_DYNAMIC or bucket_mode ==
BucketMode.CROSS_PARTITION:
+ raise ValueError(f"Unsupported bucket mode {bucket_mode} yet")
+ else:
+ raise ValueError(f"Unsupported bucket mode: {bucket_mode}")
diff --git a/paimon-python/pypaimon/table/table.py
b/paimon-python/pypaimon/table/table.py
index 6f15b049ae..3a1fe2e622 100644
--- a/paimon-python/pypaimon/table/table.py
+++ b/paimon-python/pypaimon/table/table.py
@@ -16,8 +16,19 @@
# limitations under the License.
#################################################################################
-from abc import ABC
+from abc import ABC, abstractmethod
+
+from pypaimon.read.read_builder import ReadBuilder
+from pypaimon.write.batch_write_builder import BatchWriteBuilder
class Table(ABC):
"""A table provides basic abstraction for table read and write."""
+
+ @abstractmethod
+ def new_read_builder(self) -> ReadBuilder:
+ """Return a builder for building table scan and table read."""
+
+ @abstractmethod
+ def new_batch_write_builder(self) -> BatchWriteBuilder:
+ """Returns a builder for building batch table write and table
commit."""
diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
new file mode 100644
index 0000000000..5b0dc81b2f
--- /dev/null
+++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import shutil
+import tempfile
+import unittest
+
+from pypaimon import Schema
+from pypaimon.catalog.catalog_exception import DatabaseAlreadyExistException,
TableAlreadyExistException, \
+ DatabaseNotExistException, TableNotExistException
+from pypaimon.catalog.catalog_factory import CatalogFactory
+from pypaimon.schema.data_types import DataField
+from pypaimon.table.file_store_table import FileStoreTable
+
+
+class FileSystemCatalogTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.temp_dir = tempfile.mkdtemp(prefix="unittest_")
+ self.warehouse = os.path.join(self.temp_dir, 'test_dir')
+
+ def tearDown(self):
+ shutil.rmtree(self.temp_dir, ignore_errors=True)
+
+ def test_database(self):
+ catalog = CatalogFactory.create({
+ "warehouse": self.warehouse
+ })
+ catalog.create_database("test_db", False)
+ self.assertTrue(os.path.exists(self.warehouse + "/test_db.db"))
+
+ with self.assertRaises(DatabaseAlreadyExistException):
+ catalog.create_database("test_db", False)
+
+ catalog.create_database("test_db", True)
+
+ with self.assertRaises(DatabaseNotExistException):
+ catalog.get_database("test_db_x")
+
+ database = catalog.get_database("test_db")
+ self.assertEqual(database.name, "test_db")
+
+ def test_table(self):
+ fields = [
+ DataField.from_dict({"id": 1, "name": "f0", "type": "INT"}),
+ DataField.from_dict({"id": 2, "name": "f1", "type": "INT"}),
+ DataField.from_dict({"id": 3, "name": "f2", "type": "STRING"}),
+ ]
+ catalog = CatalogFactory.create({
+ "warehouse": self.warehouse
+ })
+ catalog.create_database("test_db", False)
+ catalog.create_table("test_db.test_table", Schema(fields=fields),
False)
+ self.assertTrue(os.path.exists(self.warehouse +
"/test_db.db/test_table/schema/schema-0"))
+
+ with self.assertRaises(TableAlreadyExistException):
+ catalog.create_table("test_db.test_table", Schema(fields=fields),
False)
+
+ catalog.create_table("test_db.test_table", Schema(fields=fields), True)
+
+ database = catalog.get_database("test_db")
+ self.assertEqual(database.name, "test_db")
+
+ with self.assertRaises(TableNotExistException):
+ catalog.get_table("test_db.test_table_x")
+
+ table = catalog.get_table("test_db.test_table")
+ self.assertTrue(table is not None)
+ self.assertTrue(isinstance(table, FileStoreTable))
diff --git a/paimon-python/pypaimon/tests/writer_test.py
b/paimon-python/pypaimon/tests/writer_test.py
new file mode 100644
index 0000000000..9a8230b0eb
--- /dev/null
+++ b/paimon-python/pypaimon/tests/writer_test.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import glob
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow
+
+from pypaimon import Schema
+from pypaimon.catalog.catalog_factory import CatalogFactory
+from pypaimon.schema.data_types import DataField
+
+
+class WriterTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.temp_dir = tempfile.mkdtemp(prefix="unittest_")
+ self.warehouse = os.path.join(self.temp_dir, 'test_dir')
+
+ def tearDown(self):
+ shutil.rmtree(self.temp_dir, ignore_errors=True)
+
+ def test_writer(self):
+ pa_schema = pyarrow.schema([
+ ('f0', pyarrow.int32()),
+ ('f1', pyarrow.string()),
+ ('f2', pyarrow.string())
+ ])
+ fields = [
+ DataField.from_dict({"id": 1, "name": "f0", "type": "INT"}),
+ DataField.from_dict({"id": 2, "name": "f1", "type": "STRING"}),
+ DataField.from_dict({"id": 3, "name": "f2", "type": "STRING"}),
+ ]
+ catalog = CatalogFactory.create({
+ "warehouse": self.warehouse
+ })
+ catalog.create_database("test_db", False)
+ catalog.create_table("test_db.test_table", Schema(fields=fields),
False)
+ table = catalog.get_table("test_db.test_table")
+
+ data = {
+ 'f0': [1, 2, 3],
+ 'f1': ['a', 'b', 'c'],
+ 'f2': ['X', 'Y', 'Z']
+ }
+ expect = pyarrow.Table.from_pydict(data, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(expect)
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+ self.assertTrue(os.path.exists(self.warehouse +
"/test_db.db/test_table/snapshot/LATEST"))
+ self.assertTrue(os.path.exists(self.warehouse +
"/test_db.db/test_table/snapshot/snapshot-1"))
+ self.assertTrue(os.path.exists(self.warehouse +
"/test_db.db/test_table/manifest"))
+ self.assertTrue(os.path.exists(self.warehouse +
"/test_db.db/test_table/bucket-0"))
+ self.assertEqual(len(glob.glob(self.warehouse +
"/test_db.db/test_table/manifest/*.avro")), 2)
+ self.assertEqual(len(glob.glob(self.warehouse +
"/test_db.db/test_table/bucket-0/*.parquet")), 1)
diff --git a/paimon-python/pypaimon/write/batch_table_commit.py
b/paimon-python/pypaimon/write/batch_table_commit.py
new file mode 100644
index 0000000000..14849e6309
--- /dev/null
+++ b/paimon-python/pypaimon/write/batch_table_commit.py
@@ -0,0 +1,72 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import time
+from typing import List, Optional
+
+from pypaimon.write.commit_message import CommitMessage
+from pypaimon.write.file_store_commit import FileStoreCommit
+
+
+class BatchTableCommit:
+ """Python implementation of BatchTableCommit for batch writing
scenarios."""
+
+ def __init__(self, table, commit_user: str, static_partition:
Optional[dict]):
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ self.table: FileStoreTable = table
+ self.commit_user = commit_user
+ self.overwrite_partition = static_partition
+ self.file_store_commit = FileStoreCommit(table, commit_user)
+ self.batch_committed = False
+
+ def commit(self, commit_messages: List[CommitMessage]):
+ self._check_committed()
+
+ non_empty_messages = [msg for msg in commit_messages if not
msg.is_empty()]
+ if not non_empty_messages:
+ return
+
+ commit_identifier = int(time.time() * 1000)
+
+ try:
+ if self.overwrite_partition is not None:
+ self.file_store_commit.overwrite(
+ partition=self.overwrite_partition,
+ commit_messages=non_empty_messages,
+ commit_identifier=commit_identifier
+ )
+ else:
+ self.file_store_commit.commit(
+ commit_messages=non_empty_messages,
+ commit_identifier=commit_identifier
+ )
+ except Exception as e:
+ self.file_store_commit.abort(commit_messages)
+ raise RuntimeError(f"Failed to commit: {str(e)}") from e
+
+ def abort(self, commit_messages: List[CommitMessage]):
+ self.file_store_commit.abort(commit_messages)
+
+ def close(self):
+ self.file_store_commit.close()
+
+ def _check_committed(self):
+ if self.batch_committed:
+ raise RuntimeError("BatchTableCommit only supports one-time
committing.")
+ self.batch_committed = True
diff --git a/paimon-python/pypaimon/write/batch_table_write.py
b/paimon-python/pypaimon/write/batch_table_write.py
new file mode 100644
index 0000000000..921de1151f
--- /dev/null
+++ b/paimon-python/pypaimon/write/batch_table_write.py
@@ -0,0 +1,63 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import pyarrow as pa
+from collections import defaultdict
+
+from typing import List
+
+from pypaimon.write.commit_message import CommitMessage
+from pypaimon.write.file_store_write import FileStoreWrite
+
+
+class BatchTableWrite:
+ def __init__(self, table):
+ self.file_store_write = FileStoreWrite(table)
+ self.row_key_extractor = table.create_row_key_extractor()
+ self.batch_committed = False
+
+ def write_arrow(self, table: pa.Table, row_kind: List[int] = None):
+ # TODO: support row_kind
+ batches_iterator = table.to_batches()
+ for batch in batches_iterator:
+ self.write_arrow_batch(batch)
+
+ def write_arrow_batch(self, data: pa.RecordBatch, row_kind: List[int] =
None):
+ # TODO: support row_kind
+ partitions, buckets =
self.row_key_extractor.extract_partition_bucket_batch(data)
+
+ partition_bucket_groups = defaultdict(list)
+ for i in range(data.num_rows):
+ partition_bucket_groups[(tuple(partitions[i]),
buckets[i])].append(i)
+
+ for (partition, bucket), row_indices in
partition_bucket_groups.items():
+ indices_array = pa.array(row_indices, type=pa.int64())
+ sub_table = pa.compute.take(data, indices_array)
+ self.file_store_write.write(partition, bucket, sub_table)
+
+ def write_pandas(self, dataframe):
+ raise ValueError("Not implemented yet")
+
+ def prepare_commit(self) -> List[CommitMessage]:
+ if self.batch_committed:
+ raise RuntimeError("BatchTableWrite only supports one-time
committing.")
+ self.batch_committed = True
+ return self.file_store_write.prepare_commit()
+
+ def close(self):
+ self.file_store_write.close()
diff --git a/paimon-python/pypaimon/write/batch_write_builder.py
b/paimon-python/pypaimon/write/batch_write_builder.py
new file mode 100644
index 0000000000..e23455f3fc
--- /dev/null
+++ b/paimon-python/pypaimon/write/batch_write_builder.py
@@ -0,0 +1,51 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import uuid
+
+from typing import Optional
+
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.write.batch_table_commit import BatchTableCommit
+from pypaimon.write.batch_table_write import BatchTableWrite
+
+
+class BatchWriteBuilder:
+ def __init__(self, table):
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ self.table: FileStoreTable = table
+ self.commit_user = self._create_commit_user()
+ self.static_partition = None
+
+ def overwrite(self, static_partition: Optional[dict] = None):
+ self.static_partition = static_partition
+ return self
+
+ def new_write(self) -> BatchTableWrite:
+ return BatchTableWrite(self.table)
+
+ def new_commit(self) -> BatchTableCommit:
+ commit = BatchTableCommit(self.table, self.commit_user,
self.static_partition)
+ return commit
+
+ def _create_commit_user(self):
+ if CoreOptions.COMMIT_USER_PREFIX in self.table.options:
+ return
f"{self.table.options.get(CoreOptions.COMMIT_USER_PREFIX)}_{uuid.uuid4()}"
+ else:
+ return str(uuid.uuid4())
diff --git a/paimon-python/pypaimon/api/core_options.py
b/paimon-python/pypaimon/write/commit_message.py
similarity index 55%
rename from paimon-python/pypaimon/api/core_options.py
rename to paimon-python/pypaimon/write/commit_message.py
index 18d2fba291..fc36f13e5b 100644
--- a/paimon-python/pypaimon/api/core_options.py
+++ b/paimon-python/pypaimon/write/commit_message.py
@@ -16,31 +16,30 @@
# limitations under the License.
################################################################################
-from enum import Enum
-
-
-class CoreOptions(str, Enum):
- """Core options for paimon."""
-
- def __str__(self):
- return self.value
-
- # Basic options
- AUTO_CREATE = "auto-create"
- PATH = "path"
- TYPE = "type"
- BRANCH = "branch"
- BUCKET = "bucket"
- BUCKET_KEY = "bucket-key"
- WAREHOUSE = "warehouse"
- # File format options
- FILE_FORMAT = "file.format"
- FILE_FORMAT_ORC = "orc"
- FILE_FORMAT_AVRO = "avro"
- FILE_FORMAT_PARQUET = "parquet"
- FILE_COMPRESSION = "file.compression"
- FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
- FILE_FORMAT_PER_LEVEL = "file.format.per.level"
- FILE_BLOCK_SIZE = "file.block-size"
- # Scan options
- SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
+from typing import Tuple, List
+
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+
+
+class CommitMessage:
+ """Python implementation of CommitMessage"""
+
+ def __init__(self, partition: Tuple, bucket: int, new_files:
List[DataFileMeta]):
+ self._partition = partition
+ self._bucket = bucket
+ self._new_files = new_files or []
+
+ def partition(self) -> Tuple:
+ """Get the partition of this commit message."""
+ return self._partition
+
+ def bucket(self) -> int:
+ """Get the bucket of this commit message."""
+ return self._bucket
+
+ def new_files(self) -> List[DataFileMeta]:
+ """Get the list of new files."""
+ return self._new_files
+
+ def is_empty(self):
+ return not self._new_files
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
new file mode 100644
index 0000000000..4928167907
--- /dev/null
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -0,0 +1,120 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import time
+from pathlib import Path
+from typing import List
+
+from pypaimon.manifest.manifest_file_manager import ManifestFileManager
+from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.write.commit_message import CommitMessage
+
+
+class FileStoreCommit:
+ """Core commit logic for file store operations."""
+
+ def __init__(self, table, commit_user: str):
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ self.table: FileStoreTable = table
+ self.commit_user = commit_user
+
+ self.snapshot_manager = SnapshotManager(table)
+ self.manifest_file_manager = ManifestFileManager(table)
+ self.manifest_list_manager = ManifestListManager(table)
+
+ self.manifest_target_size = 8 * 1024 * 1024
+ self.manifest_merge_min_count = 30
+
+ def commit(self, commit_messages: List[CommitMessage], commit_identifier:
int):
+ """Commit the given commit messages in normal append mode."""
+ if not commit_messages:
+ return
+
+ new_manifest_files = self.manifest_file_manager.write(commit_messages)
+ if not new_manifest_files:
+ return
+ latest_snapshot = self.snapshot_manager.get_latest_snapshot()
+ existing_manifest_files = []
+ if latest_snapshot:
+ existing_manifest_files =
self.manifest_list_manager.read_all_manifest_files(latest_snapshot)
+ new_manifest_files.extend(existing_manifest_files)
+ manifest_list = self.manifest_list_manager.write(new_manifest_files)
+
+ new_snapshot_id = self._generate_snapshot_id()
+ snapshot_data = Snapshot(
+ version=3,
+ id=new_snapshot_id,
+ schema_id=0,
+ base_manifest_list=manifest_list,
+ delta_manifest_list=manifest_list,
+ commit_user=self.commit_user,
+ commit_identifier=commit_identifier,
+ commit_kind="APPEND",
+ time_millis=int(time.time() * 1000),
+ log_offsets={},
+ )
+ self.snapshot_manager.commit_snapshot(new_snapshot_id, snapshot_data)
+
+ def overwrite(self, partition, commit_messages: List[CommitMessage],
commit_identifier: int):
+ if not commit_messages:
+ return
+
+ new_manifest_files = self.manifest_file_manager.write(commit_messages)
+ if not new_manifest_files:
+ return
+
+ # In overwrite mode, we don't merge with existing manifests
+ manifest_list = self.manifest_list_manager.write(new_manifest_files)
+
+ new_snapshot_id = self._generate_snapshot_id()
+ snapshot_data = Snapshot(
+ version=3,
+ id=new_snapshot_id,
+ schema_id=0,
+ base_manifest_list=manifest_list,
+ delta_manifest_list=manifest_list,
+ commit_user=self.commit_user,
+ commit_identifier=commit_identifier,
+ commit_kind="OVERWRITE",
+ time_millis=int(time.time() * 1000),
+ log_offsets={},
+ )
+ self.snapshot_manager.commit_snapshot(new_snapshot_id, snapshot_data)
+
+ def abort(self, commit_messages: List[CommitMessage]):
+ for message in commit_messages:
+ for file in message.new_files():
+ try:
+ file_path_obj = Path(file.file_path)
+ if file_path_obj.exists():
+ file_path_obj.unlink()
+ except Exception as e:
+ print(f"Warning: Failed to clean up file {file.file_path}:
{e}")
+
+ def close(self):
+ pass
+
+ def _generate_snapshot_id(self) -> int:
+ latest_snapshot = self.snapshot_manager.get_latest_snapshot()
+ if latest_snapshot:
+ return latest_snapshot.id + 1
+ else:
+ return 1
diff --git a/paimon-python/pypaimon/write/file_store_write.py
b/paimon-python/pypaimon/write/file_store_write.py
new file mode 100644
index 0000000000..d1002bc094
--- /dev/null
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -0,0 +1,75 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import pyarrow as pa
+from typing import Dict, Tuple, List
+
+from pypaimon.write.commit_message import CommitMessage
+from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
+from pypaimon.write.writer.data_writer import DataWriter
+from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter
+
+
+class FileStoreWrite:
+ """Base class for file store write operations."""
+
+ def __init__(self, table):
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ self.table: FileStoreTable = table
+ self.data_writers: Dict[Tuple, DataWriter] = {}
+
+ def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
+ key = (partition, bucket)
+ if key not in self.data_writers:
+ self.data_writers[key] = self._create_data_writer(partition,
bucket)
+ writer = self.data_writers[key]
+ writer.write(data)
+
+ def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter:
+ if self.table.is_primary_key_table:
+ return KeyValueDataWriter(
+ table=self.table,
+ partition=partition,
+ bucket=bucket,
+ )
+ else:
+ return AppendOnlyDataWriter(
+ table=self.table,
+ partition=partition,
+ bucket=bucket,
+ )
+
+ def prepare_commit(self) -> List[CommitMessage]:
+ commit_messages = []
+ for (partition, bucket), writer in self.data_writers.items():
+ committed_files = writer.prepare_commit()
+ if committed_files:
+ commit_message = CommitMessage(
+ partition=partition,
+ bucket=bucket,
+ new_files=committed_files
+ )
+ commit_messages.append(commit_message)
+ return commit_messages
+
+ def close(self):
+ """Close all data writers and clean up resources."""
+ for writer in self.data_writers.values():
+ writer.close()
+ self.data_writers.clear()
diff --git a/paimon-python/pypaimon/write/row_key_extractor.py
b/paimon-python/pypaimon/write/row_key_extractor.py
new file mode 100644
index 0000000000..cda3ad07ba
--- /dev/null
+++ b/paimon-python/pypaimon/write/row_key_extractor.py
@@ -0,0 +1,102 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import pyarrow as pa
+from typing import Tuple, List
+from abc import ABC, abstractmethod
+
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.schema.table_schema import TableSchema
+
+
+class RowKeyExtractor(ABC):
+ """Base class for extracting partition and bucket information from PyArrow
data."""
+
+ def __init__(self, table_schema: TableSchema):
+ self.table_schema = table_schema
+ self.partition_indices =
self._get_field_indices(table_schema.partition_keys)
+
+ def extract_partition_bucket_batch(self, data: pa.RecordBatch) ->
Tuple[List[Tuple], List[int]]:
+ partitions = self._extract_partitions_batch(data)
+ buckets = self._extract_buckets_batch(data)
+ return partitions, buckets
+
+ def _get_field_indices(self, field_names: List[str]) -> List[int]:
+ if not field_names:
+ return []
+ field_map = {field.name: i for i, field in
enumerate(self.table_schema.fields)}
+ return [field_map[name] for name in field_names if name in field_map]
+
+ def _extract_partitions_batch(self, data: pa.RecordBatch) -> List[Tuple]:
+ if not self.partition_indices:
+ return [() for _ in range(data.num_rows)]
+
+ partition_columns = [data.column(i) for i in self.partition_indices]
+
+ partitions = []
+ for row_idx in range(data.num_rows):
+ partition_values = tuple(col[row_idx].as_py() for col in
partition_columns)
+ partitions.append(partition_values)
+
+ return partitions
+
+ @abstractmethod
+ def _extract_buckets_batch(self, table: pa.RecordBatch) -> List[int]:
+ """Extract bucket numbers for all rows. Must be implemented by
subclasses."""
+ pass
+
+
+class FixedBucketRowKeyExtractor(RowKeyExtractor):
+ """Fixed bucket mode extractor with configurable number of buckets."""
+
+ def __init__(self, table_schema: TableSchema):
+ super().__init__(table_schema)
+ self.num_buckets = table_schema.options.get(CoreOptions.BUCKET, -1)
+ if self.num_buckets <= 0:
+ raise ValueError(f"Fixed bucket mode requires bucket > 0, got
{self.num_buckets}")
+
+ bucket_key_option = table_schema.options.get(CoreOptions.BUCKET_KEY,
'')
+ if bucket_key_option.strip():
+ self.bucket_keys = [k.strip() for k in
bucket_key_option.split(',')]
+ else:
+ self.bucket_keys = [pk for pk in table_schema.primary_keys
+ if pk not in table_schema.partition_keys]
+
+ self.bucket_key_indices = self._get_field_indices(self.bucket_keys)
+
+ def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]:
+ columns = [data.column(i) for i in self.bucket_key_indices]
+ hashes = []
+ for row_idx in range(data.num_rows):
+ row_values = tuple(col[row_idx].as_py() for col in columns)
+ hashes.append(hash(row_values))
+ return [abs(hash_val) % self.num_buckets for hash_val in hashes]
+
+
+class UnawareBucketRowKeyExtractor(RowKeyExtractor):
+ """Extractor for unaware bucket mode (bucket = -1, no primary keys)."""
+
+ def __init__(self, table_schema: TableSchema):
+ super().__init__(table_schema)
+ num_buckets = table_schema.options.get(CoreOptions.BUCKET, -1)
+
+ if num_buckets != -1:
+ raise ValueError(f"Unaware bucket mode requires bucket = -1, got
{num_buckets}")
+
+ def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]:
+ return [0] * data.num_rows
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index 78de30a459..a661bd7543 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -23,17 +23,18 @@ from typing import Tuple, Optional, List
from pathlib import Path
from abc import ABC, abstractmethod
-from pypaimon.api.core_options import CoreOptions
+from pypaimon.common.core_options import CoreOptions
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
-from pypaimon.table.file_store_table import FileStoreTable
from pypaimon.table.row.binary_row import BinaryRow
class DataWriter(ABC):
"""Base class for data writers that handle PyArrow tables directly."""
- def __init__(self, table: FileStoreTable, partition: Tuple, bucket: int):
- self.table = table
+ def __init__(self, table, partition: Tuple, bucket: int):
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ self.table: FileStoreTable = table
self.partition = partition
self.bucket = bucket
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
index 9b10236e14..d4b93ee987 100644
--- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -18,7 +18,7 @@
import pyarrow as pa
import pyarrow.compute as pc
-from typing import Tuple, Dict
+from typing import Tuple
from pypaimon.write.writer.data_writer import DataWriter
@@ -26,12 +26,10 @@ from pypaimon.write.writer.data_writer import DataWriter
class KeyValueDataWriter(DataWriter):
"""Data writer for primary key tables with system fields and sorting."""
- def __init__(self, partition: Tuple, bucket: int, file_io, table_schema,
table_identifier,
- target_file_size: int, options: Dict[str, str]):
- super().__init__(partition, bucket, file_io, table_schema,
table_identifier,
- target_file_size, options)
+ def __init__(self, table, partition: Tuple, bucket: int):
+ super().__init__(table, partition, bucket)
self.sequence_generator = SequenceGenerator()
- self.trimmed_primary_key = [field.name for field in
self.table.table_schema.get_trimmed_primary_key_fields()]
+ self.trimmed_primary_key = [field.name for field in
self.trimmed_primary_key_fields]
def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
enhanced_data = self._add_system_fields(data)
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 499c6ab8dc..8fa06e11bc 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -27,6 +27,7 @@ install_requires = [
'cachetools==5.3.3',
'ossfs==2023.12.0'
'fastavro==1.11.1'
+ 'pyarrow==15.0.2'
]
long_description = "See Apache Paimon Python API \