JingsongLi commented on code in PR #7174:
URL: https://github.com/apache/paimon/pull/7174#discussion_r2753259597
##########
paimon-python/pypaimon/read/reader/concat_batch_reader.py:
##########
@@ -141,7 +141,13 @@ class DataEvolutionMergeReader(RecordBatchReader):
- The sixth field comes from batch1, and it is at offset 0 in batch1.
"""
- def __init__(self, row_offsets: List[int], field_offsets: List[int],
readers: List[Optional[RecordBatchReader]]):
+ def __init__(
+ self,
+ row_offsets: List[int],
+ field_offsets: List[int],
+ readers: List[Optional[RecordBatchReader]],
+ schema: Optional[pa.Schema] = None,
Review Comment:
Not Optional?
##########
paimon-python/pypaimon/read/reader/concat_batch_reader.py:
##########
@@ -141,7 +141,13 @@ class DataEvolutionMergeReader(RecordBatchReader):
- The sixth field comes from batch1, and it is at offset 0 in batch1.
"""
- def __init__(self, row_offsets: List[int], field_offsets: List[int],
readers: List[Optional[RecordBatchReader]]):
+ def __init__(
+ self,
+ row_offsets: List[int],
+ field_offsets: List[int],
+ readers: List[Optional[RecordBatchReader]],
+ schema: Optional[pa.Schema] = None,
Review Comment:
Also modify `KeyValueDataWriter`?
##########
paimon-python/pypaimon/read/reader/concat_batch_reader.py:
##########
@@ -176,6 +183,8 @@ def read_arrow_batch(self) -> Optional[RecordBatch]:
columns.append(column)
names.append(batches[batch_index].schema.names[field_index])
if columns:
+ if self.schema is not None and len(columns) == len(self.schema):
Review Comment:
schema should be not none.
##########
paimon-python/pypaimon/read/reader/data_file_batch_reader.py:
##########
@@ -135,7 +135,12 @@ def _assign_row_tracking(self, record_batch: RecordBatch)
-> RecordBatch:
# Create a new array that fills with max_sequence_number
arrays[idx] = pa.repeat(self.max_sequence_number,
record_batch.num_rows)
- return pa.RecordBatch.from_arrays(arrays,
names=record_batch.schema.names)
+ names = record_batch.schema.names
+ fields = [
+ pa.field(name, arrays[i].type,
nullable=record_batch.schema.field(name).nullable)
+ for i, name in enumerate(names)
+ ]
+ return pa.RecordBatch.from_arrays(arrays, schema=pa.schema(fields))
Review Comment:
Can you use `append_column`?
##########
paimon-python/pypaimon/read/reader/concat_batch_reader.py:
##########
@@ -141,7 +141,13 @@ class DataEvolutionMergeReader(RecordBatchReader):
- The sixth field comes from batch1, and it is at offset 0 in batch1.
"""
- def __init__(self, row_offsets: List[int], field_offsets: List[int],
readers: List[Optional[RecordBatchReader]]):
+ def __init__(
+ self,
+ row_offsets: List[int],
+ field_offsets: List[int],
+ readers: List[Optional[RecordBatchReader]],
+ schema: Optional[pa.Schema] = None,
Review Comment:
Also modify `MergeAllBatchReader`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]