This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4f8a8110fa [python] fix data-evolution double-counting issue by using
mergedRowCount in ray datasource (#7087)
4f8a8110fa is described below
commit 4f8a8110facb4c3089e0b96f343ba3d6cd7ccf1e
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Jan 22 18:16:09 2026 +0800
[python] fix data-evolution double-counting issue by using mergedRowCount
in ray datasource (#7087)
---
.../pypaimon/globalindex/indexed_split.py | 3 +
.../pypaimon/read/datasource/ray_datasource.py | 11 ++-
.../read/scanner/data_evolution_split_generator.py | 60 ++++++++++++-
paimon-python/pypaimon/read/sliced_split.py | 78 +++++++++++++++++
paimon-python/pypaimon/read/split.py | 94 ++++++++++++++++++++
paimon-python/pypaimon/tests/blob_table_test.py | 99 +++++++++++++++++++++-
.../pypaimon/tests/reader_split_generator_test.py | 85 +++++++++++++++++++
7 files changed, 424 insertions(+), 6 deletions(-)
diff --git a/paimon-python/pypaimon/globalindex/indexed_split.py
b/paimon-python/pypaimon/globalindex/indexed_split.py
index 6c58a6fc5a..ab085421e3 100644
--- a/paimon-python/pypaimon/globalindex/indexed_split.py
+++ b/paimon-python/pypaimon/globalindex/indexed_split.py
@@ -76,6 +76,9 @@ class IndexedSplit(Split):
"""
return sum(r.count() for r in self._row_ranges)
+ def merged_row_count(self):
+ return self.row_count
+
# Delegate other properties to data_split
@property
diff --git a/paimon-python/pypaimon/read/datasource/ray_datasource.py
b/paimon-python/pypaimon/read/datasource/ray_datasource.py
index 7f78fbad6f..bf278d7b73 100644
--- a/paimon-python/pypaimon/read/datasource/ray_datasource.py
+++ b/paimon-python/pypaimon/read/datasource/ray_datasource.py
@@ -171,8 +171,15 @@ class RayDatasource(Datasource):
for split in chunk_splits:
if predicate is None:
# Only estimate rows if no predicate (predicate filtering
changes row count)
- if hasattr(split, 'row_count') and split.row_count > 0:
- total_rows += split.row_count
+ row_count = None
+ if hasattr(split, 'merged_row_count'):
+ merged_count = split.merged_row_count()
+ if merged_count is not None:
+ row_count = merged_count
+ if row_count is None and hasattr(split, 'row_count') and
split.row_count > 0:
+ row_count = split.row_count
+ if row_count is not None and row_count > 0:
+ total_rows += row_count
if hasattr(split, 'file_size') and split.file_size > 0:
total_size += split.file_size
diff --git
a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
index 10847b12b2..2a4628b3d4 100644
--- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
+++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
@@ -23,7 +23,7 @@ from pypaimon.globalindex.range import Range
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.read.scanner.split_generator import AbstractSplitGenerator
-from pypaimon.read.split import Split
+from pypaimon.read.split import DataSplit, Split
from pypaimon.read.sliced_split import SlicedSplit
@@ -104,8 +104,8 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
for pack in packed_files
]
- splits += self._build_split_from_pack(
- flatten_packed_files, sorted_entries_list, False
+ splits += self._build_split_from_pack_for_data_evolution(
+ flatten_packed_files, packed_files, sorted_entries_list
)
if self.start_pos_of_this_subtask is not None or
self.idx_of_this_subtask is not None:
@@ -117,6 +117,60 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
return splits
+ def _build_split_from_pack_for_data_evolution(
+ self,
+ flatten_packed_files: List[List[DataFileMeta]],
+ packed_files: List[List[List[DataFileMeta]]],
+ file_entries: List[ManifestEntry]
+ ) -> List[Split]:
+ """
+ Build splits from packed files for data evolution tables.
+ raw_convertible is True only when each range (pack) contains exactly
one file.
+ """
+ splits = []
+ for i, file_group in enumerate(flatten_packed_files):
+ # In Java: rawConvertible = f.stream().allMatch(file ->
file.size() == 1)
+ # This means raw_convertible is True only when each range contains
exactly one file
+ pack = packed_files[i] if i < len(packed_files) else []
+ raw_convertible = all(len(sub_pack) == 1 for sub_pack in pack)
+
+ file_paths = []
+ total_file_size = 0
+ total_record_count = 0
+
+ for data_file in file_group:
+ data_file.set_file_path(
+ self.table.table_path,
+ file_entries[0].partition,
+ file_entries[0].bucket
+ )
+ file_paths.append(data_file.file_path)
+ total_file_size += data_file.file_size
+ total_record_count += data_file.row_count
+
+ if file_paths:
+ # Get deletion files for this split
+ data_deletion_files = None
+ if self.deletion_files_map:
+ data_deletion_files = self._get_deletion_files_for_split(
+ file_group,
+ file_entries[0].partition,
+ file_entries[0].bucket
+ )
+
+ split = DataSplit(
+ files=file_group,
+ partition=file_entries[0].partition,
+ bucket=file_entries[0].bucket,
+ file_paths=file_paths,
+ row_count=total_record_count,
+ file_size=total_file_size,
+ raw_convertible=raw_convertible,
+ data_deletion_files=data_deletion_files
+ )
+ splits.append(split)
+ return splits
+
def _wrap_to_sliced_splits(self, splits: List[Split], plan_start_pos: int,
plan_end_pos: int) -> List[Split]:
"""
Wrap splits with SlicedSplit to add file-level slicing information.
diff --git a/paimon-python/pypaimon/read/sliced_split.py
b/paimon-python/pypaimon/read/sliced_split.py
index 7d8e3a04d1..7390be5399 100644
--- a/paimon-python/pypaimon/read/sliced_split.py
+++ b/paimon-python/pypaimon/read/sliced_split.py
@@ -96,6 +96,84 @@ class SlicedSplit(Split):
def data_deletion_files(self):
return self._data_split.data_deletion_files
+ def _get_sliced_file_row_count(self, file: 'DataFileMeta') -> int:
+ if file.file_name in self._shard_file_idx_map:
+ start, end = self._shard_file_idx_map[file.file_name]
+ return (end - start) if start != -1 and end != -1 else 0
+ return file.row_count
+
+ def merged_row_count(self):
+ if not self._shard_file_idx_map:
+ return self._data_split.merged_row_count()
+
+ underlying_merged = self._data_split.merged_row_count()
+ if underlying_merged is not None:
+ original_row_count = self._data_split.row_count
+ return int(underlying_merged * self.row_count /
original_row_count) if original_row_count > 0 else 0
+
+ from pypaimon.read.split import DataSplit
+ from pypaimon.globalindex.range import Range
+
+ if not isinstance(self._data_split, DataSplit):
+ return None
+
+ if not all(f.first_row_id is not None for f in self._data_split.files):
+ return None
+
+ file_ranges = []
+ for file in self._data_split.files:
+ if file.first_row_id is not None:
+ sliced_count = self._get_sliced_file_row_count(file)
+ if sliced_count > 0:
+ file_ranges.append((file, Range(file.first_row_id,
file.first_row_id + sliced_count - 1)))
+
+ if not file_ranges:
+ return 0
+
+ file_ranges.sort(key=lambda x: x[1].from_)
+
+ groups = []
+ current_group = [file_ranges[0]]
+ current_range = file_ranges[0][1]
+
+ for file, file_range in file_ranges[1:]:
+ if file_range.from_ <= current_range.to + 1:
+ current_group.append((file, file_range))
+ current_range = Range(current_range.from_,
max(current_range.to, file_range.to))
+ else:
+ groups.append(current_group)
+ current_group = [(file, file_range)]
+ current_range = file_range
+
+ if current_group:
+ groups.append(current_group)
+
+ sum_rows = 0
+ for group in groups:
+ max_count = 0
+ for file, _ in group:
+ max_count = max(max_count,
self._get_sliced_file_row_count(file))
+ sum_rows += max_count
+
+ if self._data_split.data_deletion_files is not None:
+ if not all(f is None or f.cardinality is not None for f in
self._data_split.data_deletion_files):
+ return None
+
+ for i, deletion_file in
enumerate(self._data_split.data_deletion_files):
+ if (deletion_file is not None and deletion_file.cardinality is
not None
+ and i < len(self._data_split.files)):
+ file = self._data_split.files[i]
+ if file.first_row_id is not None:
+ file_original_count = file.row_count
+ file_sliced_count =
self._get_sliced_file_row_count(file)
+ if file_original_count > 0:
+ deletion_ratio = deletion_file.cardinality /
file_original_count
+ sum_rows -= int(file_sliced_count * deletion_ratio)
+ else:
+ sum_rows -= deletion_file.cardinality
+
+ return sum_rows
+
def __eq__(self, other):
if not isinstance(other, SlicedSplit):
return False
diff --git a/paimon-python/pypaimon/read/split.py
b/paimon-python/pypaimon/read/split.py
index 3f2d2f8329..12d20c0947 100644
--- a/paimon-python/pypaimon/read/split.py
+++ b/paimon-python/pypaimon/read/split.py
@@ -55,6 +55,15 @@ class Split(ABC):
"""Return the bucket of this split."""
pass
+ def merged_row_count(self) -> Optional[int]:
+ """
+ Return the merged row count of data files. For example, when the
delete vector is enabled in
+ the primary key table, the number of rows that have been deleted will
be subtracted from the
+ returned result. In the Data Evolution mode of the Append table, the
actual number of rows
+ will be returned.
+ """
+ return None
+
class DataSplit(Split):
"""
@@ -106,3 +115,88 @@ class DataSplit(Split):
@property
def file_paths(self) -> List[str]:
return self._file_paths
+
+ def set_row_count(self, row_count: int) -> None:
+ self._row_count = row_count
+
+ def merged_row_count(self) -> Optional[int]:
+ """
+ Return the merged row count of data files. For example, when the
delete vector is enabled in
+ the primary key table, the number of rows that have been deleted will
be subtracted from the
+ returned result. In the Data Evolution mode of the Append table, the
actual number of rows
+ will be returned.
+ """
+ if self._raw_merged_row_count_available():
+ return self._raw_merged_row_count()
+ if self._data_evolution_row_count_available():
+ return self._data_evolution_merged_row_count()
+ return None
+
+ def _raw_merged_row_count_available(self) -> bool:
+ return self.raw_convertible and (
+ self.data_deletion_files is None
+ or all(f is None or f.cardinality is not None for f in
self.data_deletion_files)
+ )
+
+ def _raw_merged_row_count(self) -> int:
+ sum_rows = 0
+ for i, file in enumerate(self._files):
+ deletion_file = None
+ if self.data_deletion_files is not None and i <
len(self.data_deletion_files):
+ deletion_file = self.data_deletion_files[i]
+
+ if deletion_file is None:
+ sum_rows += file.row_count
+ elif deletion_file.cardinality is not None:
+ sum_rows += file.row_count - deletion_file.cardinality
+
+ return sum_rows
+
+ def _data_evolution_row_count_available(self) -> bool:
+ for file in self._files:
+ if file.first_row_id is None:
+ return False
+ return True
+
+ def _data_evolution_merged_row_count(self) -> int:
+ if not self._files:
+ return 0
+
+ file_ranges = []
+ for file in self._files:
+ if file.first_row_id is not None and file.row_count > 0:
+ start = file.first_row_id
+ end = file.first_row_id + file.row_count - 1
+ file_ranges.append((file, start, end))
+
+ if not file_ranges:
+ return 0
+
+ file_ranges.sort(key=lambda x: (x[1], x[2]))
+
+ groups = []
+ current_group = [file_ranges[0]]
+ current_end = file_ranges[0][2]
+
+ for file_range in file_ranges[1:]:
+ file, start, end = file_range
+ if start <= current_end:
+ current_group.append(file_range)
+ if end > current_end:
+ current_end = end
+ else:
+ groups.append(current_group)
+ current_group = [file_range]
+ current_end = end
+
+ if current_group:
+ groups.append(current_group)
+
+ sum_rows = 0
+ for group in groups:
+ max_count = 0
+ for file, _, _ in group:
+ max_count = max(max_count, file.row_count)
+ sum_rows += max_count
+
+ return sum_rows
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index 9925e21be5..88d2626bb6 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1452,7 +1452,10 @@ class DataBlobWriterTest(unittest.TestCase):
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
- result = table_read.to_arrow(table_scan.plan().splits())
+ splits = table_scan.plan().splits()
+ result = table_read.to_arrow(splits)
+
+ self.assertEqual(sum([s._row_count for s in splits]), 40 * 2)
# Verify the data
self.assertEqual(result.num_rows, 40, "Should have 40 rows")
@@ -2702,6 +2705,100 @@ class DataBlobWriterTest(unittest.TestCase):
print(f"✓ Blob Table Iteration {test_iteration + 1}/{iter_num}
completed successfully")
+ def test_blob_data_with_ray(self):
+ try:
+ import ray
+ if not ray.is_initialized():
+ ray.init(ignore_reinit_error=True, num_cpus=2)
+ except ImportError:
+ self.skipTest("Ray is not available")
+
+ from pypaimon import Schema
+ from pypaimon.table.row.blob import BlobDescriptor
+
+ pa_schema = pa.schema([
+ ('text', pa.string()),
+ ('video_path', pa.string()),
+ ('video_bytes', pa.large_binary()) # Blob column
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-field': 'video_bytes',
+ 'blob-as-descriptor': 'true'
+ }
+ )
+
+ self.catalog.create_table('test_db.test_blob_data_with_ray', schema,
False)
+ table = self.catalog.get_table('test_db.test_blob_data_with_ray')
+
+ num_rows = 10
+ blob_descriptors = []
+ for i in range(num_rows):
+ blob_size = 1024 * (i + 1) # Varying sizes: 1KB, 2KB, ..., 10KB
+ blob_data = b'X' * blob_size
+ blob_file_path = os.path.join(self.temp_dir, f'blob_{i}.mp4')
+ with open(blob_file_path, 'wb') as f:
+ f.write(blob_data)
+ blob_descriptor = BlobDescriptor(blob_file_path, 0, blob_size)
+ blob_descriptors.append(blob_descriptor.serialize())
+
+ test_data = pa.Table.from_pydict({
+ 'text': [f'text_{i}' for i in range(num_rows)],
+ 'video_path': [f'video_{i}.mp4' for i in range(num_rows)],
+ 'video_bytes': blob_descriptors
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ splits = table_scan.plan().splits()
+
+ total_split_row_count = sum([s._row_count for s in splits])
+ self.assertEqual(total_split_row_count, num_rows * 2,
+ f"Total split row count should be {num_rows}, got
{total_split_row_count}")
+
+ total_merged_count = 0
+ for split in splits:
+ merged_count = split.merged_row_count()
+ if merged_count is not None:
+ total_merged_count += merged_count
+ self.assertLessEqual(
+ merged_count, split.row_count,
+ f"merged_row_count ({merged_count}) should be <= row_count
({split.row_count})")
+
+ if total_merged_count > 0:
+ self.assertEqual(
+ total_merged_count, num_rows,
+ f"Total merged_row_count should be {num_rows}, got
{total_merged_count}")
+
+ ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
+
+ ray_count = ray_dataset.count()
+ self.assertEqual(
+ ray_count,
+ num_rows,
+ f"Ray dataset count() should be {num_rows}, got {ray_count}. "
+ )
+
+ df = ray_dataset.to_pandas()
+ self.assertEqual(len(df), num_rows,
+ f"Actual data rows should be {num_rows}, got
{len(df)}")
+
+ self.assertEqual(list(df['text']), [f'text_{i}' for i in
range(num_rows)])
+ self.assertEqual(list(df['video_path']), [f'video_{i}.mp4' for i in
range(num_rows)])
+
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/tests/reader_split_generator_test.py
b/paimon-python/pypaimon/tests/reader_split_generator_test.py
index ad2f9e084b..a77118b6d3 100644
--- a/paimon-python/pypaimon/tests/reader_split_generator_test.py
+++ b/paimon-python/pypaimon/tests/reader_split_generator_test.py
@@ -210,6 +210,13 @@ class SplitGeneratorTest(unittest.TestCase):
self.assertFalse(
split.raw_convertible,
"Multi-file split should not be raw_convertible when
optimized path is not used")
+
+ merged_count = split.merged_row_count()
+ if merged_count is not None:
+ self.assertGreaterEqual(merged_count, 0, "merged_row_count
should be non-negative")
+ self.assertLessEqual(
+ merged_count, split.row_count,
+ "merged_row_count should be <= row_count")
def test_shard_with_empty_partition(self):
pa_schema = pa.schema([
@@ -243,6 +250,84 @@ class SplitGeneratorTest(unittest.TestCase):
for split in splits_shard_0:
self.assertGreater(len(split.files), 0, "Each split should have at
least one file")
+
+ merged_count = split.merged_row_count()
+ if merged_count is not None:
+ self.assertGreaterEqual(merged_count, 0, "merged_row_count
should be non-negative")
+ self.assertLessEqual(
+ merged_count, split.row_count,
+ "merged_row_count should be <= row_count")
+
+ from pypaimon.read.sliced_split import SlicedSplit
+ if isinstance(split, SlicedSplit):
+ sliced_merged = split.merged_row_count()
+ if split.shard_file_idx_map():
+ self.assertEqual(
+ sliced_merged, split.row_count,
+ "SlicedSplit with shard_file_idx_map should return
row_count as merged_row_count")
+ else:
+ underlying_merged = split.data_split().merged_row_count()
+ self.assertEqual(
+ sliced_merged, underlying_merged,
+ "SlicedSplit without shard_file_idx_map should
delegate to underlying split")
+
+ def test_sliced_split_merged_row_count(self):
+ """Test merged_row_count() for SlicedSplit with explicit slicing."""
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('value', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ primary_keys=['id'],
+ options={'bucket': '1'} # Single bucket to ensure splits can be
sliced
+ )
+ self.catalog.create_table('default.test_sliced_split', schema, False)
+ table = self.catalog.get_table('default.test_sliced_split')
+
+ # Write enough data to create multiple files/splits
+ # Write in multiple batches to potentially create multiple files
+ for i in range(3):
+ self._write_data(table, [{'id': list(range(i * 10, (i + 1) * 10)),
+ 'value': [f'v{j}' for j in range(i * 10,
(i + 1) * 10)]}])
+
+ read_builder = table.new_read_builder()
+ splits_all = read_builder.new_scan().plan().splits()
+ self.assertGreater(len(splits_all), 0, "Should have splits")
+
+ # Use with_shard to potentially create SlicedSplit
+ # Using multiple shards increases chance of creating SlicedSplit
+ from pypaimon.read.sliced_split import SlicedSplit
+
+ for shard_idx in range(3):
+ splits_shard = read_builder.new_scan().with_shard(shard_idx,
3).plan().splits()
+ for split in splits_shard:
+ # Test merged_row_count for all splits
+ merged_count = split.merged_row_count()
+ if merged_count is not None:
+ self.assertGreaterEqual(merged_count, 0, "merged_row_count
should be non-negative")
+ self.assertLessEqual(
+ merged_count, split.row_count,
+ "merged_row_count should be <= row_count")
+
+ # Explicitly test SlicedSplit if present
+ if isinstance(split, SlicedSplit):
+ sliced_merged = split.merged_row_count()
+ shard_map = split.shard_file_idx_map()
+ if shard_map:
+ # When shard_file_idx_map is present, merged_row_count
should equal row_count
+ self.assertEqual(
+ sliced_merged, split.row_count,
+ "SlicedSplit with shard_file_idx_map should return
row_count as merged_row_count")
+ else:
+ # When shard_file_idx_map is empty, should delegate to
underlying split
+ underlying_merged =
split.data_split().merged_row_count()
+ self.assertEqual(
+ sliced_merged, underlying_merged,
+ "SlicedSplit without shard_file_idx_map should
delegate to underlying split")
+
+ # Note: SlicedSplit may or may not be created depending on data
distribution
+ # This test ensures that if SlicedSplit is created, merged_row_count()
works correctly
if __name__ == '__main__':