This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 701002588d [python] Implement optimized path for split generation with
raw_convertible (#6918)
701002588d is described below
commit 701002588d4f1291222ed90dd9cf6806f4ab2038
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Dec 29 21:35:08 2025 +0800
[python] Implement optimized path for split generation with raw_convertible
(#6918)
---
.../pypaimon/common/options/core_options.py | 21 ++
.../pypaimon/common/options/options_utils.py | 35 +++
.../pypaimon/read/scanner/full_starting_scanner.py | 72 ++++--
.../pypaimon/tests/reader_split_generator_test.py | 249 +++++++++++++++++++++
4 files changed, 360 insertions(+), 17 deletions(-)
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index 04c993b0e2..10ff120d75 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -33,6 +33,16 @@ class ExternalPathStrategy(str, Enum):
SPECIFIC_FS = "specific-fs"
+class MergeEngine(str, Enum):
+ """
+ Specifies the merge engine for table with primary key.
+ """
+ DEDUPLICATE = "deduplicate"
+ PARTIAL_UPDATE = "partial-update"
+ AGGREGATE = "aggregation"
+ FIRST_ROW = "first-row"
+
+
class CoreOptions:
"""Core options for Paimon tables."""
# File format constants
@@ -206,6 +216,14 @@ class CoreOptions:
.default_value(False)
.with_description("Whether to enable deletion vectors.")
)
+
+ MERGE_ENGINE: ConfigOption[MergeEngine] = (
+ ConfigOptions.key("merge-engine")
+ .enum_type(MergeEngine)
+ .default_value(MergeEngine.DEDUPLICATE)
+ .with_description("Specify the merge engine for table with primary
key. "
+ "Options: deduplicate, partial-update, aggregation,
first-row.")
+ )
# Commit options
COMMIT_USER_PREFIX: ConfigOption[str] = (
ConfigOptions.key("commit.user-prefix")
@@ -348,6 +366,9 @@ class CoreOptions:
def deletion_vectors_enabled(self, default=None):
return self.options.get(CoreOptions.DELETION_VECTORS_ENABLED, default)
+ def merge_engine(self, default=None):
+ return self.options.get(CoreOptions.MERGE_ENGINE, default)
+
def data_file_external_paths(self, default=None):
external_paths_str =
self.options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS, default)
if not external_paths_str:
diff --git a/paimon-python/pypaimon/common/options/options_utils.py
b/paimon-python/pypaimon/common/options/options_utils.py
index 12c9eaad0f..f48f549df4 100644
--- a/paimon-python/pypaimon/common/options/options_utils.py
+++ b/paimon-python/pypaimon/common/options/options_utils.py
@@ -16,6 +16,7 @@ See the License for the specific language governing
permissions and
limitations under the License.
"""
+from enum import Enum
from typing import Any, Type
from pypaimon.common.memory_size import MemorySize
@@ -45,6 +46,12 @@ class OptionsUtils:
if isinstance(value, target_type):
return value
+ try:
+ if issubclass(target_type, Enum):
+ return OptionsUtils.convert_to_enum(value, target_type)
+ except TypeError:
+ pass
+
# Handle string conversions
if target_type == str:
return OptionsUtils.convert_to_string(value)
@@ -117,3 +124,31 @@ class OptionsUtils:
if isinstance(value, str):
return MemorySize.parse(value)
raise ValueError(f"Cannot convert {type(value)} to MemorySize")
+
+ @staticmethod
+ def convert_to_enum(value: Any, enum_class: Type[Enum]) -> Enum:
+
+ if isinstance(value, enum_class):
+ return value
+
+ if isinstance(value, str):
+ value_lower = value.lower().strip()
+ for enum_member in enum_class:
+ if enum_member.value.lower() == value_lower:
+ return enum_member
+ try:
+ return enum_class[value.upper()]
+ except KeyError:
+ raise ValueError(
+ f"Cannot convert '{value}' to {enum_class.__name__}. "
+ f"Valid values: {[e.value for e in enum_class]}"
+ )
+ elif isinstance(value, Enum):
+ for enum_member in enum_class:
+ if enum_member.value == value.value:
+ return enum_member
+ raise ValueError(
+ f"Cannot convert {value} (from {type(value).__name__}) to
{enum_class.__name__}"
+ )
+ else:
+ raise ValueError(f"Cannot convert {type(value)} to
{enum_class.__name__}")
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 96a848ce01..eebd3b6008 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -36,6 +36,7 @@ from pypaimon.read.split import Split
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
+from pypaimon.common.options.core_options import MergeEngine
class FullStartingScanner(StartingScanner):
@@ -471,6 +472,12 @@ class FullStartingScanner(StartingScanner):
self._compute_split_start_end_row(splits, plan_start_row,
plan_end_row)
return splits
+ def _without_delete_row(self, data_file_meta: DataFileMeta) -> bool:
+ # null to true to be compatible with old version
+ if data_file_meta.delete_row_count is None:
+ return True
+ return data_file_meta.delete_row_count == 0
+
def _create_primary_key_splits(
self, file_entries: List[ManifestEntry], deletion_files_map: dict
= None) -> List['Split']:
if self.idx_of_this_subtask is not None:
@@ -479,28 +486,56 @@ class FullStartingScanner(StartingScanner):
for entry in file_entries:
partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
+ def single_weight_func(f: DataFileMeta) -> int:
+ return max(f.file_size, self.open_file_cost)
+
def weight_func(fl: List[DataFileMeta]) -> int:
return max(sum(f.file_size for f in fl), self.open_file_cost)
+ merge_engine = self.table.options.merge_engine()
+ merge_engine_first_row = merge_engine == MergeEngine.FIRST_ROW
+
splits = []
for key, file_entries in partitioned_files.items():
if not file_entries:
- return []
+ continue
data_files: List[DataFileMeta] = [e.file for e in file_entries]
- partition_sort_runs: List[List[SortedRun]] =
IntervalPartition(data_files).partition()
- sections: List[List[DataFileMeta]] = [
- [file for s in sl for file in s.files]
- for sl in partition_sort_runs
- ]
- packed_files: List[List[List[DataFileMeta]]] =
self._pack_for_ordered(sections, weight_func,
-
self.target_split_size)
- flatten_packed_files: List[List[DataFileMeta]] = [
- [file for sub_pack in pack for file in sub_pack]
- for pack in packed_files
- ]
- splits += self._build_split_from_pack(flatten_packed_files,
file_entries, True, deletion_files_map)
+ raw_convertible = all(
+ f.level != 0 and self._without_delete_row(f)
+ for f in data_files
+ )
+
+ levels = {f.level for f in data_files}
+ one_level = len(levels) == 1
+
+ use_optimized_path = raw_convertible and (
+ self.deletion_vectors_enabled or merge_engine_first_row or
one_level)
+ if use_optimized_path:
+ packed_files: List[List[DataFileMeta]] =
self._pack_for_ordered(
+ data_files, single_weight_func, self.target_split_size
+ )
+ splits += self._build_split_from_pack(
+ packed_files, file_entries, True, deletion_files_map,
+ use_optimized_path)
+ else:
+ partition_sort_runs: List[List[SortedRun]] =
IntervalPartition(data_files).partition()
+ sections: List[List[DataFileMeta]] = [
+ [file for s in sl for file in s.files]
+ for sl in partition_sort_runs
+ ]
+
+ packed_files: List[List[List[DataFileMeta]]] =
self._pack_for_ordered(sections, weight_func,
+
self.target_split_size)
+
+ flatten_packed_files: List[List[DataFileMeta]] = [
+ [file for sub_pack in pack for file in sub_pack]
+ for pack in packed_files
+ ]
+ splits += self._build_split_from_pack(
+ flatten_packed_files, file_entries, True,
+ deletion_files_map, False)
return splits
def _create_data_evolution_splits(
@@ -595,12 +630,15 @@ class FullStartingScanner(StartingScanner):
return split_by_row_id
def _build_split_from_pack(self, packed_files, file_entries,
for_primary_key_split: bool,
- deletion_files_map: dict = None) ->
List['Split']:
+ deletion_files_map: dict = None,
use_optimized_path: bool = False) -> List['Split']:
splits = []
for file_group in packed_files:
- raw_convertible = True
- if for_primary_key_split:
- raw_convertible = len(file_group) == 1
+ if use_optimized_path:
+ raw_convertible = True
+ elif for_primary_key_split:
+ raw_convertible = len(file_group) == 1 and
self._without_delete_row(file_group[0])
+ else:
+ raw_convertible = True
file_paths = []
total_file_size = 0
diff --git a/paimon-python/pypaimon/tests/reader_split_generator_test.py
b/paimon-python/pypaimon/tests/reader_split_generator_test.py
new file mode 100644
index 0000000000..ad2f9e084b
--- /dev/null
+++ b/paimon-python/pypaimon/tests/reader_split_generator_test.py
@@ -0,0 +1,249 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""
+Test cases for split generation logic, matching Java's SplitGeneratorTest.
+
+This test covers MergeTree split generation and rawConvertible logic.
+"""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.common.options.core_options import CoreOptions, MergeEngine
+
+
+class SplitGeneratorTest(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({
+ 'warehouse': cls.warehouse
+ })
+ cls.catalog.create_database('default', False)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def _create_table(self, table_name, merge_engine='deduplicate',
+ deletion_vectors_enabled=False,
+ split_target_size=None, split_open_file_cost=None):
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('value', pa.string())
+ ])
+ options = {
+ 'bucket': '1', # Single bucket for testing
+ 'merge-engine': merge_engine,
+ 'deletion-vectors.enabled': str(deletion_vectors_enabled).lower()
+ }
+ if split_target_size is not None:
+ options[CoreOptions.SOURCE_SPLIT_TARGET_SIZE.key()] =
split_target_size
+ if split_open_file_cost is not None:
+ options[CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST.key()] =
split_open_file_cost
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ primary_keys=['id'],
+ options=options
+ )
+ self.catalog.create_table(f'default.{table_name}', schema, False)
+ return self.catalog.get_table(f'default.{table_name}')
+
+ def _create_test_data(self, id_ranges):
+ return [
+ {'id': list(range(start, end)) if isinstance(start, int) else
start,
+ 'value': [f'v{i}' for i in (range(start, end) if
isinstance(start, int) else start)]}
+ for start, end in id_ranges
+ ]
+
+ def _write_data(self, table, data_list):
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('value', pa.string())
+ ])
+ for data in data_list:
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ commit = write_builder.new_commit()
+ try:
+ batch = pa.Table.from_pydict(data, schema=pa_schema)
+ writer.write_arrow(batch)
+ commit.commit(writer.prepare_commit())
+ finally:
+ writer.close()
+ commit.close()
+
+ def _get_splits_info(self, table):
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ splits = table_scan.plan().splits()
+
+ result = []
+ for split in splits:
+ file_names = sorted([f.file_name for f in split.files])
+ result.append((file_names, split.raw_convertible))
+ return result
+
+ def test_merge_tree(self):
+ test_data = self._create_test_data([
+ (0, 11), (0, 13), (15, 61), (18, 41), (82, 86), (100, 201)
+ ])
+
+ table1 = self._create_table('test_merge_tree_1',
split_target_size='1kb', split_open_file_cost='100b')
+ self._write_data(table1, test_data)
+ splits_info1 = self._get_splits_info(table1)
+ self.assertGreater(len(splits_info1), 0)
+ total_files1 = sum(len(files) for files, _ in splits_info1)
+ self.assertEqual(total_files1, 6)
+ self.assertLessEqual(len(splits_info1), 6)
+
+ table2 = self._create_table('test_merge_tree_2',
split_target_size='1kb', split_open_file_cost='1kb')
+ self._write_data(table2, test_data)
+ splits_info2 = self._get_splits_info(table2)
+ self.assertGreater(len(splits_info2), 0)
+ total_files2 = sum(len(files) for files, _ in splits_info2)
+ self.assertEqual(total_files2, 6)
+ self.assertGreaterEqual(len(splits_info2), len(splits_info1))
+
+ for file_names, raw_convertible in splits_info1 + splits_info2:
+ self.assertGreater(len(file_names), 0)
+
+ def test_split_raw_convertible(self):
+ table = self._create_table('test_raw_convertible')
+ self._write_data(table, [{'id': [1, 2], 'value': ['a', 'b']}])
+ splits = table.new_read_builder().new_scan().plan().splits()
+ for split in splits:
+ if len(split.files) == 1:
+ has_delete_rows = any(f.delete_row_count and
f.delete_row_count > 0 for f in split.files)
+ if not has_delete_rows:
+ self.assertTrue(split.raw_convertible)
+
+ table_dv = self._create_table('test_dv', deletion_vectors_enabled=True)
+ self._write_data(table_dv, [
+ {'id': [1, 2], 'value': ['a', 'b']},
+ {'id': [3, 4], 'value': ['c', 'd']},
+ ])
+ splits_dv = table_dv.new_read_builder().new_scan().plan().splits()
+
+ if len(splits_dv) == 0:
+ pass
+ else:
+ for split in splits_dv:
+ if len(split.files) == 1:
+ has_delete_rows = any(f.delete_row_count and
f.delete_row_count > 0 for f in split.files)
+ if not has_delete_rows:
+ self.assertTrue(split.raw_convertible)
+
+ table_first_row = self._create_table('test_first_row',
merge_engine='first-row')
+ self._write_data(table_first_row, [
+ {'id': [1, 2], 'value': ['a', 'b']},
+ {'id': [3, 4], 'value': ['c', 'd']},
+ ])
+ splits_first_row =
table_first_row.new_read_builder().new_scan().plan().splits()
+ for split in splits_first_row:
+ if len(split.files) == 1:
+ has_delete_rows = any(f.delete_row_count and
f.delete_row_count > 0 for f in split.files)
+ if not has_delete_rows:
+ self.assertTrue(split.raw_convertible)
+
+ def test_merge_tree_split_raw_convertible(self):
+ table = self._create_table('test_mixed_levels')
+ self._write_data(table, self._create_test_data([
+ (0, 11), (0, 13), (13, 21), (21, 221), (201, 211), (211, 221)
+ ]))
+ splits = table.new_read_builder().new_scan().plan().splits()
+ self.assertGreater(len(splits), 0)
+
+ deletion_vectors_enabled = table.options.deletion_vectors_enabled()
+ merge_engine = table.options.merge_engine()
+ merge_engine_first_row = merge_engine == MergeEngine.FIRST_ROW
+
+ for split in splits:
+ has_level_0 = any(f.level == 0 for f in split.files)
+ has_delete_rows = any(f.delete_row_count and f.delete_row_count >
0 for f in split.files)
+
+ if len(split.files) == 1:
+ if not has_level_0 and not has_delete_rows:
+ self.assertTrue(
+ split.raw_convertible,
+ "Single file split should be raw_convertible")
+ else:
+ all_non_level0_no_delete = all(
+ f.level != 0 and (not f.delete_row_count or
f.delete_row_count == 0)
+ for f in split.files
+ )
+ levels = {f.level for f in split.files}
+ one_level = len(levels) == 1
+
+ use_optimized_path = all_non_level0_no_delete and (
+ deletion_vectors_enabled or merge_engine_first_row or
one_level
+ )
+
+ if use_optimized_path:
+ self.assertTrue(
+ split.raw_convertible,
+ "Multi-file split should be raw_convertible when
optimized path is used")
+ else:
+ self.assertFalse(
+ split.raw_convertible,
+ "Multi-file split should not be raw_convertible when
optimized path is not used")
+
+ def test_shard_with_empty_partition(self):
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('value', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ primary_keys=['id'],
+ options={'bucket': '3'} # Use 3 buckets for shard testing
+ )
+ self.catalog.create_table('default.test_shard_empty_partition',
schema, False)
+ table = self.catalog.get_table('default.test_shard_empty_partition')
+
+ self._write_data(table, [
+ {'id': [0, 3, 6], 'value': ['v0', 'v3', 'v6']},
+ {'id': [1, 4, 7], 'value': ['v1', 'v4', 'v7']},
+ {'id': [2, 5, 8], 'value': ['v2', 'v5', 'v8']},
+ ])
+
+ read_builder = table.new_read_builder()
+
+ splits_all = read_builder.new_scan().plan().splits()
+ self.assertGreater(len(splits_all), 0, "Should have splits without
shard filtering")
+
+ splits_shard_0 = read_builder.new_scan().with_shard(0,
3).plan().splits()
+
+ self.assertGreaterEqual(
+ len(splits_shard_0), 0,
+ "Should return splits even if some partitions are empty after
shard filtering")
+
+ for split in splits_shard_0:
+ self.assertGreater(len(split.files), 0, "Each split should have at
least one file")
+
+
+if __name__ == '__main__':
+ unittest.main()