This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b9ccee6a79 [python] Support read blob row by offsets in with_shard
feature (#6863)
b9ccee6a79 is described below
commit b9ccee6a79a495178a774fb192893bfce09567f9
Author: umi <[email protected]>
AuthorDate: Tue Dec 23 11:09:29 2025 +0800
[python] Support read blob row by offsets in with_shard feature (#6863)
---
.../pypaimon/read/reader/concat_batch_reader.py | 99 +++++++++++-----
.../read/reader/data_evolution_merge_reader.py | 85 --------------
.../pypaimon/read/reader/data_file_batch_reader.py | 10 +-
.../pypaimon/read/reader/format_blob_reader.py | 16 ++-
.../pypaimon/read/reader/shard_batch_reader.py | 61 ++++++++++
.../pypaimon/read/scanner/full_starting_scanner.py | 75 ++++++++----
paimon-python/pypaimon/read/split.py | 7 +-
paimon-python/pypaimon/read/split_read.py | 77 ++++++++-----
paimon-python/pypaimon/tests/blob_table_test.py | 128 ++++++++++++++++++---
.../pypaimon/tests/data_evolution_test.py | 2 +-
10 files changed, 366 insertions(+), 194 deletions(-)
diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index aefff13ebd..3ce9db6f5e 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -53,36 +53,6 @@ class ConcatBatchReader(RecordBatchReader):
self.queue.clear()
-class ShardBatchReader(ConcatBatchReader):
-
- def __init__(self, readers, split_start_row, split_end_row):
- super().__init__(readers)
- self.split_start_row = split_start_row
- self.split_end_row = split_end_row
- self.cur_end = 0
-
- def read_arrow_batch(self) -> Optional[RecordBatch]:
- batch = super().read_arrow_batch()
- if batch is None:
- return None
- if self.split_start_row is not None or self.split_end_row is not None:
- cur_begin = self.cur_end # begin idx of current batch based on
the split
- self.cur_end += batch.num_rows
- # shard the first batch and the last batch
- if self.split_start_row <= cur_begin < self.cur_end <=
self.split_end_row:
- return batch
- elif cur_begin <= self.split_start_row < self.cur_end:
- return batch.slice(self.split_start_row - cur_begin,
- min(self.split_end_row, self.cur_end) -
self.split_start_row)
- elif cur_begin < self.split_end_row <= self.cur_end:
- return batch.slice(0, self.split_end_row - cur_begin)
- else:
- # return empty RecordBatch if the batch size has not reached
split_start_row
- return pa.RecordBatch.from_arrays([], [])
- else:
- return batch
-
-
class MergeAllBatchReader(RecordBatchReader):
"""
A reader that accepts multiple reader suppliers and concatenates all their
arrow batches
@@ -98,13 +68,18 @@ class MergeAllBatchReader(RecordBatchReader):
def read_arrow_batch(self) -> Optional[RecordBatch]:
if self.reader:
- return self.reader.read_next_batch()
+ try:
+ return self.reader.read_next_batch()
+ except StopIteration:
+ return None
all_batches = []
# Read all batches from all reader suppliers
for supplier in self.reader_suppliers:
reader = supplier()
+ if reader is None:
+ continue
try:
while True:
batch = reader.read_arrow_batch()
@@ -149,3 +124,65 @@ class MergeAllBatchReader(RecordBatchReader):
def close(self) -> None:
self.merged_batch = None
self.reader = None
+
+
+class DataEvolutionMergeReader(RecordBatchReader):
+ """
+ This is a union reader which contains multiple inner readers, Each reader
is responsible for reading one file.
+
+ This reader, assembling multiple reader into one big and great reader,
will merge the batches from all readers.
+
+ For example, if rowOffsets is {0, 2, 0, 1, 2, 1} and fieldOffsets is {0,
0, 1, 1, 1, 0}, it means:
+ - The first field comes from batch0, and it is at offset 0 in batch0.
+ - The second field comes from batch2, and it is at offset 0 in batch2.
+ - The third field comes from batch0, and it is at offset 1 in batch0.
+ - The fourth field comes from batch1, and it is at offset 1 in batch1.
+ - The fifth field comes from batch2, and it is at offset 1 in batch2.
+ - The sixth field comes from batch1, and it is at offset 0 in batch1.
+ """
+
+ def __init__(self, row_offsets: List[int], field_offsets: List[int],
readers: List[Optional[RecordBatchReader]]):
+ if row_offsets is None:
+ raise ValueError("Row offsets must not be null")
+ if field_offsets is None:
+ raise ValueError("Field offsets must not be null")
+ if len(row_offsets) != len(field_offsets):
+ raise ValueError("Row offsets and field offsets must have the same
length")
+ if not row_offsets:
+ raise ValueError("Row offsets must not be empty")
+ if not readers or len(readers) < 1:
+ raise ValueError("Readers should be more than 0")
+ self.row_offsets = row_offsets
+ self.field_offsets = field_offsets
+ self.readers = readers
+
+ def read_arrow_batch(self) -> Optional[RecordBatch]:
+ batches: List[Optional[RecordBatch]] = [None] * len(self.readers)
+ for i, reader in enumerate(self.readers):
+ if reader is not None:
+ batch = reader.read_arrow_batch()
+ if batch is None:
+ # all readers are aligned, as long as one returns null,
the others will also have no data
+ return None
+ batches[i] = batch
+ # Assemble record batches from batches based on row_offsets and
field_offsets
+ columns = []
+ names = []
+ for i in range(len(self.row_offsets)):
+ batch_index = self.row_offsets[i]
+ field_index = self.field_offsets[i]
+ if batches[batch_index] is not None:
+ column = batches[batch_index].column(field_index)
+ columns.append(column)
+ names.append(batches[batch_index].schema.names[field_index])
+ if columns:
+ return pa.RecordBatch.from_arrays(columns, names)
+ return None
+
+ def close(self) -> None:
+ try:
+ for reader in self.readers:
+ if reader is not None:
+ reader.close()
+ except Exception as e:
+ raise IOError("Failed to close inner readers") from e
diff --git a/paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py
b/paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py
deleted file mode 100644
index 43bf926862..0000000000
--- a/paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py
+++ /dev/null
@@ -1,85 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from typing import List, Optional
-
-import pyarrow as pa
-from pyarrow import RecordBatch
-
-from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
-
-
-class DataEvolutionMergeReader(RecordBatchReader):
- """
- This is a union reader which contains multiple inner readers, Each reader
is responsible for reading one file.
-
- This reader, assembling multiple reader into one big and great reader,
will merge the batches from all readers.
-
- For example, if rowOffsets is {0, 2, 0, 1, 2, 1} and fieldOffsets is {0,
0, 1, 1, 1, 0}, it means:
- - The first field comes from batch0, and it is at offset 0 in batch0.
- - The second field comes from batch2, and it is at offset 0 in batch2.
- - The third field comes from batch0, and it is at offset 1 in batch0.
- - The fourth field comes from batch1, and it is at offset 1 in batch1.
- - The fifth field comes from batch2, and it is at offset 1 in batch2.
- - The sixth field comes from batch1, and it is at offset 0 in batch1.
- """
-
- def __init__(self, row_offsets: List[int], field_offsets: List[int],
readers: List[Optional[RecordBatchReader]]):
- if row_offsets is None:
- raise ValueError("Row offsets must not be null")
- if field_offsets is None:
- raise ValueError("Field offsets must not be null")
- if len(row_offsets) != len(field_offsets):
- raise ValueError("Row offsets and field offsets must have the same
length")
- if not row_offsets:
- raise ValueError("Row offsets must not be empty")
- if not readers or len(readers) < 1:
- raise ValueError("Readers should be more than 0")
- self.row_offsets = row_offsets
- self.field_offsets = field_offsets
- self.readers = readers
-
- def read_arrow_batch(self) -> Optional[RecordBatch]:
- batches: List[Optional[RecordBatch]] = [None] * len(self.readers)
- for i, reader in enumerate(self.readers):
- if reader is not None:
- batch = reader.read_arrow_batch()
- if batch is None:
- # all readers are aligned, as long as one returns null,
the others will also have no data
- return None
- batches[i] = batch
- # Assemble record batches from batches based on row_offsets and
field_offsets
- columns = []
- names = []
- for i in range(len(self.row_offsets)):
- batch_index = self.row_offsets[i]
- field_index = self.field_offsets[i]
- if batches[batch_index] is not None:
- column = batches[batch_index].column(field_index)
- columns.append(column)
- names.append(batches[batch_index].schema.names[field_index])
- if columns:
- return pa.RecordBatch.from_arrays(columns, names)
- return None
-
- def close(self) -> None:
- try:
- for reader in self.readers:
- if reader is not None:
- reader.close()
- except Exception as e:
- raise IOError("Failed to close inner readers") from e
diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index 526e501b97..014d4f9da3 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -22,6 +22,7 @@ import pyarrow as pa
from pyarrow import RecordBatch
from pypaimon.read.partition_info import PartitionInfo
+from pypaimon.read.reader.format_blob_reader import FormatBlobReader
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
from pypaimon.table.special_fields import SpecialFields
@@ -29,7 +30,7 @@ from pypaimon.table.special_fields import SpecialFields
class DataFileBatchReader(RecordBatchReader):
"""
- Reads record batch from data files.
+ Reads record batch from files of different formats
"""
def __init__(self, format_reader: RecordBatchReader, index_mapping:
List[int], partition_info: PartitionInfo,
@@ -48,8 +49,11 @@ class DataFileBatchReader(RecordBatchReader):
self.max_sequence_number = max_sequence_number
self.system_fields = system_fields
- def read_arrow_batch(self) -> Optional[RecordBatch]:
- record_batch = self.format_reader.read_arrow_batch()
+ def read_arrow_batch(self, start_idx=None, end_idx=None) ->
Optional[RecordBatch]:
+ if isinstance(self.format_reader, FormatBlobReader):
+ record_batch = self.format_reader.read_arrow_batch(start_idx,
end_idx)
+ else:
+ record_batch = self.format_reader.read_arrow_batch()
if record_batch is None:
return None
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
index 5e0affe7d8..ecd740de4d 100644
--- a/paimon-python/pypaimon/read/reader/format_blob_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -63,7 +63,11 @@ class FormatBlobReader(RecordBatchReader):
self._blob_iterator = None
self._current_batch = None
- def read_arrow_batch(self) -> Optional[RecordBatch]:
+ def read_arrow_batch(self, start_idx=None, end_idx=None) ->
Optional[RecordBatch]:
+ """
+ start_idx: start index record of the blob file
+ end_idx: end index record of the blob file
+ """
if self._blob_iterator is None:
if self.returned:
return None
@@ -73,7 +77,13 @@ class FormatBlobReader(RecordBatchReader):
self.blob_offsets, self._fields[0]
)
self._blob_iterator = iter(batch_iterator)
-
+ read_size = self._batch_size
+ if start_idx is not None and end_idx is not None:
+ if self._blob_iterator.current_position >= end_idx:
+ return None
+ if self._blob_iterator.current_position < start_idx:
+ self._blob_iterator.current_position = start_idx
+ read_size = min(end_idx - self._blob_iterator.current_position,
self._batch_size)
# Collect records for this batch
pydict_data = {name: [] for name in self._fields}
records_in_batch = 0
@@ -93,7 +103,7 @@ class FormatBlobReader(RecordBatchReader):
pydict_data[field_name].append(blob_data)
records_in_batch += 1
- if records_in_batch >= self._batch_size:
+ if records_in_batch >= read_size:
break
except StopIteration:
diff --git a/paimon-python/pypaimon/read/reader/shard_batch_reader.py
b/paimon-python/pypaimon/read/reader/shard_batch_reader.py
new file mode 100644
index 0000000000..0baf63e534
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/shard_batch_reader.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from pyarrow import RecordBatch
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.read.reader.format_blob_reader import FormatBlobReader
+
+
+class ShardBatchReader(RecordBatchReader):
+ """
+ A reader that reads a subset of rows from a data file
+ """
+ def __init__(self, reader, start_row, end_row):
+ self.reader = reader
+ self.start_row = start_row
+ self.end_row = end_row
+ self.current_row = 0
+
+ def read_arrow_batch(self) -> Optional[RecordBatch]:
+ # Check if reader is FormatBlobReader (blob type)
+ if isinstance(self.reader.format_reader, FormatBlobReader):
+ # For blob reader, pass begin_idx and end_idx parameters
+ return self.reader.read_arrow_batch(start_idx=self.start_row,
end_idx=self.end_row)
+ else:
+ # For non-blob reader (DataFileBatchReader), use standard
read_arrow_batch
+ batch = self.reader.read_arrow_batch()
+
+ if batch is None:
+ return None
+
+ # Apply row range filtering for non-blob readers
+ batch_begin = self.current_row
+ self.current_row += batch.num_rows
+
+ # Check if batch is within the desired range
+ if self.start_row <= batch_begin < self.current_row <=
self.end_row: # batch is within the desired range
+ return batch
+ elif batch_begin < self.start_row < self.current_row: # batch
starts before the desired range
+ return batch.slice(self.start_row - batch_begin, self.end_row
- self.start_row)
+ elif batch_begin < self.end_row < self.current_row: # batch ends
after the desired range
+ return batch.slice(0, self.end_row - batch_begin)
+ else: # batch is outside the desired range
+ return self.read_arrow_batch()
+
+ def close(self):
+ self.reader.close()
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index a0a16c1409..96a848ce01 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -125,6 +125,10 @@ class FullStartingScanner(StartingScanner):
return self
def _append_only_filter_by_shard(self, partitioned_files: defaultdict) ->
(defaultdict, int, int):
+ """
+ Filter file entries by shard. Only keep the files within the range,
which means
+ that only the starting and ending files need to be further divided
subsequently
+ """
total_row = 0
# Sort by file creation time to ensure consistent sharding
for key, file_entries in partitioned_files.items():
@@ -240,31 +244,54 @@ class FullStartingScanner(StartingScanner):
return filtered_partitioned_files, plan_start_row, plan_end_row
def _compute_split_start_end_row(self, splits: List[Split],
plan_start_row, plan_end_row):
- file_end_row = 0 # end row position of current file in all data
+ """
+ Find files that needs to be divided for each split
+ :param splits: splits
+ :param plan_start_row: plan begin row in all splits data
+ :param plan_end_row: plan end row in all splits data
+ """
+ file_end_row = 0 # end row position of current file in all splits data
for split in splits:
- row_cnt = 0
- files = split.files
- split_start_row = file_end_row
- # Iterate through all file entries to find files that overlap with
current shard range
- for file in files:
- if self._is_blob_file(file.file_name):
- continue
- row_cnt += file.row_count
- file_begin_row = file_end_row # Starting row position of
current file in all data
- file_end_row += file.row_count # Update to row position after
current file
-
- # If shard start position is within current file, record
actual start position and relative offset
- if file_begin_row <= plan_start_row < file_end_row:
- split.split_start_row = plan_start_row - file_begin_row
-
- # If shard end position is within current file, record
relative end position
- if file_begin_row < plan_end_row <= file_end_row:
- split.split_end_row = plan_end_row - split_start_row
- if split.split_start_row is None:
- split.split_start_row = 0
- if split.split_end_row is None:
- split.split_end_row = row_cnt
+ cur_split_end_row = file_end_row
+ # Compute split_file_idx_map for data files
+ file_end_row = self._compute_split_file_idx_map(plan_start_row,
plan_end_row,
+ split,
cur_split_end_row, False)
+ # Compute split_file_idx_map for blob files
+ if self.data_evolution:
+ self._compute_split_file_idx_map(plan_start_row, plan_end_row,
+ split, cur_split_end_row,
True)
+
+ def _compute_split_file_idx_map(self, plan_start_row, plan_end_row, split:
Split,
+ file_end_row: int, is_blob: bool = False):
+ """
+ Traverse all the files in current split, find the starting shard and
ending shard files,
+ and add them to shard_file_idx_map;
+ - for data file, only two data files will be divided in all splits.
+ - for blob file, perhaps there will be some unnecessary files in
addition to two files(start and end).
+ Add them to shard_file_idx_map as well, because they need to be
removed later.
+ """
+ row_cnt = 0
+ for file in split.files:
+ if not is_blob and self._is_blob_file(file.file_name):
+ continue
+ if is_blob and not self._is_blob_file(file.file_name):
+ continue
+ row_cnt += file.row_count
+ file_begin_row = file_end_row # Starting row position of current
file in all data
+ file_end_row += file.row_count # Update to row position after
current file
+ if file_begin_row <= plan_start_row < plan_end_row <= file_end_row:
+ split.shard_file_idx_map[file.file_name] = (
+ plan_start_row - file_begin_row, plan_end_row -
file_begin_row)
+ # If shard start position is within current file, record actual
start position and relative offset
+ elif file_begin_row < plan_start_row < file_end_row:
+ split.shard_file_idx_map[file.file_name] = (plan_start_row -
file_begin_row, file.row_count)
+ # If shard end position is within current file, record relative
end position
+ elif file_begin_row < plan_end_row < file_end_row:
+ split.shard_file_idx_map[file.file_name] = (0, plan_end_row -
file_begin_row)
+ elif file_end_row <= plan_start_row or file_begin_row >=
plan_end_row:
+ split.shard_file_idx_map[file.file_name] = (-1, -1)
+ return file_end_row
def _primary_key_filter_by_shard(self, file_entries: List[ManifestEntry])
-> List[ManifestEntry]:
filtered_entries = []
@@ -440,6 +467,7 @@ class FullStartingScanner(StartingScanner):
self.target_split_size)
splits += self._build_split_from_pack(packed_files, file_entries,
False, deletion_files_map)
if self.idx_of_this_subtask is not None:
+ # When files are combined into splits, it is necessary to find
files that needs to be divided for each split
self._compute_split_start_end_row(splits, plan_start_row,
plan_end_row)
return splits
@@ -493,6 +521,7 @@ class FullStartingScanner(StartingScanner):
partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
if self.idx_of_this_subtask is not None:
+ # shard data range: [plan_start_row, plan_end_row)
partitioned_files, plan_start_row, plan_end_row =
self._data_evolution_filter_by_shard(partitioned_files)
def weight_func(file_list: List[DataFileMeta]) -> int:
diff --git a/paimon-python/pypaimon/read/split.py
b/paimon-python/pypaimon/read/split.py
index 55cb955ad6..272c145137 100644
--- a/paimon-python/pypaimon/read/split.py
+++ b/paimon-python/pypaimon/read/split.py
@@ -16,8 +16,8 @@
# limitations under the License.
################################################################################
-from dataclasses import dataclass
-from typing import List, Optional
+from dataclasses import dataclass, field
+from typing import List, Optional, Dict, Tuple
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.table.row.generic_row import GenericRow
@@ -33,8 +33,7 @@ class Split:
_file_paths: List[str]
_row_count: int
_file_size: int
- split_start_row: int = None
- split_end_row: int = None
+ shard_file_idx_map: Dict[str, Tuple[int, int]] =
field(default_factory=dict) # file_name -> (start_idx, end_idx)
raw_convertible: bool = False
data_deletion_files: Optional[List[DeletionFile]] = None
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 05bcaf994e..8a374f447d 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -30,11 +30,9 @@ from pypaimon.read.interval_partition import
IntervalPartition, SortedRun
from pypaimon.read.partition_info import PartitionInfo
from pypaimon.read.push_down_utils import trim_predicate_by_fields
from pypaimon.read.reader.concat_batch_reader import (ConcatBatchReader,
- MergeAllBatchReader,
- ShardBatchReader)
+ MergeAllBatchReader,
DataEvolutionMergeReader)
from pypaimon.read.reader.concat_record_reader import ConcatRecordReader
-from pypaimon.read.reader.data_evolution_merge_reader import \
- DataEvolutionMergeReader
+
from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
from pypaimon.read.reader.drop_delete_reader import DropDeleteRecordReader
from pypaimon.read.reader.empty_record_reader import EmptyFileRecordReader
@@ -50,6 +48,7 @@ from pypaimon.read.reader.iface.record_reader import
RecordReader
from pypaimon.read.reader.key_value_unwrap_reader import \
KeyValueUnwrapRecordReader
from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader
+from pypaimon.read.reader.shard_batch_reader import ShardBatchReader
from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
from pypaimon.read.split import Split
from pypaimon.schema.data_types import DataField
@@ -331,9 +330,25 @@ class SplitRead(ABC):
class RawFileSplitRead(SplitRead):
- def raw_reader_supplier(self, file: DataFileMeta, dv_factory:
Optional[Callable] = None) -> RecordReader:
- file_batch_reader = self.file_reader_supplier(
- file, False, self._get_final_read_data_fields(),
self.row_tracking_enabled)
+ def raw_reader_supplier(self, file: DataFileMeta, dv_factory:
Optional[Callable] = None) -> Optional[RecordReader]:
+ read_fields = self._get_final_read_data_fields()
+ # If the current file needs to be further divided for reading, use
ShardBatchReader
+ if file.file_name in self.split.shard_file_idx_map:
+ (start_row, end_row) =
self.split.shard_file_idx_map[file.file_name]
+ if (start_row, end_row) == (-1, -1):
+ return None
+ else:
+ file_batch_reader = ShardBatchReader(self.file_reader_supplier(
+ file=file,
+ for_merge_read=False,
+ read_fields=read_fields,
+ row_tracking_enabled=True), start_row, end_row)
+ else:
+ file_batch_reader = self.file_reader_supplier(
+ file=file,
+ for_merge_read=False,
+ read_fields=read_fields,
+ row_tracking_enabled=True)
dv = dv_factory() if dv_factory else None
if dv:
return
ApplyDeletionVectorReader(RowPositionReader(file_batch_reader), dv)
@@ -353,10 +368,8 @@ class RawFileSplitRead(SplitRead):
if not data_readers:
return EmptyFileRecordReader()
- if self.split.split_start_row is not None:
- concat_reader = ShardBatchReader(data_readers,
self.split.split_start_row, self.split.split_end_row)
- else:
- concat_reader = ConcatBatchReader(data_readers)
+
+ concat_reader = ConcatBatchReader(data_readers)
# if the table is appendonly table, we don't need extra filter, all
predicates has pushed down
if self.table.is_primary_key_table and self.predicate:
return FilterRecordReader(concat_reader, self.predicate)
@@ -415,7 +428,7 @@ class DataEvolutionSplitRead(SplitRead):
files = self.split.files
suppliers = []
- # Split files by row ID using the same logic as Java
DataEvolutionSplitGenerator.split
+ # Split files by row ID
split_by_row_id = self._split_by_row_id(files)
for need_merge_files in split_by_row_id:
@@ -428,10 +441,8 @@ class DataEvolutionSplitRead(SplitRead):
suppliers.append(
lambda files=need_merge_files:
self._create_union_reader(files)
)
- if self.split.split_start_row is not None:
- return ShardBatchReader(suppliers, self.split.split_start_row,
self.split.split_end_row)
- else:
- return ConcatBatchReader(suppliers)
+
+ return ConcatBatchReader(suppliers)
def _split_by_row_id(self, files: List[DataFileMeta]) ->
List[List[DataFileMeta]]:
"""Split files by firstRowId for data evolution."""
@@ -532,15 +543,15 @@ class DataEvolutionSplitRead(SplitRead):
self.read_fields = read_fields # create reader based on
read_fields
# Create reader for this bunch
if len(bunch.files()) == 1:
- file_record_readers[i] = self._create_file_reader(
+ suppliers = [lambda r=self._create_file_reader(
bunch.files()[0], read_field_names
- )
+ ): r]
+ file_record_readers[i] = MergeAllBatchReader(suppliers)
else:
# Create concatenated reader for multiple files
suppliers = [
- lambda f=file: self._create_file_reader(
- f, read_field_names
- ) for file in bunch.files()
+ partial(self._create_file_reader, file=file,
+ read_fields=read_field_names) for file in
bunch.files()
]
file_record_readers[i] = MergeAllBatchReader(suppliers)
self.read_fields = table_fields
@@ -553,13 +564,25 @@ class DataEvolutionSplitRead(SplitRead):
return DataEvolutionMergeReader(row_offsets, field_offsets,
file_record_readers)
- def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) ->
RecordReader:
+ def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) ->
Optional[RecordReader]:
"""Create a file reader for a single file."""
- return self.file_reader_supplier(
- file=file,
- for_merge_read=False,
- read_fields=read_fields,
- row_tracking_enabled=True)
+ # If the current file needs to be further divided for reading, use
ShardBatchReader
+ if file.file_name in self.split.shard_file_idx_map:
+ (begin_row, end_row) =
self.split.shard_file_idx_map[file.file_name]
+ if (begin_row, end_row) == (-1, -1):
+ return None
+ else:
+ return ShardBatchReader(self.file_reader_supplier(
+ file=file,
+ for_merge_read=False,
+ read_fields=read_fields,
+ row_tracking_enabled=True), begin_row, end_row)
+ else:
+ return self.file_reader_supplier(
+ file=file,
+ for_merge_read=False,
+ read_fields=read_fields,
+ row_tracking_enabled=True)
def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) ->
List[FieldBunch]:
"""Split files into field bunches."""
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index ce1527761f..56439cee9b 100644
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -17,12 +17,13 @@ limitations under the License.
"""
import os
import shutil
+import struct
import tempfile
import unittest
import pyarrow as pa
-from pypaimon import CatalogFactory
+from pypaimon import CatalogFactory, Schema
from pypaimon.table.file_store_table import FileStoreTable
from pypaimon.write.commit_message import CommitMessage
@@ -2134,12 +2135,22 @@ class DataBlobWriterTest(unittest.TestCase):
self.assertEqual(result.column('id').to_pylist(), list(range(1,
num_blobs + 1)))
def test_blob_write_read_large_data_with_rolling_with_shard(self):
- from pypaimon import Schema
+ """
+ Test writing and reading large blob data with file rolling and
sharding.
+
+ Test workflow:
+ - Creates a table with blob column and 10MB target file size
+ - Writes 4 batches of 40 records each (160 total records)
+ - Each record contains a 3MB blob
+ - Reads data using 3-way sharding (shard 0 - 3)
+ - Verifies blob data integrity and size
+ - Compares concatenated sharded results with full table scan
+ """
# Create schema with blob column
pa_schema = pa.schema([
('id', pa.int32()),
- ('batch_id', pa.int32()),
+ ('record_id_of_batch', pa.int32()),
('metadata', pa.string()),
('large_blob', pa.large_binary()),
])
@@ -2164,16 +2175,16 @@ class DataBlobWriterTest(unittest.TestCase):
actual_size = len(large_blob_data)
print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024
* 1024):.2f} MB)")
+ # Write 4 batches of 40 records
for i in range(4):
- # Write 40 batches of data
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
- # Write all 40 batches first
- for batch_id in range(40):
+ # Write 40 records
+ for record_id in range(40):
test_data = pa.Table.from_pydict({
- 'id': [i * 40 + batch_id + 1], # Unique ID for each row
- 'batch_id': [batch_id],
- 'metadata': [f'Large blob batch {batch_id + 1}'],
+ 'id': [i * 40 + record_id + 1], # Unique ID for each row
+ 'record_id_of_batch': [record_id],
+ 'metadata': [f'Large blob batch {record_id + 1}'],
'large_blob': [large_blob_data]
}, schema=pa_schema)
writer.write_arrow(test_data)
@@ -2190,12 +2201,12 @@ class DataBlobWriterTest(unittest.TestCase):
result = table_read.to_arrow(table_scan.plan().splits())
# Verify the data
- self.assertEqual(result.num_rows, 54, "Should have 94 rows")
+ self.assertEqual(result.num_rows, 54, "Should have 54 rows")
self.assertEqual(result.num_columns, 4, "Should have 4 columns")
# Verify blob data integrity
blob_data = result.column('large_blob').to_pylist()
- self.assertEqual(len(blob_data), 54, "Should have 94 blob records")
+ self.assertEqual(len(blob_data), 54, "Should have 54 blob records")
# Verify each blob
for i, blob in enumerate(blob_data):
self.assertEqual(len(blob), len(large_blob_data), f"Blob {i + 1}
should be {large_blob_size:,} bytes")
@@ -2211,13 +2222,93 @@ class DataBlobWriterTest(unittest.TestCase):
actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
# Verify the data
- self.assertEqual(actual.num_rows, 160, "Should have 280 rows")
+ self.assertEqual(actual.num_rows, 160, "Should have 160 rows")
self.assertEqual(actual.num_columns, 4, "Should have 4 columns")
self.assertEqual(actual.column('id').to_pylist(), list(range(1, 161)),
"ID column should match")
self.assertEqual(actual, expected)
+ def test_blob_rolling_with_shard(self):
+ """
+ - Writes 30 records
+ - Each record contains a 3MB blob
+ """
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('metadata', pa.string()),
+ ('large_blob', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob.target-file-size': '10MB'
+ }
+ )
+ self.catalog.create_table('test_db.blob_rolling_with_shard1', schema,
False)
+ table = self.catalog.get_table('test_db.blob_rolling_with_shard1')
+
+ # Create large blob data
+ large_blob_size = 3 * 1024 * 1024 #
+ blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern
+ pattern_size = len(blob_pattern)
+ repetitions = large_blob_size // pattern_size
+ large_blob_data = blob_pattern * repetitions
+
+ actual_size = len(large_blob_data)
+ print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024
* 1024):.2f} MB)")
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ # Write 30 records
+ for record_id in range(30):
+ test_data = pa.Table.from_pydict({
+ 'id': [record_id], # Unique ID for each row
+ 'metadata': [f'Large blob batch {record_id + 1}'],
+ 'large_blob': [struct.pack('<I', record_id) + large_blob_data]
+ }, schema=pa_schema)
+ writer.write_arrow(test_data)
+
+ commit_messages = writer.prepare_commit()
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ # Read data back
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan().with_shard(1, 3)
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+
+ # Verify the data
+ self.assertEqual(result.num_rows, 10, "Should have 10 rows")
+ self.assertEqual(result.num_columns, 3, "Should have 3 columns")
+
+ # Verify blob data integrity
+ blob_data = result.column('large_blob').to_pylist()
+ self.assertEqual(len(blob_data), 10, "Should have 94 blob records")
+ # Verify each blob
+ for i, blob in enumerate(blob_data):
+ self.assertEqual(len(blob), len(large_blob_data) + 4, f"Blob {i +
1} should be {large_blob_size:,} bytes")
+ splits = read_builder.new_scan().plan().splits()
+ expected = table_read.to_arrow(splits)
+ splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
+ actual1 = table_read.to_arrow(splits1)
+ splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
+ actual2 = table_read.to_arrow(splits2)
+ splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
+ actual3 = table_read.to_arrow(splits3)
+ actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
+
+ # Verify the data
+ self.assertEqual(actual.num_rows, 30, "Should have 30 rows")
+ self.assertEqual(actual.num_columns, 3, "Should have 3 columns")
+ self.assertEqual(actual, expected)
+
def test_blob_large_data_volume_with_shard(self):
- from pypaimon import Schema
# Create schema with blob column
pa_schema = pa.schema([
@@ -2284,7 +2375,6 @@ class DataBlobWriterTest(unittest.TestCase):
def test_data_blob_writer_with_shard(self):
"""Test DataBlobWriter with mixed data types in blob column."""
- from pypaimon import Schema
# Create schema with blob column
pa_schema = pa.schema([
@@ -2345,8 +2435,12 @@ class DataBlobWriterTest(unittest.TestCase):
self.assertEqual(result.num_columns, 3, "Should have 3 columns")
def test_blob_write_read_large_data_volume_rolling_with_shard(self):
- from pypaimon import Schema
-
+ """
+ - Writes 10000 records
+ - Each record contains a 5KB blob
+ - 'blob.target-file-size': '10MB'
+ - each blob file contains 2000 records
+ """
# Create schema with blob column
pa_schema = pa.schema([
('id', pa.int32()),
@@ -2375,7 +2469,7 @@ class DataBlobWriterTest(unittest.TestCase):
actual_size = len(large_blob_data)
print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024
* 1024):.2f} MB)")
- # Write 40 batches of data
+ # Write 10000 records of data
num_row = 10000
expected = pa.Table.from_pydict({
'id': [1] * num_row,
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 896d534f09..38034078e7 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -112,7 +112,7 @@ class DataEvolutionTest(unittest.TestCase):
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
- # append:set first_row_id = 100 to modify the row with columns write
+ # append:write (2, "x") and ("y"), set first_row_id = 100
write0 = write_builder.new_write().with_write_type(['f0', 'f1'])
write1 = write_builder.new_write().with_write_type(['f2'])
commit = write_builder.new_commit()