This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 85f4b8f8a1 [python] support custom source split target size and split
open file cost (#6527)
85f4b8f8a1 is described below
commit 85f4b8f8a1121008c5e6d7b14183ab59e4f4c6cb
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Nov 5 17:54:46 2025 +0800
[python] support custom source split target size and split open file cost
(#6527)
---
paimon-python/pypaimon/common/core_options.py | 20 ++
paimon-python/pypaimon/common/memory_size.py | 201 +++++++++++++++++++++
.../pypaimon/read/scanner/full_starting_scanner.py | 5 +-
paimon-python/pypaimon/tests/reader_base_test.py | 132 ++++++++++++++
4 files changed, 356 insertions(+), 2 deletions(-)
diff --git a/paimon-python/pypaimon/common/core_options.py
b/paimon-python/pypaimon/common/core_options.py
index aab339e269..d6643399bb 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -18,6 +18,8 @@
from enum import Enum
+from pypaimon.common.memory_size import MemorySize
+
class CoreOptions(str, Enum):
"""Core options for paimon."""
@@ -48,6 +50,8 @@ class CoreOptions(str, Enum):
# Scan options
SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
+ SOURCE_SPLIT_TARGET_SIZE = "source.split.target-size"
+ SOURCE_SPLIT_OPEN_FILE_COST = "source.split.open-file-cost"
# Commit options
COMMIT_USER_PREFIX = "commit.user-prefix"
ROW_TRACKING_ENABLED = "row-tracking.enabled"
@@ -56,3 +60,19 @@ class CoreOptions(str, Enum):
@staticmethod
def get_blob_as_descriptor(options: dict) -> bool:
return options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR,
"false").lower() == 'true'
+
+ @staticmethod
+ def get_split_target_size(options: dict) -> int:
+ """Get split target size from options, default to 128MB."""
+ if CoreOptions.SOURCE_SPLIT_TARGET_SIZE in options:
+ size_str = options[CoreOptions.SOURCE_SPLIT_TARGET_SIZE]
+ return MemorySize.parse(size_str).get_bytes()
+ return MemorySize.of_mebi_bytes(128).get_bytes()
+
+ @staticmethod
+ def get_split_open_file_cost(options: dict) -> int:
+ """Get split open file cost from options, default to 4MB."""
+ if CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST in options:
+ cost_str = options[CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST]
+ return MemorySize.parse(cost_str).get_bytes()
+ return MemorySize.of_mebi_bytes(4).get_bytes()
diff --git a/paimon-python/pypaimon/common/memory_size.py
b/paimon-python/pypaimon/common/memory_size.py
new file mode 100644
index 0000000000..b68ccfc210
--- /dev/null
+++ b/paimon-python/pypaimon/common/memory_size.py
@@ -0,0 +1,201 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Optional
+
+
+class MemorySize:
+ """MemorySize is a representation of a number of bytes, viewable in
different units."""
+
+ ZERO = None
+ MAX_VALUE = None
+
+ def __init__(self, bytes: int):
+ """Constructs a new MemorySize."""
+ if bytes < 0:
+ raise ValueError("bytes must be >= 0")
+ self.bytes = bytes
+
+ @staticmethod
+ def of_mebi_bytes(mebi_bytes: int) -> 'MemorySize':
+ return MemorySize(mebi_bytes << 20)
+
+ @staticmethod
+ def of_kibi_bytes(kibi_bytes: int) -> 'MemorySize':
+ return MemorySize(kibi_bytes << 10)
+
+ @staticmethod
+ def of_bytes(bytes: int) -> 'MemorySize':
+ return MemorySize(bytes)
+
+ def get_bytes(self) -> int:
+ return self.bytes
+
+ def get_kibi_bytes(self) -> int:
+ return self.bytes >> 10
+
+ def get_mebi_bytes(self) -> int:
+ return self.bytes >> 20
+
+ def get_gibi_bytes(self) -> int:
+ return self.bytes >> 30
+
+ def get_tebi_bytes(self) -> int:
+ return self.bytes >> 40
+
+ def __eq__(self, other) -> bool:
+ return isinstance(other, MemorySize) and self.bytes == other.bytes
+
+ def __hash__(self) -> int:
+ return hash(self.bytes)
+
+ def __str__(self) -> str:
+ return self.format_to_string()
+
+ def format_to_string(self) -> str:
+ ORDERED_UNITS = [MemoryUnit.BYTES, MemoryUnit.KILO_BYTES,
MemoryUnit.MEGA_BYTES,
+ MemoryUnit.GIGA_BYTES, MemoryUnit.TERA_BYTES]
+
+ highest_integer_unit = MemoryUnit.BYTES
+ for idx, unit in enumerate(ORDERED_UNITS):
+ if self.bytes % unit.multiplier != 0:
+ if idx == 0:
+ highest_integer_unit = ORDERED_UNITS[0]
+ else:
+ highest_integer_unit = ORDERED_UNITS[idx - 1]
+ break
+ else:
+ highest_integer_unit = MemoryUnit.BYTES
+
+ return f"{self.bytes // highest_integer_unit.multiplier}
{highest_integer_unit.units[1]}"
+
+ def __repr__(self) -> str:
+ return f"MemorySize({self.bytes})"
+
+ def __lt__(self, other: 'MemorySize') -> bool:
+ return self.bytes < other.bytes
+
+ def __le__(self, other: 'MemorySize') -> bool:
+ return self.bytes <= other.bytes
+
+ def __gt__(self, other: 'MemorySize') -> bool:
+ return self.bytes > other.bytes
+
+ def __ge__(self, other: 'MemorySize') -> bool:
+ return self.bytes >= other.bytes
+
+ @staticmethod
+ def parse(text: str) -> 'MemorySize':
+ return MemorySize(MemorySize.parse_bytes(text))
+
+ @staticmethod
+ def parse_bytes(text: str) -> int:
+ if text is None:
+ raise ValueError("text cannot be None")
+
+ trimmed = text.strip()
+ if not trimmed:
+ raise ValueError("argument is an empty- or whitespace-only string")
+
+ pos = 0
+ while pos < len(trimmed) and trimmed[pos].isdigit():
+ pos += 1
+
+ number_str = trimmed[:pos]
+ unit_str = trimmed[pos:].strip().lower()
+
+ if not number_str:
+ raise ValueError("text does not start with a number")
+
+ try:
+ value = int(number_str)
+ except ValueError:
+ raise ValueError(
+ f"The value '{number_str}' cannot be represented as 64bit
number (numeric overflow).")
+
+ unit = MemorySize._parse_unit(unit_str)
+ multiplier = unit.multiplier if unit else 1
+ result = value * multiplier
+
+ if result // multiplier != value:
+ raise ValueError(
+ f"The value '{text}' cannot be represented as 64bit number of
bytes (numeric overflow).")
+
+ return result
+
+ @staticmethod
+ def _parse_unit(unit_str: str) -> Optional['MemoryUnit']:
+ if not unit_str:
+ return None
+
+ for unit in [MemoryUnit.BYTES, MemoryUnit.KILO_BYTES,
MemoryUnit.MEGA_BYTES,
+ MemoryUnit.GIGA_BYTES, MemoryUnit.TERA_BYTES]:
+ if unit_str in unit.units:
+ return unit
+
+ raise ValueError(
+ f"Memory size unit '{unit_str}' does not match any of the
recognized units: "
+ f"{MemoryUnit.get_all_units()}")
+
+
+class MemoryUnit:
+ """Enum which defines memory unit, mostly used to parse value from
configuration file."""
+
+ def __init__(self, units: list, multiplier: int):
+ self.units = units
+ self.multiplier = multiplier
+
+ BYTES = None
+ KILO_BYTES = None
+ MEGA_BYTES = None
+ GIGA_BYTES = None
+ TERA_BYTES = None
+
+ @staticmethod
+ def get_all_units() -> str:
+ all_units = []
+ for unit in [MemoryUnit.BYTES, MemoryUnit.KILO_BYTES,
MemoryUnit.MEGA_BYTES,
+ MemoryUnit.GIGA_BYTES, MemoryUnit.TERA_BYTES]:
+ all_units.append("(" + " | ".join(unit.units) + ")")
+ return " / ".join(all_units)
+
+ @staticmethod
+ def has_unit(text: str) -> bool:
+ if text is None:
+ raise ValueError("text cannot be None")
+
+ trimmed = text.strip()
+ if not trimmed:
+ raise ValueError("argument is an empty- or whitespace-only string")
+
+ pos = 0
+ while pos < len(trimmed) and trimmed[pos].isdigit():
+ pos += 1
+
+ unit = trimmed[pos:].strip().lower()
+ return len(unit) > 0
+
+
+MemoryUnit.BYTES = MemoryUnit(["b", "bytes"], 1)
+MemoryUnit.KILO_BYTES = MemoryUnit(["k", "kb", "kibibytes"], 1024)
+MemoryUnit.MEGA_BYTES = MemoryUnit(["m", "mb", "mebibytes"], 1024 * 1024)
+MemoryUnit.GIGA_BYTES = MemoryUnit(["g", "gb", "gibibytes"], 1024 * 1024 *
1024)
+MemoryUnit.TERA_BYTES = MemoryUnit(["t", "tb", "tebibytes"], 1024 * 1024 *
1024 * 1024)
+
+MemorySize.ZERO = MemorySize(0)
+MemorySize.MAX_VALUE = MemorySize(2**63 - 1)
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index b3c18a7bb6..dfbe329f43 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -54,8 +54,9 @@ class FullStartingScanner(StartingScanner):
self.partition_key_predicate = trim_and_transform_predicate(
self.predicate, self.table.field_names, self.table.partition_keys)
- self.target_split_size = 128 * 1024 * 1024
- self.open_file_cost = 4 * 1024 * 1024
+ # Get split target size and open file cost from table options
+ self.target_split_size =
CoreOptions.get_split_target_size(self.table.options)
+ self.open_file_cost =
CoreOptions.get_split_open_file_cost(self.table.options)
self.idx_of_this_subtask = None
self.number_of_para_subtasks = None
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py
b/paimon-python/pypaimon/tests/reader_base_test.py
index ebe98e4fc5..fdcb9f1ea2 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -675,3 +675,135 @@ class ReaderBasicTest(unittest.TestCase):
max_values)
self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)
+
+ def test_split_target_size(self):
+ """Test source.split.target-size configuration effect on split
generation."""
+ from pypaimon.common.core_options import CoreOptions
+
+ pa_schema = pa.schema([
+ ('f0', pa.int64()),
+ ('f1', pa.string())
+ ])
+
+ # Test with small target_split_size (512B) - should generate more
splits
+ schema_small = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={CoreOptions.SOURCE_SPLIT_TARGET_SIZE: '512b'}
+ )
+ self.catalog.create_table('default.test_split_target_size_small',
schema_small, False)
+ table_small =
self.catalog.get_table('default.test_split_target_size_small')
+
+ for i in range(10):
+ write_builder = table_small.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'f0': list(range(i * 100, (i + 1) * 100)),
+ 'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)]
+ }, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table_small.new_read_builder()
+ splits_small = read_builder.new_scan().plan().splits()
+
+ schema_default = Schema.from_pyarrow_schema(pa_schema)
+ self.catalog.create_table('default.test_split_target_size_default',
schema_default, False)
+ table_default =
self.catalog.get_table('default.test_split_target_size_default')
+
+ for i in range(10):
+ write_builder = table_default.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'f0': list(range(i * 100, (i + 1) * 100)),
+ 'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)]
+ }, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Generate splits with default target_split_size
+ read_builder = table_default.new_read_builder()
+ splits_default = read_builder.new_scan().plan().splits()
+
+ self.assertGreater(
+ len(splits_small), len(splits_default),
+ f"Small target_split_size should generate more splits. "
+ f"Got {len(splits_small)} splits with 512B vs "
+ f"{len(splits_default)} splits with default")
+
+ def test_split_open_file_cost(self):
+ """Test source.split.open-file-cost configuration effect on split
generation."""
+ from pypaimon.common.core_options import CoreOptions
+
+ pa_schema = pa.schema([
+ ('f0', pa.int64()),
+ ('f1', pa.string())
+ ])
+
+ # Test with large open_file_cost (64MB) - should generate more splits
+ schema_large_cost = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ CoreOptions.SOURCE_SPLIT_TARGET_SIZE: '128mb',
+ CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST: '64mb'
+ }
+ )
+ self.catalog.create_table('default.test_split_open_file_cost_large',
schema_large_cost, False)
+ table_large_cost =
self.catalog.get_table('default.test_split_open_file_cost_large')
+
+ # Write multiple batches to create multiple files
+ # Write 10 batches, each with 100 rows
+ for i in range(10):
+ write_builder = table_large_cost.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'f0': list(range(i * 100, (i + 1) * 100)),
+ 'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)]
+ }, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Generate splits with large open_file_cost
+ read_builder = table_large_cost.new_read_builder()
+ splits_large_cost = read_builder.new_scan().plan().splits()
+
+ # Test with default open_file_cost (4MB) - should generate fewer splits
+ schema_default = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={CoreOptions.SOURCE_SPLIT_TARGET_SIZE: '128mb'}
+ )
+ self.catalog.create_table('default.test_split_open_file_cost_default',
schema_default, False)
+ table_default =
self.catalog.get_table('default.test_split_open_file_cost_default')
+
+ # Write same amount of data
+ for i in range(10):
+ write_builder = table_default.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = pa.Table.from_pydict({
+ 'f0': list(range(i * 100, (i + 1) * 100)),
+ 'f1': [f'value_{j}' for j in range(i * 100, (i + 1) * 100)]
+ }, schema=pa_schema)
+ table_write.write_arrow(data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Generate splits with default open_file_cost
+ read_builder = table_default.new_read_builder()
+ splits_default = read_builder.new_scan().plan().splits()
+
+ # With default open_file_cost (4MB), more files can be packed into
each split
+ self.assertGreater(
+ len(splits_large_cost), len(splits_default),
+ f"Large open_file_cost should generate more splits. "
+ f"Got {len(splits_large_cost)} splits with 64MB cost vs "
+ f"{len(splits_default)} splits with default")