This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit e6ea490bf9bec519c65e2ff4578dc9a180ddc7fa Author: XiaoHongbo <[email protected]> AuthorDate: Wed Nov 5 17:54:46 2025 +0800 [python] support custom source split target size and split open file cost (#6527) --- paimon-python/pypaimon/common/core_options.py | 20 ++ paimon-python/pypaimon/common/memory_size.py | 201 +++++++++++++++++++++ .../pypaimon/read/scanner/full_starting_scanner.py | 5 +- paimon-python/pypaimon/tests/reader_base_test.py | 132 ++++++++++++++ 4 files changed, 356 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/common/core_options.py b/paimon-python/pypaimon/common/core_options.py index aab339e269..d6643399bb 100644 --- a/paimon-python/pypaimon/common/core_options.py +++ b/paimon-python/pypaimon/common/core_options.py @@ -18,6 +18,8 @@ from enum import Enum +from pypaimon.common.memory_size import MemorySize + class CoreOptions(str, Enum): """Core options for paimon.""" @@ -48,6 +50,8 @@ class CoreOptions(str, Enum): # Scan options SCAN_FALLBACK_BRANCH = "scan.fallback-branch" INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp" + SOURCE_SPLIT_TARGET_SIZE = "source.split.target-size" + SOURCE_SPLIT_OPEN_FILE_COST = "source.split.open-file-cost" # Commit options COMMIT_USER_PREFIX = "commit.user-prefix" ROW_TRACKING_ENABLED = "row-tracking.enabled" @@ -56,3 +60,19 @@ class CoreOptions(str, Enum): @staticmethod def get_blob_as_descriptor(options: dict) -> bool: return options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, "false").lower() == 'true' + + @staticmethod + def get_split_target_size(options: dict) -> int: + """Get split target size from options, default to 128MB.""" + if CoreOptions.SOURCE_SPLIT_TARGET_SIZE in options: + size_str = options[CoreOptions.SOURCE_SPLIT_TARGET_SIZE] + return MemorySize.parse(size_str).get_bytes() + return MemorySize.of_mebi_bytes(128).get_bytes() + + @staticmethod + def get_split_open_file_cost(options: dict) -> int: + """Get split open file cost from options, default to 4MB.""" + if CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST in options: + cost_str = options[CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST] + return MemorySize.parse(cost_str).get_bytes() + return MemorySize.of_mebi_bytes(4).get_bytes() diff --git a/paimon-python/pypaimon/common/memory_size.py b/paimon-python/pypaimon/common/memory_size.py new file mode 100644 index 0000000000..b68ccfc210 --- /dev/null +++ b/paimon-python/pypaimon/common/memory_size.py @@ -0,0 +1,201 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import Optional + + +class MemorySize: + """MemorySize is a representation of a number of bytes, viewable in different units.""" + + ZERO = None + MAX_VALUE = None + + def __init__(self, bytes: int): + """Constructs a new MemorySize.""" + if bytes < 0: + raise ValueError("bytes must be >= 0") + self.bytes = bytes + + @staticmethod + def of_mebi_bytes(mebi_bytes: int) -> 'MemorySize': + return MemorySize(mebi_bytes << 20) + + @staticmethod + def of_kibi_bytes(kibi_bytes: int) -> 'MemorySize': + return MemorySize(kibi_bytes << 10) + + @staticmethod + def of_bytes(bytes: int) -> 'MemorySize': + return MemorySize(bytes) + + def get_bytes(self) -> int: + return self.bytes + + def get_kibi_bytes(self) -> int: + return self.bytes >> 10 + + def get_mebi_bytes(self) -> int: + return self.bytes >> 20 + + def get_gibi_bytes(self) -> int: + return self.bytes >> 30 + + def get_tebi_bytes(self) -> int: + return self.bytes >> 40 + + def __eq__(self, other) -> bool: + return isinstance(other, MemorySize) and self.bytes == other.bytes + + def __hash__(self) -> int: + return hash(self.bytes) + + def __str__(self) -> str: + return self.format_to_string() + + def format_to_string(self) -> str: + ORDERED_UNITS = [MemoryUnit.BYTES, MemoryUnit.KILO_BYTES, MemoryUnit.MEGA_BYTES, + MemoryUnit.GIGA_BYTES, MemoryUnit.TERA_BYTES] + + highest_integer_unit = MemoryUnit.BYTES + for idx, unit in enumerate(ORDERED_UNITS): + if self.bytes % unit.multiplier != 0: + if idx == 0: + highest_integer_unit = ORDERED_UNITS[0] + else: + highest_integer_unit = ORDERED_UNITS[idx - 1] + break + else: + highest_integer_unit = MemoryUnit.BYTES + + return f"{self.bytes // highest_integer_unit.multiplier} {highest_integer_unit.units[1]}" + + def __repr__(self) -> str: + return f"MemorySize({self.bytes})" + + def __lt__(self, other: 'MemorySize') -> bool: + return self.bytes < other.bytes + + def __le__(self, other: 'MemorySize') -> bool: + return self.bytes <= other.bytes + + def __gt__(self, other: 'MemorySize') -> bool: + return self.bytes > other.bytes + + def __ge__(self, other: 'MemorySize') -> bool: + return self.bytes >= other.bytes + + @staticmethod + def parse(text: str) -> 'MemorySize': + return MemorySize(MemorySize.parse_bytes(text)) + + @staticmethod + def parse_bytes(text: str) -> int: + if text is None: + raise ValueError("text cannot be None") + + trimmed = text.strip() + if not trimmed: + raise ValueError("argument is an empty- or whitespace-only string") + + pos = 0 + while pos < len(trimmed) and trimmed[pos].isdigit(): + pos += 1 + + number_str = trimmed[:pos] + unit_str = trimmed[pos:].strip().lower() + + if not number_str: + raise ValueError("text does not start with a number") + + try: + value = int(number_str) + except ValueError: + raise ValueError( + f"The value '{number_str}' cannot be represented as 64bit number (numeric overflow).") + + unit = MemorySize._parse_unit(unit_str) + multiplier = unit.multiplier if unit else 1 + result = value * multiplier + + if result // multiplier != value: + raise ValueError( + f"The value '{text}' cannot be represented as 64bit number of bytes (numeric overflow).") + + return result + + @staticmethod + def _parse_unit(unit_str: str) -> Optional['MemoryUnit']: + if not unit_str: + return None + + for unit in [MemoryUnit.BYTES, MemoryUnit.KILO_BYTES, MemoryUnit.MEGA_BYTES, + MemoryUnit.GIGA_BYTES, MemoryUnit.TERA_BYTES]: + if unit_str in unit.units: + return unit + + raise ValueError( + f"Memory size unit '{unit_str}' does not match any of the recognized units: " + f"{MemoryUnit.get_all_units()}") + + +class MemoryUnit: + """Enum which defines memory unit, mostly used to parse value from configuration file.""" + + def __init__(self, units: list, multiplier: int): + self.units = units + self.multiplier = multiplier + + BYTES = None + KILO_BYTES = None + MEGA_BYTES = None + GIGA_BYTES = None + TERA_BYTES = None + + @staticmethod + def get_all_units() -> str: + all_units = [] + for unit in [MemoryUnit.BYTES, MemoryUnit.KILO_BYTES, MemoryUnit.MEGA_BYTES, + MemoryUnit.GIGA_BYTES, MemoryUnit.TERA_BYTES]: + all_units.append("(" + " | ".join(unit.units) + ")") + return " / ".join(all_units) + + @staticmethod + def has_unit(text: str) -> bool: + if text is None: + raise ValueError("text cannot be None") + + trimmed = text.strip() + if not trimmed: + raise ValueError("argument is an empty- or whitespace-only string") + + pos = 0 + while pos < len(trimmed) and trimmed[pos].isdigit(): + pos += 1 + + unit = trimmed[pos:].strip().lower() + return len(unit) > 0 + + +MemoryUnit.BYTES = MemoryUnit(["b", "bytes"], 1) +MemoryUnit.KILO_BYTES = MemoryUnit(["k", "kb", "kibibytes"], 1024) +MemoryUnit.MEGA_BYTES = MemoryUnit(["m", "mb", "mebibytes"], 1024 * 1024) +MemoryUnit.GIGA_BYTES = MemoryUnit(["g", "gb", "gibibytes"], 1024 * 1024 * 1024) +MemoryUnit.TERA_BYTES = MemoryUnit(["t", "tb", "tebibytes"], 1024 * 1024 * 1024 * 1024) + +MemorySize.ZERO = MemorySize(0) +MemorySize.MAX_VALUE = MemorySize(2**63 - 1) diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index b3c18a7bb6..dfbe329f43 100644 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -54,8 +54,9 @@ class FullStartingScanner(StartingScanner): self.partition_key_predicate = trim_and_transform_predicate( self.predicate, self.table.field_names, self.table.partition_keys) - self.target_split_size = 128 * 1024 * 1024 - self.open_file_cost = 4 * 1024 * 1024 + # Get split target size and open file cost from table options + self.target_split_size = CoreOptions.get_split_target_size(self.table.options) + self.open_file_cost = CoreOptions.get_split_open_file_cost(self.table.options) self.idx_of_this_subtask = None self.number_of_para_subtasks = None diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index ebe98e4fc5..fdcb9f1ea2 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -675,3 +675,135 @@ class ReaderBasicTest(unittest.TestCase): max_values) self.assertEqual(read_entry.file.value_stats.null_counts, null_counts) + + def test_split_target_size(self): + """Test source.split.target-size configuration effect on split generation.""" + from pypaimon.common.core_options import CoreOptions + + pa_schema = pa.schema([ + ('f0', pa.int64()), + ('f1', pa.string()) + ]) + + # Test with small target_split_size (512B) - should generate more splits + schema_small = Schema.from_pyarrow_schema( + pa_schema, + options={CoreOptions.SOURCE_SPLIT_TARGET_SIZE: '512b'} + ) + self.catalog.create_table('default.test_split_target_size_small', schema_small, False) + table_small = self.catalog.get_table('default.test_split_target_size_small') + + for i in range(10): + write_builder = table_small.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + data = pa.Table.from_pydict({ + 'f0': list(range(i * 100, (i + 1) * 100)), + 'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)] + }, schema=pa_schema) + table_write.write_arrow(data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + read_builder = table_small.new_read_builder() + splits_small = read_builder.new_scan().plan().splits() + + schema_default = Schema.from_pyarrow_schema(pa_schema) + self.catalog.create_table('default.test_split_target_size_default', schema_default, False) + table_default = self.catalog.get_table('default.test_split_target_size_default') + + for i in range(10): + write_builder = table_default.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + data = pa.Table.from_pydict({ + 'f0': list(range(i * 100, (i + 1) * 100)), + 'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)] + }, schema=pa_schema) + table_write.write_arrow(data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # Generate splits with default target_split_size + read_builder = table_default.new_read_builder() + splits_default = read_builder.new_scan().plan().splits() + + self.assertGreater( + len(splits_small), len(splits_default), + f"Small target_split_size should generate more splits. " + f"Got {len(splits_small)} splits with 512B vs " + f"{len(splits_default)} splits with default") + + def test_split_open_file_cost(self): + """Test source.split.open-file-cost configuration effect on split generation.""" + from pypaimon.common.core_options import CoreOptions + + pa_schema = pa.schema([ + ('f0', pa.int64()), + ('f1', pa.string()) + ]) + + # Test with large open_file_cost (64MB) - should generate more splits + schema_large_cost = Schema.from_pyarrow_schema( + pa_schema, + options={ + CoreOptions.SOURCE_SPLIT_TARGET_SIZE: '128mb', + CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST: '64mb' + } + ) + self.catalog.create_table('default.test_split_open_file_cost_large', schema_large_cost, False) + table_large_cost = self.catalog.get_table('default.test_split_open_file_cost_large') + + # Write multiple batches to create multiple files + # Write 10 batches, each with 100 rows + for i in range(10): + write_builder = table_large_cost.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + data = pa.Table.from_pydict({ + 'f0': list(range(i * 100, (i + 1) * 100)), + 'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)] + }, schema=pa_schema) + table_write.write_arrow(data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # Generate splits with large open_file_cost + read_builder = table_large_cost.new_read_builder() + splits_large_cost = read_builder.new_scan().plan().splits() + + # Test with default open_file_cost (4MB) - should generate fewer splits + schema_default = Schema.from_pyarrow_schema( + pa_schema, + options={CoreOptions.SOURCE_SPLIT_TARGET_SIZE: '128mb'} + ) + self.catalog.create_table('default.test_split_open_file_cost_default', schema_default, False) + table_default = self.catalog.get_table('default.test_split_open_file_cost_default') + + # Write same amount of data + for i in range(10): + write_builder = table_default.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + data = pa.Table.from_pydict({ + 'f0': list(range(i * 100, (i + 1) * 100)), + 'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)] + }, schema=pa_schema) + table_write.write_arrow(data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # Generate splits with default open_file_cost + read_builder = table_default.new_read_builder() + splits_default = read_builder.new_scan().plan().splits() + + # With default open_file_cost (4MB), more files can be packed into each split + self.assertGreater( + len(splits_large_cost), len(splits_default), + f"Large open_file_cost should generate more splits. " + f"Got {len(splits_large_cost)} splits with 64MB cost vs " + f"{len(splits_default)} splits with default")
