This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 32cb6854db [Python] Enhanced Manifest Statistics & Hybrid Java/Python
Writer Tests (#6119)
32cb6854db is described below
commit 32cb6854dbe33af86cf85ca47ecb11309dd032fd
Author: ChengHui Chen <[email protected]>
AuthorDate: Wed Aug 27 11:36:21 2025 +0800
[Python] Enhanced Manifest Statistics & Hybrid Java/Python Writer Tests
(#6119)
---
.github/workflows/paimon-python-checks.yml | 2 +-
.../pypaimon/catalog/filesystem_catalog.py | 4 +-
.../pypaimon/catalog/renaming_snapshot_commit.py | 2 +-
.../pypaimon/manifest/manifest_file_manager.py | 92 +++--
.../pypaimon/manifest/manifest_list_manager.py | 73 ++--
.../pypaimon/manifest/schema/data_file_meta.py | 36 +-
.../pypaimon/manifest/schema/manifest_entry.py | 1 +
.../pypaimon/manifest/schema/manifest_file_meta.py | 5 +-
.../pypaimon/manifest/schema/simple_stats.py | 17 +-
.../read/reader/data_file_record_reader.py | 16 +-
paimon-python/pypaimon/read/split_read.py | 6 +-
paimon-python/pypaimon/read/table_scan.py | 6 +-
paimon-python/pypaimon/schema/data_types.py | 13 +-
paimon-python/pypaimon/schema/schema_manager.py | 11 +-
paimon-python/pypaimon/schema/table_schema.py | 37 +-
paimon-python/pypaimon/table/file_store_table.py | 5 +-
paimon-python/pypaimon/table/row/binary_row.py | 145 +++-----
.../pypaimon/tests/java_python_hybrid_test.py | 132 +++++++
.../py4j_impl/__init__.py} | 22 --
.../py4j_impl/constants.py} | 29 +-
.../pypaimon/tests/py4j_impl/gateway_factory.py | 135 ++++++++
.../pypaimon/tests/py4j_impl/gateway_server.py | 122 +++++++
.../tests/py4j_impl/java_implementation.py | 383 +++++++++++++++++++++
.../pypaimon/tests/py4j_impl/java_utils.py | 132 +++++++
.../flink-shaded-hadoop-2-uber-2.8.3-10.0.jar | Bin 0 -> 43317025 bytes
.../paimon-python-java-bridge-0.9-SNAPSHOT.jar | Bin 0 -> 43384210 bytes
.../pypaimon/tests/reader_primary_key_test.py | 4 +-
.../pypaimon/tests/rest_catalog_base_test.py | 2 +-
paimon-python/pypaimon/tests/rest_server.py | 2 +-
paimon-python/pypaimon/tests/rest_table_test.py | 2 +-
paimon-python/pypaimon/tests/schema_test.py | 3 +-
paimon-python/pypaimon/tests/writer_test.py | 2 +-
paimon-python/pypaimon/write/commit_message.py | 26 +-
paimon-python/pypaimon/write/file_store_commit.py | 114 +++---
paimon-python/pypaimon/write/file_store_write.py | 34 +-
paimon-python/pypaimon/write/writer/data_writer.py | 132 +++++--
.../pypaimon/write/writer/key_value_data_writer.py | 16 +-
paimon-python/setup.py | 3 +-
38 files changed, 1334 insertions(+), 432 deletions(-)
diff --git a/.github/workflows/paimon-python-checks.yml
b/.github/workflows/paimon-python-checks.yml
index 9a38e9b5c2..6f6a8a23a8 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -46,7 +46,7 @@ jobs:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install dependencies
run: |
- python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0
polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 flake8==4.0.1
pytest~=7.0 requests 2>&1 >/dev/null
+ python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0
zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3
flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests 2>&1 >/dev/null
- name: Run lint-python.sh
run: |
chmod +x paimon-python/dev/lint-python.sh
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 68d1e438c8..1aed6e8137 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -17,7 +17,7 @@
#################################################################################
from pathlib import Path
-from typing import Optional, Union, List
+from typing import List, Optional, Union
from urllib.parse import urlparse
from pypaimon.catalog.catalog import Catalog
@@ -67,7 +67,7 @@ class FileSystemCatalog(Catalog):
if not isinstance(identifier, Identifier):
identifier = Identifier.from_string(identifier)
if CoreOptions.SCAN_FALLBACK_BRANCH in self.catalog_options:
- raise ValueError(CoreOptions.SCAN_FALLBACK_BRANCH)
+ raise ValueError(f"Unsupported CoreOption
{CoreOptions.SCAN_FALLBACK_BRANCH}")
table_path = self.get_table_path(identifier)
table_schema = self.get_table_schema(identifier)
diff --git a/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py
b/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py
index 4a2e70e4e2..b85c4200a1 100644
--- a/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py
+++ b/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py
@@ -63,7 +63,7 @@ class RenamingSnapshotCommit(SnapshotCommit):
if not self.file_io.exists(new_snapshot_path):
"""Internal function to perform the actual commit."""
# Try to write atomically using the file IO
- committed = self.file_io.try_to_write_atomic(new_snapshot_path,
JSON.to_json(snapshot))
+ committed = self.file_io.try_to_write_atomic(new_snapshot_path,
JSON.to_json(snapshot, indent=2))
if committed:
# Update the latest hint
self._commit_latest_hint(snapshot.id)
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index ef674e5b88..7c46b368d2 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-import uuid
from io import BytesIO
from typing import List
@@ -24,8 +23,10 @@ import fastavro
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
ManifestEntry)
+from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.table.row.binary_row import (BinaryRow, BinaryRowDeserializer,
BinaryRowSerializer)
+from pypaimon.write.commit_message import CommitMessage
class ManifestFileManager:
@@ -51,20 +52,40 @@ class ManifestFileManager:
reader = fastavro.reader(buffer)
for record in reader:
- file_info = dict(record['_FILE'])
+ file_dict = dict(record['_FILE'])
+ key_dict = dict(file_dict['_KEY_STATS'])
+ key_stats = SimpleStats(
+
min_value=BinaryRowDeserializer.from_bytes(key_dict['_MIN_VALUES'],
+
self.trimmed_primary_key_fields),
+
max_value=BinaryRowDeserializer.from_bytes(key_dict['_MAX_VALUES'],
+
self.trimmed_primary_key_fields),
+ null_count=key_dict['_NULL_COUNTS'],
+ )
+ value_dict = dict(file_dict['_VALUE_STATS'])
+ value_stats = SimpleStats(
+
min_value=BinaryRowDeserializer.from_bytes(value_dict['_MIN_VALUES'],
+
self.table.table_schema.fields),
+
max_value=BinaryRowDeserializer.from_bytes(value_dict['_MAX_VALUES'],
+
self.table.table_schema.fields),
+ null_count=value_dict['_NULL_COUNTS'],
+ )
file_meta = DataFileMeta(
- file_name=file_info['_FILE_NAME'],
- file_size=file_info['_FILE_SIZE'],
- row_count=file_info['_ROW_COUNT'],
-
min_key=BinaryRowDeserializer.from_bytes(file_info['_MIN_KEY'],
self.trimmed_primary_key_fields),
-
max_key=BinaryRowDeserializer.from_bytes(file_info['_MAX_KEY'],
self.trimmed_primary_key_fields),
- key_stats=None, # TODO
- value_stats=None, # TODO
- min_sequence_number=file_info['_MIN_SEQUENCE_NUMBER'],
- max_sequence_number=file_info['_MAX_SEQUENCE_NUMBER'],
- schema_id=file_info['_SCHEMA_ID'],
- level=file_info['_LEVEL'],
- extra_files=None, # TODO
+ file_name=file_dict['_FILE_NAME'],
+ file_size=file_dict['_FILE_SIZE'],
+ row_count=file_dict['_ROW_COUNT'],
+
min_key=BinaryRowDeserializer.from_bytes(file_dict['_MIN_KEY'],
self.trimmed_primary_key_fields),
+
max_key=BinaryRowDeserializer.from_bytes(file_dict['_MAX_KEY'],
self.trimmed_primary_key_fields),
+ key_stats=key_stats,
+ value_stats=value_stats,
+ min_sequence_number=file_dict['_MIN_SEQUENCE_NUMBER'],
+ max_sequence_number=file_dict['_MAX_SEQUENCE_NUMBER'],
+ schema_id=file_dict['_SCHEMA_ID'],
+ level=file_dict['_LEVEL'],
+ extra_files=file_dict['_EXTRA_FILES'],
+ creation_time=file_dict['_CREATION_TIME'],
+ delete_row_count=file_dict['_DELETE_ROW_COUNT'],
+ embedded_index=file_dict['_EMBEDDED_FILE_INDEX'],
+ file_source=file_dict['_FILE_SOURCE'],
)
entry = ManifestEntry(
kind=record['_KIND'],
@@ -73,22 +94,23 @@ class ManifestFileManager:
total_buckets=record['_TOTAL_BUCKETS'],
file=file_meta
)
- if not shard_filter(entry):
+ if shard_filter is not None and not shard_filter(entry):
continue
entries.append(entry)
return entries
- def write(self, commit_messages: List['CommitMessage']) -> List[str]:
+ def write(self, file_name, commit_messages: List[CommitMessage]):
avro_records = []
for message in commit_messages:
partition_bytes = BinaryRowSerializer.to_bytes(
- BinaryRow(list(message.partition()),
self.table.table_schema.get_partition_key_fields()))
- for file in message.new_files():
+ BinaryRow(list(message.partition),
self.table.table_schema.get_partition_key_fields()))
+ for file in message.new_files:
avro_record = {
+ "_VERSION": 2,
"_KIND": 0,
"_PARTITION": partition_bytes,
- "_BUCKET": message.bucket(),
- "_TOTAL_BUCKETS": -1, # TODO
+ "_BUCKET": message.bucket,
+ "_TOTAL_BUCKETS": self.table.total_buckets,
"_FILE": {
"_FILE_NAME": file.file_name,
"_FILE_SIZE": file.file_size,
@@ -96,33 +118,35 @@ class ManifestFileManager:
"_MIN_KEY": BinaryRowSerializer.to_bytes(file.min_key),
"_MAX_KEY": BinaryRowSerializer.to_bytes(file.max_key),
"_KEY_STATS": {
- "_MIN_VALUES": None,
- "_MAX_VALUES": None,
- "_NULL_COUNTS": 0,
+ "_MIN_VALUES":
BinaryRowSerializer.to_bytes(file.key_stats.min_value),
+ "_MAX_VALUES":
BinaryRowSerializer.to_bytes(file.key_stats.max_value),
+ "_NULL_COUNTS": file.key_stats.null_count,
},
"_VALUE_STATS": {
- "_MIN_VALUES": None,
- "_MAX_VALUES": None,
- "_NULL_COUNTS": 0,
+ "_MIN_VALUES":
BinaryRowSerializer.to_bytes(file.value_stats.min_value),
+ "_MAX_VALUES":
BinaryRowSerializer.to_bytes(file.value_stats.max_value),
+ "_NULL_COUNTS": file.value_stats.null_count,
},
- "_MIN_SEQUENCE_NUMBER": 0,
- "_MAX_SEQUENCE_NUMBER": 0,
- "_SCHEMA_ID": 0,
- "_LEVEL": 0,
- "_EXTRA_FILES": [],
+ "_MIN_SEQUENCE_NUMBER": file.min_sequence_number,
+ "_MAX_SEQUENCE_NUMBER": file.max_sequence_number,
+ "_SCHEMA_ID": file.schema_id,
+ "_LEVEL": file.level,
+ "_EXTRA_FILES": file.extra_files,
+ "_CREATION_TIME": file.creation_time,
+ "_DELETE_ROW_COUNT": file.delete_row_count,
+ "_EMBEDDED_FILE_INDEX": file.embedded_index,
+ "_FILE_SOURCE": file.file_source,
}
}
avro_records.append(avro_record)
- manifest_filename = f"manifest-{str(uuid.uuid4())}.avro"
- manifest_path = self.manifest_path / manifest_filename
+ manifest_path = self.manifest_path / file_name
try:
buffer = BytesIO()
fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records)
avro_bytes = buffer.getvalue()
with self.file_io.new_output_stream(manifest_path) as
output_stream:
output_stream.write(avro_bytes)
- return [str(manifest_filename)]
except Exception as e:
self.file_io.delete_quietly(manifest_path)
raise RuntimeError(f"Failed to write manifest file: {e}") from e
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index a338441465..65fd2b21ac 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -16,15 +16,17 @@
# limitations under the License.
################################################################################
-import uuid
from io import BytesIO
-from typing import List, Optional
+from typing import List
import fastavro
-from pypaimon.manifest.schema.manifest_file_meta import \
- MANIFEST_FILE_META_SCHEMA
+from pypaimon.manifest.schema.manifest_file_meta import (
+ MANIFEST_FILE_META_SCHEMA, ManifestFileMeta)
+from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.table.row.binary_row import (BinaryRowDeserializer,
+ BinaryRowSerializer)
class ManifestListManager:
@@ -37,57 +39,72 @@ class ManifestListManager:
self.manifest_path = self.table.table_path / "manifest"
self.file_io = self.table.file_io
- def read_all_manifest_files(self, snapshot: Snapshot) -> List[str]:
+ def read_all(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
manifest_files = []
base_manifests = self.read(snapshot.base_manifest_list)
manifest_files.extend(base_manifests)
delta_manifests = self.read(snapshot.delta_manifest_list)
manifest_files.extend(delta_manifests)
- return list(set(manifest_files))
+ return manifest_files
- def read(self, manifest_list_name: str) -> List[str]:
- manifest_list_path = self.manifest_path / manifest_list_name
- manifest_paths = []
+ def read(self, manifest_list_name: str) -> List[ManifestFileMeta]:
+ manifest_files = []
+ manifest_list_path = self.manifest_path / manifest_list_name
with self.file_io.new_input_stream(manifest_list_path) as input_stream:
avro_bytes = input_stream.read()
buffer = BytesIO(avro_bytes)
reader = fastavro.reader(buffer)
for record in reader:
- file_name = record['_FILE_NAME']
- manifest_paths.append(file_name)
-
- return manifest_paths
+ stats_dict = dict(record['_PARTITION_STATS'])
+ partition_stats = SimpleStats(
+ min_value=BinaryRowDeserializer.from_bytes(
+ stats_dict['_MIN_VALUES'],
+ self.table.table_schema.get_partition_key_fields()
+ ),
+ max_value=BinaryRowDeserializer.from_bytes(
+ stats_dict['_MAX_VALUES'],
+ self.table.table_schema.get_partition_key_fields()
+ ),
+ null_count=stats_dict['_NULL_COUNTS'],
+ )
+ manifest_file_meta = ManifestFileMeta(
+ file_name=record['_FILE_NAME'],
+ file_size=record['_FILE_SIZE'],
+ num_added_files=record['_NUM_ADDED_FILES'],
+ num_deleted_files=record['_NUM_DELETED_FILES'],
+ partition_stats=partition_stats,
+ schema_id=record['_SCHEMA_ID'],
+ )
+ manifest_files.append(manifest_file_meta)
- def write(self, manifest_file_names: List[str]) -> Optional[str]:
- if not manifest_file_names:
- return None
+ return manifest_files
+ def write(self, file_name, manifest_file_metas: List[ManifestFileMeta]):
avro_records = []
- for manifest_file_name in manifest_file_names:
+ for meta in manifest_file_metas:
avro_record = {
- "_FILE_NAME": manifest_file_name,
- "_FILE_SIZE": 0, # TODO
- "_NUM_ADDED_FILES": 0,
- "_NUM_DELETED_FILES": 0,
+ "_VERSION": 2,
+ "_FILE_NAME": meta.file_name,
+ "_FILE_SIZE": meta.file_size,
+ "_NUM_ADDED_FILES": meta.num_added_files,
+ "_NUM_DELETED_FILES": meta.num_deleted_files,
"_PARTITION_STATS": {
- "_MIN_VALUES": None,
- "_MAX_VALUES": None,
- "_NULL_COUNTS": 0,
+ "_MIN_VALUES":
BinaryRowSerializer.to_bytes(meta.partition_stats.min_value),
+ "_MAX_VALUES":
BinaryRowSerializer.to_bytes(meta.partition_stats.max_value),
+ "_NULL_COUNTS": meta.partition_stats.null_count,
},
- "_SCHEMA_ID": 0,
+ "_SCHEMA_ID": meta.schema_id,
}
avro_records.append(avro_record)
- list_filename = f"manifest-list-{str(uuid.uuid4())}.avro"
- list_path = self.manifest_path / list_filename
+ list_path = self.manifest_path / file_name
try:
buffer = BytesIO()
fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, avro_records)
avro_bytes = buffer.getvalue()
with self.file_io.new_output_stream(list_path) as output_stream:
output_stream.write(avro_bytes)
- return list_filename
except Exception as e:
self.file_io.delete_quietly(list_path)
raise RuntimeError(f"Failed to write manifest list file: {e}")
from e
diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py
b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
index 02a7179218..e1f60bf2e1 100644
--- a/paimon-python/pypaimon/manifest/schema/data_file_meta.py
+++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
@@ -19,7 +19,7 @@
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
-from typing import List, Optional
+from typing import List
from pypaimon.manifest.schema.simple_stats import (SIMPLE_STATS_SCHEMA,
SimpleStats)
@@ -31,34 +31,32 @@ class DataFileMeta:
file_name: str
file_size: int
row_count: int
- min_key: Optional[BinaryRow]
- max_key: Optional[BinaryRow]
- key_stats: Optional[SimpleStats]
- value_stats: Optional[SimpleStats]
+ min_key: BinaryRow
+ max_key: BinaryRow
+ key_stats: SimpleStats
+ value_stats: SimpleStats
min_sequence_number: int
max_sequence_number: int
schema_id: int
level: int
- extra_files: Optional[List[str]]
+ extra_files: List[str]
- creation_time: Optional[datetime] = None
- delete_row_count: Optional[int] = None
- embedded_index: Optional[bytes] = None
- file_source: Optional[str] = None
- value_stats_cols: Optional[List[str]] = None
- external_path: Optional[str] = None
+ creation_time: datetime | None = None
+ delete_row_count: int | None = None
+ embedded_index: bytes | None = None
+ file_source: str | None = None
+ value_stats_cols: List[str] | None = None
+ external_path: str | None = None
+ # not a schema field, just for internal usage
file_path: str = None
def set_file_path(self, table_path: Path, partition: BinaryRow, bucket:
int):
path_builder = table_path
-
partition_dict = partition.to_dict()
for field_name, field_value in partition_dict.items():
path_builder = path_builder / (field_name + "=" + str(field_value))
-
path_builder = path_builder / ("bucket-" + str(bucket)) /
self.file_name
-
self.file_path = str(path_builder)
@@ -78,11 +76,13 @@ DATA_FILE_META_SCHEMA = {
{"name": "_SCHEMA_ID", "type": "long"},
{"name": "_LEVEL", "type": "int"},
{"name": "_EXTRA_FILES", "type": {"type": "array", "items": "string"}},
- {"name": "_CREATION_TIME", "type": ["null", "long"], "default": None},
+ {"name": "_CREATION_TIME",
+ "type": [
+ "null",
+ {"type": "long", "logicalType": "timestamp-millis"}],
+ "default": None},
{"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], "default":
None},
{"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], "default":
None},
{"name": "_FILE_SOURCE", "type": ["null", "int"], "default": None},
- {"name": "_VALUE_STATS_COLS", "type": ["null", {"type": "array",
"items": "string"}], "default": None},
- {"name": "_EXTERNAL_PATH", "type": ["null", "string"], "default":
None},
]
}
diff --git a/paimon-python/pypaimon/manifest/schema/manifest_entry.py
b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
index 1a80553188..75a51f30c5 100644
--- a/paimon-python/pypaimon/manifest/schema/manifest_entry.py
+++ b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
@@ -36,6 +36,7 @@ MANIFEST_ENTRY_SCHEMA = {
"type": "record",
"name": "ManifestEntry",
"fields": [
+ {"name": "_VERSION", "type": "int"},
{"name": "_KIND", "type": "int"},
{"name": "_PARTITION", "type": "bytes"},
{"name": "_BUCKET", "type": "int"},
diff --git a/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
index a001fca4c2..443c6a0944 100644
--- a/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
+++ b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
@@ -30,16 +30,13 @@ class ManifestFileMeta:
num_deleted_files: int
partition_stats: SimpleStats
schema_id: int
- min_bucket: int
- max_bucket: int
- min_level: int
- max_level: int
MANIFEST_FILE_META_SCHEMA = {
"type": "record",
"name": "ManifestFileMeta",
"fields": [
+ {"name": "_VERSION", "type": "int"},
{"name": "_FILE_NAME", "type": "string"},
{"name": "_FILE_SIZE", "type": "long"},
{"name": "_NUM_ADDED_FILES", "type": "long"},
diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py
b/paimon-python/pypaimon/manifest/schema/simple_stats.py
index b291c12852..dd6924fb2e 100644
--- a/paimon-python/pypaimon/manifest/schema/simple_stats.py
+++ b/paimon-python/pypaimon/manifest/schema/simple_stats.py
@@ -17,6 +17,7 @@
################################################################################
from dataclasses import dataclass
+from typing import List
from pypaimon.table.row.binary_row import BinaryRow
@@ -25,15 +26,23 @@ from pypaimon.table.row.binary_row import BinaryRow
class SimpleStats:
min_value: BinaryRow
max_value: BinaryRow
- null_count: int
+ null_count: List[int] | None
SIMPLE_STATS_SCHEMA = {
"type": "record",
"name": "SimpleStats",
"fields": [
- {"name": "_MIN_VALUES", "type": ["null", "bytes"], "default": None},
- {"name": "_MAX_VALUES", "type": ["null", "bytes"], "default": None},
- {"name": "_NULL_COUNTS", "type": ["null", "long"], "default": None},
+ {"name": "_MIN_VALUES", "type": "bytes"},
+ {"name": "_MAX_VALUES", "type": "bytes"},
+ {"name": "_NULL_COUNTS",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": ["null", "long"]
+ }
+ ],
+ "default": None},
]
}
diff --git a/paimon-python/pypaimon/read/reader/data_file_record_reader.py
b/paimon-python/pypaimon/read/reader/data_file_record_reader.py
index 41c1bf7517..c83f1ce152 100644
--- a/paimon-python/pypaimon/read/reader/data_file_record_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_record_reader.py
@@ -23,6 +23,7 @@ from pyarrow import RecordBatch
from pypaimon.read.partition_info import PartitionInfo
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.schema.data_types import DataField, PyarrowFieldParser
class DataFileBatchReader(RecordBatchReader):
@@ -31,11 +32,12 @@ class DataFileBatchReader(RecordBatchReader):
"""
def __init__(self, format_reader: RecordBatchReader, index_mapping:
List[int], partition_info: PartitionInfo,
- system_primary_key: Optional[List[str]]):
+ system_primary_key: Optional[List[str]], fields:
List[DataField]):
self.format_reader = format_reader
self.index_mapping = index_mapping
self.partition_info = partition_info
self.system_primary_key = system_primary_key
+ self.schema_map = {field.name: field for field in
PyarrowFieldParser.from_paimon_schema(fields)}
def read_arrow_batch(self) -> Optional[RecordBatch]:
record_batch = self.format_reader.read_arrow_batch()
@@ -85,7 +87,17 @@ class DataFileBatchReader(RecordBatchReader):
inter_arrays = mapped_arrays
inter_names = mapped_names
- return pa.RecordBatch.from_arrays(inter_arrays, names=inter_names)
+ # to contains 'not null' property
+ final_fields = []
+ for i, name in enumerate(inter_names):
+ array = inter_arrays[i]
+ target_field = self.schema_map.get(name)
+ if not target_field:
+ target_field = pa.field(name, array.type)
+ final_fields.append(target_field)
+ final_schema = pa.schema(final_fields)
+
+ return pa.RecordBatch.from_arrays(inter_arrays, schema=final_schema)
def close(self) -> None:
self.format_reader.close()
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index ea37593fdc..f085bac444 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -83,9 +83,11 @@ class SplitRead(ABC):
index_mapping = self.create_index_mapping()
partition_info = self.create_partition_info()
if for_merge_read:
- return DataFileBatchReader(format_reader, index_mapping,
partition_info, self.trimmed_primary_key)
+ return DataFileBatchReader(format_reader, index_mapping,
partition_info, self.trimmed_primary_key,
+ self.table.table_schema.fields)
else:
- return DataFileBatchReader(format_reader, index_mapping,
partition_info, None)
+ return DataFileBatchReader(format_reader, index_mapping,
partition_info, None,
+ self.table.table_schema.fields)
@abstractmethod
def _get_all_data_fields(self):
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index 2745b1d1b3..d89ddd0bcb 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -63,11 +63,11 @@ class TableScan:
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
if not latest_snapshot:
return Plan([])
- manifest_files =
self.manifest_list_manager.read_all_manifest_files(latest_snapshot)
+ manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
file_entries = []
- for manifest_file_path in manifest_files:
- manifest_entries =
self.manifest_file_manager.read(manifest_file_path,
+ for manifest_file in manifest_files:
+ manifest_entries =
self.manifest_file_manager.read(manifest_file.file_name,
lambda row:
self._bucket_filter(row))
for entry in manifest_entries:
if entry.kind == 0:
diff --git a/paimon-python/pypaimon/schema/data_types.py
b/paimon-python/pypaimon/schema/data_types.py
index 8a22e7f012..a5186cc56c 100644
--- a/paimon-python/pypaimon/schema/data_types.py
+++ b/paimon-python/pypaimon/schema/data_types.py
@@ -279,13 +279,16 @@ class DataTypeParser:
if "(" in type_upper:
base_type = type_upper.split("(")[0]
+ elif " " in type_upper:
+ base_type = type_upper.split(" ")[0]
+ type_upper = base_type
else:
base_type = type_upper
try:
Keyword(base_type)
return AtomicType(
- type_string, DataTypeParser.parse_nullability(type_string)
+ type_upper, DataTypeParser.parse_nullability(type_string)
)
except ValueError:
raise Exception(f"Unknown type: {base_type}")
@@ -345,11 +348,7 @@ class DataTypeParser:
def parse_data_field(
json_data: Dict[str, Any], field_id: Optional[AtomicInteger] = None
) -> DataField:
-
- if (
- DataField.FIELD_ID in json_data
- and json_data[DataField.FIELD_ID] is not None
- ):
+ if DataField.FIELD_ID in json_data and json_data[DataField.FIELD_ID]
is not None:
if field_id is not None and field_id.get() != -1:
raise ValueError("Partial field id is not allowed.")
field_id_value = int(json_data["id"])
@@ -486,7 +485,7 @@ class PyarrowFieldParser:
return MapType(nullable, key_type, value_type)
else:
raise ValueError(f"Unknown type: {type_name}")
- return AtomicType(type_name)
+ return AtomicType(type_name, nullable)
@staticmethod
def to_paimon_field(field_idx: int, pa_field: pyarrow.Field) -> DataField:
diff --git a/paimon-python/pypaimon/schema/schema_manager.py
b/paimon-python/pypaimon/schema/schema_manager.py
index f0a108befb..f03b9e111b 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -19,6 +19,7 @@ from pathlib import Path
from typing import Optional
from pypaimon.common.file_io import FileIO
+from pypaimon.common.rest_json import JSON
from pypaimon.schema.schema import Schema
from pypaimon.schema.table_schema import TableSchema
@@ -42,15 +43,11 @@ class SchemaManager:
except Exception as e:
raise RuntimeError(f"Failed to load schema from path:
{self.schema_path}") from e
- def create_table(self, schema: Schema, external_table: bool = False) ->
TableSchema:
+ def create_table(self, schema: Schema) -> TableSchema:
while True:
latest = self.latest()
if latest is not None:
- if external_table:
- self._check_schema_for_external_table(latest.to_schema(),
schema)
- return latest
- else:
- raise RuntimeError("Schema in filesystem exists, creation
is not allowed.")
+ raise RuntimeError("Schema in filesystem exists, creation is
not allowed.")
table_schema = TableSchema.from_schema(schema_id=0, schema=schema)
success = self.commit(table_schema)
@@ -60,7 +57,7 @@ class SchemaManager:
def commit(self, new_schema: TableSchema) -> bool:
schema_path = self._to_schema_path(new_schema.id)
try:
- return self.file_io.try_to_write_atomic(schema_path,
new_schema.to_json())
+ return self.file_io.try_to_write_atomic(schema_path,
JSON.to_json(new_schema, indent=2))
except Exception as e:
raise RuntimeError(f"Failed to commit schema: {e}") from e
diff --git a/paimon-python/pypaimon/schema/table_schema.py
b/paimon-python/pypaimon/schema/table_schema.py
index 80b787eb6e..dc1872deb6 100644
--- a/paimon-python/pypaimon/schema/table_schema.py
+++ b/paimon-python/pypaimon/schema/table_schema.py
@@ -57,22 +57,6 @@ class TableSchema:
comment: Optional[str] = json_field(FIELD_COMMENT, default=None)
time_millis: int = json_field("timeMillis", default_factory=lambda:
int(time.time() * 1000))
- def __init__(self, version: int, id: int, fields: List[DataField],
highest_field_id: int,
- partition_keys: List[str], primary_keys: List[str], options:
Dict[str, str],
- comment: Optional[str] = None, time_millis: Optional[int] =
None):
- self.version = version
- self.id = id
- self.fields = fields
- self.highest_field_id = highest_field_id
- self.partition_keys = partition_keys or []
- self.primary_keys = primary_keys or []
- self.options = options or {}
- self.comment = comment
- self.time_millis = time_millis if time_millis is not None else
int(time.time() * 1000)
- self.get_trimmed_primary_key_fields()
-
- from typing import List
-
def cross_partition_update(self) -> bool:
if not self.primary_keys or not self.partition_keys:
return False
@@ -96,7 +80,7 @@ class TableSchema:
partition_keys: List[str] = schema.partition_keys
primary_keys: List[str] = schema.primary_keys
options: Dict[str, str] = schema.options
- highest_field_id: int = -1 # max(field.id for field in fields)
+ highest_field_id: int = max(field.id for field in fields)
return TableSchema(
TableSchema.CURRENT_VERSION,
@@ -106,8 +90,7 @@ class TableSchema:
partition_keys,
primary_keys,
options,
- schema.comment,
- int(time.time())
+ schema.comment
)
@staticmethod
@@ -150,22 +133,6 @@ class TableSchema:
except Exception as e:
raise RuntimeError(f"Failed to parse schema from JSON: {e}") from e
- def to_json(self) -> str:
- data = {
- TableSchema.FIELD_VERSION: self.version,
- TableSchema.FIELD_ID: self.id,
- TableSchema.FIELD_FIELDS: [field.to_dict() for field in
self.fields],
- TableSchema.FIELD_HIGHEST_FIELD_ID: self.highest_field_id,
- TableSchema.FIELD_PARTITION_KEYS: self.partition_keys,
- TableSchema.FIELD_PRIMARY_KEYS: self.primary_keys,
- TableSchema.FIELD_OPTIONS: self.options,
- TableSchema.FIELD_COMMENT: self.comment,
- TableSchema.FIELD_TIME_MILLIS: self.time_millis
- }
- if self.comment is not None:
- data["comment"] = self.comment
- return json.dumps(data, indent=2, ensure_ascii=False)
-
def copy(self, new_options: Optional[Dict[str, str]] = None) ->
"TableSchema":
return TableSchema(
version=self.version,
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 1d131a492d..eb763d2938 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -49,10 +49,11 @@ class FileStoreTable(Table):
self.primary_keys = table_schema.primary_keys
self.partition_keys = table_schema.partition_keys
self.options = table_schema.options
+ self.cross_partition_update =
self.table_schema.cross_partition_update()
+ self.is_primary_key_table = bool(self.primary_keys)
+ self.total_buckets = int(table_schema.options.get(CoreOptions.BUCKET,
-1))
self.schema_manager = SchemaManager(file_io, table_path)
- self.is_primary_key_table = bool(self.primary_keys)
- self.cross_partition_update =
self.table_schema.cross_partition_update()
def current_branch(self) -> str:
"""Get the current branch name from options."""
diff --git a/paimon-python/pypaimon/table/row/binary_row.py
b/paimon-python/pypaimon/table/row/binary_row.py
index 2e44ff417e..f1f8e740df 100644
--- a/paimon-python/pypaimon/table/row/binary_row.py
+++ b/paimon-python/pypaimon/table/row/binary_row.py
@@ -193,7 +193,7 @@ class BinaryRowDeserializer:
return bytes_data[base_offset + sub_offset:base_offset +
sub_offset + length]
else:
length = (offset_and_len & cls.HIGHEST_SECOND_TO_EIGHTH_BIT) >> 56
- return bytes_data[field_offset + 1:field_offset + 1 + length]
+ return bytes_data[field_offset:field_offset + length]
@classmethod
def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset:
int, data_type: DataType) -> Decimal:
@@ -238,8 +238,6 @@ class BinaryRowDeserializer:
class BinaryRowSerializer:
HEADER_SIZE_IN_BITS = 8
MAX_FIX_PART_DATA_SIZE = 7
- HIGHEST_FIRST_BIT = 0x80 << 56
- HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7F << 56
@classmethod
def to_bytes(cls, binary_row: BinaryRow) -> bytes:
@@ -252,79 +250,49 @@ class BinaryRowSerializer:
fixed_part = bytearray(fixed_part_size)
fixed_part[0] = binary_row.row_kind.value
- for i, value in enumerate(binary_row.values):
- if value is None:
- cls._set_null_bit(fixed_part, 0, i)
-
- variable_data = []
- variable_offsets = []
- current_offset = fixed_part_size
+ variable_part_data = []
+ current_variable_offset = 0
for i, (value, field) in enumerate(zip(binary_row.values,
binary_row.fields)):
+ field_fixed_offset = null_bits_size_in_bytes + i * 8
+
if value is None:
- struct.pack_into('<q', fixed_part, null_bits_size_in_bytes + i
* 8, 0)
- variable_data.append(b'')
- variable_offsets.append(0)
+ cls._set_null_bit(fixed_part, 0, i)
+ struct.pack_into('<q', fixed_part, field_fixed_offset, 0)
continue
- field_offset = null_bits_size_in_bytes + i * 8
if not isinstance(field.type, AtomicType):
raise ValueError(f"BinaryRow only support AtomicType yet, meet
{field.type.__class__}")
- if field.type.type.upper() in ['VARCHAR', 'STRING', 'CHAR',
'BINARY', 'VARBINARY', 'BYTES']:
- if field.type.type.upper() in ['VARCHAR', 'STRING', 'CHAR']:
- if isinstance(value, str):
- value_bytes = value.encode('utf-8')
- else:
- value_bytes = bytes(value)
+
+ type_name = field.type.type.upper()
+ if type_name in ['VARCHAR', 'STRING', 'CHAR', 'BINARY',
'VARBINARY', 'BYTES']:
+ if type_name in ['VARCHAR', 'STRING', 'CHAR']:
+ value_bytes = str(value).encode('utf-8')
else:
- if isinstance(value, bytes):
- value_bytes = value
- else:
- value_bytes = bytes(value)
+ value_bytes = bytes(value)
length = len(value_bytes)
if length <= cls.MAX_FIX_PART_DATA_SIZE:
- fixed_part[field_offset:field_offset + length] =
value_bytes
- for j in range(length, 8):
- fixed_part[field_offset + j] = 0
- packed_long = struct.unpack_from('<q', fixed_part,
field_offset)[0]
-
- offset_and_len = packed_long | (length << 56) |
cls.HIGHEST_FIRST_BIT
- if offset_and_len > 0x7FFFFFFFFFFFFFFF:
- offset_and_len = offset_and_len - 0x10000000000000000
- struct.pack_into('<q', fixed_part, field_offset,
offset_and_len)
- variable_data.append(b'')
- variable_offsets.append(0)
+ fixed_part[field_fixed_offset: field_fixed_offset +
length] = value_bytes
+ for j in range(length, 7):
+ fixed_part[field_fixed_offset + j] = 0
+ header_byte = 0x80 | length
+ fixed_part[field_fixed_offset + 7] = header_byte
else:
- variable_data.append(value_bytes)
- variable_offsets.append(current_offset)
- current_offset += len(value_bytes)
- offset_and_len = (variable_offsets[i] << 32) |
len(variable_data[i])
- struct.pack_into('<q', fixed_part, null_bits_size_in_bytes
+ i * 8, offset_and_len)
- else:
- if field.type.type.upper() in ['BOOLEAN', 'BOOL']:
- struct.pack_into('<b', fixed_part, field_offset, 1 if
value else 0)
- elif field.type.type.upper() in ['TINYINT', 'BYTE']:
- struct.pack_into('<b', fixed_part, field_offset, value)
- elif field.type.type.upper() in ['SMALLINT', 'SHORT']:
- struct.pack_into('<h', fixed_part, field_offset, value)
- elif field.type.type.upper() in ['INT', 'INTEGER']:
- struct.pack_into('<i', fixed_part, field_offset, value)
- elif field.type.type.upper() in ['BIGINT', 'LONG']:
- struct.pack_into('<q', fixed_part, field_offset, value)
- elif field.type.type.upper() in ['FLOAT', 'REAL']:
- struct.pack_into('<f', fixed_part, field_offset, value)
- elif field.type.type.upper() in ['DOUBLE']:
- struct.pack_into('<d', fixed_part, field_offset, value)
- else:
- field_bytes = cls._serialize_field_value(value, field.type)
- fixed_part[field_offset:field_offset + len(field_bytes)] =
field_bytes
+ offset_in_variable_part = current_variable_offset
+ variable_part_data.append(value_bytes)
+ current_variable_offset += length
- variable_data.append(b'')
- variable_offsets.append(0)
+ absolute_offset = fixed_part_size + offset_in_variable_part
+ offset_and_len = (absolute_offset << 32) | length
+ struct.pack_into('<q', fixed_part, field_fixed_offset,
offset_and_len)
+ else:
+ field_bytes = cls._serialize_field_value(value, field.type)
+ fixed_part[field_fixed_offset: field_fixed_offset +
len(field_bytes)] = field_bytes
- result = bytes(fixed_part) + b''.join(variable_data)
- return result
+ row_data = bytes(fixed_part) + b''.join(variable_part_data)
+ arity_prefix = struct.pack('>i', arity)
+ return arity_prefix + row_data
@classmethod
def _calculate_bit_set_width_in_bytes(cls, arity: int) -> int:
@@ -342,33 +310,29 @@ class BinaryRowSerializer:
type_name = data_type.type.upper()
if type_name in ['BOOLEAN', 'BOOL']:
- return cls._serialize_boolean(value)
+ return cls._serialize_boolean(value) + b'\x00' * 7
elif type_name in ['TINYINT', 'BYTE']:
- return cls._serialize_byte(value)
+ return cls._serialize_byte(value) + b'\x00' * 7
elif type_name in ['SMALLINT', 'SHORT']:
- return cls._serialize_short(value)
+ return cls._serialize_short(value) + b'\x00' * 6
elif type_name in ['INT', 'INTEGER']:
- return cls._serialize_int(value)
+ return cls._serialize_int(value) + b'\x00' * 4
elif type_name in ['BIGINT', 'LONG']:
return cls._serialize_long(value)
elif type_name in ['FLOAT', 'REAL']:
- return cls._serialize_float(value)
+ return cls._serialize_float(value) + b'\x00' * 4
elif type_name in ['DOUBLE']:
return cls._serialize_double(value)
- elif type_name in ['VARCHAR', 'STRING', 'CHAR']:
- return cls._serialize_string(value)
- elif type_name in ['BINARY', 'VARBINARY', 'BYTES']:
- return cls._serialize_binary(value)
elif type_name in ['DECIMAL', 'NUMERIC']:
return cls._serialize_decimal(value, data_type)
elif type_name in ['TIMESTAMP', 'TIMESTAMP_WITHOUT_TIME_ZONE']:
return cls._serialize_timestamp(value)
elif type_name in ['DATE']:
- return cls._serialize_date(value)
+ return cls._serialize_date(value) + b'\x00' * 4
elif type_name in ['TIME', 'TIME_WITHOUT_TIME_ZONE']:
- return cls._serialize_time(value)
+ return cls._serialize_time(value) + b'\x00' * 4
else:
- return cls._serialize_string(str(value))
+ raise TypeError(f"Unsupported type for serialization: {type_name}")
@classmethod
def _serialize_boolean(cls, value: bool) -> bytes:
@@ -398,32 +362,6 @@ class BinaryRowSerializer:
def _serialize_double(cls, value: float) -> bytes:
return struct.pack('<d', value)
- @classmethod
- def _serialize_string(cls, value) -> bytes:
- if isinstance(value, str):
- value_bytes = value.encode('utf-8')
- else:
- value_bytes = bytes(value)
-
- length = len(value_bytes)
-
- offset_and_len = (0x80 << 56) | (length << 56)
- if offset_and_len > 0x7FFFFFFFFFFFFFFF:
- offset_and_len = offset_and_len - 0x10000000000000000
- return struct.pack('<q', offset_and_len)
-
- @classmethod
- def _serialize_binary(cls, value: bytes) -> bytes:
- if isinstance(value, bytes):
- data_bytes = value
- else:
- data_bytes = bytes(value)
- length = len(data_bytes)
- offset_and_len = (0x80 << 56) | (length << 56)
- if offset_and_len > 0x7FFFFFFFFFFFFFFF:
- offset_and_len = offset_and_len - 0x10000000000000000
- return struct.pack('<q', offset_and_len)
-
@classmethod
def _serialize_decimal(cls, value: Decimal, data_type: DataType) -> bytes:
type_str = str(data_type)
@@ -452,11 +390,10 @@ class BinaryRowSerializer:
@classmethod
def _serialize_date(cls, value: datetime) -> bytes:
if isinstance(value, datetime):
- epoch = datetime(1970, 1, 1)
- days = (value - epoch).days
+ epoch = datetime(1970, 1, 1).date()
+ days = (value.date() - epoch).days
else:
- epoch = datetime(1970, 1, 1)
- days = (value - epoch).days
+ raise RuntimeError("date should be datatime")
return struct.pack('<i', days)
@classmethod
diff --git a/paimon-python/pypaimon/tests/java_python_hybrid_test.py
b/paimon-python/pypaimon/tests/java_python_hybrid_test.py
new file mode 100644
index 0000000000..0b36236ba5
--- /dev/null
+++ b/paimon-python/pypaimon/tests/java_python_hybrid_test.py
@@ -0,0 +1,132 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import glob
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon.catalog.catalog_factory import CatalogFactory
+from pypaimon.schema.schema import Schema
+from pypaimon.tests.py4j_impl import constants
+from pypaimon.tests.py4j_impl.java_implementation import CatalogPy4j
+
+
+class AlternativeWriteTest(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ # for py4j env
+ this_dir = os.path.abspath(os.path.dirname(__file__))
+ project_dir = os.path.dirname(this_dir)
+ deps_dir = os.path.join(project_dir, "tests/py4j_impl/test_deps/*")
+ print(f"py4j deps_dir: {deps_dir}")
+ for file in glob.glob(deps_dir):
+ print(f"py4j deps_dir file: {file}")
+ os.environ[constants.PYPAIMON_HADOOP_CLASSPATH] = deps_dir
+ os.environ[constants.PYPAIMON4J_TEST_MODE] = 'true'
+
+ # for default catalog
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.option = {
+ 'warehouse': cls.warehouse
+ }
+ cls.py_catalog = CatalogFactory.create(cls.option)
+ cls.j_catalog = CatalogPy4j.create(cls.option)
+
+ cls.pa_schema = pa.schema([
+ pa.field('user_id', pa.int32(), nullable=False),
+ ('item_id', pa.int64()),
+ ('behavior', pa.string()),
+ pa.field('dt', pa.string(), nullable=False)
+ ])
+ cls.expected_data = {
+ 'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
+ 'item_id': [1001, 2001, 3001, 4007, 5007, 6007, 7007, 8007, 9007,
1006, 1106, 1206],
+ 'behavior': ['l', 'k', 'j', 'i-new', None, 'g-new', 'f-new',
'e-new', 'd-new', 'c-new', 'b-new', 'a-new'],
+ 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1',
'p2', 'p1'],
+ }
+ cls.expected_result = pa.Table.from_pydict(cls.expected_data,
schema=cls.pa_schema)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def testAlternativeWrite(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
+ partition_keys=['dt'],
+ primary_keys=['user_id', 'dt'],
+ options={'bucket': '1'}
+ )
+ self.py_catalog.create_database('default', False)
+ self.py_catalog.create_table('default.test_alternative_write', schema,
False)
+
+ self.py_table =
self.py_catalog.get_table('default.test_alternative_write')
+ self.j_table =
self.j_catalog.get_table('default.test_alternative_write')
+
+ self._write_data({
+ 'user_id': self.expected_data.get('user_id')[0:6],
+ 'item_id': [1001, 2001, 3001, 4001, 5001, 6001],
+ 'behavior': ['l', 'k', 'j', 'i', None, 'g'],
+ 'dt': self.expected_data.get('dt')[0:6]
+ }, 1)
+ data1 = {
+ 'user_id': self.expected_data.get('user_id')[6:12],
+ 'item_id': [7001, 8001, 9001, 1001, 1101, 1201],
+ 'behavior': ['f-new', 'e-new', 'd-new', 'c-new', 'b-new', 'a-new'],
+ 'dt': self.expected_data.get('dt')[6:12]
+ }
+ data2 = {
+ 'user_id': self.expected_data.get('user_id')[3:9],
+ 'item_id': [4001, 5001, 6001, 7001, 8001, 9001],
+ 'behavior': ['i-new', None, 'g-new', 'f-new', 'e-new', 'd-new'],
+ 'dt': self.expected_data.get('dt')[3:9]
+ }
+ datas = [data1, data2]
+ for idx, using_py in enumerate([0, 0, 1, 1, 0, 0]): # TODO: this can
be a random list
+ data1['item_id'] = [item + 1 for item in data1['item_id']]
+ data2['item_id'] = [item + 1 for item in data2['item_id']]
+ data = datas[idx % 2]
+ self._write_data(data, using_py)
+
+ j_read_builder = self.j_table.new_read_builder()
+ j_table_read = j_read_builder.new_read()
+ j_splits = j_read_builder.new_scan().plan().splits()
+ j_actual = j_table_read.to_arrow(j_splits).sort_by('user_id')
+ self.assertEqual(j_actual, self.expected_result)
+
+ py_read_builder = self.py_table.new_read_builder()
+ py_table_read = py_read_builder.new_read()
+ py_splits = py_read_builder.new_scan().plan().splits()
+ py_actual = py_table_read.to_arrow(py_splits).sort_by('user_id')
+ self.assertEqual(py_actual, self.expected_result)
+
+ def _write_data(self, data, using_py_table: int):
+ if using_py_table == 1:
+ write_builder = self.py_table.new_batch_write_builder()
+ else:
+ write_builder = self.j_table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ pa_table = pa.Table.from_pydict(data, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py
b/paimon-python/pypaimon/tests/py4j_impl/__init__.py
similarity index 64%
copy from paimon-python/pypaimon/manifest/schema/simple_stats.py
copy to paimon-python/pypaimon/tests/py4j_impl/__init__.py
index b291c12852..65b48d4d79 100644
--- a/paimon-python/pypaimon/manifest/schema/simple_stats.py
+++ b/paimon-python/pypaimon/tests/py4j_impl/__init__.py
@@ -15,25 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
-from dataclasses import dataclass
-
-from pypaimon.table.row.binary_row import BinaryRow
-
-
-@dataclass
-class SimpleStats:
- min_value: BinaryRow
- max_value: BinaryRow
- null_count: int
-
-
-SIMPLE_STATS_SCHEMA = {
- "type": "record",
- "name": "SimpleStats",
- "fields": [
- {"name": "_MIN_VALUES", "type": ["null", "bytes"], "default": None},
- {"name": "_MAX_VALUES", "type": ["null", "bytes"], "default": None},
- {"name": "_NULL_COUNTS", "type": ["null", "long"], "default": None},
- ]
-}
diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py
b/paimon-python/pypaimon/tests/py4j_impl/constants.py
similarity index 64%
copy from paimon-python/pypaimon/manifest/schema/simple_stats.py
copy to paimon-python/pypaimon/tests/py4j_impl/constants.py
index b291c12852..af7d1c8165 100644
--- a/paimon-python/pypaimon/manifest/schema/simple_stats.py
+++ b/paimon-python/pypaimon/tests/py4j_impl/constants.py
@@ -16,24 +16,13 @@
# limitations under the License.
################################################################################
-from dataclasses import dataclass
+# ---------------------------- for env var ----------------------------
+PYPAIMON_CONN_INFO_PATH = '_PYPAIMON_CONN_INFO_PATH'
+PYPAIMON_JVM_ARGS = '_PYPAIMON_JVM_ARGS'
+PYPAIMON_JAVA_CLASSPATH = '_PYPAIMON_JAVA_CLASSPATH'
+PYPAIMON_HADOOP_CLASSPATH = '_PYPAIMON_HADOOP_CLASSPATH'
+PYPAIMON_MAIN_CLASS = 'org.apache.paimon.python.PythonGatewayServer'
+PYPAIMON_MAIN_ARGS = '_PYPAIMON_MAIN_ARGS'
-from pypaimon.table.row.binary_row import BinaryRow
-
-
-@dataclass
-class SimpleStats:
- min_value: BinaryRow
- max_value: BinaryRow
- null_count: int
-
-
-SIMPLE_STATS_SCHEMA = {
- "type": "record",
- "name": "SimpleStats",
- "fields": [
- {"name": "_MIN_VALUES", "type": ["null", "bytes"], "default": None},
- {"name": "_MAX_VALUES", "type": ["null", "bytes"], "default": None},
- {"name": "_NULL_COUNTS", "type": ["null", "long"], "default": None},
- ]
-}
+# ------------------ for tests (Please don't use it) ------------------
+PYPAIMON4J_TEST_MODE = '_PYPAIMON4J_TEST_MODE'
diff --git a/paimon-python/pypaimon/tests/py4j_impl/gateway_factory.py
b/paimon-python/pypaimon/tests/py4j_impl/gateway_factory.py
new file mode 100644
index 0000000000..4f306e374b
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py4j_impl/gateway_factory.py
@@ -0,0 +1,135 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import struct
+import tempfile
+import time
+from logging import WARN
+from threading import RLock
+
+from py4j.java_gateway import (CallbackServerParameters, GatewayParameters,
+ JavaGateway, JavaPackage, java_import, logger)
+
+from pypaimon.tests.py4j_impl import constants
+from pypaimon.tests.py4j_impl.gateway_server import \
+ launch_gateway_server_process
+
+_gateway = None
+_lock = RLock()
+
+
+def get_gateway():
+ # type: () -> JavaGateway
+ global _gateway
+ with _lock:
+ if _gateway is None:
+ # Set the level to WARN to mute the noisy INFO level logs
+ logger.level = WARN
+ _gateway = launch_gateway()
+
+ callback_server = _gateway.get_callback_server()
+ callback_server_listening_address =
callback_server.get_listening_address()
+ callback_server_listening_port =
callback_server.get_listening_port()
+
_gateway.jvm.org.apache.paimon.python.PythonEnvUtils.resetCallbackClient(
+ _gateway.java_gateway_server,
+ callback_server_listening_address,
+ callback_server_listening_port)
+ import_paimon_view(_gateway)
+ install_py4j_hooks()
+ _gateway.entry_point.put("Watchdog", Watchdog())
+ return _gateway
+
+
+def launch_gateway():
+ # type: () -> JavaGateway
+ """
+ launch jvm gateway
+ """
+
+ # Create a temporary directory where the gateway server should write the
connection information.
+ conn_info_dir = tempfile.mkdtemp()
+ try:
+ fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir)
+ os.close(fd)
+ os.unlink(conn_info_file)
+
+ env = dict(os.environ)
+ env[constants.PYPAIMON_CONN_INFO_PATH] = conn_info_file
+
+ p = launch_gateway_server_process(env)
+
+ while not p.poll() and not os.path.isfile(conn_info_file):
+ time.sleep(0.1)
+
+ if not os.path.isfile(conn_info_file):
+ stderr_info = p.stderr.read().decode('utf-8')
+ raise RuntimeError(
+ "Java gateway process exited before sending its port
number.\nStderr:\n"
+ + stderr_info
+ )
+
+ with open(conn_info_file, "rb") as info:
+ gateway_port = struct.unpack("!I", info.read(4))[0]
+ finally:
+ shutil.rmtree(conn_info_dir)
+
+ # Connect to the gateway
+ gateway = JavaGateway(
+ gateway_parameters=GatewayParameters(port=gateway_port,
auto_convert=True),
+ callback_server_parameters=CallbackServerParameters(
+ port=0, daemonize=True, daemonize_connections=True))
+
+ return gateway
+
+
+def import_paimon_view(gateway):
+ java_import(gateway.jvm, "org.apache.paimon.table.*")
+ java_import(gateway.jvm, "org.apache.paimon.options.Options")
+ java_import(gateway.jvm, "org.apache.paimon.catalog.*")
+ java_import(gateway.jvm, "org.apache.paimon.schema.Schema*")
+ java_import(gateway.jvm, 'org.apache.paimon.types.*')
+ java_import(gateway.jvm, 'org.apache.paimon.python.*')
+ java_import(gateway.jvm, "org.apache.paimon.data.*")
+ java_import(gateway.jvm, "org.apache.paimon.predicate.PredicateBuilder")
+
+
+def install_py4j_hooks():
+ """
+ Hook the classes such as JavaPackage, etc of Py4j to improve the exception
message.
+ """
+ def wrapped_call(self, *args, **kwargs):
+ raise TypeError(
+ "Could not found the Java class '%s'. The Java dependencies could
be specified via "
+ "command line argument '--jarfile' or the config option
'pipeline.jars'" % self._fqn)
+
+ setattr(JavaPackage, '__call__', wrapped_call)
+
+
+class Watchdog(object):
+ """
+ Used to provide to Java side to check whether its parent process is alive.
+ """
+
+ def ping(self):
+ time.sleep(10)
+ return True
+
+ class Java:
+ implements = ["org.apache.paimon.python.PythonGatewayServer$Watchdog"]
diff --git a/paimon-python/pypaimon/tests/py4j_impl/gateway_server.py
b/paimon-python/pypaimon/tests/py4j_impl/gateway_server.py
new file mode 100644
index 0000000000..5de2bcd154
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py4j_impl/gateway_server.py
@@ -0,0 +1,122 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import importlib.resources
+import os
+import platform
+import signal
+from subprocess import PIPE, Popen
+
+from pypaimon.tests.py4j_impl import constants
+
+
+def on_windows():
+ return platform.system() == "Windows"
+
+
+def find_java_executable():
+ java_executable = "java.exe" if on_windows() else "java"
+ java_home = None
+
+ if java_home is None and "JAVA_HOME" in os.environ:
+ java_home = os.environ["JAVA_HOME"]
+
+ if java_home is not None:
+ java_executable = os.path.join(java_home, "bin", java_executable)
+
+ return java_executable
+
+
+def launch_gateway_server_process(env):
+ java_executable = find_java_executable()
+ log_settings = []
+ jvm_args = env.get(constants.PYPAIMON_JVM_ARGS, '').split()
+ classpath = _get_classpath(env)
+ print(f"py4j classpath: {classpath}")
+ main_args = env.get(constants.PYPAIMON_MAIN_ARGS, '').split()
+ command = [
+ java_executable,
+ *jvm_args,
+ # default jvm args
+ "-XX:+IgnoreUnrecognizedVMOptions",
+ "--add-opens=jdk.proxy2/jdk.proxy2=ALL-UNNAMED",
+ "--add-opens=java.base/java.nio=ALL-UNNAMED",
+ *log_settings,
+ "-cp",
+ classpath,
+ "-c",
+ constants.PYPAIMON_MAIN_CLASS,
+ *main_args
+ ]
+
+ preexec_fn = None
+ if not on_windows():
+ def preexec_func():
+ # ignore ctrl-c / SIGINT
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+ preexec_fn = preexec_func
+ return Popen(list(filter(lambda c: len(c) != 0, command)),
+ stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env)
+
+
+_JAVA_DEPS_PACKAGE = 'pypaimon.jars'
+
+
+def _get_classpath(env):
+ classpath = []
+
+ # note that jars are not packaged in test
+ test_mode = os.environ.get(constants.PYPAIMON4J_TEST_MODE)
+ if not test_mode or test_mode.lower() != "true":
+ jars = importlib.resources.files(_JAVA_DEPS_PACKAGE)
+ one_jar = next(iter(jars.iterdir()), None)
+ if not one_jar:
+ raise ValueError("Haven't found necessary python-java-bridge jar,
this is unexpected.")
+ builtin_java_classpath = os.path.join(os.path.dirname(str(one_jar)),
'*')
+ classpath.append(builtin_java_classpath)
+
+ # user defined
+ if constants.PYPAIMON_JAVA_CLASSPATH in env:
+ classpath.append(env[constants.PYPAIMON_JAVA_CLASSPATH])
+
+ # hadoop
+ hadoop_classpath = _get_hadoop_classpath(env)
+ if hadoop_classpath is not None:
+ classpath.append(hadoop_classpath)
+
+ return os.pathsep.join(classpath)
+
+
+_HADOOP_DEPS_PACKAGE = 'pypaimon.hadoop-deps'
+
+
+def _get_hadoop_classpath(env):
+ if constants.PYPAIMON_HADOOP_CLASSPATH in env:
+ return env[constants.PYPAIMON_HADOOP_CLASSPATH]
+ elif 'HADOOP_CLASSPATH' in env:
+ return env['HADOOP_CLASSPATH']
+ else:
+ # use built-in hadoop
+ jars = importlib.resources.files(_HADOOP_DEPS_PACKAGE)
+ one_jar = next(iter(jars.iterdir()), None)
+ if not one_jar:
+ raise EnvironmentError(f"The built-in Hadoop environment has been
broken, this \
+ is unexpected. You can set one of
'{constants.PYPAIMON_HADOOP_CLASSPATH}' or \
+ 'HADOOP_CLASSPATH' to continue.")
+ return os.path.join(os.path.dirname(str(one_jar)), '*')
diff --git a/paimon-python/pypaimon/tests/py4j_impl/java_implementation.py
b/paimon-python/pypaimon/tests/py4j_impl/java_implementation.py
new file mode 100644
index 0000000000..3001519bd7
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py4j_impl/java_implementation.py
@@ -0,0 +1,383 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# pypaimon.api implementation based on Java code & py4j lib
+
+from typing import Any, Iterator, List, Optional
+
+import pyarrow as pa
+
+from pypaimon.tests.py4j_impl import java_utils
+from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+from pypaimon.tests.py4j_impl.java_utils import (deserialize_java_object,
+ serialize_java_object)
+
+
+class SchemaPy4j:
+ """Schema of a table."""
+
+ def __init__(self,
+ pa_schema: pa.Schema,
+ partition_keys: Optional[List[str]] = None,
+ primary_keys: Optional[List[str]] = None,
+ options: Optional[dict] = None,
+ comment: Optional[str] = None):
+ self.pa_schema = pa_schema
+ self.partition_keys = partition_keys
+ self.primary_keys = primary_keys
+ self.options = options
+ self.comment = comment
+
+
+class CatalogPy4j:
+
+ def __init__(self, j_catalog, catalog_options: dict):
+ self._j_catalog = j_catalog
+ self._catalog_options = catalog_options
+
+ @staticmethod
+ def create(catalog_options: dict) -> 'CatalogPy4j':
+ j_catalog_context = java_utils.to_j_catalog_context(catalog_options)
+ gateway = get_gateway()
+ j_catalog = gateway.jvm.CatalogFactory.createCatalog(j_catalog_context)
+ return CatalogPy4j(j_catalog, catalog_options)
+
+ def get_table(self, identifier: str) -> 'TablePy4j':
+ j_identifier = java_utils.to_j_identifier(identifier)
+ j_table = self._j_catalog.getTable(j_identifier)
+ return TablePy4j(j_table, self._catalog_options)
+
+ def create_database(self, name: str, ignore_if_exists: bool, properties:
Optional[dict] = None):
+ if properties is None:
+ properties = {}
+ self._j_catalog.createDatabase(name, ignore_if_exists, properties)
+
+ def create_table(self, identifier: str, schema: SchemaPy4j,
ignore_if_exists: bool):
+ j_identifier = java_utils.to_j_identifier(identifier)
+ j_schema = java_utils.to_paimon_schema(schema)
+ self._j_catalog.createTable(j_identifier, j_schema, ignore_if_exists)
+
+
+class TablePy4j:
+
+ def __init__(self, j_table, catalog_options: dict):
+ self._j_table = j_table
+ self._catalog_options = catalog_options
+
+ def new_read_builder(self) -> 'ReadBuilderPy4j':
+ j_read_builder =
get_gateway().jvm.InvocationUtil.getReadBuilder(self._j_table)
+ return ReadBuilderPy4j(j_read_builder, self._j_table.rowType(),
self._catalog_options)
+
+ def new_batch_write_builder(self) -> 'BatchWriteBuilderPy4j':
+ java_utils.check_batch_write(self._j_table)
+ j_batch_write_builder =
get_gateway().jvm.InvocationUtil.getBatchWriteBuilder(self._j_table)
+ return BatchWriteBuilderPy4j(j_batch_write_builder)
+
+
+class ReadBuilderPy4j:
+
+ def __init__(self, j_read_builder, j_row_type, catalog_options: dict):
+ self._j_read_builder = j_read_builder
+ self._j_row_type = j_row_type
+ self._catalog_options = catalog_options
+
+ def with_filter(self, predicate: 'PredicatePy4j'):
+ self._j_read_builder.withFilter(predicate.to_j_predicate())
+ return self
+
+ def with_projection(self, projection: List[str]) -> 'ReadBuilderPy4j':
+ field_names = list(map(lambda field: field.name(),
self._j_row_type.getFields()))
+ int_projection = list(map(lambda p: field_names.index(p), projection))
+ gateway = get_gateway()
+ int_projection_arr = gateway.new_array(gateway.jvm.int,
len(projection))
+ for i in range(len(projection)):
+ int_projection_arr[i] = int_projection[i]
+ self._j_read_builder.withProjection(int_projection_arr)
+ return self
+
+ def with_limit(self, limit: int) -> 'ReadBuilderPy4j':
+ self._j_read_builder.withLimit(limit)
+ return self
+
+ def new_scan(self) -> 'TableScanPy4j':
+ j_table_scan = self._j_read_builder.newScan()
+ return TableScanPy4j(j_table_scan)
+
+ def new_read(self) -> 'TableReadPy4j':
+ j_table_read = self._j_read_builder.newRead().executeFilter()
+ return TableReadPy4j(j_table_read, self._j_read_builder.readType(),
self._catalog_options)
+
+ def new_predicate_builder(self) -> 'PredicateBuilderPy4j':
+ return PredicateBuilderPy4j(self._j_row_type)
+
+ def read_type(self) -> 'RowTypePy4j':
+ return RowTypePy4j(self._j_read_builder.readType())
+
+
+class RowTypePy4j:
+
+ def __init__(self, j_row_type):
+ self._j_row_type = j_row_type
+
+ def as_arrow(self) -> "pa.Schema":
+ return java_utils.to_arrow_schema(self._j_row_type)
+
+
+class TableScanPy4j:
+
+ def __init__(self, j_table_scan):
+ self._j_table_scan = j_table_scan
+
+ def plan(self) -> 'PlanPy4j':
+ j_plan = self._j_table_scan.plan()
+ j_splits = j_plan.splits()
+ return PlanPy4j(j_splits)
+
+
+class PlanPy4j:
+
+ def __init__(self, j_splits):
+ self._j_splits = j_splits
+
+ def splits(self) -> List['SplitPy4j']:
+ return list(map(lambda s: self._build_single_split(s), self._j_splits))
+
+ def _build_single_split(self, j_split) -> 'SplitPy4j':
+ j_split_bytes = serialize_java_object(j_split)
+ row_count = j_split.rowCount()
+ files_optional = j_split.convertToRawFiles()
+ if not files_optional.isPresent():
+ file_size = 0
+ file_paths = []
+ else:
+ files = files_optional.get()
+ file_size = sum(file.length() for file in files)
+ file_paths = [file.path() for file in files]
+ return SplitPy4j(j_split_bytes, row_count, file_size, file_paths)
+
+
+class SplitPy4j:
+
+ def __init__(self, j_split_bytes, row_count: int, file_size: int,
file_paths: List[str]):
+ self._j_split_bytes = j_split_bytes
+ self._row_count = row_count
+ self._file_size = file_size
+ self._file_paths = file_paths
+
+ def to_j_split(self):
+ return deserialize_java_object(self._j_split_bytes)
+
+ def row_count(self) -> int:
+ return self._row_count
+
+ def file_size(self) -> int:
+ return self._file_size
+
+ def file_paths(self) -> List[str]:
+ return self._file_paths
+
+
+class TableReadPy4j:
+
+ def __init__(self, j_table_read, j_read_type, catalog_options):
+ self._arrow_schema = java_utils.to_arrow_schema(j_read_type)
+ self._j_bytes_reader =
get_gateway().jvm.InvocationUtil.createParallelBytesReader(
+ j_table_read, j_read_type, 1)
+
+ def to_arrow(self, splits):
+ record_batch_reader = self.to_arrow_batch_reader(splits)
+ return pa.Table.from_batches(record_batch_reader,
schema=self._arrow_schema)
+
+ def to_arrow_batch_reader(self, splits):
+ j_splits = list(map(lambda s: s.to_j_split(), splits))
+ self._j_bytes_reader.setSplits(j_splits)
+ batch_iterator = self._batch_generator()
+ return pa.RecordBatchReader.from_batches(self._arrow_schema,
batch_iterator)
+
+ def _batch_generator(self) -> Iterator[pa.RecordBatch]:
+ while True:
+ next_bytes = self._j_bytes_reader.next()
+ if next_bytes is None:
+ break
+ else:
+ stream_reader =
pa.RecordBatchStreamReader(pa.BufferReader(next_bytes))
+ yield from stream_reader
+
+
+class BatchWriteBuilderPy4j:
+
+ def __init__(self, j_batch_write_builder):
+ self._j_batch_write_builder = j_batch_write_builder
+
+ def overwrite(self, static_partition: Optional[dict] = None) ->
'BatchWriteBuilderPy4j':
+ if static_partition is None:
+ static_partition = {}
+ self._j_batch_write_builder.withOverwrite(static_partition)
+ return self
+
+ def new_write(self) -> 'BatchTableWritePy4j':
+ j_batch_table_write = self._j_batch_write_builder.newWrite()
+ return BatchTableWritePy4j(j_batch_table_write,
self._j_batch_write_builder.rowType())
+
+ def new_commit(self) -> 'BatchTableCommitPy4j':
+ j_batch_table_commit = self._j_batch_write_builder.newCommit()
+ return BatchTableCommitPy4j(j_batch_table_commit)
+
+
+class BatchTableWritePy4j:
+
+ def __init__(self, j_batch_table_write, j_row_type):
+ self._j_batch_table_write = j_batch_table_write
+ self._j_bytes_writer =
get_gateway().jvm.InvocationUtil.createBytesWriter(
+ j_batch_table_write, j_row_type)
+ self._arrow_schema = java_utils.to_arrow_schema(j_row_type)
+
+ def write_arrow(self, table):
+ for record_batch in table.to_reader():
+ self._write_arrow_batch(record_batch)
+
+ def write_arrow_batch(self, record_batch):
+ self._write_arrow_batch(record_batch)
+
+ def _write_arrow_batch(self, record_batch):
+ stream = pa.BufferOutputStream()
+ with pa.RecordBatchStreamWriter(stream, record_batch.schema) as writer:
+ writer.write(record_batch)
+ arrow_bytes = stream.getvalue().to_pybytes()
+ self._j_bytes_writer.write(arrow_bytes)
+
+ def prepare_commit(self) -> List['CommitMessagePy4j']:
+ j_commit_messages = self._j_batch_table_write.prepareCommit()
+ return list(map(lambda cm: CommitMessagePy4j(cm), j_commit_messages))
+
+ def close(self):
+ self._j_batch_table_write.close()
+ self._j_bytes_writer.close()
+
+
+class CommitMessagePy4j:
+
+ def __init__(self, j_commit_message):
+ self._j_commit_message = j_commit_message
+
+ def to_j_commit_message(self):
+ return self._j_commit_message
+
+
+class BatchTableCommitPy4j:
+
+ def __init__(self, j_batch_table_commit):
+ self._j_batch_table_commit = j_batch_table_commit
+
+ def commit(self, commit_messages: List[CommitMessagePy4j]):
+ j_commit_messages = list(map(lambda cm: cm.to_j_commit_message(),
commit_messages))
+ self._j_batch_table_commit.commit(j_commit_messages)
+
+ def close(self):
+ self._j_batch_table_commit.close()
+
+
+class PredicatePy4j:
+
+ def __init__(self, j_predicate_bytes):
+ self._j_predicate_bytes = j_predicate_bytes
+
+ def to_j_predicate(self):
+ return deserialize_java_object(self._j_predicate_bytes)
+
+
+class PredicateBuilderPy4j:
+
+ def __init__(self, j_row_type):
+ self._field_names = j_row_type.getFieldNames()
+ self._j_row_type = j_row_type
+ self._j_predicate_builder =
get_gateway().jvm.PredicateBuilder(j_row_type)
+
+ def _build(self, method: str, field: str, literals: Optional[List[Any]] =
None):
+ error = ValueError(f'The field {field} is not in field list
{self._field_names}.')
+ try:
+ index = self._field_names.index(field)
+ if index == -1:
+ raise error
+ except ValueError:
+ raise error
+
+ if literals is None:
+ literals = []
+
+ j_predicate = get_gateway().jvm.PredicationUtil.build(
+ self._j_row_type,
+ self._j_predicate_builder,
+ method,
+ index,
+ literals
+ )
+ return PredicatePy4j(serialize_java_object(j_predicate))
+
+ def equal(self, field: str, literal: Any) -> PredicatePy4j:
+ return self._build('equal', field, [literal])
+
+ def not_equal(self, field: str, literal: Any) -> PredicatePy4j:
+ return self._build('notEqual', field, [literal])
+
+ def less_than(self, field: str, literal: Any) -> PredicatePy4j:
+ return self._build('lessThan', field, [literal])
+
+ def less_or_equal(self, field: str, literal: Any) -> PredicatePy4j:
+ return self._build('lessOrEqual', field, [literal])
+
+ def greater_than(self, field: str, literal: Any) -> PredicatePy4j:
+ return self._build('greaterThan', field, [literal])
+
+ def greater_or_equal(self, field: str, literal: Any) -> PredicatePy4j:
+ return self._build('greaterOrEqual', field, [literal])
+
+ def is_null(self, field: str) -> PredicatePy4j:
+ return self._build('isNull', field)
+
+ def is_not_null(self, field: str) -> PredicatePy4j:
+ return self._build('isNotNull', field)
+
+ def startswith(self, field: str, pattern_literal: Any) -> PredicatePy4j:
+ return self._build('startsWith', field, [pattern_literal])
+
+ def endswith(self, field: str, pattern_literal: Any) -> PredicatePy4j:
+ return self._build('endsWith', field, [pattern_literal])
+
+ def contains(self, field: str, pattern_literal: Any) -> PredicatePy4j:
+ return self._build('contains', field, [pattern_literal])
+
+ def is_in(self, field: str, literals: List[Any]) -> PredicatePy4j:
+ return self._build('in', field, literals)
+
+ def is_not_in(self, field: str, literals: List[Any]) -> PredicatePy4j:
+ return self._build('notIn', field, literals)
+
+ def between(self, field: str, included_lower_bound: Any,
included_upper_bound: Any) \
+ -> PredicatePy4j:
+ return self._build('between', field, [included_lower_bound,
included_upper_bound])
+
+ def and_predicates(self, predicates: List[PredicatePy4j]) -> PredicatePy4j:
+ predicates = list(map(lambda p: p.to_j_predicate(), predicates))
+ j_predicate = get_gateway().jvm.PredicationUtil.buildAnd(predicates)
+ return PredicatePy4j(serialize_java_object(j_predicate))
+
+ def or_predicates(self, predicates: List[PredicatePy4j]) -> PredicatePy4j:
+ predicates = list(map(lambda p: p.to_j_predicate(), predicates))
+ j_predicate = get_gateway().jvm.PredicationUtil.buildOr(predicates)
+ return PredicatePy4j(serialize_java_object(j_predicate))
diff --git a/paimon-python/pypaimon/tests/py4j_impl/java_utils.py
b/paimon-python/pypaimon/tests/py4j_impl/java_utils.py
new file mode 100644
index 0000000000..1329c2cd7c
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py4j_impl/java_utils.py
@@ -0,0 +1,132 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import pyarrow as pa
+
+
+def to_j_catalog_context(catalog_options: dict):
+ from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+ gateway = get_gateway()
+ j_options = gateway.jvm.Options(catalog_options)
+ return gateway.jvm.CatalogContext.create(j_options)
+
+
+def to_j_identifier(identifier: str):
+ from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+ return get_gateway().jvm.Identifier.fromString(identifier)
+
+
+def to_paimon_schema(schema: 'SchemaPy4j'):
+ from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+ j_schema_builder = get_gateway().jvm.Schema.newBuilder()
+
+ if schema.partition_keys is not None:
+ j_schema_builder.partitionKeys(schema.partition_keys)
+
+ if schema.primary_keys is not None:
+ j_schema_builder.primaryKey(schema.primary_keys)
+
+ if schema.options is not None:
+ j_schema_builder.options(schema.options)
+
+ j_schema_builder.comment(schema.comment)
+
+ for field in schema.pa_schema:
+ column_name = field.name
+ column_type = _to_j_type(column_name, field.type)
+ j_schema_builder.column(column_name, column_type)
+ return j_schema_builder.build()
+
+
+def check_batch_write(j_table):
+ from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+ gateway = get_gateway()
+ bucket_mode = j_table.bucketMode()
+ if bucket_mode == gateway.jvm.BucketMode.HASH_DYNAMIC \
+ or bucket_mode == gateway.jvm.BucketMode.CROSS_PARTITION:
+ raise TypeError("Doesn't support writing dynamic bucket or cross
partition table.")
+
+
+def _to_j_type(name, pa_type):
+ from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+ jvm = get_gateway().jvm
+ # int
+ if pa.types.is_int8(pa_type):
+ return jvm.DataTypes.TINYINT()
+ elif pa.types.is_int16(pa_type):
+ return jvm.DataTypes.SMALLINT()
+ elif pa.types.is_int32(pa_type):
+ return jvm.DataTypes.INT()
+ elif pa.types.is_int64(pa_type):
+ return jvm.DataTypes.BIGINT()
+ # float
+ elif pa.types.is_float16(pa_type) or pa.types.is_float32(pa_type):
+ return jvm.DataTypes.FLOAT()
+ elif pa.types.is_float64(pa_type):
+ return jvm.DataTypes.DOUBLE()
+ # string
+ elif pa.types.is_string(pa_type):
+ return jvm.DataTypes.STRING()
+ # bool
+ elif pa.types.is_boolean(pa_type):
+ return jvm.DataTypes.BOOLEAN()
+ elif pa.types.is_null(pa_type):
+ print(f"WARN: The type of column '{name}' is null, "
+ "and it will be converted to string type by default. "
+ "Please check if the original type is string. "
+ f"If not, please manually specify the type of '{name}'.")
+ return jvm.DataTypes.STRING()
+ else:
+ raise ValueError(f'Found unsupported data type {str(pa_type)} for
field {name}.')
+
+
+def to_arrow_schema(j_row_type):
+ from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+ # init arrow schema
+ schema_bytes = get_gateway().jvm.SchemaUtil.getArrowSchema(j_row_type)
+ schema_reader = pa.RecordBatchStreamReader(pa.BufferReader(schema_bytes))
+ arrow_schema = schema_reader.schema
+ schema_reader.close()
+ return arrow_schema
+
+
+def serialize_java_object(java_obj) -> bytes:
+ from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+ gateway = get_gateway()
+ util = gateway.jvm.org.apache.paimon.utils.InstantiationUtil
+ try:
+ java_bytes = util.serializeObject(java_obj)
+ return bytes(java_bytes)
+ except Exception as e:
+ raise RuntimeError(f"Java serialization failed: {e}")
+
+
+def deserialize_java_object(bytes_data):
+ from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+ gateway = get_gateway()
+ cl = get_gateway().jvm.Thread.currentThread().getContextClassLoader()
+ util = gateway.jvm.org.apache.paimon.utils.InstantiationUtil
+ return util.deserializeObject(bytes_data, cl)
diff --git
a/paimon-python/pypaimon/tests/py4j_impl/test_deps/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
b/paimon-python/pypaimon/tests/py4j_impl/test_deps/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
new file mode 100644
index 0000000000..a7b50b073a
Binary files /dev/null and
b/paimon-python/pypaimon/tests/py4j_impl/test_deps/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
differ
diff --git
a/paimon-python/pypaimon/tests/py4j_impl/test_deps/paimon-python-java-bridge-0.9-SNAPSHOT.jar
b/paimon-python/pypaimon/tests/py4j_impl/test_deps/paimon-python-java-bridge-0.9-SNAPSHOT.jar
new file mode 100644
index 0000000000..dfbe9d7d0b
Binary files /dev/null and
b/paimon-python/pypaimon/tests/py4j_impl/test_deps/paimon-python-java-bridge-0.9-SNAPSHOT.jar
differ
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index 4671e32996..b9b115dfa4 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -38,10 +38,10 @@ class PkReaderTest(unittest.TestCase):
cls.catalog.create_database('default', False)
cls.pa_schema = pa.schema([
- ('user_id', pa.int32()),
+ pa.field('user_id', pa.int32(), nullable=False),
('item_id', pa.int64()),
('behavior', pa.string()),
- ('dt', pa.string())
+ pa.field('dt', pa.string(), nullable=False)
])
cls.expected = pa.Table.from_pydict({
'user_id': [1, 2, 3, 4, 5, 7, 8],
diff --git a/paimon-python/pypaimon/tests/rest_catalog_base_test.py
b/paimon-python/pypaimon/tests/rest_catalog_base_test.py
index 773d94abed..c035957ccc 100644
--- a/paimon-python/pypaimon/tests/rest_catalog_base_test.py
+++ b/paimon-python/pypaimon/tests/rest_catalog_base_test.py
@@ -186,7 +186,7 @@ class RESTCatalogBaseTest(unittest.TestCase):
self.assertTrue(os.path.exists(self.warehouse +
"/default/test_table/snapshot/snapshot-1"))
self.assertTrue(os.path.exists(self.warehouse +
"/default/test_table/manifest"))
self.assertTrue(os.path.exists(self.warehouse +
"/default/test_table/dt=p1"))
- self.assertEqual(len(glob.glob(self.warehouse +
"/default/test_table/manifest/*.avro")), 2)
+ self.assertEqual(len(glob.glob(self.warehouse +
"/default/test_table/manifest/*")), 3)
def _write_test_table(self, table):
write_builder = table.new_batch_write_builder()
diff --git a/paimon-python/pypaimon/tests/rest_server.py
b/paimon-python/pypaimon/tests/rest_server.py
index d0486ec02c..c326f3525d 100644
--- a/paimon-python/pypaimon/tests/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest_server.py
@@ -495,8 +495,8 @@ class RESTCatalogServer:
def _write_snapshot_files(self, identifier: Identifier, snapshot,
statistics):
"""Write snapshot and related files to the file system"""
- import os
import json
+ import os
import uuid
# Construct table path: {warehouse}/{database}/{table}
diff --git a/paimon-python/pypaimon/tests/rest_table_test.py
b/paimon-python/pypaimon/tests/rest_table_test.py
index 9c64c17003..b905fa593b 100644
--- a/paimon-python/pypaimon/tests/rest_table_test.py
+++ b/paimon-python/pypaimon/tests/rest_table_test.py
@@ -168,7 +168,7 @@ class RESTTableTest(RESTCatalogBaseTest):
self.assertTrue(os.path.exists(self.warehouse +
"/default/test_postpone/snapshot/LATEST"))
self.assertTrue(os.path.exists(self.warehouse +
"/default/test_postpone/snapshot/snapshot-1"))
self.assertTrue(os.path.exists(self.warehouse +
"/default/test_postpone/manifest"))
- self.assertEqual(len(glob.glob(self.warehouse +
"/default/test_postpone/manifest/*.avro")), 2)
+ self.assertEqual(len(glob.glob(self.warehouse +
"/default/test_postpone/manifest/*")), 3)
self.assertEqual(len(glob.glob(self.warehouse +
"/default/test_postpone/user_id=2/bucket-postpone/*.avro")), 1)
def test_postpone_read_write(self):
diff --git a/paimon-python/pypaimon/tests/schema_test.py
b/paimon-python/pypaimon/tests/schema_test.py
index daa9442e50..766676e95f 100644
--- a/paimon-python/pypaimon/tests/schema_test.py
+++ b/paimon-python/pypaimon/tests/schema_test.py
@@ -32,7 +32,8 @@ class SchemaTestCase(unittest.TestCase):
DataField(0, "name", AtomicType('INT'), 'desc name'),
DataField(1, "arr", ArrayType(True, AtomicType('INT')), 'desc
arr1'),
DataField(2, "map1",
- MapType(False, AtomicType('INT'), MapType(False,
AtomicType('INT'), AtomicType('INT'))),
+ MapType(False, AtomicType('INT', False),
+ MapType(False, AtomicType('INT', False),
AtomicType('INT', False))),
'desc map1'),
]
table_schema = TableSchema(TableSchema.CURRENT_VERSION,
len(data_fields), data_fields,
diff --git a/paimon-python/pypaimon/tests/writer_test.py
b/paimon-python/pypaimon/tests/writer_test.py
index c0c242d2c4..8bf38f72e0 100644
--- a/paimon-python/pypaimon/tests/writer_test.py
+++ b/paimon-python/pypaimon/tests/writer_test.py
@@ -71,7 +71,7 @@ class WriterTest(unittest.TestCase):
self.assertTrue(os.path.exists(self.warehouse +
"/test_db.db/test_table/snapshot/snapshot-1"))
self.assertTrue(os.path.exists(self.warehouse +
"/test_db.db/test_table/manifest"))
self.assertTrue(os.path.exists(self.warehouse +
"/test_db.db/test_table/bucket-0"))
- self.assertEqual(len(glob.glob(self.warehouse +
"/test_db.db/test_table/manifest/*.avro")), 2)
+ self.assertEqual(len(glob.glob(self.warehouse +
"/test_db.db/test_table/manifest/*")), 3)
self.assertEqual(len(glob.glob(self.warehouse +
"/test_db.db/test_table/bucket-0/*.parquet")), 1)
with open(self.warehouse +
'/test_db.db/test_table/snapshot/snapshot-1', 'r', encoding='utf-8') as file:
diff --git a/paimon-python/pypaimon/write/commit_message.py
b/paimon-python/pypaimon/write/commit_message.py
index 1a2177abb1..4e4c0f0b48 100644
--- a/paimon-python/pypaimon/write/commit_message.py
+++ b/paimon-python/pypaimon/write/commit_message.py
@@ -15,31 +15,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
+from dataclasses import dataclass
from typing import List, Tuple
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+@dataclass
class CommitMessage:
- """Python implementation of CommitMessage"""
-
- def __init__(self, partition: Tuple, bucket: int, new_files:
List[DataFileMeta]):
- self._partition = partition
- self._bucket = bucket
- self._new_files = new_files or []
-
- def partition(self) -> Tuple:
- """Get the partition of this commit message."""
- return self._partition
-
- def bucket(self) -> int:
- """Get the bucket of this commit message."""
- return self._bucket
-
- def new_files(self) -> List[DataFileMeta]:
- """Get the list of new files."""
- return self._new_files
+ partition: Tuple
+ bucket: int
+ new_files: List[DataFileMeta]
def is_empty(self):
- return not self._new_files
+ return not self.new_files
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index e98cb8610f..ca26981333 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -17,14 +17,18 @@
################################################################################
import time
+import uuid
from pathlib import Path
from typing import List
from pypaimon.catalog.snapshot_commit import PartitionStatistics,
SnapshotCommit
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
+from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.table.row.binary_row import BinaryRow
from pypaimon.write.commit_message import CommitMessage
@@ -55,39 +59,69 @@ class FileStoreCommit:
if not commit_messages:
return
- new_manifest_files = self.manifest_file_manager.write(commit_messages)
- if not new_manifest_files:
- return
+ unique_id = uuid.uuid4()
+ base_manifest_list = f"manifest-list-{unique_id}-0"
+ delta_manifest_list = f"manifest-list-{unique_id}-1"
+
+ # process new_manifest
+ new_manifest_file = f"manifest-{str(uuid.uuid4())}-0"
+ self.manifest_file_manager.write(new_manifest_file, commit_messages)
+
+ partition_columns = list(zip(*(msg.partition for msg in
commit_messages)))
+ partition_min_stats = [min(col) for col in partition_columns]
+ partition_max_stats = [max(col) for col in partition_columns]
+ partition_null_counts = [sum(value == 0 for value in col) for col in
partition_columns]
+ if not all(count == 0 for count in partition_null_counts):
+ raise RuntimeError("Partition value should not be null")
+
+ new_manifest_list = ManifestFileMeta(
+ file_name=new_manifest_file,
+
file_size=self.table.file_io.get_file_size(self.manifest_file_manager.manifest_path
/ new_manifest_file),
+ num_added_files=sum(len(msg.new_files) for msg in commit_messages),
+ num_deleted_files=0,
+ partition_stats=SimpleStats(
+ min_value=BinaryRow(
+ values=partition_min_stats,
+ fields=self.table.table_schema.get_partition_key_fields(),
+ ),
+ max_value=BinaryRow(
+ values=partition_max_stats,
+ fields=self.table.table_schema.get_partition_key_fields(),
+ ),
+ null_count=partition_null_counts,
+ ),
+ schema_id=self.table.table_schema.id,
+ )
+ self.manifest_list_manager.write(delta_manifest_list,
[new_manifest_list])
+ # process existing_manifest
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
-
- existing_manifest_files = []
- record_count_add = self._generate_record_count_add(commit_messages)
- total_record_count = record_count_add
-
+ total_record_count = 0
if latest_snapshot:
- existing_manifest_files =
self.manifest_list_manager.read_all_manifest_files(latest_snapshot)
+ existing_manifest_files =
self.manifest_list_manager.read_all(latest_snapshot)
previous_record_count = latest_snapshot.total_record_count
if previous_record_count:
total_record_count += previous_record_count
+ else:
+ existing_manifest_files = []
+ self.manifest_list_manager.write(base_manifest_list,
existing_manifest_files)
- new_manifest_files.extend(existing_manifest_files)
- manifest_list = self.manifest_list_manager.write(new_manifest_files)
-
+ # process snapshot
new_snapshot_id = self._generate_snapshot_id()
+ record_count_add = self._generate_record_count_add(commit_messages)
+ total_record_count += record_count_add
snapshot_data = Snapshot(
- version=3,
+ version=1,
id=new_snapshot_id,
- schema_id=0,
- base_manifest_list=manifest_list,
- delta_manifest_list=manifest_list,
+ schema_id=self.table.table_schema.id,
+ base_manifest_list=base_manifest_list,
+ delta_manifest_list=delta_manifest_list,
total_record_count=total_record_count,
delta_record_count=record_count_add,
commit_user=self.commit_user,
commit_identifier=commit_identifier,
commit_kind="APPEND",
time_millis=int(time.time() * 1000),
- log_offsets={},
)
# Generate partition statistics for the commit
@@ -101,46 +135,11 @@ class FileStoreCommit:
def overwrite(self, partition, commit_messages: List[CommitMessage],
commit_identifier: int):
"""Commit the given commit messages in overwrite mode."""
- if not commit_messages:
- return
-
- new_manifest_files = self.manifest_file_manager.write(commit_messages)
- if not new_manifest_files:
- return
-
- # In overwrite mode, we don't merge with existing manifests
- manifest_list = self.manifest_list_manager.write(new_manifest_files)
-
- record_count_add = self._generate_record_count_add(commit_messages)
-
- new_snapshot_id = self._generate_snapshot_id()
- snapshot_data = Snapshot(
- version=3,
- id=new_snapshot_id,
- schema_id=0,
- base_manifest_list=manifest_list,
- delta_manifest_list=manifest_list,
- total_record_count=record_count_add,
- delta_record_count=record_count_add,
- commit_user=self.commit_user,
- commit_identifier=commit_identifier,
- commit_kind="OVERWRITE",
- time_millis=int(time.time() * 1000),
- log_offsets={},
- )
-
- # Generate partition statistics for the commit
- statistics = self._generate_partition_statistics(commit_messages)
-
- # Use SnapshotCommit for atomic commit
- with self.snapshot_commit:
- success = self.snapshot_commit.commit(snapshot_data,
self.table.current_branch(), statistics)
- if not success:
- raise RuntimeError(f"Failed to commit snapshot
{new_snapshot_id}")
+ raise RuntimeError("overwrite unsupported yet")
def abort(self, commit_messages: List[CommitMessage]):
for message in commit_messages:
- for file in message.new_files():
+ for file in message.new_files:
try:
file_path_obj = Path(file.file_path)
if file_path_obj.exists():
@@ -179,7 +178,7 @@ class FileStoreCommit:
for message in commit_messages:
# Convert partition tuple to dictionary for PartitionStatistics
- partition_value = message.partition() # Call the method to get
partition value
+ partition_value = message.partition # Call the method to get
partition value
if partition_value:
# Assuming partition is a tuple and we need to convert it to a
dict
# This may need adjustment based on actual partition format
@@ -213,8 +212,7 @@ class FileStoreCommit:
# Process each file in the commit message
# Following Java implementation: PartitionEntry.fromDataFile()
- new_files = message.new_files()
- for file_meta in new_files:
+ for file_meta in message.new_files:
# Extract actual file metadata (following Java DataFileMeta
pattern)
record_count = file_meta.row_count
file_size_in_bytes = file_meta.file_size
@@ -266,7 +264,7 @@ class FileStoreCommit:
record_count = 0
for message in commit_messages:
- new_files = message.new_files()
+ new_files = message.new_files
for file_meta in new_files:
record_count += file_meta.row_count
diff --git a/paimon-python/pypaimon/write/file_store_write.py
b/paimon-python/pypaimon/write/file_store_write.py
index 3bf6150b03..bcef10a4c3 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
from typing import Dict, List, Tuple
import pyarrow as pa
@@ -34,6 +33,7 @@ class FileStoreWrite:
self.table: FileStoreTable = table
self.data_writers: Dict[Tuple, DataWriter] = {}
+ self.max_seq_numbers = self._seq_number_stats() # TODO: build this
on-demand instead of on all
def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
key = (partition, bucket)
@@ -48,12 +48,14 @@ class FileStoreWrite:
table=self.table,
partition=partition,
bucket=bucket,
+ max_seq_number=self.max_seq_numbers.get((partition, bucket),
1),
)
else:
return AppendOnlyDataWriter(
table=self.table,
partition=partition,
bucket=bucket,
+ max_seq_number=self.max_seq_numbers.get((partition, bucket),
1),
)
def prepare_commit(self) -> List[CommitMessage]:
@@ -74,3 +76,33 @@ class FileStoreWrite:
for writer in self.data_writers.values():
writer.close()
self.data_writers.clear()
+
+ def _seq_number_stats(self) -> dict:
+ from pypaimon.manifest.manifest_file_manager import ManifestFileManager
+ from pypaimon.manifest.manifest_list_manager import ManifestListManager
+ from pypaimon.snapshot.snapshot_manager import SnapshotManager
+
+ snapshot_manager = SnapshotManager(self.table)
+ manifest_list_manager = ManifestListManager(self.table)
+ manifest_file_manager = ManifestFileManager(self.table)
+
+ latest_snapshot = snapshot_manager.get_latest_snapshot()
+ if not latest_snapshot:
+ return {}
+ manifest_files = manifest_list_manager.read_all(latest_snapshot)
+
+ file_entries = []
+ for manifest_file in manifest_files:
+ manifest_entries =
manifest_file_manager.read(manifest_file.file_name)
+ for entry in manifest_entries:
+ if entry.kind == 0:
+ file_entries.append(entry)
+
+ max_seq_numbers = {}
+ for entry in file_entries:
+ partition_key = (tuple(entry.partition.values), entry.bucket)
+ current_seq_num = entry.file.max_sequence_number
+ existing_max = max_seq_numbers.get(partition_key, -1)
+ if current_seq_num > existing_max:
+ max_seq_numbers[partition_key] = current_seq_num
+ return max_seq_numbers
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index c11d991b84..5d9641718c 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -15,16 +15,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
import uuid
from abc import ABC, abstractmethod
+from datetime import datetime
from pathlib import Path
from typing import List, Optional, Tuple
import pyarrow as pa
+import pyarrow.compute as pc
from pypaimon.common.core_options import CoreOptions
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.table.row.binary_row import BinaryRow
@@ -32,7 +34,7 @@ from pypaimon.table.row.binary_row import BinaryRow
class DataWriter(ABC):
"""Base class for data writers that handle PyArrow tables directly."""
- def __init__(self, table, partition: Tuple, bucket: int):
+ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
@@ -50,6 +52,7 @@ class DataWriter(ABC):
if self.bucket !=
BucketMode.POSTPONE_BUCKET.value
else CoreOptions.FILE_FORMAT_AVRO)
self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd")
+ self.sequence_generator = SequenceGenerator(max_seq_number)
self.pending_data: Optional[pa.RecordBatch] = None
self.committed_files: List[DataFileMeta] = []
@@ -89,7 +92,7 @@ class DataWriter(ABC):
current_size = self.pending_data.get_total_buffer_size()
if current_size > self.target_file_size:
- split_row = _find_optimal_split_point(self.pending_data,
self.target_file_size)
+ split_row = self._find_optimal_split_point(self.pending_data,
self.target_file_size)
if split_row > 0:
data_to_write = self.pending_data.slice(0, split_row)
remaining_data = self.pending_data.slice(split_row)
@@ -101,7 +104,7 @@ class DataWriter(ABC):
def _write_data_to_file(self, data: pa.RecordBatch):
if data.num_rows == 0:
return
- file_name = f"data-{uuid.uuid4()}.{self.file_format}"
+ file_name = f"data-{uuid.uuid4()}-0.{self.file_format}"
file_path = self._generate_file_path(file_name)
if self.file_format == CoreOptions.FILE_FORMAT_PARQUET:
self.file_io.write_parquet(file_path, data,
compression=self.compression)
@@ -112,24 +115,55 @@ class DataWriter(ABC):
else:
raise ValueError(f"Unsupported file format: {self.file_format}")
+ # min key & max key
key_columns_batch = data.select(self.trimmed_primary_key)
min_key_row_batch = key_columns_batch.slice(0, 1)
- min_key_data = [col.to_pylist()[0] for col in
min_key_row_batch.columns]
max_key_row_batch = key_columns_batch.slice(key_columns_batch.num_rows
- 1, 1)
- max_key_data = [col.to_pylist()[0] for col in
max_key_row_batch.columns]
+ min_key = [col.to_pylist()[0] for col in min_key_row_batch.columns]
+ max_key = [col.to_pylist()[0] for col in max_key_row_batch.columns]
+
+ # key stats & value stats
+ column_stats = {
+ field.name: self._get_column_stats(data, field.name)
+ for field in self.table.table_schema.fields
+ }
+ all_fields = self.table.table_schema.fields
+ min_value_stats = [column_stats[field.name]['min_value'] for field in
all_fields]
+ max_value_stats = [column_stats[field.name]['max_value'] for field in
all_fields]
+ value_null_counts = [column_stats[field.name]['null_count'] for field
in all_fields]
+ key_fields = self.trimmed_primary_key_fields
+ min_key_stats = [column_stats[field.name]['min_value'] for field in
key_fields]
+ max_key_stats = [column_stats[field.name]['max_value'] for field in
key_fields]
+ key_null_counts = [column_stats[field.name]['null_count'] for field in
key_fields]
+ if not all(count == 0 for count in key_null_counts):
+ raise RuntimeError("Primary key should not be null")
+
+ min_seq = self.sequence_generator.start
+ max_seq = self.sequence_generator.current
+ self.sequence_generator.start = self.sequence_generator.current
self.committed_files.append(DataFileMeta(
file_name=file_name,
file_size=self.file_io.get_file_size(file_path),
row_count=data.num_rows,
- min_key=BinaryRow(min_key_data, self.trimmed_primary_key_fields),
- max_key=BinaryRow(max_key_data, self.trimmed_primary_key_fields),
- key_stats=None, # TODO
- value_stats=None,
- min_sequence_number=0,
- max_sequence_number=0,
- schema_id=0,
+ min_key=BinaryRow(min_key, self.trimmed_primary_key_fields),
+ max_key=BinaryRow(max_key, self.trimmed_primary_key_fields),
+ key_stats=SimpleStats(
+ BinaryRow(min_key_stats, self.trimmed_primary_key_fields),
+ BinaryRow(max_key_stats, self.trimmed_primary_key_fields),
+ key_null_counts,
+ ),
+ value_stats=SimpleStats(
+ BinaryRow(min_value_stats, self.table.table_schema.fields),
+ BinaryRow(max_value_stats, self.table.table_schema.fields),
+ value_null_counts,
+ ),
+ min_sequence_number=min_seq,
+ max_sequence_number=max_seq,
+ schema_id=self.table.table_schema.id,
level=0,
- extra_files=None,
+ extra_files=[],
+ creation_time=datetime.now(),
+ delete_row_count=0,
file_path=str(file_path),
))
@@ -146,24 +180,52 @@ class DataWriter(ABC):
return path_builder
-
-def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int:
- total_rows = data.num_rows
- if total_rows <= 1:
- return 0
-
- left, right = 1, total_rows
- best_split = 0
-
- while left <= right:
- mid = (left + right) // 2
- slice_data = data.slice(0, mid)
- slice_size = slice_data.get_total_buffer_size()
-
- if slice_size <= target_size:
- best_split = mid
- left = mid + 1
- else:
- right = mid - 1
-
- return best_split
+ @staticmethod
+ def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) ->
int:
+ total_rows = data.num_rows
+ if total_rows <= 1:
+ return 0
+
+ left, right = 1, total_rows
+ best_split = 0
+
+ while left <= right:
+ mid = (left + right) // 2
+ slice_data = data.slice(0, mid)
+ slice_size = slice_data.get_total_buffer_size()
+
+ if slice_size <= target_size:
+ best_split = mid
+ left = mid + 1
+ else:
+ right = mid - 1
+
+ return best_split
+
+ @staticmethod
+ def _get_column_stats(record_batch: pa.RecordBatch, column_name: str) ->
dict:
+ column_array = record_batch.column(column_name)
+ if column_array.null_count == len(column_array):
+ return {
+ "min_value": None,
+ "max_value": None,
+ "null_count": column_array.null_count,
+ }
+ min_value = pc.min(column_array).as_py()
+ max_value = pc.max(column_array).as_py()
+ null_count = column_array.null_count
+ return {
+ "min_value": min_value,
+ "max_value": max_value,
+ "null_count": null_count,
+ }
+
+
+class SequenceGenerator:
+ def __init__(self, start: int = 0):
+ self.start = start
+ self.current = start
+
+ def next(self) -> int:
+ self.current += 1
+ return self.current
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
index dbf47496c9..99b11a9788 100644
--- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -22,18 +22,6 @@ import pyarrow.compute as pc
from pypaimon.write.writer.data_writer import DataWriter
-class SequenceGenerator:
- def __init__(self, start: int = 0):
- self.current = start
-
- def next(self) -> int:
- self.current += 1
- return self.current
-
-
-sequence_generator = SequenceGenerator()
-
-
class KeyValueDataWriter(DataWriter):
"""Data writer for primary key tables with system fields and sorting."""
@@ -55,11 +43,11 @@ class KeyValueDataWriter(DataWriter):
key_column = data.column(pk_key)
enhanced_table = enhanced_table.add_column(0,
f'_KEY_{pk_key}', key_column)
- sequence_column = pa.array([sequence_generator.next() for _ in
range(num_rows)], type=pa.int64())
+ sequence_column = pa.array([self.sequence_generator.next() for _ in
range(num_rows)], type=pa.int64())
enhanced_table =
enhanced_table.add_column(len(self.trimmed_primary_key), '_SEQUENCE_NUMBER',
sequence_column)
# TODO: support real row kind here
- value_kind_column = pa.repeat(0, num_rows)
+ value_kind_column = pa.array([0] * num_rows, type=pa.int32())
enhanced_table =
enhanced_table.add_column(len(self.trimmed_primary_key) + 1, '_VALUE_KIND',
value_kind_column)
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 507844685c..a00a2ac487 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -28,7 +28,8 @@ install_requires = [
'ossfs==2023.12.0',
'pyarrow==16.0.0',
'polars==1.32.0',
- 'fastavro==1.11.1'
+ 'fastavro==1.11.1',
+ 'zstandard==0.24.0'
]
long_description = "See Apache Paimon Python API \