This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
commit a66f87d16fcd40fd7f3fda9fda2afeb6d010bd57 Author: ChengHui Chen <[email protected]> AuthorDate: Wed Aug 27 11:36:21 2025 +0800 [Python] Enhanced Manifest Statistics & Hybrid Java/Python Writer Tests (#6119) --- .github/workflows/paimon-python-checks.yml | 2 +- .../pypaimon/catalog/filesystem_catalog.py | 4 +- .../pypaimon/catalog/renaming_snapshot_commit.py | 2 +- .../pypaimon/manifest/manifest_file_manager.py | 92 ++++++++----- .../pypaimon/manifest/manifest_list_manager.py | 73 +++++++---- .../pypaimon/manifest/schema/data_file_meta.py | 36 ++--- .../pypaimon/manifest/schema/manifest_entry.py | 1 + .../pypaimon/manifest/schema/manifest_file_meta.py | 5 +- .../pypaimon/manifest/schema/simple_stats.py | 17 ++- .../read/reader/data_file_record_reader.py | 16 ++- paimon-python/pypaimon/read/split_read.py | 6 +- paimon-python/pypaimon/read/table_scan.py | 6 +- paimon-python/pypaimon/schema/data_types.py | 13 +- paimon-python/pypaimon/schema/schema_manager.py | 11 +- paimon-python/pypaimon/schema/table_schema.py | 37 +----- paimon-python/pypaimon/table/file_store_table.py | 5 +- paimon-python/pypaimon/table/row/binary_row.py | 145 ++++++--------------- .../pypaimon/tests/reader_primary_key_test.py | 4 +- .../pypaimon/tests/rest_catalog_base_test.py | 2 +- paimon-python/pypaimon/tests/rest_server.py | 2 +- paimon-python/pypaimon/tests/rest_table_test.py | 2 +- paimon-python/pypaimon/tests/schema_test.py | 3 +- paimon-python/pypaimon/tests/writer_test.py | 2 +- paimon-python/pypaimon/write/commit_message.py | 26 +--- paimon-python/pypaimon/write/file_store_commit.py | 114 ++++++++-------- paimon-python/pypaimon/write/file_store_write.py | 34 ++++- paimon-python/pypaimon/write/writer/data_writer.py | 132 ++++++++++++++----- .../pypaimon/write/writer/key_value_data_writer.py | 16 +-- paimon-python/setup.py | 3 +- 29 files changed, 421 insertions(+), 390 deletions(-) diff --git a/.github/workflows/paimon-python-checks.yml b/.github/workflows/paimon-python-checks.yml index 9a38e9b5c2..6f6a8a23a8 100644 --- a/.github/workflows/paimon-python-checks.yml +++ b/.github/workflows/paimon-python-checks.yml @@ -46,7 +46,7 @@ jobs: python-version: ${{ env.PYTHON_VERSION }} - name: Install dependencies run: | - python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 flake8==4.0.1 pytest~=7.0 requests 2>&1 >/dev/null + python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests 2>&1 >/dev/null - name: Run lint-python.sh run: | chmod +x paimon-python/dev/lint-python.sh diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py b/paimon-python/pypaimon/catalog/filesystem_catalog.py index 68d1e438c8..1aed6e8137 100644 --- a/paimon-python/pypaimon/catalog/filesystem_catalog.py +++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py @@ -17,7 +17,7 @@ ################################################################################# from pathlib import Path -from typing import Optional, Union, List +from typing import List, Optional, Union from urllib.parse import urlparse from pypaimon.catalog.catalog import Catalog @@ -67,7 +67,7 @@ class FileSystemCatalog(Catalog): if not isinstance(identifier, Identifier): identifier = Identifier.from_string(identifier) if CoreOptions.SCAN_FALLBACK_BRANCH in self.catalog_options: - raise ValueError(CoreOptions.SCAN_FALLBACK_BRANCH) + raise ValueError(f"Unsupported CoreOption {CoreOptions.SCAN_FALLBACK_BRANCH}") table_path = self.get_table_path(identifier) table_schema = self.get_table_schema(identifier) diff --git a/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py b/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py index 4a2e70e4e2..b85c4200a1 100644 --- a/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py +++ b/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py @@ -63,7 +63,7 @@ class RenamingSnapshotCommit(SnapshotCommit): if not self.file_io.exists(new_snapshot_path): """Internal function to perform the actual commit.""" # Try to write atomically using the file IO - committed = self.file_io.try_to_write_atomic(new_snapshot_path, JSON.to_json(snapshot)) + committed = self.file_io.try_to_write_atomic(new_snapshot_path, JSON.to_json(snapshot, indent=2)) if committed: # Update the latest hint self._commit_latest_hint(snapshot.id) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index ef674e5b88..7c46b368d2 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -15,7 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import uuid from io import BytesIO from typing import List @@ -24,8 +23,10 @@ import fastavro from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA, ManifestEntry) +from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.table.row.binary_row import (BinaryRow, BinaryRowDeserializer, BinaryRowSerializer) +from pypaimon.write.commit_message import CommitMessage class ManifestFileManager: @@ -51,20 +52,40 @@ class ManifestFileManager: reader = fastavro.reader(buffer) for record in reader: - file_info = dict(record['_FILE']) + file_dict = dict(record['_FILE']) + key_dict = dict(file_dict['_KEY_STATS']) + key_stats = SimpleStats( + min_value=BinaryRowDeserializer.from_bytes(key_dict['_MIN_VALUES'], + self.trimmed_primary_key_fields), + max_value=BinaryRowDeserializer.from_bytes(key_dict['_MAX_VALUES'], + self.trimmed_primary_key_fields), + null_count=key_dict['_NULL_COUNTS'], + ) + value_dict = dict(file_dict['_VALUE_STATS']) + value_stats = SimpleStats( + min_value=BinaryRowDeserializer.from_bytes(value_dict['_MIN_VALUES'], + self.table.table_schema.fields), + max_value=BinaryRowDeserializer.from_bytes(value_dict['_MAX_VALUES'], + self.table.table_schema.fields), + null_count=value_dict['_NULL_COUNTS'], + ) file_meta = DataFileMeta( - file_name=file_info['_FILE_NAME'], - file_size=file_info['_FILE_SIZE'], - row_count=file_info['_ROW_COUNT'], - min_key=BinaryRowDeserializer.from_bytes(file_info['_MIN_KEY'], self.trimmed_primary_key_fields), - max_key=BinaryRowDeserializer.from_bytes(file_info['_MAX_KEY'], self.trimmed_primary_key_fields), - key_stats=None, # TODO - value_stats=None, # TODO - min_sequence_number=file_info['_MIN_SEQUENCE_NUMBER'], - max_sequence_number=file_info['_MAX_SEQUENCE_NUMBER'], - schema_id=file_info['_SCHEMA_ID'], - level=file_info['_LEVEL'], - extra_files=None, # TODO + file_name=file_dict['_FILE_NAME'], + file_size=file_dict['_FILE_SIZE'], + row_count=file_dict['_ROW_COUNT'], + min_key=BinaryRowDeserializer.from_bytes(file_dict['_MIN_KEY'], self.trimmed_primary_key_fields), + max_key=BinaryRowDeserializer.from_bytes(file_dict['_MAX_KEY'], self.trimmed_primary_key_fields), + key_stats=key_stats, + value_stats=value_stats, + min_sequence_number=file_dict['_MIN_SEQUENCE_NUMBER'], + max_sequence_number=file_dict['_MAX_SEQUENCE_NUMBER'], + schema_id=file_dict['_SCHEMA_ID'], + level=file_dict['_LEVEL'], + extra_files=file_dict['_EXTRA_FILES'], + creation_time=file_dict['_CREATION_TIME'], + delete_row_count=file_dict['_DELETE_ROW_COUNT'], + embedded_index=file_dict['_EMBEDDED_FILE_INDEX'], + file_source=file_dict['_FILE_SOURCE'], ) entry = ManifestEntry( kind=record['_KIND'], @@ -73,22 +94,23 @@ class ManifestFileManager: total_buckets=record['_TOTAL_BUCKETS'], file=file_meta ) - if not shard_filter(entry): + if shard_filter is not None and not shard_filter(entry): continue entries.append(entry) return entries - def write(self, commit_messages: List['CommitMessage']) -> List[str]: + def write(self, file_name, commit_messages: List[CommitMessage]): avro_records = [] for message in commit_messages: partition_bytes = BinaryRowSerializer.to_bytes( - BinaryRow(list(message.partition()), self.table.table_schema.get_partition_key_fields())) - for file in message.new_files(): + BinaryRow(list(message.partition), self.table.table_schema.get_partition_key_fields())) + for file in message.new_files: avro_record = { + "_VERSION": 2, "_KIND": 0, "_PARTITION": partition_bytes, - "_BUCKET": message.bucket(), - "_TOTAL_BUCKETS": -1, # TODO + "_BUCKET": message.bucket, + "_TOTAL_BUCKETS": self.table.total_buckets, "_FILE": { "_FILE_NAME": file.file_name, "_FILE_SIZE": file.file_size, @@ -96,33 +118,35 @@ class ManifestFileManager: "_MIN_KEY": BinaryRowSerializer.to_bytes(file.min_key), "_MAX_KEY": BinaryRowSerializer.to_bytes(file.max_key), "_KEY_STATS": { - "_MIN_VALUES": None, - "_MAX_VALUES": None, - "_NULL_COUNTS": 0, + "_MIN_VALUES": BinaryRowSerializer.to_bytes(file.key_stats.min_value), + "_MAX_VALUES": BinaryRowSerializer.to_bytes(file.key_stats.max_value), + "_NULL_COUNTS": file.key_stats.null_count, }, "_VALUE_STATS": { - "_MIN_VALUES": None, - "_MAX_VALUES": None, - "_NULL_COUNTS": 0, + "_MIN_VALUES": BinaryRowSerializer.to_bytes(file.value_stats.min_value), + "_MAX_VALUES": BinaryRowSerializer.to_bytes(file.value_stats.max_value), + "_NULL_COUNTS": file.value_stats.null_count, }, - "_MIN_SEQUENCE_NUMBER": 0, - "_MAX_SEQUENCE_NUMBER": 0, - "_SCHEMA_ID": 0, - "_LEVEL": 0, - "_EXTRA_FILES": [], + "_MIN_SEQUENCE_NUMBER": file.min_sequence_number, + "_MAX_SEQUENCE_NUMBER": file.max_sequence_number, + "_SCHEMA_ID": file.schema_id, + "_LEVEL": file.level, + "_EXTRA_FILES": file.extra_files, + "_CREATION_TIME": file.creation_time, + "_DELETE_ROW_COUNT": file.delete_row_count, + "_EMBEDDED_FILE_INDEX": file.embedded_index, + "_FILE_SOURCE": file.file_source, } } avro_records.append(avro_record) - manifest_filename = f"manifest-{str(uuid.uuid4())}.avro" - manifest_path = self.manifest_path / manifest_filename + manifest_path = self.manifest_path / file_name try: buffer = BytesIO() fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records) avro_bytes = buffer.getvalue() with self.file_io.new_output_stream(manifest_path) as output_stream: output_stream.write(avro_bytes) - return [str(manifest_filename)] except Exception as e: self.file_io.delete_quietly(manifest_path) raise RuntimeError(f"Failed to write manifest file: {e}") from e diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py b/paimon-python/pypaimon/manifest/manifest_list_manager.py index a338441465..65fd2b21ac 100644 --- a/paimon-python/pypaimon/manifest/manifest_list_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py @@ -16,15 +16,17 @@ # limitations under the License. ################################################################################ -import uuid from io import BytesIO -from typing import List, Optional +from typing import List import fastavro -from pypaimon.manifest.schema.manifest_file_meta import \ - MANIFEST_FILE_META_SCHEMA +from pypaimon.manifest.schema.manifest_file_meta import ( + MANIFEST_FILE_META_SCHEMA, ManifestFileMeta) +from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.snapshot.snapshot import Snapshot +from pypaimon.table.row.binary_row import (BinaryRowDeserializer, + BinaryRowSerializer) class ManifestListManager: @@ -37,57 +39,72 @@ class ManifestListManager: self.manifest_path = self.table.table_path / "manifest" self.file_io = self.table.file_io - def read_all_manifest_files(self, snapshot: Snapshot) -> List[str]: + def read_all(self, snapshot: Snapshot) -> List[ManifestFileMeta]: manifest_files = [] base_manifests = self.read(snapshot.base_manifest_list) manifest_files.extend(base_manifests) delta_manifests = self.read(snapshot.delta_manifest_list) manifest_files.extend(delta_manifests) - return list(set(manifest_files)) + return manifest_files - def read(self, manifest_list_name: str) -> List[str]: - manifest_list_path = self.manifest_path / manifest_list_name - manifest_paths = [] + def read(self, manifest_list_name: str) -> List[ManifestFileMeta]: + manifest_files = [] + manifest_list_path = self.manifest_path / manifest_list_name with self.file_io.new_input_stream(manifest_list_path) as input_stream: avro_bytes = input_stream.read() buffer = BytesIO(avro_bytes) reader = fastavro.reader(buffer) for record in reader: - file_name = record['_FILE_NAME'] - manifest_paths.append(file_name) - - return manifest_paths + stats_dict = dict(record['_PARTITION_STATS']) + partition_stats = SimpleStats( + min_value=BinaryRowDeserializer.from_bytes( + stats_dict['_MIN_VALUES'], + self.table.table_schema.get_partition_key_fields() + ), + max_value=BinaryRowDeserializer.from_bytes( + stats_dict['_MAX_VALUES'], + self.table.table_schema.get_partition_key_fields() + ), + null_count=stats_dict['_NULL_COUNTS'], + ) + manifest_file_meta = ManifestFileMeta( + file_name=record['_FILE_NAME'], + file_size=record['_FILE_SIZE'], + num_added_files=record['_NUM_ADDED_FILES'], + num_deleted_files=record['_NUM_DELETED_FILES'], + partition_stats=partition_stats, + schema_id=record['_SCHEMA_ID'], + ) + manifest_files.append(manifest_file_meta) - def write(self, manifest_file_names: List[str]) -> Optional[str]: - if not manifest_file_names: - return None + return manifest_files + def write(self, file_name, manifest_file_metas: List[ManifestFileMeta]): avro_records = [] - for manifest_file_name in manifest_file_names: + for meta in manifest_file_metas: avro_record = { - "_FILE_NAME": manifest_file_name, - "_FILE_SIZE": 0, # TODO - "_NUM_ADDED_FILES": 0, - "_NUM_DELETED_FILES": 0, + "_VERSION": 2, + "_FILE_NAME": meta.file_name, + "_FILE_SIZE": meta.file_size, + "_NUM_ADDED_FILES": meta.num_added_files, + "_NUM_DELETED_FILES": meta.num_deleted_files, "_PARTITION_STATS": { - "_MIN_VALUES": None, - "_MAX_VALUES": None, - "_NULL_COUNTS": 0, + "_MIN_VALUES": BinaryRowSerializer.to_bytes(meta.partition_stats.min_value), + "_MAX_VALUES": BinaryRowSerializer.to_bytes(meta.partition_stats.max_value), + "_NULL_COUNTS": meta.partition_stats.null_count, }, - "_SCHEMA_ID": 0, + "_SCHEMA_ID": meta.schema_id, } avro_records.append(avro_record) - list_filename = f"manifest-list-{str(uuid.uuid4())}.avro" - list_path = self.manifest_path / list_filename + list_path = self.manifest_path / file_name try: buffer = BytesIO() fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, avro_records) avro_bytes = buffer.getvalue() with self.file_io.new_output_stream(list_path) as output_stream: output_stream.write(avro_bytes) - return list_filename except Exception as e: self.file_io.delete_quietly(list_path) raise RuntimeError(f"Failed to write manifest list file: {e}") from e diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py b/paimon-python/pypaimon/manifest/schema/data_file_meta.py index 02a7179218..e1f60bf2e1 100644 --- a/paimon-python/pypaimon/manifest/schema/data_file_meta.py +++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py @@ -19,7 +19,7 @@ from dataclasses import dataclass from datetime import datetime from pathlib import Path -from typing import List, Optional +from typing import List from pypaimon.manifest.schema.simple_stats import (SIMPLE_STATS_SCHEMA, SimpleStats) @@ -31,34 +31,32 @@ class DataFileMeta: file_name: str file_size: int row_count: int - min_key: Optional[BinaryRow] - max_key: Optional[BinaryRow] - key_stats: Optional[SimpleStats] - value_stats: Optional[SimpleStats] + min_key: BinaryRow + max_key: BinaryRow + key_stats: SimpleStats + value_stats: SimpleStats min_sequence_number: int max_sequence_number: int schema_id: int level: int - extra_files: Optional[List[str]] + extra_files: List[str] - creation_time: Optional[datetime] = None - delete_row_count: Optional[int] = None - embedded_index: Optional[bytes] = None - file_source: Optional[str] = None - value_stats_cols: Optional[List[str]] = None - external_path: Optional[str] = None + creation_time: datetime | None = None + delete_row_count: int | None = None + embedded_index: bytes | None = None + file_source: str | None = None + value_stats_cols: List[str] | None = None + external_path: str | None = None + # not a schema field, just for internal usage file_path: str = None def set_file_path(self, table_path: Path, partition: BinaryRow, bucket: int): path_builder = table_path - partition_dict = partition.to_dict() for field_name, field_value in partition_dict.items(): path_builder = path_builder / (field_name + "=" + str(field_value)) - path_builder = path_builder / ("bucket-" + str(bucket)) / self.file_name - self.file_path = str(path_builder) @@ -78,11 +76,13 @@ DATA_FILE_META_SCHEMA = { {"name": "_SCHEMA_ID", "type": "long"}, {"name": "_LEVEL", "type": "int"}, {"name": "_EXTRA_FILES", "type": {"type": "array", "items": "string"}}, - {"name": "_CREATION_TIME", "type": ["null", "long"], "default": None}, + {"name": "_CREATION_TIME", + "type": [ + "null", + {"type": "long", "logicalType": "timestamp-millis"}], + "default": None}, {"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], "default": None}, {"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], "default": None}, {"name": "_FILE_SOURCE", "type": ["null", "int"], "default": None}, - {"name": "_VALUE_STATS_COLS", "type": ["null", {"type": "array", "items": "string"}], "default": None}, - {"name": "_EXTERNAL_PATH", "type": ["null", "string"], "default": None}, ] } diff --git a/paimon-python/pypaimon/manifest/schema/manifest_entry.py b/paimon-python/pypaimon/manifest/schema/manifest_entry.py index 1a80553188..75a51f30c5 100644 --- a/paimon-python/pypaimon/manifest/schema/manifest_entry.py +++ b/paimon-python/pypaimon/manifest/schema/manifest_entry.py @@ -36,6 +36,7 @@ MANIFEST_ENTRY_SCHEMA = { "type": "record", "name": "ManifestEntry", "fields": [ + {"name": "_VERSION", "type": "int"}, {"name": "_KIND", "type": "int"}, {"name": "_PARTITION", "type": "bytes"}, {"name": "_BUCKET", "type": "int"}, diff --git a/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py index a001fca4c2..443c6a0944 100644 --- a/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py +++ b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py @@ -30,16 +30,13 @@ class ManifestFileMeta: num_deleted_files: int partition_stats: SimpleStats schema_id: int - min_bucket: int - max_bucket: int - min_level: int - max_level: int MANIFEST_FILE_META_SCHEMA = { "type": "record", "name": "ManifestFileMeta", "fields": [ + {"name": "_VERSION", "type": "int"}, {"name": "_FILE_NAME", "type": "string"}, {"name": "_FILE_SIZE", "type": "long"}, {"name": "_NUM_ADDED_FILES", "type": "long"}, diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py b/paimon-python/pypaimon/manifest/schema/simple_stats.py index b291c12852..dd6924fb2e 100644 --- a/paimon-python/pypaimon/manifest/schema/simple_stats.py +++ b/paimon-python/pypaimon/manifest/schema/simple_stats.py @@ -17,6 +17,7 @@ ################################################################################ from dataclasses import dataclass +from typing import List from pypaimon.table.row.binary_row import BinaryRow @@ -25,15 +26,23 @@ from pypaimon.table.row.binary_row import BinaryRow class SimpleStats: min_value: BinaryRow max_value: BinaryRow - null_count: int + null_count: List[int] | None SIMPLE_STATS_SCHEMA = { "type": "record", "name": "SimpleStats", "fields": [ - {"name": "_MIN_VALUES", "type": ["null", "bytes"], "default": None}, - {"name": "_MAX_VALUES", "type": ["null", "bytes"], "default": None}, - {"name": "_NULL_COUNTS", "type": ["null", "long"], "default": None}, + {"name": "_MIN_VALUES", "type": "bytes"}, + {"name": "_MAX_VALUES", "type": "bytes"}, + {"name": "_NULL_COUNTS", + "type": [ + "null", + { + "type": "array", + "items": ["null", "long"] + } + ], + "default": None}, ] } diff --git a/paimon-python/pypaimon/read/reader/data_file_record_reader.py b/paimon-python/pypaimon/read/reader/data_file_record_reader.py index 41c1bf7517..c83f1ce152 100644 --- a/paimon-python/pypaimon/read/reader/data_file_record_reader.py +++ b/paimon-python/pypaimon/read/reader/data_file_record_reader.py @@ -23,6 +23,7 @@ from pyarrow import RecordBatch from pypaimon.read.partition_info import PartitionInfo from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader +from pypaimon.schema.data_types import DataField, PyarrowFieldParser class DataFileBatchReader(RecordBatchReader): @@ -31,11 +32,12 @@ class DataFileBatchReader(RecordBatchReader): """ def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], partition_info: PartitionInfo, - system_primary_key: Optional[List[str]]): + system_primary_key: Optional[List[str]], fields: List[DataField]): self.format_reader = format_reader self.index_mapping = index_mapping self.partition_info = partition_info self.system_primary_key = system_primary_key + self.schema_map = {field.name: field for field in PyarrowFieldParser.from_paimon_schema(fields)} def read_arrow_batch(self) -> Optional[RecordBatch]: record_batch = self.format_reader.read_arrow_batch() @@ -85,7 +87,17 @@ class DataFileBatchReader(RecordBatchReader): inter_arrays = mapped_arrays inter_names = mapped_names - return pa.RecordBatch.from_arrays(inter_arrays, names=inter_names) + # to contains 'not null' property + final_fields = [] + for i, name in enumerate(inter_names): + array = inter_arrays[i] + target_field = self.schema_map.get(name) + if not target_field: + target_field = pa.field(name, array.type) + final_fields.append(target_field) + final_schema = pa.schema(final_fields) + + return pa.RecordBatch.from_arrays(inter_arrays, schema=final_schema) def close(self) -> None: self.format_reader.close() diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index ea37593fdc..f085bac444 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -83,9 +83,11 @@ class SplitRead(ABC): index_mapping = self.create_index_mapping() partition_info = self.create_partition_info() if for_merge_read: - return DataFileBatchReader(format_reader, index_mapping, partition_info, self.trimmed_primary_key) + return DataFileBatchReader(format_reader, index_mapping, partition_info, self.trimmed_primary_key, + self.table.table_schema.fields) else: - return DataFileBatchReader(format_reader, index_mapping, partition_info, None) + return DataFileBatchReader(format_reader, index_mapping, partition_info, None, + self.table.table_schema.fields) @abstractmethod def _get_all_data_fields(self): diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index 2745b1d1b3..d89ddd0bcb 100644 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -63,11 +63,11 @@ class TableScan: latest_snapshot = self.snapshot_manager.get_latest_snapshot() if not latest_snapshot: return Plan([]) - manifest_files = self.manifest_list_manager.read_all_manifest_files(latest_snapshot) + manifest_files = self.manifest_list_manager.read_all(latest_snapshot) file_entries = [] - for manifest_file_path in manifest_files: - manifest_entries = self.manifest_file_manager.read(manifest_file_path, + for manifest_file in manifest_files: + manifest_entries = self.manifest_file_manager.read(manifest_file.file_name, lambda row: self._bucket_filter(row)) for entry in manifest_entries: if entry.kind == 0: diff --git a/paimon-python/pypaimon/schema/data_types.py b/paimon-python/pypaimon/schema/data_types.py index 8a22e7f012..a5186cc56c 100644 --- a/paimon-python/pypaimon/schema/data_types.py +++ b/paimon-python/pypaimon/schema/data_types.py @@ -279,13 +279,16 @@ class DataTypeParser: if "(" in type_upper: base_type = type_upper.split("(")[0] + elif " " in type_upper: + base_type = type_upper.split(" ")[0] + type_upper = base_type else: base_type = type_upper try: Keyword(base_type) return AtomicType( - type_string, DataTypeParser.parse_nullability(type_string) + type_upper, DataTypeParser.parse_nullability(type_string) ) except ValueError: raise Exception(f"Unknown type: {base_type}") @@ -345,11 +348,7 @@ class DataTypeParser: def parse_data_field( json_data: Dict[str, Any], field_id: Optional[AtomicInteger] = None ) -> DataField: - - if ( - DataField.FIELD_ID in json_data - and json_data[DataField.FIELD_ID] is not None - ): + if DataField.FIELD_ID in json_data and json_data[DataField.FIELD_ID] is not None: if field_id is not None and field_id.get() != -1: raise ValueError("Partial field id is not allowed.") field_id_value = int(json_data["id"]) @@ -486,7 +485,7 @@ class PyarrowFieldParser: return MapType(nullable, key_type, value_type) else: raise ValueError(f"Unknown type: {type_name}") - return AtomicType(type_name) + return AtomicType(type_name, nullable) @staticmethod def to_paimon_field(field_idx: int, pa_field: pyarrow.Field) -> DataField: diff --git a/paimon-python/pypaimon/schema/schema_manager.py b/paimon-python/pypaimon/schema/schema_manager.py index f0a108befb..f03b9e111b 100644 --- a/paimon-python/pypaimon/schema/schema_manager.py +++ b/paimon-python/pypaimon/schema/schema_manager.py @@ -19,6 +19,7 @@ from pathlib import Path from typing import Optional from pypaimon.common.file_io import FileIO +from pypaimon.common.rest_json import JSON from pypaimon.schema.schema import Schema from pypaimon.schema.table_schema import TableSchema @@ -42,15 +43,11 @@ class SchemaManager: except Exception as e: raise RuntimeError(f"Failed to load schema from path: {self.schema_path}") from e - def create_table(self, schema: Schema, external_table: bool = False) -> TableSchema: + def create_table(self, schema: Schema) -> TableSchema: while True: latest = self.latest() if latest is not None: - if external_table: - self._check_schema_for_external_table(latest.to_schema(), schema) - return latest - else: - raise RuntimeError("Schema in filesystem exists, creation is not allowed.") + raise RuntimeError("Schema in filesystem exists, creation is not allowed.") table_schema = TableSchema.from_schema(schema_id=0, schema=schema) success = self.commit(table_schema) @@ -60,7 +57,7 @@ class SchemaManager: def commit(self, new_schema: TableSchema) -> bool: schema_path = self._to_schema_path(new_schema.id) try: - return self.file_io.try_to_write_atomic(schema_path, new_schema.to_json()) + return self.file_io.try_to_write_atomic(schema_path, JSON.to_json(new_schema, indent=2)) except Exception as e: raise RuntimeError(f"Failed to commit schema: {e}") from e diff --git a/paimon-python/pypaimon/schema/table_schema.py b/paimon-python/pypaimon/schema/table_schema.py index 80b787eb6e..dc1872deb6 100644 --- a/paimon-python/pypaimon/schema/table_schema.py +++ b/paimon-python/pypaimon/schema/table_schema.py @@ -57,22 +57,6 @@ class TableSchema: comment: Optional[str] = json_field(FIELD_COMMENT, default=None) time_millis: int = json_field("timeMillis", default_factory=lambda: int(time.time() * 1000)) - def __init__(self, version: int, id: int, fields: List[DataField], highest_field_id: int, - partition_keys: List[str], primary_keys: List[str], options: Dict[str, str], - comment: Optional[str] = None, time_millis: Optional[int] = None): - self.version = version - self.id = id - self.fields = fields - self.highest_field_id = highest_field_id - self.partition_keys = partition_keys or [] - self.primary_keys = primary_keys or [] - self.options = options or {} - self.comment = comment - self.time_millis = time_millis if time_millis is not None else int(time.time() * 1000) - self.get_trimmed_primary_key_fields() - - from typing import List - def cross_partition_update(self) -> bool: if not self.primary_keys or not self.partition_keys: return False @@ -96,7 +80,7 @@ class TableSchema: partition_keys: List[str] = schema.partition_keys primary_keys: List[str] = schema.primary_keys options: Dict[str, str] = schema.options - highest_field_id: int = -1 # max(field.id for field in fields) + highest_field_id: int = max(field.id for field in fields) return TableSchema( TableSchema.CURRENT_VERSION, @@ -106,8 +90,7 @@ class TableSchema: partition_keys, primary_keys, options, - schema.comment, - int(time.time()) + schema.comment ) @staticmethod @@ -150,22 +133,6 @@ class TableSchema: except Exception as e: raise RuntimeError(f"Failed to parse schema from JSON: {e}") from e - def to_json(self) -> str: - data = { - TableSchema.FIELD_VERSION: self.version, - TableSchema.FIELD_ID: self.id, - TableSchema.FIELD_FIELDS: [field.to_dict() for field in self.fields], - TableSchema.FIELD_HIGHEST_FIELD_ID: self.highest_field_id, - TableSchema.FIELD_PARTITION_KEYS: self.partition_keys, - TableSchema.FIELD_PRIMARY_KEYS: self.primary_keys, - TableSchema.FIELD_OPTIONS: self.options, - TableSchema.FIELD_COMMENT: self.comment, - TableSchema.FIELD_TIME_MILLIS: self.time_millis - } - if self.comment is not None: - data["comment"] = self.comment - return json.dumps(data, indent=2, ensure_ascii=False) - def copy(self, new_options: Optional[Dict[str, str]] = None) -> "TableSchema": return TableSchema( version=self.version, diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index 1d131a492d..eb763d2938 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -49,10 +49,11 @@ class FileStoreTable(Table): self.primary_keys = table_schema.primary_keys self.partition_keys = table_schema.partition_keys self.options = table_schema.options + self.cross_partition_update = self.table_schema.cross_partition_update() + self.is_primary_key_table = bool(self.primary_keys) + self.total_buckets = int(table_schema.options.get(CoreOptions.BUCKET, -1)) self.schema_manager = SchemaManager(file_io, table_path) - self.is_primary_key_table = bool(self.primary_keys) - self.cross_partition_update = self.table_schema.cross_partition_update() def current_branch(self) -> str: """Get the current branch name from options.""" diff --git a/paimon-python/pypaimon/table/row/binary_row.py b/paimon-python/pypaimon/table/row/binary_row.py index 2e44ff417e..f1f8e740df 100644 --- a/paimon-python/pypaimon/table/row/binary_row.py +++ b/paimon-python/pypaimon/table/row/binary_row.py @@ -193,7 +193,7 @@ class BinaryRowDeserializer: return bytes_data[base_offset + sub_offset:base_offset + sub_offset + length] else: length = (offset_and_len & cls.HIGHEST_SECOND_TO_EIGHTH_BIT) >> 56 - return bytes_data[field_offset + 1:field_offset + 1 + length] + return bytes_data[field_offset:field_offset + length] @classmethod def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset: int, data_type: DataType) -> Decimal: @@ -238,8 +238,6 @@ class BinaryRowDeserializer: class BinaryRowSerializer: HEADER_SIZE_IN_BITS = 8 MAX_FIX_PART_DATA_SIZE = 7 - HIGHEST_FIRST_BIT = 0x80 << 56 - HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7F << 56 @classmethod def to_bytes(cls, binary_row: BinaryRow) -> bytes: @@ -252,79 +250,49 @@ class BinaryRowSerializer: fixed_part = bytearray(fixed_part_size) fixed_part[0] = binary_row.row_kind.value - for i, value in enumerate(binary_row.values): - if value is None: - cls._set_null_bit(fixed_part, 0, i) - - variable_data = [] - variable_offsets = [] - current_offset = fixed_part_size + variable_part_data = [] + current_variable_offset = 0 for i, (value, field) in enumerate(zip(binary_row.values, binary_row.fields)): + field_fixed_offset = null_bits_size_in_bytes + i * 8 + if value is None: - struct.pack_into('<q', fixed_part, null_bits_size_in_bytes + i * 8, 0) - variable_data.append(b'') - variable_offsets.append(0) + cls._set_null_bit(fixed_part, 0, i) + struct.pack_into('<q', fixed_part, field_fixed_offset, 0) continue - field_offset = null_bits_size_in_bytes + i * 8 if not isinstance(field.type, AtomicType): raise ValueError(f"BinaryRow only support AtomicType yet, meet {field.type.__class__}") - if field.type.type.upper() in ['VARCHAR', 'STRING', 'CHAR', 'BINARY', 'VARBINARY', 'BYTES']: - if field.type.type.upper() in ['VARCHAR', 'STRING', 'CHAR']: - if isinstance(value, str): - value_bytes = value.encode('utf-8') - else: - value_bytes = bytes(value) + + type_name = field.type.type.upper() + if type_name in ['VARCHAR', 'STRING', 'CHAR', 'BINARY', 'VARBINARY', 'BYTES']: + if type_name in ['VARCHAR', 'STRING', 'CHAR']: + value_bytes = str(value).encode('utf-8') else: - if isinstance(value, bytes): - value_bytes = value - else: - value_bytes = bytes(value) + value_bytes = bytes(value) length = len(value_bytes) if length <= cls.MAX_FIX_PART_DATA_SIZE: - fixed_part[field_offset:field_offset + length] = value_bytes - for j in range(length, 8): - fixed_part[field_offset + j] = 0 - packed_long = struct.unpack_from('<q', fixed_part, field_offset)[0] - - offset_and_len = packed_long | (length << 56) | cls.HIGHEST_FIRST_BIT - if offset_and_len > 0x7FFFFFFFFFFFFFFF: - offset_and_len = offset_and_len - 0x10000000000000000 - struct.pack_into('<q', fixed_part, field_offset, offset_and_len) - variable_data.append(b'') - variable_offsets.append(0) + fixed_part[field_fixed_offset: field_fixed_offset + length] = value_bytes + for j in range(length, 7): + fixed_part[field_fixed_offset + j] = 0 + header_byte = 0x80 | length + fixed_part[field_fixed_offset + 7] = header_byte else: - variable_data.append(value_bytes) - variable_offsets.append(current_offset) - current_offset += len(value_bytes) - offset_and_len = (variable_offsets[i] << 32) | len(variable_data[i]) - struct.pack_into('<q', fixed_part, null_bits_size_in_bytes + i * 8, offset_and_len) - else: - if field.type.type.upper() in ['BOOLEAN', 'BOOL']: - struct.pack_into('<b', fixed_part, field_offset, 1 if value else 0) - elif field.type.type.upper() in ['TINYINT', 'BYTE']: - struct.pack_into('<b', fixed_part, field_offset, value) - elif field.type.type.upper() in ['SMALLINT', 'SHORT']: - struct.pack_into('<h', fixed_part, field_offset, value) - elif field.type.type.upper() in ['INT', 'INTEGER']: - struct.pack_into('<i', fixed_part, field_offset, value) - elif field.type.type.upper() in ['BIGINT', 'LONG']: - struct.pack_into('<q', fixed_part, field_offset, value) - elif field.type.type.upper() in ['FLOAT', 'REAL']: - struct.pack_into('<f', fixed_part, field_offset, value) - elif field.type.type.upper() in ['DOUBLE']: - struct.pack_into('<d', fixed_part, field_offset, value) - else: - field_bytes = cls._serialize_field_value(value, field.type) - fixed_part[field_offset:field_offset + len(field_bytes)] = field_bytes + offset_in_variable_part = current_variable_offset + variable_part_data.append(value_bytes) + current_variable_offset += length - variable_data.append(b'') - variable_offsets.append(0) + absolute_offset = fixed_part_size + offset_in_variable_part + offset_and_len = (absolute_offset << 32) | length + struct.pack_into('<q', fixed_part, field_fixed_offset, offset_and_len) + else: + field_bytes = cls._serialize_field_value(value, field.type) + fixed_part[field_fixed_offset: field_fixed_offset + len(field_bytes)] = field_bytes - result = bytes(fixed_part) + b''.join(variable_data) - return result + row_data = bytes(fixed_part) + b''.join(variable_part_data) + arity_prefix = struct.pack('>i', arity) + return arity_prefix + row_data @classmethod def _calculate_bit_set_width_in_bytes(cls, arity: int) -> int: @@ -342,33 +310,29 @@ class BinaryRowSerializer: type_name = data_type.type.upper() if type_name in ['BOOLEAN', 'BOOL']: - return cls._serialize_boolean(value) + return cls._serialize_boolean(value) + b'\x00' * 7 elif type_name in ['TINYINT', 'BYTE']: - return cls._serialize_byte(value) + return cls._serialize_byte(value) + b'\x00' * 7 elif type_name in ['SMALLINT', 'SHORT']: - return cls._serialize_short(value) + return cls._serialize_short(value) + b'\x00' * 6 elif type_name in ['INT', 'INTEGER']: - return cls._serialize_int(value) + return cls._serialize_int(value) + b'\x00' * 4 elif type_name in ['BIGINT', 'LONG']: return cls._serialize_long(value) elif type_name in ['FLOAT', 'REAL']: - return cls._serialize_float(value) + return cls._serialize_float(value) + b'\x00' * 4 elif type_name in ['DOUBLE']: return cls._serialize_double(value) - elif type_name in ['VARCHAR', 'STRING', 'CHAR']: - return cls._serialize_string(value) - elif type_name in ['BINARY', 'VARBINARY', 'BYTES']: - return cls._serialize_binary(value) elif type_name in ['DECIMAL', 'NUMERIC']: return cls._serialize_decimal(value, data_type) elif type_name in ['TIMESTAMP', 'TIMESTAMP_WITHOUT_TIME_ZONE']: return cls._serialize_timestamp(value) elif type_name in ['DATE']: - return cls._serialize_date(value) + return cls._serialize_date(value) + b'\x00' * 4 elif type_name in ['TIME', 'TIME_WITHOUT_TIME_ZONE']: - return cls._serialize_time(value) + return cls._serialize_time(value) + b'\x00' * 4 else: - return cls._serialize_string(str(value)) + raise TypeError(f"Unsupported type for serialization: {type_name}") @classmethod def _serialize_boolean(cls, value: bool) -> bytes: @@ -398,32 +362,6 @@ class BinaryRowSerializer: def _serialize_double(cls, value: float) -> bytes: return struct.pack('<d', value) - @classmethod - def _serialize_string(cls, value) -> bytes: - if isinstance(value, str): - value_bytes = value.encode('utf-8') - else: - value_bytes = bytes(value) - - length = len(value_bytes) - - offset_and_len = (0x80 << 56) | (length << 56) - if offset_and_len > 0x7FFFFFFFFFFFFFFF: - offset_and_len = offset_and_len - 0x10000000000000000 - return struct.pack('<q', offset_and_len) - - @classmethod - def _serialize_binary(cls, value: bytes) -> bytes: - if isinstance(value, bytes): - data_bytes = value - else: - data_bytes = bytes(value) - length = len(data_bytes) - offset_and_len = (0x80 << 56) | (length << 56) - if offset_and_len > 0x7FFFFFFFFFFFFFFF: - offset_and_len = offset_and_len - 0x10000000000000000 - return struct.pack('<q', offset_and_len) - @classmethod def _serialize_decimal(cls, value: Decimal, data_type: DataType) -> bytes: type_str = str(data_type) @@ -452,11 +390,10 @@ class BinaryRowSerializer: @classmethod def _serialize_date(cls, value: datetime) -> bytes: if isinstance(value, datetime): - epoch = datetime(1970, 1, 1) - days = (value - epoch).days + epoch = datetime(1970, 1, 1).date() + days = (value.date() - epoch).days else: - epoch = datetime(1970, 1, 1) - days = (value - epoch).days + raise RuntimeError("date should be datatime") return struct.pack('<i', days) @classmethod diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py b/paimon-python/pypaimon/tests/reader_primary_key_test.py index 4671e32996..b9b115dfa4 100644 --- a/paimon-python/pypaimon/tests/reader_primary_key_test.py +++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py @@ -38,10 +38,10 @@ class PkReaderTest(unittest.TestCase): cls.catalog.create_database('default', False) cls.pa_schema = pa.schema([ - ('user_id', pa.int32()), + pa.field('user_id', pa.int32(), nullable=False), ('item_id', pa.int64()), ('behavior', pa.string()), - ('dt', pa.string()) + pa.field('dt', pa.string(), nullable=False) ]) cls.expected = pa.Table.from_pydict({ 'user_id': [1, 2, 3, 4, 5, 7, 8], diff --git a/paimon-python/pypaimon/tests/rest_catalog_base_test.py b/paimon-python/pypaimon/tests/rest_catalog_base_test.py index 773d94abed..c035957ccc 100644 --- a/paimon-python/pypaimon/tests/rest_catalog_base_test.py +++ b/paimon-python/pypaimon/tests/rest_catalog_base_test.py @@ -186,7 +186,7 @@ class RESTCatalogBaseTest(unittest.TestCase): self.assertTrue(os.path.exists(self.warehouse + "/default/test_table/snapshot/snapshot-1")) self.assertTrue(os.path.exists(self.warehouse + "/default/test_table/manifest")) self.assertTrue(os.path.exists(self.warehouse + "/default/test_table/dt=p1")) - self.assertEqual(len(glob.glob(self.warehouse + "/default/test_table/manifest/*.avro")), 2) + self.assertEqual(len(glob.glob(self.warehouse + "/default/test_table/manifest/*")), 3) def _write_test_table(self, table): write_builder = table.new_batch_write_builder() diff --git a/paimon-python/pypaimon/tests/rest_server.py b/paimon-python/pypaimon/tests/rest_server.py index d0486ec02c..c326f3525d 100644 --- a/paimon-python/pypaimon/tests/rest_server.py +++ b/paimon-python/pypaimon/tests/rest_server.py @@ -495,8 +495,8 @@ class RESTCatalogServer: def _write_snapshot_files(self, identifier: Identifier, snapshot, statistics): """Write snapshot and related files to the file system""" - import os import json + import os import uuid # Construct table path: {warehouse}/{database}/{table} diff --git a/paimon-python/pypaimon/tests/rest_table_test.py b/paimon-python/pypaimon/tests/rest_table_test.py index 9c64c17003..b905fa593b 100644 --- a/paimon-python/pypaimon/tests/rest_table_test.py +++ b/paimon-python/pypaimon/tests/rest_table_test.py @@ -168,7 +168,7 @@ class RESTTableTest(RESTCatalogBaseTest): self.assertTrue(os.path.exists(self.warehouse + "/default/test_postpone/snapshot/LATEST")) self.assertTrue(os.path.exists(self.warehouse + "/default/test_postpone/snapshot/snapshot-1")) self.assertTrue(os.path.exists(self.warehouse + "/default/test_postpone/manifest")) - self.assertEqual(len(glob.glob(self.warehouse + "/default/test_postpone/manifest/*.avro")), 2) + self.assertEqual(len(glob.glob(self.warehouse + "/default/test_postpone/manifest/*")), 3) self.assertEqual(len(glob.glob(self.warehouse + "/default/test_postpone/user_id=2/bucket-postpone/*.avro")), 1) def test_postpone_read_write(self): diff --git a/paimon-python/pypaimon/tests/schema_test.py b/paimon-python/pypaimon/tests/schema_test.py index daa9442e50..766676e95f 100644 --- a/paimon-python/pypaimon/tests/schema_test.py +++ b/paimon-python/pypaimon/tests/schema_test.py @@ -32,7 +32,8 @@ class SchemaTestCase(unittest.TestCase): DataField(0, "name", AtomicType('INT'), 'desc name'), DataField(1, "arr", ArrayType(True, AtomicType('INT')), 'desc arr1'), DataField(2, "map1", - MapType(False, AtomicType('INT'), MapType(False, AtomicType('INT'), AtomicType('INT'))), + MapType(False, AtomicType('INT', False), + MapType(False, AtomicType('INT', False), AtomicType('INT', False))), 'desc map1'), ] table_schema = TableSchema(TableSchema.CURRENT_VERSION, len(data_fields), data_fields, diff --git a/paimon-python/pypaimon/tests/writer_test.py b/paimon-python/pypaimon/tests/writer_test.py index c0c242d2c4..8bf38f72e0 100644 --- a/paimon-python/pypaimon/tests/writer_test.py +++ b/paimon-python/pypaimon/tests/writer_test.py @@ -71,7 +71,7 @@ class WriterTest(unittest.TestCase): self.assertTrue(os.path.exists(self.warehouse + "/test_db.db/test_table/snapshot/snapshot-1")) self.assertTrue(os.path.exists(self.warehouse + "/test_db.db/test_table/manifest")) self.assertTrue(os.path.exists(self.warehouse + "/test_db.db/test_table/bucket-0")) - self.assertEqual(len(glob.glob(self.warehouse + "/test_db.db/test_table/manifest/*.avro")), 2) + self.assertEqual(len(glob.glob(self.warehouse + "/test_db.db/test_table/manifest/*")), 3) self.assertEqual(len(glob.glob(self.warehouse + "/test_db.db/test_table/bucket-0/*.parquet")), 1) with open(self.warehouse + '/test_db.db/test_table/snapshot/snapshot-1', 'r', encoding='utf-8') as file: diff --git a/paimon-python/pypaimon/write/commit_message.py b/paimon-python/pypaimon/write/commit_message.py index 1a2177abb1..4e4c0f0b48 100644 --- a/paimon-python/pypaimon/write/commit_message.py +++ b/paimon-python/pypaimon/write/commit_message.py @@ -15,31 +15,17 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - +from dataclasses import dataclass from typing import List, Tuple from pypaimon.manifest.schema.data_file_meta import DataFileMeta +@dataclass class CommitMessage: - """Python implementation of CommitMessage""" - - def __init__(self, partition: Tuple, bucket: int, new_files: List[DataFileMeta]): - self._partition = partition - self._bucket = bucket - self._new_files = new_files or [] - - def partition(self) -> Tuple: - """Get the partition of this commit message.""" - return self._partition - - def bucket(self) -> int: - """Get the bucket of this commit message.""" - return self._bucket - - def new_files(self) -> List[DataFileMeta]: - """Get the list of new files.""" - return self._new_files + partition: Tuple + bucket: int + new_files: List[DataFileMeta] def is_empty(self): - return not self._new_files + return not self.new_files diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index e98cb8610f..ca26981333 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -17,14 +17,18 @@ ################################################################################ import time +import uuid from pathlib import Path from typing import List from pypaimon.catalog.snapshot_commit import PartitionStatistics, SnapshotCommit from pypaimon.manifest.manifest_file_manager import ManifestFileManager from pypaimon.manifest.manifest_list_manager import ManifestListManager +from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta +from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.snapshot.snapshot import Snapshot from pypaimon.snapshot.snapshot_manager import SnapshotManager +from pypaimon.table.row.binary_row import BinaryRow from pypaimon.write.commit_message import CommitMessage @@ -55,39 +59,69 @@ class FileStoreCommit: if not commit_messages: return - new_manifest_files = self.manifest_file_manager.write(commit_messages) - if not new_manifest_files: - return + unique_id = uuid.uuid4() + base_manifest_list = f"manifest-list-{unique_id}-0" + delta_manifest_list = f"manifest-list-{unique_id}-1" + + # process new_manifest + new_manifest_file = f"manifest-{str(uuid.uuid4())}-0" + self.manifest_file_manager.write(new_manifest_file, commit_messages) + + partition_columns = list(zip(*(msg.partition for msg in commit_messages))) + partition_min_stats = [min(col) for col in partition_columns] + partition_max_stats = [max(col) for col in partition_columns] + partition_null_counts = [sum(value == 0 for value in col) for col in partition_columns] + if not all(count == 0 for count in partition_null_counts): + raise RuntimeError("Partition value should not be null") + + new_manifest_list = ManifestFileMeta( + file_name=new_manifest_file, + file_size=self.table.file_io.get_file_size(self.manifest_file_manager.manifest_path / new_manifest_file), + num_added_files=sum(len(msg.new_files) for msg in commit_messages), + num_deleted_files=0, + partition_stats=SimpleStats( + min_value=BinaryRow( + values=partition_min_stats, + fields=self.table.table_schema.get_partition_key_fields(), + ), + max_value=BinaryRow( + values=partition_max_stats, + fields=self.table.table_schema.get_partition_key_fields(), + ), + null_count=partition_null_counts, + ), + schema_id=self.table.table_schema.id, + ) + self.manifest_list_manager.write(delta_manifest_list, [new_manifest_list]) + # process existing_manifest latest_snapshot = self.snapshot_manager.get_latest_snapshot() - - existing_manifest_files = [] - record_count_add = self._generate_record_count_add(commit_messages) - total_record_count = record_count_add - + total_record_count = 0 if latest_snapshot: - existing_manifest_files = self.manifest_list_manager.read_all_manifest_files(latest_snapshot) + existing_manifest_files = self.manifest_list_manager.read_all(latest_snapshot) previous_record_count = latest_snapshot.total_record_count if previous_record_count: total_record_count += previous_record_count + else: + existing_manifest_files = [] + self.manifest_list_manager.write(base_manifest_list, existing_manifest_files) - new_manifest_files.extend(existing_manifest_files) - manifest_list = self.manifest_list_manager.write(new_manifest_files) - + # process snapshot new_snapshot_id = self._generate_snapshot_id() + record_count_add = self._generate_record_count_add(commit_messages) + total_record_count += record_count_add snapshot_data = Snapshot( - version=3, + version=1, id=new_snapshot_id, - schema_id=0, - base_manifest_list=manifest_list, - delta_manifest_list=manifest_list, + schema_id=self.table.table_schema.id, + base_manifest_list=base_manifest_list, + delta_manifest_list=delta_manifest_list, total_record_count=total_record_count, delta_record_count=record_count_add, commit_user=self.commit_user, commit_identifier=commit_identifier, commit_kind="APPEND", time_millis=int(time.time() * 1000), - log_offsets={}, ) # Generate partition statistics for the commit @@ -101,46 +135,11 @@ class FileStoreCommit: def overwrite(self, partition, commit_messages: List[CommitMessage], commit_identifier: int): """Commit the given commit messages in overwrite mode.""" - if not commit_messages: - return - - new_manifest_files = self.manifest_file_manager.write(commit_messages) - if not new_manifest_files: - return - - # In overwrite mode, we don't merge with existing manifests - manifest_list = self.manifest_list_manager.write(new_manifest_files) - - record_count_add = self._generate_record_count_add(commit_messages) - - new_snapshot_id = self._generate_snapshot_id() - snapshot_data = Snapshot( - version=3, - id=new_snapshot_id, - schema_id=0, - base_manifest_list=manifest_list, - delta_manifest_list=manifest_list, - total_record_count=record_count_add, - delta_record_count=record_count_add, - commit_user=self.commit_user, - commit_identifier=commit_identifier, - commit_kind="OVERWRITE", - time_millis=int(time.time() * 1000), - log_offsets={}, - ) - - # Generate partition statistics for the commit - statistics = self._generate_partition_statistics(commit_messages) - - # Use SnapshotCommit for atomic commit - with self.snapshot_commit: - success = self.snapshot_commit.commit(snapshot_data, self.table.current_branch(), statistics) - if not success: - raise RuntimeError(f"Failed to commit snapshot {new_snapshot_id}") + raise RuntimeError("overwrite unsupported yet") def abort(self, commit_messages: List[CommitMessage]): for message in commit_messages: - for file in message.new_files(): + for file in message.new_files: try: file_path_obj = Path(file.file_path) if file_path_obj.exists(): @@ -179,7 +178,7 @@ class FileStoreCommit: for message in commit_messages: # Convert partition tuple to dictionary for PartitionStatistics - partition_value = message.partition() # Call the method to get partition value + partition_value = message.partition # Call the method to get partition value if partition_value: # Assuming partition is a tuple and we need to convert it to a dict # This may need adjustment based on actual partition format @@ -213,8 +212,7 @@ class FileStoreCommit: # Process each file in the commit message # Following Java implementation: PartitionEntry.fromDataFile() - new_files = message.new_files() - for file_meta in new_files: + for file_meta in message.new_files: # Extract actual file metadata (following Java DataFileMeta pattern) record_count = file_meta.row_count file_size_in_bytes = file_meta.file_size @@ -266,7 +264,7 @@ class FileStoreCommit: record_count = 0 for message in commit_messages: - new_files = message.new_files() + new_files = message.new_files for file_meta in new_files: record_count += file_meta.row_count diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index 3bf6150b03..bcef10a4c3 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -15,7 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - from typing import Dict, List, Tuple import pyarrow as pa @@ -34,6 +33,7 @@ class FileStoreWrite: self.table: FileStoreTable = table self.data_writers: Dict[Tuple, DataWriter] = {} + self.max_seq_numbers = self._seq_number_stats() # TODO: build this on-demand instead of on all def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch): key = (partition, bucket) @@ -48,12 +48,14 @@ class FileStoreWrite: table=self.table, partition=partition, bucket=bucket, + max_seq_number=self.max_seq_numbers.get((partition, bucket), 1), ) else: return AppendOnlyDataWriter( table=self.table, partition=partition, bucket=bucket, + max_seq_number=self.max_seq_numbers.get((partition, bucket), 1), ) def prepare_commit(self) -> List[CommitMessage]: @@ -74,3 +76,33 @@ class FileStoreWrite: for writer in self.data_writers.values(): writer.close() self.data_writers.clear() + + def _seq_number_stats(self) -> dict: + from pypaimon.manifest.manifest_file_manager import ManifestFileManager + from pypaimon.manifest.manifest_list_manager import ManifestListManager + from pypaimon.snapshot.snapshot_manager import SnapshotManager + + snapshot_manager = SnapshotManager(self.table) + manifest_list_manager = ManifestListManager(self.table) + manifest_file_manager = ManifestFileManager(self.table) + + latest_snapshot = snapshot_manager.get_latest_snapshot() + if not latest_snapshot: + return {} + manifest_files = manifest_list_manager.read_all(latest_snapshot) + + file_entries = [] + for manifest_file in manifest_files: + manifest_entries = manifest_file_manager.read(manifest_file.file_name) + for entry in manifest_entries: + if entry.kind == 0: + file_entries.append(entry) + + max_seq_numbers = {} + for entry in file_entries: + partition_key = (tuple(entry.partition.values), entry.bucket) + current_seq_num = entry.file.max_sequence_number + existing_max = max_seq_numbers.get(partition_key, -1) + if current_seq_num > existing_max: + max_seq_numbers[partition_key] = current_seq_num + return max_seq_numbers diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index c11d991b84..5d9641718c 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -15,16 +15,18 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - import uuid from abc import ABC, abstractmethod +from datetime import datetime from pathlib import Path from typing import List, Optional, Tuple import pyarrow as pa +import pyarrow.compute as pc from pypaimon.common.core_options import CoreOptions from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.table.bucket_mode import BucketMode from pypaimon.table.row.binary_row import BinaryRow @@ -32,7 +34,7 @@ from pypaimon.table.row.binary_row import BinaryRow class DataWriter(ABC): """Base class for data writers that handle PyArrow tables directly.""" - def __init__(self, table, partition: Tuple, bucket: int): + def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int): from pypaimon.table.file_store_table import FileStoreTable self.table: FileStoreTable = table @@ -50,6 +52,7 @@ class DataWriter(ABC): if self.bucket != BucketMode.POSTPONE_BUCKET.value else CoreOptions.FILE_FORMAT_AVRO) self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd") + self.sequence_generator = SequenceGenerator(max_seq_number) self.pending_data: Optional[pa.RecordBatch] = None self.committed_files: List[DataFileMeta] = [] @@ -89,7 +92,7 @@ class DataWriter(ABC): current_size = self.pending_data.get_total_buffer_size() if current_size > self.target_file_size: - split_row = _find_optimal_split_point(self.pending_data, self.target_file_size) + split_row = self._find_optimal_split_point(self.pending_data, self.target_file_size) if split_row > 0: data_to_write = self.pending_data.slice(0, split_row) remaining_data = self.pending_data.slice(split_row) @@ -101,7 +104,7 @@ class DataWriter(ABC): def _write_data_to_file(self, data: pa.RecordBatch): if data.num_rows == 0: return - file_name = f"data-{uuid.uuid4()}.{self.file_format}" + file_name = f"data-{uuid.uuid4()}-0.{self.file_format}" file_path = self._generate_file_path(file_name) if self.file_format == CoreOptions.FILE_FORMAT_PARQUET: self.file_io.write_parquet(file_path, data, compression=self.compression) @@ -112,24 +115,55 @@ class DataWriter(ABC): else: raise ValueError(f"Unsupported file format: {self.file_format}") + # min key & max key key_columns_batch = data.select(self.trimmed_primary_key) min_key_row_batch = key_columns_batch.slice(0, 1) - min_key_data = [col.to_pylist()[0] for col in min_key_row_batch.columns] max_key_row_batch = key_columns_batch.slice(key_columns_batch.num_rows - 1, 1) - max_key_data = [col.to_pylist()[0] for col in max_key_row_batch.columns] + min_key = [col.to_pylist()[0] for col in min_key_row_batch.columns] + max_key = [col.to_pylist()[0] for col in max_key_row_batch.columns] + + # key stats & value stats + column_stats = { + field.name: self._get_column_stats(data, field.name) + for field in self.table.table_schema.fields + } + all_fields = self.table.table_schema.fields + min_value_stats = [column_stats[field.name]['min_value'] for field in all_fields] + max_value_stats = [column_stats[field.name]['max_value'] for field in all_fields] + value_null_counts = [column_stats[field.name]['null_count'] for field in all_fields] + key_fields = self.trimmed_primary_key_fields + min_key_stats = [column_stats[field.name]['min_value'] for field in key_fields] + max_key_stats = [column_stats[field.name]['max_value'] for field in key_fields] + key_null_counts = [column_stats[field.name]['null_count'] for field in key_fields] + if not all(count == 0 for count in key_null_counts): + raise RuntimeError("Primary key should not be null") + + min_seq = self.sequence_generator.start + max_seq = self.sequence_generator.current + self.sequence_generator.start = self.sequence_generator.current self.committed_files.append(DataFileMeta( file_name=file_name, file_size=self.file_io.get_file_size(file_path), row_count=data.num_rows, - min_key=BinaryRow(min_key_data, self.trimmed_primary_key_fields), - max_key=BinaryRow(max_key_data, self.trimmed_primary_key_fields), - key_stats=None, # TODO - value_stats=None, - min_sequence_number=0, - max_sequence_number=0, - schema_id=0, + min_key=BinaryRow(min_key, self.trimmed_primary_key_fields), + max_key=BinaryRow(max_key, self.trimmed_primary_key_fields), + key_stats=SimpleStats( + BinaryRow(min_key_stats, self.trimmed_primary_key_fields), + BinaryRow(max_key_stats, self.trimmed_primary_key_fields), + key_null_counts, + ), + value_stats=SimpleStats( + BinaryRow(min_value_stats, self.table.table_schema.fields), + BinaryRow(max_value_stats, self.table.table_schema.fields), + value_null_counts, + ), + min_sequence_number=min_seq, + max_sequence_number=max_seq, + schema_id=self.table.table_schema.id, level=0, - extra_files=None, + extra_files=[], + creation_time=datetime.now(), + delete_row_count=0, file_path=str(file_path), )) @@ -146,24 +180,52 @@ class DataWriter(ABC): return path_builder - -def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int: - total_rows = data.num_rows - if total_rows <= 1: - return 0 - - left, right = 1, total_rows - best_split = 0 - - while left <= right: - mid = (left + right) // 2 - slice_data = data.slice(0, mid) - slice_size = slice_data.get_total_buffer_size() - - if slice_size <= target_size: - best_split = mid - left = mid + 1 - else: - right = mid - 1 - - return best_split + @staticmethod + def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int: + total_rows = data.num_rows + if total_rows <= 1: + return 0 + + left, right = 1, total_rows + best_split = 0 + + while left <= right: + mid = (left + right) // 2 + slice_data = data.slice(0, mid) + slice_size = slice_data.get_total_buffer_size() + + if slice_size <= target_size: + best_split = mid + left = mid + 1 + else: + right = mid - 1 + + return best_split + + @staticmethod + def _get_column_stats(record_batch: pa.RecordBatch, column_name: str) -> dict: + column_array = record_batch.column(column_name) + if column_array.null_count == len(column_array): + return { + "min_value": None, + "max_value": None, + "null_count": column_array.null_count, + } + min_value = pc.min(column_array).as_py() + max_value = pc.max(column_array).as_py() + null_count = column_array.null_count + return { + "min_value": min_value, + "max_value": max_value, + "null_count": null_count, + } + + +class SequenceGenerator: + def __init__(self, start: int = 0): + self.start = start + self.current = start + + def next(self) -> int: + self.current += 1 + return self.current diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py b/paimon-python/pypaimon/write/writer/key_value_data_writer.py index dbf47496c9..99b11a9788 100644 --- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py +++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py @@ -22,18 +22,6 @@ import pyarrow.compute as pc from pypaimon.write.writer.data_writer import DataWriter -class SequenceGenerator: - def __init__(self, start: int = 0): - self.current = start - - def next(self) -> int: - self.current += 1 - return self.current - - -sequence_generator = SequenceGenerator() - - class KeyValueDataWriter(DataWriter): """Data writer for primary key tables with system fields and sorting.""" @@ -55,11 +43,11 @@ class KeyValueDataWriter(DataWriter): key_column = data.column(pk_key) enhanced_table = enhanced_table.add_column(0, f'_KEY_{pk_key}', key_column) - sequence_column = pa.array([sequence_generator.next() for _ in range(num_rows)], type=pa.int64()) + sequence_column = pa.array([self.sequence_generator.next() for _ in range(num_rows)], type=pa.int64()) enhanced_table = enhanced_table.add_column(len(self.trimmed_primary_key), '_SEQUENCE_NUMBER', sequence_column) # TODO: support real row kind here - value_kind_column = pa.repeat(0, num_rows) + value_kind_column = pa.array([0] * num_rows, type=pa.int32()) enhanced_table = enhanced_table.add_column(len(self.trimmed_primary_key) + 1, '_VALUE_KIND', value_kind_column) diff --git a/paimon-python/setup.py b/paimon-python/setup.py index 507844685c..a00a2ac487 100644 --- a/paimon-python/setup.py +++ b/paimon-python/setup.py @@ -28,7 +28,8 @@ install_requires = [ 'ossfs==2023.12.0', 'pyarrow==16.0.0', 'polars==1.32.0', - 'fastavro==1.11.1' + 'fastavro==1.11.1', + 'zstandard==0.24.0' ] long_description = "See Apache Paimon Python API \
