pitrou commented on code in PR #40070: URL: https://github.com/apache/arrow/pull/40070#discussion_r1490614522
########## python/pyarrow/src/arrow/python/ipc.cc: ########## @@ -63,5 +64,59 @@ Result<std::shared_ptr<RecordBatchReader>> PyRecordBatchReader::Make( return reader; } +CastingRecordBatchReader::CastingRecordBatchReader() {} + +Status CastingRecordBatchReader::Init(std::shared_ptr<RecordBatchReader> parent, + std::shared_ptr<Schema> schema) { + std::shared_ptr<Schema> src = parent->schema(); + + // Check for conformable number of columns + int num_fields = schema->num_fields(); + if (src->num_fields() != num_fields) { + return Status::Invalid("Source has ", src->num_fields(), " but requested schema has ", + num_fields); + } + + // Try to cast an empty version of all the columns before succceeding + compute::CastOptions options; + for (int i = 0; i < num_fields; i++) { + ARROW_ASSIGN_OR_RAISE(auto empty_array, MakeEmptyArray(src->field(i)->type())); Review Comment: Instead, you can probably call `CanCast` on the pairs of types? ########## python/pyarrow/src/arrow/python/ipc.cc: ########## @@ -19,6 +19,7 @@ #include <memory> +#include "arrow/compute/api.h" Review Comment: Nit: `arrow/compute/cast.h` is probably sufficient and will pull less headers. ########## python/pyarrow/src/arrow/python/ipc.cc: ########## @@ -63,5 +64,59 @@ Result<std::shared_ptr<RecordBatchReader>> PyRecordBatchReader::Make( return reader; } +CastingRecordBatchReader::CastingRecordBatchReader() {} + +Status CastingRecordBatchReader::Init(std::shared_ptr<RecordBatchReader> parent, + std::shared_ptr<Schema> schema) { + std::shared_ptr<Schema> src = parent->schema(); + + // Check for conformable number of columns + int num_fields = schema->num_fields(); + if (src->num_fields() != num_fields) { + return Status::Invalid("Source has ", src->num_fields(), " but requested schema has ", + num_fields); + } + + // Try to cast an empty version of all the columns before succceeding + compute::CastOptions options; + for (int i = 0; i < num_fields; i++) { + ARROW_ASSIGN_OR_RAISE(auto empty_array, MakeEmptyArray(src->field(i)->type())); + options.to_type = schema->field(i)->type(); + ARROW_ASSIGN_OR_RAISE(auto emtpy_array_dst, compute::Cast(empty_array, options)); + } + + parent_ = std::move(parent); + schema_ = std::move(schema); + + return Status::OK(); +} + +std::shared_ptr<Schema> CastingRecordBatchReader::schema() const { return schema_; } + +Status CastingRecordBatchReader::ReadNext(std::shared_ptr<RecordBatch>* batch) { + std::shared_ptr<RecordBatch> out; + ARROW_RETURN_NOT_OK(parent_->ReadNext(&out)); + if (!out) { + return Status::OK(); + } + + auto num_columns = out->num_columns(); + ArrayVector columns(num_columns); + for (int i = 0; i < num_columns; i++) { + ARROW_ASSIGN_OR_RAISE(columns[i], + compute::Cast(*out->column(i), schema_->field(i)->type())); Review Comment: Do we want to check for nulls if the destination fields is non-nullable? ########## python/pyarrow/tests/test_cffi.py: ########## @@ -591,6 +590,24 @@ def test_roundtrip_reader_capsule(constructor): assert batch.equals(expected) +def test_roundtrip_batch_reader_capsule_requested_schema(): + batch = make_batch() + requested_schema = pa.schema([('ints', pa.list_(pa.int64()))]) + requested_capsule = requested_schema.__arrow_c_schema__() + # RecordBatch has no cast() method Review Comment: Do we have a GH issue open for this? ########## python/pyarrow/ipc.pxi: ########## @@ -772,6 +772,28 @@ cdef class RecordBatchReader(_Weakrefable): def __exit__(self, exc_type, exc_val, exc_tb): self.close() + def cast(self, schema): + """ + Wraps this reader with one that casts each batch lazily as it is pulled. Review Comment: Nit: infinitive ```suggestion Wrap this reader with one that casts each batch lazily as it is pulled. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org