This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 701002588d [python] Implement optimized path for split generation with 
raw_convertible (#6918)
701002588d is described below

commit 701002588d4f1291222ed90dd9cf6806f4ab2038
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Dec 29 21:35:08 2025 +0800

    [python] Implement optimized path for split generation with raw_convertible 
(#6918)
---
 .../pypaimon/common/options/core_options.py        |  21 ++
 .../pypaimon/common/options/options_utils.py       |  35 +++
 .../pypaimon/read/scanner/full_starting_scanner.py |  72 ++++--
 .../pypaimon/tests/reader_split_generator_test.py  | 249 +++++++++++++++++++++
 4 files changed, 360 insertions(+), 17 deletions(-)

diff --git a/paimon-python/pypaimon/common/options/core_options.py 
b/paimon-python/pypaimon/common/options/core_options.py
index 04c993b0e2..10ff120d75 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -33,6 +33,16 @@ class ExternalPathStrategy(str, Enum):
     SPECIFIC_FS = "specific-fs"
 
 
+class MergeEngine(str, Enum):
+    """
+    Specifies the merge engine for table with primary key.
+    """
+    DEDUPLICATE = "deduplicate"
+    PARTIAL_UPDATE = "partial-update"
+    AGGREGATE = "aggregation"
+    FIRST_ROW = "first-row"
+
+
 class CoreOptions:
     """Core options for Paimon tables."""
     # File format constants
@@ -206,6 +216,14 @@ class CoreOptions:
         .default_value(False)
         .with_description("Whether to enable deletion vectors.")
     )
+
+    MERGE_ENGINE: ConfigOption[MergeEngine] = (
+        ConfigOptions.key("merge-engine")
+        .enum_type(MergeEngine)
+        .default_value(MergeEngine.DEDUPLICATE)
+        .with_description("Specify the merge engine for table with primary 
key. "
+                          "Options: deduplicate, partial-update, aggregation, 
first-row.")
+    )
     # Commit options
     COMMIT_USER_PREFIX: ConfigOption[str] = (
         ConfigOptions.key("commit.user-prefix")
@@ -348,6 +366,9 @@ class CoreOptions:
     def deletion_vectors_enabled(self, default=None):
         return self.options.get(CoreOptions.DELETION_VECTORS_ENABLED, default)
 
+    def merge_engine(self, default=None):
+        return self.options.get(CoreOptions.MERGE_ENGINE, default)
+
     def data_file_external_paths(self, default=None):
         external_paths_str = 
self.options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS, default)
         if not external_paths_str:
diff --git a/paimon-python/pypaimon/common/options/options_utils.py 
b/paimon-python/pypaimon/common/options/options_utils.py
index 12c9eaad0f..f48f549df4 100644
--- a/paimon-python/pypaimon/common/options/options_utils.py
+++ b/paimon-python/pypaimon/common/options/options_utils.py
@@ -16,6 +16,7 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 """
 
+from enum import Enum
 from typing import Any, Type
 
 from pypaimon.common.memory_size import MemorySize
@@ -45,6 +46,12 @@ class OptionsUtils:
         if isinstance(value, target_type):
             return value
 
+        try:
+            if issubclass(target_type, Enum):
+                return OptionsUtils.convert_to_enum(value, target_type)
+        except TypeError:
+            pass
+
         # Handle string conversions
         if target_type == str:
             return OptionsUtils.convert_to_string(value)
@@ -117,3 +124,31 @@ class OptionsUtils:
         if isinstance(value, str):
             return MemorySize.parse(value)
         raise ValueError(f"Cannot convert {type(value)} to MemorySize")
+
+    @staticmethod
+    def convert_to_enum(value: Any, enum_class: Type[Enum]) -> Enum:
+
+        if isinstance(value, enum_class):
+            return value
+
+        if isinstance(value, str):
+            value_lower = value.lower().strip()
+            for enum_member in enum_class:
+                if enum_member.value.lower() == value_lower:
+                    return enum_member
+            try:
+                return enum_class[value.upper()]
+            except KeyError:
+                raise ValueError(
+                    f"Cannot convert '{value}' to {enum_class.__name__}. "
+                    f"Valid values: {[e.value for e in enum_class]}"
+                )
+        elif isinstance(value, Enum):
+            for enum_member in enum_class:
+                if enum_member.value == value.value:
+                    return enum_member
+            raise ValueError(
+                f"Cannot convert {value} (from {type(value).__name__}) to 
{enum_class.__name__}"
+            )
+        else:
+            raise ValueError(f"Cannot convert {type(value)} to 
{enum_class.__name__}")
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 96a848ce01..eebd3b6008 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -36,6 +36,7 @@ from pypaimon.read.split import Split
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.bucket_mode import BucketMode
 from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
+from pypaimon.common.options.core_options import MergeEngine
 
 
 class FullStartingScanner(StartingScanner):
@@ -471,6 +472,12 @@ class FullStartingScanner(StartingScanner):
             self._compute_split_start_end_row(splits, plan_start_row, 
plan_end_row)
         return splits
 
+    def _without_delete_row(self, data_file_meta: DataFileMeta) -> bool:
+        # null to true to be compatible with old version
+        if data_file_meta.delete_row_count is None:
+            return True
+        return data_file_meta.delete_row_count == 0
+
     def _create_primary_key_splits(
             self, file_entries: List[ManifestEntry], deletion_files_map: dict 
= None) -> List['Split']:
         if self.idx_of_this_subtask is not None:
@@ -479,28 +486,56 @@ class FullStartingScanner(StartingScanner):
         for entry in file_entries:
             partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
 
+        def single_weight_func(f: DataFileMeta) -> int:
+            return max(f.file_size, self.open_file_cost)
+
         def weight_func(fl: List[DataFileMeta]) -> int:
             return max(sum(f.file_size for f in fl), self.open_file_cost)
 
+        merge_engine = self.table.options.merge_engine()
+        merge_engine_first_row = merge_engine == MergeEngine.FIRST_ROW
+
         splits = []
         for key, file_entries in partitioned_files.items():
             if not file_entries:
-                return []
+                continue
 
             data_files: List[DataFileMeta] = [e.file for e in file_entries]
-            partition_sort_runs: List[List[SortedRun]] = 
IntervalPartition(data_files).partition()
-            sections: List[List[DataFileMeta]] = [
-                [file for s in sl for file in s.files]
-                for sl in partition_sort_runs
-            ]
 
-            packed_files: List[List[List[DataFileMeta]]] = 
self._pack_for_ordered(sections, weight_func,
-                                                                               
   self.target_split_size)
-            flatten_packed_files: List[List[DataFileMeta]] = [
-                [file for sub_pack in pack for file in sub_pack]
-                for pack in packed_files
-            ]
-            splits += self._build_split_from_pack(flatten_packed_files, 
file_entries, True, deletion_files_map)
+            raw_convertible = all(
+                f.level != 0 and self._without_delete_row(f)
+                for f in data_files
+            )
+
+            levels = {f.level for f in data_files}
+            one_level = len(levels) == 1
+
+            use_optimized_path = raw_convertible and (
+                self.deletion_vectors_enabled or merge_engine_first_row or 
one_level)
+            if use_optimized_path:
+                packed_files: List[List[DataFileMeta]] = 
self._pack_for_ordered(
+                    data_files, single_weight_func, self.target_split_size
+                )
+                splits += self._build_split_from_pack(
+                    packed_files, file_entries, True, deletion_files_map,
+                    use_optimized_path)
+            else:
+                partition_sort_runs: List[List[SortedRun]] = 
IntervalPartition(data_files).partition()
+                sections: List[List[DataFileMeta]] = [
+                    [file for s in sl for file in s.files]
+                    for sl in partition_sort_runs
+                ]
+
+                packed_files: List[List[List[DataFileMeta]]] = 
self._pack_for_ordered(sections, weight_func,
+                                                                               
       self.target_split_size)
+
+                flatten_packed_files: List[List[DataFileMeta]] = [
+                    [file for sub_pack in pack for file in sub_pack]
+                    for pack in packed_files
+                ]
+                splits += self._build_split_from_pack(
+                    flatten_packed_files, file_entries, True,
+                    deletion_files_map, False)
         return splits
 
     def _create_data_evolution_splits(
@@ -595,12 +630,15 @@ class FullStartingScanner(StartingScanner):
         return split_by_row_id
 
     def _build_split_from_pack(self, packed_files, file_entries, 
for_primary_key_split: bool,
-                               deletion_files_map: dict = None) -> 
List['Split']:
+                               deletion_files_map: dict = None, 
use_optimized_path: bool = False) -> List['Split']:
         splits = []
         for file_group in packed_files:
-            raw_convertible = True
-            if for_primary_key_split:
-                raw_convertible = len(file_group) == 1
+            if use_optimized_path:
+                raw_convertible = True
+            elif for_primary_key_split:
+                raw_convertible = len(file_group) == 1 and 
self._without_delete_row(file_group[0])
+            else:
+                raw_convertible = True
 
             file_paths = []
             total_file_size = 0
diff --git a/paimon-python/pypaimon/tests/reader_split_generator_test.py 
b/paimon-python/pypaimon/tests/reader_split_generator_test.py
new file mode 100644
index 0000000000..ad2f9e084b
--- /dev/null
+++ b/paimon-python/pypaimon/tests/reader_split_generator_test.py
@@ -0,0 +1,249 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""
+Test cases for split generation logic, matching Java's SplitGeneratorTest.
+
+This test covers MergeTree split generation and rawConvertible logic.
+"""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.common.options.core_options import CoreOptions, MergeEngine
+
+
+class SplitGeneratorTest(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({
+            'warehouse': cls.warehouse
+        })
+        cls.catalog.create_database('default', False)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def _create_table(self, table_name, merge_engine='deduplicate',
+                      deletion_vectors_enabled=False,
+                      split_target_size=None, split_open_file_cost=None):
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('value', pa.string())
+        ])
+        options = {
+            'bucket': '1',  # Single bucket for testing
+            'merge-engine': merge_engine,
+            'deletion-vectors.enabled': str(deletion_vectors_enabled).lower()
+        }
+        if split_target_size is not None:
+            options[CoreOptions.SOURCE_SPLIT_TARGET_SIZE.key()] = 
split_target_size
+        if split_open_file_cost is not None:
+            options[CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST.key()] = 
split_open_file_cost
+        
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            primary_keys=['id'],
+            options=options
+        )
+        self.catalog.create_table(f'default.{table_name}', schema, False)
+        return self.catalog.get_table(f'default.{table_name}')
+
+    def _create_test_data(self, id_ranges):
+        return [
+            {'id': list(range(start, end)) if isinstance(start, int) else 
start,
+             'value': [f'v{i}' for i in (range(start, end) if 
isinstance(start, int) else start)]}
+            for start, end in id_ranges
+        ]
+    
+    def _write_data(self, table, data_list):
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('value', pa.string())
+        ])
+        for data in data_list:
+            write_builder = table.new_batch_write_builder()
+            writer = write_builder.new_write()
+            commit = write_builder.new_commit()
+            try:
+                batch = pa.Table.from_pydict(data, schema=pa_schema)
+                writer.write_arrow(batch)
+                commit.commit(writer.prepare_commit())
+            finally:
+                writer.close()
+                commit.close()
+
+    def _get_splits_info(self, table):
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        splits = table_scan.plan().splits()
+        
+        result = []
+        for split in splits:
+            file_names = sorted([f.file_name for f in split.files])
+            result.append((file_names, split.raw_convertible))
+        return result
+
+    def test_merge_tree(self):
+        test_data = self._create_test_data([
+            (0, 11), (0, 13), (15, 61), (18, 41), (82, 86), (100, 201)
+        ])
+        
+        table1 = self._create_table('test_merge_tree_1', 
split_target_size='1kb', split_open_file_cost='100b')
+        self._write_data(table1, test_data)
+        splits_info1 = self._get_splits_info(table1)
+        self.assertGreater(len(splits_info1), 0)
+        total_files1 = sum(len(files) for files, _ in splits_info1)
+        self.assertEqual(total_files1, 6)
+        self.assertLessEqual(len(splits_info1), 6)
+        
+        table2 = self._create_table('test_merge_tree_2', 
split_target_size='1kb', split_open_file_cost='1kb')
+        self._write_data(table2, test_data)
+        splits_info2 = self._get_splits_info(table2)
+        self.assertGreater(len(splits_info2), 0)
+        total_files2 = sum(len(files) for files, _ in splits_info2)
+        self.assertEqual(total_files2, 6)
+        self.assertGreaterEqual(len(splits_info2), len(splits_info1))
+        
+        for file_names, raw_convertible in splits_info1 + splits_info2:
+            self.assertGreater(len(file_names), 0)
+
+    def test_split_raw_convertible(self):
+        table = self._create_table('test_raw_convertible')
+        self._write_data(table, [{'id': [1, 2], 'value': ['a', 'b']}])
+        splits = table.new_read_builder().new_scan().plan().splits()
+        for split in splits:
+            if len(split.files) == 1:
+                has_delete_rows = any(f.delete_row_count and 
f.delete_row_count > 0 for f in split.files)
+                if not has_delete_rows:
+                    self.assertTrue(split.raw_convertible)
+        
+        table_dv = self._create_table('test_dv', deletion_vectors_enabled=True)
+        self._write_data(table_dv, [
+            {'id': [1, 2], 'value': ['a', 'b']},
+            {'id': [3, 4], 'value': ['c', 'd']},
+        ])
+        splits_dv = table_dv.new_read_builder().new_scan().plan().splits()
+        
+        if len(splits_dv) == 0:
+            pass
+        else:
+            for split in splits_dv:
+                if len(split.files) == 1:
+                    has_delete_rows = any(f.delete_row_count and 
f.delete_row_count > 0 for f in split.files)
+                    if not has_delete_rows:
+                        self.assertTrue(split.raw_convertible)
+        
+        table_first_row = self._create_table('test_first_row', 
merge_engine='first-row')
+        self._write_data(table_first_row, [
+            {'id': [1, 2], 'value': ['a', 'b']},
+            {'id': [3, 4], 'value': ['c', 'd']},
+        ])
+        splits_first_row = 
table_first_row.new_read_builder().new_scan().plan().splits()
+        for split in splits_first_row:
+            if len(split.files) == 1:
+                has_delete_rows = any(f.delete_row_count and 
f.delete_row_count > 0 for f in split.files)
+                if not has_delete_rows:
+                    self.assertTrue(split.raw_convertible)
+
+    def test_merge_tree_split_raw_convertible(self):
+        table = self._create_table('test_mixed_levels')
+        self._write_data(table, self._create_test_data([
+            (0, 11), (0, 13), (13, 21), (21, 221), (201, 211), (211, 221)
+        ]))
+        splits = table.new_read_builder().new_scan().plan().splits()
+        self.assertGreater(len(splits), 0)
+        
+        deletion_vectors_enabled = table.options.deletion_vectors_enabled()
+        merge_engine = table.options.merge_engine()
+        merge_engine_first_row = merge_engine == MergeEngine.FIRST_ROW
+        
+        for split in splits:
+            has_level_0 = any(f.level == 0 for f in split.files)
+            has_delete_rows = any(f.delete_row_count and f.delete_row_count > 
0 for f in split.files)
+            
+            if len(split.files) == 1:
+                if not has_level_0 and not has_delete_rows:
+                    self.assertTrue(
+                        split.raw_convertible,
+                        "Single file split should be raw_convertible")
+            else:
+                all_non_level0_no_delete = all(
+                    f.level != 0 and (not f.delete_row_count or 
f.delete_row_count == 0)
+                    for f in split.files
+                )
+                levels = {f.level for f in split.files}
+                one_level = len(levels) == 1
+                
+                use_optimized_path = all_non_level0_no_delete and (
+                    deletion_vectors_enabled or merge_engine_first_row or 
one_level
+                )
+                
+                if use_optimized_path:
+                    self.assertTrue(
+                        split.raw_convertible,
+                        "Multi-file split should be raw_convertible when 
optimized path is used")
+                else:
+                    self.assertFalse(
+                        split.raw_convertible,
+                        "Multi-file split should not be raw_convertible when 
optimized path is not used")
+
+    def test_shard_with_empty_partition(self):
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('value', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            primary_keys=['id'],
+            options={'bucket': '3'}  # Use 3 buckets for shard testing
+        )
+        self.catalog.create_table('default.test_shard_empty_partition', 
schema, False)
+        table = self.catalog.get_table('default.test_shard_empty_partition')
+        
+        self._write_data(table, [
+            {'id': [0, 3, 6], 'value': ['v0', 'v3', 'v6']},
+            {'id': [1, 4, 7], 'value': ['v1', 'v4', 'v7']},
+            {'id': [2, 5, 8], 'value': ['v2', 'v5', 'v8']},
+        ])
+        
+        read_builder = table.new_read_builder()
+        
+        splits_all = read_builder.new_scan().plan().splits()
+        self.assertGreater(len(splits_all), 0, "Should have splits without 
shard filtering")
+        
+        splits_shard_0 = read_builder.new_scan().with_shard(0, 
3).plan().splits()
+        
+        self.assertGreaterEqual(
+            len(splits_shard_0), 0,
+            "Should return splits even if some partitions are empty after 
shard filtering")
+        
+        for split in splits_shard_0:
+            self.assertGreater(len(split.files), 0, "Each split should have at 
least one file")
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to