JingsongLi commented on code in PR #7157:
URL: https://github.com/apache/paimon/pull/7157#discussion_r2866885270
##########
paimon-python/pypaimon/read/reader/concat_batch_reader.py:
##########
@@ -206,9 +208,17 @@ def read_arrow_batch(self) -> Optional[RecordBatch]:
columns = []
for i in range(len(self.row_offsets)):
batch_index = self.row_offsets[i]
- field_index = self.field_offsets[i]
+ field_name = self.schema.field(i).name
+
if batch_index >= 0 and batches[batch_index] is not None:
-
columns.append(batches[batch_index].column(field_index).slice(0, min_rows))
+ src_batch = batches[batch_index]
+ if field_name in src_batch.schema.names:
+ column = src_batch.column(
+ src_batch.schema.get_field_index(field_name)
+ ).slice(0, min_rows)
+ columns.append(column)
+ else:
+ columns.append(pa.nulls(min_rows,
type=self.schema.field(i).type))
Review Comment:
This work should be done in `DataFileBatchReader`?
##########
docs/content/pypaimon/data-evolution.md:
##########
@@ -204,3 +204,8 @@ commit.close()
- **Row order matters**: the batches you write must have the **same number of
rows** as the batches you read, in the
same order for that shard.
- **Parallelism**: run multiple shards by calling
`new_shard_updator(shard_idx, num_shards)` for each shard.
+
+## Read After Partial Shard Update
Review Comment:
I feel like this document doesn't make much sense
##########
paimon-python/pypaimon/read/reader/data_file_batch_reader.py:
##########
@@ -93,32 +130,127 @@ def read_arrow_batch(self, start_idx=None, end_idx=None)
-> Optional[RecordBatch
inter_names.append(partition_field.name)
else:
real_index = self.partition_info.get_real_index(i)
- if real_index < record_batch.num_columns:
+ name = (
+ self.requested_field_names[i]
+ if self.requested_field_names and i <
len(self.requested_field_names)
+ else f"_col_{i}"
+ )
+ batch_names = record_batch.schema.names
+ col_idx = None
+ if name in batch_names:
+ col_idx = record_batch.schema.get_field_index(name)
+ elif name.startswith(_KEY_PREFIX) and
name[len(_KEY_PREFIX):] in batch_names:
+ col_idx =
record_batch.schema.get_field_index(name[len(_KEY_PREFIX):])
+ if col_idx is not None:
+ inter_arrays.append(record_batch.column(col_idx))
+ inter_names.append(name)
+ elif real_index < record_batch.num_columns:
inter_arrays.append(record_batch.column(real_index))
-
inter_names.append(record_batch.schema.field(real_index).name)
+ inter_names.append(name)
+ else:
+ field = self.schema_map.get(name)
+ inter_arrays.append(
+ pa.nulls(num_rows, type=field.type) if field is
not None else pa.nulls(num_rows)
Review Comment:
I don't get it, `FormatPyArrowReader` have already handled `read_fields`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]