This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f8a8110fa [python] fix data-evolution double-counting issue by using 
mergedRowCount in ray datasource (#7087)
4f8a8110fa is described below

commit 4f8a8110facb4c3089e0b96f343ba3d6cd7ccf1e
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Jan 22 18:16:09 2026 +0800

    [python] fix data-evolution double-counting issue by using mergedRowCount 
in ray datasource (#7087)
---
 .../pypaimon/globalindex/indexed_split.py          |  3 +
 .../pypaimon/read/datasource/ray_datasource.py     | 11 ++-
 .../read/scanner/data_evolution_split_generator.py | 60 ++++++++++++-
 paimon-python/pypaimon/read/sliced_split.py        | 78 +++++++++++++++++
 paimon-python/pypaimon/read/split.py               | 94 ++++++++++++++++++++
 paimon-python/pypaimon/tests/blob_table_test.py    | 99 +++++++++++++++++++++-
 .../pypaimon/tests/reader_split_generator_test.py  | 85 +++++++++++++++++++
 7 files changed, 424 insertions(+), 6 deletions(-)

diff --git a/paimon-python/pypaimon/globalindex/indexed_split.py 
b/paimon-python/pypaimon/globalindex/indexed_split.py
index 6c58a6fc5a..ab085421e3 100644
--- a/paimon-python/pypaimon/globalindex/indexed_split.py
+++ b/paimon-python/pypaimon/globalindex/indexed_split.py
@@ -76,6 +76,9 @@ class IndexedSplit(Split):
         """
         return sum(r.count() for r in self._row_ranges)
 
+    def merged_row_count(self):
+        return self.row_count
+
     # Delegate other properties to data_split
 
     @property
diff --git a/paimon-python/pypaimon/read/datasource/ray_datasource.py 
b/paimon-python/pypaimon/read/datasource/ray_datasource.py
index 7f78fbad6f..bf278d7b73 100644
--- a/paimon-python/pypaimon/read/datasource/ray_datasource.py
+++ b/paimon-python/pypaimon/read/datasource/ray_datasource.py
@@ -171,8 +171,15 @@ class RayDatasource(Datasource):
             for split in chunk_splits:
                 if predicate is None:
                     # Only estimate rows if no predicate (predicate filtering 
changes row count)
-                    if hasattr(split, 'row_count') and split.row_count > 0:
-                        total_rows += split.row_count
+                    row_count = None
+                    if hasattr(split, 'merged_row_count'):
+                        merged_count = split.merged_row_count()
+                        if merged_count is not None:
+                            row_count = merged_count
+                    if row_count is None and hasattr(split, 'row_count') and 
split.row_count > 0:
+                        row_count = split.row_count
+                    if row_count is not None and row_count > 0:
+                        total_rows += row_count
                 if hasattr(split, 'file_size') and split.file_size > 0:
                     total_size += split.file_size
 
diff --git 
a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py 
b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
index 10847b12b2..2a4628b3d4 100644
--- a/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
+++ b/paimon-python/pypaimon/read/scanner/data_evolution_split_generator.py
@@ -23,7 +23,7 @@ from pypaimon.globalindex.range import Range
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.read.scanner.split_generator import AbstractSplitGenerator
-from pypaimon.read.split import Split
+from pypaimon.read.split import DataSplit, Split
 from pypaimon.read.sliced_split import SlicedSplit
 
 
@@ -104,8 +104,8 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
                 for pack in packed_files
             ]
 
-            splits += self._build_split_from_pack(
-                flatten_packed_files, sorted_entries_list, False
+            splits += self._build_split_from_pack_for_data_evolution(
+                flatten_packed_files, packed_files, sorted_entries_list
             )
 
         if self.start_pos_of_this_subtask is not None or 
self.idx_of_this_subtask is not None:
@@ -117,6 +117,60 @@ class DataEvolutionSplitGenerator(AbstractSplitGenerator):
 
         return splits
 
+    def _build_split_from_pack_for_data_evolution(
+        self,
+        flatten_packed_files: List[List[DataFileMeta]],
+        packed_files: List[List[List[DataFileMeta]]],
+        file_entries: List[ManifestEntry]
+    ) -> List[Split]:
+        """
+        Build splits from packed files for data evolution tables.
+        raw_convertible is True only when each range (pack) contains exactly 
one file.
+        """
+        splits = []
+        for i, file_group in enumerate(flatten_packed_files):
+            # In Java: rawConvertible = f.stream().allMatch(file -> 
file.size() == 1)
+            # This means raw_convertible is True only when each range contains 
exactly one file
+            pack = packed_files[i] if i < len(packed_files) else []
+            raw_convertible = all(len(sub_pack) == 1 for sub_pack in pack)
+
+            file_paths = []
+            total_file_size = 0
+            total_record_count = 0
+
+            for data_file in file_group:
+                data_file.set_file_path(
+                    self.table.table_path,
+                    file_entries[0].partition,
+                    file_entries[0].bucket
+                )
+                file_paths.append(data_file.file_path)
+                total_file_size += data_file.file_size
+                total_record_count += data_file.row_count
+
+            if file_paths:
+                # Get deletion files for this split
+                data_deletion_files = None
+                if self.deletion_files_map:
+                    data_deletion_files = self._get_deletion_files_for_split(
+                        file_group,
+                        file_entries[0].partition,
+                        file_entries[0].bucket
+                    )
+
+                split = DataSplit(
+                    files=file_group,
+                    partition=file_entries[0].partition,
+                    bucket=file_entries[0].bucket,
+                    file_paths=file_paths,
+                    row_count=total_record_count,
+                    file_size=total_file_size,
+                    raw_convertible=raw_convertible,
+                    data_deletion_files=data_deletion_files
+                )
+                splits.append(split)
+        return splits
+
     def _wrap_to_sliced_splits(self, splits: List[Split], plan_start_pos: int, 
plan_end_pos: int) -> List[Split]:
         """
         Wrap splits with SlicedSplit to add file-level slicing information.
diff --git a/paimon-python/pypaimon/read/sliced_split.py 
b/paimon-python/pypaimon/read/sliced_split.py
index 7d8e3a04d1..7390be5399 100644
--- a/paimon-python/pypaimon/read/sliced_split.py
+++ b/paimon-python/pypaimon/read/sliced_split.py
@@ -96,6 +96,84 @@ class SlicedSplit(Split):
     def data_deletion_files(self):
         return self._data_split.data_deletion_files
 
+    def _get_sliced_file_row_count(self, file: 'DataFileMeta') -> int:
+        if file.file_name in self._shard_file_idx_map:
+            start, end = self._shard_file_idx_map[file.file_name]
+            return (end - start) if start != -1 and end != -1 else 0
+        return file.row_count
+
+    def merged_row_count(self):
+        if not self._shard_file_idx_map:
+            return self._data_split.merged_row_count()
+        
+        underlying_merged = self._data_split.merged_row_count()
+        if underlying_merged is not None:
+            original_row_count = self._data_split.row_count
+            return int(underlying_merged * self.row_count / 
original_row_count) if original_row_count > 0 else 0
+        
+        from pypaimon.read.split import DataSplit
+        from pypaimon.globalindex.range import Range
+        
+        if not isinstance(self._data_split, DataSplit):
+            return None
+        
+        if not all(f.first_row_id is not None for f in self._data_split.files):
+            return None
+        
+        file_ranges = []
+        for file in self._data_split.files:
+            if file.first_row_id is not None:
+                sliced_count = self._get_sliced_file_row_count(file)
+                if sliced_count > 0:
+                    file_ranges.append((file, Range(file.first_row_id, 
file.first_row_id + sliced_count - 1)))
+        
+        if not file_ranges:
+            return 0
+        
+        file_ranges.sort(key=lambda x: x[1].from_)
+        
+        groups = []
+        current_group = [file_ranges[0]]
+        current_range = file_ranges[0][1]
+        
+        for file, file_range in file_ranges[1:]:
+            if file_range.from_ <= current_range.to + 1:
+                current_group.append((file, file_range))
+                current_range = Range(current_range.from_, 
max(current_range.to, file_range.to))
+            else:
+                groups.append(current_group)
+                current_group = [(file, file_range)]
+                current_range = file_range
+        
+        if current_group:
+            groups.append(current_group)
+        
+        sum_rows = 0
+        for group in groups:
+            max_count = 0
+            for file, _ in group:
+                max_count = max(max_count, 
self._get_sliced_file_row_count(file))
+            sum_rows += max_count
+        
+        if self._data_split.data_deletion_files is not None:
+            if not all(f is None or f.cardinality is not None for f in 
self._data_split.data_deletion_files):
+                return None
+            
+            for i, deletion_file in 
enumerate(self._data_split.data_deletion_files):
+                if (deletion_file is not None and deletion_file.cardinality is 
not None
+                        and i < len(self._data_split.files)):
+                    file = self._data_split.files[i]
+                    if file.first_row_id is not None:
+                        file_original_count = file.row_count
+                        file_sliced_count = 
self._get_sliced_file_row_count(file)
+                        if file_original_count > 0:
+                            deletion_ratio = deletion_file.cardinality / 
file_original_count
+                            sum_rows -= int(file_sliced_count * deletion_ratio)
+                        else:
+                            sum_rows -= deletion_file.cardinality
+        
+        return sum_rows
+
     def __eq__(self, other):
         if not isinstance(other, SlicedSplit):
             return False
diff --git a/paimon-python/pypaimon/read/split.py 
b/paimon-python/pypaimon/read/split.py
index 3f2d2f8329..12d20c0947 100644
--- a/paimon-python/pypaimon/read/split.py
+++ b/paimon-python/pypaimon/read/split.py
@@ -55,6 +55,15 @@ class Split(ABC):
         """Return the bucket of this split."""
         pass
 
+    def merged_row_count(self) -> Optional[int]:
+        """
+        Return the merged row count of data files. For example, when the 
delete vector is enabled in
+        the primary key table, the number of rows that have been deleted will 
be subtracted from the
+        returned result. In the Data Evolution mode of the Append table, the 
actual number of rows
+        will be returned.
+        """
+        return None
+
 
 class DataSplit(Split):
     """
@@ -106,3 +115,88 @@ class DataSplit(Split):
     @property
     def file_paths(self) -> List[str]:
         return self._file_paths
+
+    def set_row_count(self, row_count: int) -> None:
+        self._row_count = row_count
+
+    def merged_row_count(self) -> Optional[int]:
+        """
+        Return the merged row count of data files. For example, when the 
delete vector is enabled in
+        the primary key table, the number of rows that have been deleted will 
be subtracted from the
+        returned result. In the Data Evolution mode of the Append table, the 
actual number of rows
+        will be returned.
+        """
+        if self._raw_merged_row_count_available():
+            return self._raw_merged_row_count()
+        if self._data_evolution_row_count_available():
+            return self._data_evolution_merged_row_count()
+        return None
+
+    def _raw_merged_row_count_available(self) -> bool:
+        return self.raw_convertible and (
+            self.data_deletion_files is None
+            or all(f is None or f.cardinality is not None for f in 
self.data_deletion_files)
+        )
+
+    def _raw_merged_row_count(self) -> int:
+        sum_rows = 0
+        for i, file in enumerate(self._files):
+            deletion_file = None
+            if self.data_deletion_files is not None and i < 
len(self.data_deletion_files):
+                deletion_file = self.data_deletion_files[i]
+            
+            if deletion_file is None:
+                sum_rows += file.row_count
+            elif deletion_file.cardinality is not None:
+                sum_rows += file.row_count - deletion_file.cardinality
+        
+        return sum_rows
+
+    def _data_evolution_row_count_available(self) -> bool:
+        for file in self._files:
+            if file.first_row_id is None:
+                return False
+        return True
+
+    def _data_evolution_merged_row_count(self) -> int:
+        if not self._files:
+            return 0
+        
+        file_ranges = []
+        for file in self._files:
+            if file.first_row_id is not None and file.row_count > 0:
+                start = file.first_row_id
+                end = file.first_row_id + file.row_count - 1
+                file_ranges.append((file, start, end))
+        
+        if not file_ranges:
+            return 0
+        
+        file_ranges.sort(key=lambda x: (x[1], x[2]))
+        
+        groups = []
+        current_group = [file_ranges[0]]
+        current_end = file_ranges[0][2]
+        
+        for file_range in file_ranges[1:]:
+            file, start, end = file_range
+            if start <= current_end:
+                current_group.append(file_range)
+                if end > current_end:
+                    current_end = end
+            else:
+                groups.append(current_group)
+                current_group = [file_range]
+                current_end = end
+        
+        if current_group:
+            groups.append(current_group)
+        
+        sum_rows = 0
+        for group in groups:
+            max_count = 0
+            for file, _, _ in group:
+                max_count = max(max_count, file.row_count)
+            sum_rows += max_count
+        
+        return sum_rows
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index 9925e21be5..88d2626bb6 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1452,7 +1452,10 @@ class DataBlobWriterTest(unittest.TestCase):
         read_builder = table.new_read_builder()
         table_scan = read_builder.new_scan()
         table_read = read_builder.new_read()
-        result = table_read.to_arrow(table_scan.plan().splits())
+        splits = table_scan.plan().splits()
+        result = table_read.to_arrow(splits)
+
+        self.assertEqual(sum([s._row_count for s in splits]), 40 * 2)
 
         # Verify the data
         self.assertEqual(result.num_rows, 40, "Should have 40 rows")
@@ -2702,6 +2705,100 @@ class DataBlobWriterTest(unittest.TestCase):
 
             print(f"✓ Blob Table Iteration {test_iteration + 1}/{iter_num} 
completed successfully")
 
+    def test_blob_data_with_ray(self):
+        try:
+            import ray
+            if not ray.is_initialized():
+                ray.init(ignore_reinit_error=True, num_cpus=2)
+        except ImportError:
+            self.skipTest("Ray is not available")
+
+        from pypaimon import Schema
+        from pypaimon.table.row.blob import BlobDescriptor
+
+        pa_schema = pa.schema([
+            ('text', pa.string()),
+            ('video_path', pa.string()),
+            ('video_bytes', pa.large_binary())  # Blob column
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob-field': 'video_bytes',
+                'blob-as-descriptor': 'true'
+            }
+        )
+
+        self.catalog.create_table('test_db.test_blob_data_with_ray', schema, 
False)
+        table = self.catalog.get_table('test_db.test_blob_data_with_ray')
+
+        num_rows = 10
+        blob_descriptors = []
+        for i in range(num_rows):
+            blob_size = 1024 * (i + 1)  # Varying sizes: 1KB, 2KB, ..., 10KB
+            blob_data = b'X' * blob_size
+            blob_file_path = os.path.join(self.temp_dir, f'blob_{i}.mp4')
+            with open(blob_file_path, 'wb') as f:
+                f.write(blob_data)
+            blob_descriptor = BlobDescriptor(blob_file_path, 0, blob_size)
+            blob_descriptors.append(blob_descriptor.serialize())
+
+        test_data = pa.Table.from_pydict({
+            'text': [f'text_{i}' for i in range(num_rows)],
+            'video_path': [f'video_{i}.mp4' for i in range(num_rows)],
+            'video_bytes': blob_descriptors
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        splits = table_scan.plan().splits()
+
+        total_split_row_count = sum([s._row_count for s in splits])
+        self.assertEqual(total_split_row_count, num_rows * 2,
+                         f"Total split row count should be {num_rows}, got 
{total_split_row_count}")
+        
+        total_merged_count = 0
+        for split in splits:
+            merged_count = split.merged_row_count()
+            if merged_count is not None:
+                total_merged_count += merged_count
+                self.assertLessEqual(
+                    merged_count, split.row_count,
+                    f"merged_row_count ({merged_count}) should be <= row_count 
({split.row_count})")
+        
+        if total_merged_count > 0:
+            self.assertEqual(
+                total_merged_count, num_rows,
+                f"Total merged_row_count should be {num_rows}, got 
{total_merged_count}")
+
+        ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
+
+        ray_count = ray_dataset.count()
+        self.assertEqual(
+            ray_count,
+            num_rows,
+            f"Ray dataset count() should be {num_rows}, got {ray_count}. "
+        )
+
+        df = ray_dataset.to_pandas()
+        self.assertEqual(len(df), num_rows,
+                         f"Actual data rows should be {num_rows}, got 
{len(df)}")
+
+        self.assertEqual(list(df['text']), [f'text_{i}' for i in 
range(num_rows)])
+        self.assertEqual(list(df['video_path']), [f'video_{i}.mp4' for i in 
range(num_rows)])
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/tests/reader_split_generator_test.py 
b/paimon-python/pypaimon/tests/reader_split_generator_test.py
index ad2f9e084b..a77118b6d3 100644
--- a/paimon-python/pypaimon/tests/reader_split_generator_test.py
+++ b/paimon-python/pypaimon/tests/reader_split_generator_test.py
@@ -210,6 +210,13 @@ class SplitGeneratorTest(unittest.TestCase):
                     self.assertFalse(
                         split.raw_convertible,
                         "Multi-file split should not be raw_convertible when 
optimized path is not used")
+            
+            merged_count = split.merged_row_count()
+            if merged_count is not None:
+                self.assertGreaterEqual(merged_count, 0, "merged_row_count 
should be non-negative")
+                self.assertLessEqual(
+                    merged_count, split.row_count,
+                    "merged_row_count should be <= row_count")
 
     def test_shard_with_empty_partition(self):
         pa_schema = pa.schema([
@@ -243,6 +250,84 @@ class SplitGeneratorTest(unittest.TestCase):
         
         for split in splits_shard_0:
             self.assertGreater(len(split.files), 0, "Each split should have at 
least one file")
+            
+            merged_count = split.merged_row_count()
+            if merged_count is not None:
+                self.assertGreaterEqual(merged_count, 0, "merged_row_count 
should be non-negative")
+                self.assertLessEqual(
+                    merged_count, split.row_count,
+                    "merged_row_count should be <= row_count")
+            
+            from pypaimon.read.sliced_split import SlicedSplit
+            if isinstance(split, SlicedSplit):
+                sliced_merged = split.merged_row_count()
+                if split.shard_file_idx_map():
+                    self.assertEqual(
+                        sliced_merged, split.row_count,
+                        "SlicedSplit with shard_file_idx_map should return 
row_count as merged_row_count")
+                else:
+                    underlying_merged = split.data_split().merged_row_count()
+                    self.assertEqual(
+                        sliced_merged, underlying_merged,
+                        "SlicedSplit without shard_file_idx_map should 
delegate to underlying split")
+
+    def test_sliced_split_merged_row_count(self):
+        """Test merged_row_count() for SlicedSplit with explicit slicing."""
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('value', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            primary_keys=['id'],
+            options={'bucket': '1'}  # Single bucket to ensure splits can be 
sliced
+        )
+        self.catalog.create_table('default.test_sliced_split', schema, False)
+        table = self.catalog.get_table('default.test_sliced_split')
+        
+        # Write enough data to create multiple files/splits
+        # Write in multiple batches to potentially create multiple files
+        for i in range(3):
+            self._write_data(table, [{'id': list(range(i * 10, (i + 1) * 10)),
+                                     'value': [f'v{j}' for j in range(i * 10, 
(i + 1) * 10)]}])
+        
+        read_builder = table.new_read_builder()
+        splits_all = read_builder.new_scan().plan().splits()
+        self.assertGreater(len(splits_all), 0, "Should have splits")
+        
+        # Use with_shard to potentially create SlicedSplit
+        # Using multiple shards increases chance of creating SlicedSplit
+        from pypaimon.read.sliced_split import SlicedSplit
+        
+        for shard_idx in range(3):
+            splits_shard = read_builder.new_scan().with_shard(shard_idx, 
3).plan().splits()
+            for split in splits_shard:
+                # Test merged_row_count for all splits
+                merged_count = split.merged_row_count()
+                if merged_count is not None:
+                    self.assertGreaterEqual(merged_count, 0, "merged_row_count 
should be non-negative")
+                    self.assertLessEqual(
+                        merged_count, split.row_count,
+                        "merged_row_count should be <= row_count")
+                
+                # Explicitly test SlicedSplit if present
+                if isinstance(split, SlicedSplit):
+                    sliced_merged = split.merged_row_count()
+                    shard_map = split.shard_file_idx_map()
+                    if shard_map:
+                        # When shard_file_idx_map is present, merged_row_count 
should equal row_count
+                        self.assertEqual(
+                            sliced_merged, split.row_count,
+                            "SlicedSplit with shard_file_idx_map should return 
row_count as merged_row_count")
+                    else:
+                        # When shard_file_idx_map is empty, should delegate to 
underlying split
+                        underlying_merged = 
split.data_split().merged_row_count()
+                        self.assertEqual(
+                            sliced_merged, underlying_merged,
+                            "SlicedSplit without shard_file_idx_map should 
delegate to underlying split")
+        
+        # Note: SlicedSplit may or may not be created depending on data 
distribution
+        # This test ensures that if SlicedSplit is created, merged_row_count() 
works correctly
 
 
 if __name__ == '__main__':

Reply via email to