This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 85f4b8f8a1 [python] support custom source split target size and split 
open file cost (#6527)
85f4b8f8a1 is described below

commit 85f4b8f8a1121008c5e6d7b14183ab59e4f4c6cb
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Nov 5 17:54:46 2025 +0800

    [python] support custom source split target size and split open file cost 
(#6527)
---
 paimon-python/pypaimon/common/core_options.py      |  20 ++
 paimon-python/pypaimon/common/memory_size.py       | 201 +++++++++++++++++++++
 .../pypaimon/read/scanner/full_starting_scanner.py |   5 +-
 paimon-python/pypaimon/tests/reader_base_test.py   | 132 ++++++++++++++
 4 files changed, 356 insertions(+), 2 deletions(-)

diff --git a/paimon-python/pypaimon/common/core_options.py 
b/paimon-python/pypaimon/common/core_options.py
index aab339e269..d6643399bb 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -18,6 +18,8 @@
 
 from enum import Enum
 
+from pypaimon.common.memory_size import MemorySize
+
 
 class CoreOptions(str, Enum):
     """Core options for paimon."""
@@ -48,6 +50,8 @@ class CoreOptions(str, Enum):
     # Scan options
     SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
     INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
+    SOURCE_SPLIT_TARGET_SIZE = "source.split.target-size"
+    SOURCE_SPLIT_OPEN_FILE_COST = "source.split.open-file-cost"
     # Commit options
     COMMIT_USER_PREFIX = "commit.user-prefix"
     ROW_TRACKING_ENABLED = "row-tracking.enabled"
@@ -56,3 +60,19 @@ class CoreOptions(str, Enum):
     @staticmethod
     def get_blob_as_descriptor(options: dict) -> bool:
         return options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, 
"false").lower() == 'true'
+
+    @staticmethod
+    def get_split_target_size(options: dict) -> int:
+        """Get split target size from options, default to 128MB."""
+        if CoreOptions.SOURCE_SPLIT_TARGET_SIZE in options:
+            size_str = options[CoreOptions.SOURCE_SPLIT_TARGET_SIZE]
+            return MemorySize.parse(size_str).get_bytes()
+        return MemorySize.of_mebi_bytes(128).get_bytes()
+
+    @staticmethod
+    def get_split_open_file_cost(options: dict) -> int:
+        """Get split open file cost from options, default to 4MB."""
+        if CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST in options:
+            cost_str = options[CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST]
+            return MemorySize.parse(cost_str).get_bytes()
+        return MemorySize.of_mebi_bytes(4).get_bytes()
diff --git a/paimon-python/pypaimon/common/memory_size.py 
b/paimon-python/pypaimon/common/memory_size.py
new file mode 100644
index 0000000000..b68ccfc210
--- /dev/null
+++ b/paimon-python/pypaimon/common/memory_size.py
@@ -0,0 +1,201 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Optional
+
+
+class MemorySize:
+    """MemorySize is a representation of a number of bytes, viewable in 
different units."""
+
+    ZERO = None
+    MAX_VALUE = None
+
+    def __init__(self, bytes: int):
+        """Constructs a new MemorySize."""
+        if bytes < 0:
+            raise ValueError("bytes must be >= 0")
+        self.bytes = bytes
+
+    @staticmethod
+    def of_mebi_bytes(mebi_bytes: int) -> 'MemorySize':
+        return MemorySize(mebi_bytes << 20)
+
+    @staticmethod
+    def of_kibi_bytes(kibi_bytes: int) -> 'MemorySize':
+        return MemorySize(kibi_bytes << 10)
+
+    @staticmethod
+    def of_bytes(bytes: int) -> 'MemorySize':
+        return MemorySize(bytes)
+
+    def get_bytes(self) -> int:
+        return self.bytes
+
+    def get_kibi_bytes(self) -> int:
+        return self.bytes >> 10
+
+    def get_mebi_bytes(self) -> int:
+        return self.bytes >> 20
+
+    def get_gibi_bytes(self) -> int:
+        return self.bytes >> 30
+
+    def get_tebi_bytes(self) -> int:
+        return self.bytes >> 40
+
+    def __eq__(self, other) -> bool:
+        return isinstance(other, MemorySize) and self.bytes == other.bytes
+
+    def __hash__(self) -> int:
+        return hash(self.bytes)
+
+    def __str__(self) -> str:
+        return self.format_to_string()
+
+    def format_to_string(self) -> str:
+        ORDERED_UNITS = [MemoryUnit.BYTES, MemoryUnit.KILO_BYTES, 
MemoryUnit.MEGA_BYTES,
+                         MemoryUnit.GIGA_BYTES, MemoryUnit.TERA_BYTES]
+
+        highest_integer_unit = MemoryUnit.BYTES
+        for idx, unit in enumerate(ORDERED_UNITS):
+            if self.bytes % unit.multiplier != 0:
+                if idx == 0:
+                    highest_integer_unit = ORDERED_UNITS[0]
+                else:
+                    highest_integer_unit = ORDERED_UNITS[idx - 1]
+                break
+        else:
+            highest_integer_unit = MemoryUnit.BYTES
+
+        return f"{self.bytes // highest_integer_unit.multiplier} 
{highest_integer_unit.units[1]}"
+
+    def __repr__(self) -> str:
+        return f"MemorySize({self.bytes})"
+
+    def __lt__(self, other: 'MemorySize') -> bool:
+        return self.bytes < other.bytes
+
+    def __le__(self, other: 'MemorySize') -> bool:
+        return self.bytes <= other.bytes
+
+    def __gt__(self, other: 'MemorySize') -> bool:
+        return self.bytes > other.bytes
+
+    def __ge__(self, other: 'MemorySize') -> bool:
+        return self.bytes >= other.bytes
+
+    @staticmethod
+    def parse(text: str) -> 'MemorySize':
+        return MemorySize(MemorySize.parse_bytes(text))
+
+    @staticmethod
+    def parse_bytes(text: str) -> int:
+        if text is None:
+            raise ValueError("text cannot be None")
+
+        trimmed = text.strip()
+        if not trimmed:
+            raise ValueError("argument is an empty- or whitespace-only string")
+
+        pos = 0
+        while pos < len(trimmed) and trimmed[pos].isdigit():
+            pos += 1
+
+        number_str = trimmed[:pos]
+        unit_str = trimmed[pos:].strip().lower()
+
+        if not number_str:
+            raise ValueError("text does not start with a number")
+
+        try:
+            value = int(number_str)
+        except ValueError:
+            raise ValueError(
+                f"The value '{number_str}' cannot be represented as 64bit 
number (numeric overflow).")
+
+        unit = MemorySize._parse_unit(unit_str)
+        multiplier = unit.multiplier if unit else 1
+        result = value * multiplier
+
+        if result // multiplier != value:
+            raise ValueError(
+                f"The value '{text}' cannot be represented as 64bit number of 
bytes (numeric overflow).")
+
+        return result
+
+    @staticmethod
+    def _parse_unit(unit_str: str) -> Optional['MemoryUnit']:
+        if not unit_str:
+            return None
+
+        for unit in [MemoryUnit.BYTES, MemoryUnit.KILO_BYTES, 
MemoryUnit.MEGA_BYTES,
+                     MemoryUnit.GIGA_BYTES, MemoryUnit.TERA_BYTES]:
+            if unit_str in unit.units:
+                return unit
+
+        raise ValueError(
+            f"Memory size unit '{unit_str}' does not match any of the 
recognized units: "
+            f"{MemoryUnit.get_all_units()}")
+
+
+class MemoryUnit:
+    """Enum which defines memory unit, mostly used to parse value from 
configuration file."""
+
+    def __init__(self, units: list, multiplier: int):
+        self.units = units
+        self.multiplier = multiplier
+
+    BYTES = None
+    KILO_BYTES = None
+    MEGA_BYTES = None
+    GIGA_BYTES = None
+    TERA_BYTES = None
+
+    @staticmethod
+    def get_all_units() -> str:
+        all_units = []
+        for unit in [MemoryUnit.BYTES, MemoryUnit.KILO_BYTES, 
MemoryUnit.MEGA_BYTES,
+                     MemoryUnit.GIGA_BYTES, MemoryUnit.TERA_BYTES]:
+            all_units.append("(" + " | ".join(unit.units) + ")")
+        return " / ".join(all_units)
+
+    @staticmethod
+    def has_unit(text: str) -> bool:
+        if text is None:
+            raise ValueError("text cannot be None")
+
+        trimmed = text.strip()
+        if not trimmed:
+            raise ValueError("argument is an empty- or whitespace-only string")
+
+        pos = 0
+        while pos < len(trimmed) and trimmed[pos].isdigit():
+            pos += 1
+
+        unit = trimmed[pos:].strip().lower()
+        return len(unit) > 0
+
+
+MemoryUnit.BYTES = MemoryUnit(["b", "bytes"], 1)
+MemoryUnit.KILO_BYTES = MemoryUnit(["k", "kb", "kibibytes"], 1024)
+MemoryUnit.MEGA_BYTES = MemoryUnit(["m", "mb", "mebibytes"], 1024 * 1024)
+MemoryUnit.GIGA_BYTES = MemoryUnit(["g", "gb", "gibibytes"], 1024 * 1024 * 
1024)
+MemoryUnit.TERA_BYTES = MemoryUnit(["t", "tb", "tebibytes"], 1024 * 1024 * 
1024 * 1024)
+
+MemorySize.ZERO = MemorySize(0)
+MemorySize.MAX_VALUE = MemorySize(2**63 - 1)
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index b3c18a7bb6..dfbe329f43 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -54,8 +54,9 @@ class FullStartingScanner(StartingScanner):
         self.partition_key_predicate = trim_and_transform_predicate(
             self.predicate, self.table.field_names, self.table.partition_keys)
 
-        self.target_split_size = 128 * 1024 * 1024
-        self.open_file_cost = 4 * 1024 * 1024
+        # Get split target size and open file cost from table options
+        self.target_split_size = 
CoreOptions.get_split_target_size(self.table.options)
+        self.open_file_cost = 
CoreOptions.get_split_open_file_cost(self.table.options)
 
         self.idx_of_this_subtask = None
         self.number_of_para_subtasks = None
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py 
b/paimon-python/pypaimon/tests/reader_base_test.py
index ebe98e4fc5..fdcb9f1ea2 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -675,3 +675,135 @@ class ReaderBasicTest(unittest.TestCase):
                 max_values)
 
         self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)
+
+    def test_split_target_size(self):
+        """Test source.split.target-size configuration effect on split 
generation."""
+        from pypaimon.common.core_options import CoreOptions
+
+        pa_schema = pa.schema([
+            ('f0', pa.int64()),
+            ('f1', pa.string())
+        ])
+
+        # Test with small target_split_size (512B) - should generate more 
splits
+        schema_small = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={CoreOptions.SOURCE_SPLIT_TARGET_SIZE: '512b'}
+        )
+        self.catalog.create_table('default.test_split_target_size_small', 
schema_small, False)
+        table_small = 
self.catalog.get_table('default.test_split_target_size_small')
+
+        for i in range(10):
+            write_builder = table_small.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({
+                'f0': list(range(i * 100, (i + 1) * 100)),
+                'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)]
+            }, schema=pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        read_builder = table_small.new_read_builder()
+        splits_small = read_builder.new_scan().plan().splits()
+
+        schema_default = Schema.from_pyarrow_schema(pa_schema)
+        self.catalog.create_table('default.test_split_target_size_default', 
schema_default, False)
+        table_default = 
self.catalog.get_table('default.test_split_target_size_default')
+
+        for i in range(10):
+            write_builder = table_default.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({
+                'f0': list(range(i * 100, (i + 1) * 100)),
+                'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)]
+            }, schema=pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        # Generate splits with default target_split_size
+        read_builder = table_default.new_read_builder()
+        splits_default = read_builder.new_scan().plan().splits()
+
+        self.assertGreater(
+            len(splits_small), len(splits_default),
+            f"Small target_split_size should generate more splits. "
+            f"Got {len(splits_small)} splits with 512B vs "
+            f"{len(splits_default)} splits with default")
+
+    def test_split_open_file_cost(self):
+        """Test source.split.open-file-cost configuration effect on split 
generation."""
+        from pypaimon.common.core_options import CoreOptions
+
+        pa_schema = pa.schema([
+            ('f0', pa.int64()),
+            ('f1', pa.string())
+        ])
+
+        # Test with large open_file_cost (64MB) - should generate more splits
+        schema_large_cost = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                CoreOptions.SOURCE_SPLIT_TARGET_SIZE: '128mb',
+                CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST: '64mb'
+            }
+        )
+        self.catalog.create_table('default.test_split_open_file_cost_large', 
schema_large_cost, False)
+        table_large_cost = 
self.catalog.get_table('default.test_split_open_file_cost_large')
+
+        # Write multiple batches to create multiple files
+        # Write 10 batches, each with 100 rows
+        for i in range(10):
+            write_builder = table_large_cost.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({
+                'f0': list(range(i * 100, (i + 1) * 100)),
+                'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)]
+            }, schema=pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        # Generate splits with large open_file_cost
+        read_builder = table_large_cost.new_read_builder()
+        splits_large_cost = read_builder.new_scan().plan().splits()
+
+        # Test with default open_file_cost (4MB) - should generate fewer splits
+        schema_default = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={CoreOptions.SOURCE_SPLIT_TARGET_SIZE: '128mb'}
+        )
+        self.catalog.create_table('default.test_split_open_file_cost_default', 
schema_default, False)
+        table_default = 
self.catalog.get_table('default.test_split_open_file_cost_default')
+
+        # Write same amount of data
+        for i in range(10):
+            write_builder = table_default.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            data = pa.Table.from_pydict({
+                'f0': list(range(i * 100, (i + 1) * 100)),
+                'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)]
+            }, schema=pa_schema)
+            table_write.write_arrow(data)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        # Generate splits with default open_file_cost
+        read_builder = table_default.new_read_builder()
+        splits_default = read_builder.new_scan().plan().splits()
+
+        # With default open_file_cost (4MB), more files can be packed into 
each split
+        self.assertGreater(
+            len(splits_large_cost), len(splits_default),
+            f"Large open_file_cost should generate more splits. "
+            f"Got {len(splits_large_cost)} splits with 64MB cost vs "
+            f"{len(splits_default)} splits with default")

Reply via email to