Thanks for the pointer. You are right, the "arrowBatch" was not valid. I was not using the correct number of rows in the "Make" call.
Best, Rares On Sun, Jun 14, 2020 at 12:16 PM Wes McKinney <wesmck...@gmail.com> wrote: > It sounds like "arrowBatch" is invalid. What happens when you run > ARROW_RETURN_NOT_OK(arrowBatch->Validate())? > > On Sun, Jun 14, 2020 at 2:09 PM Rares Vernica <rvern...@gmail.com> wrote: > > > > Hello, > > > > I'm porting a C++ program from Arrow 0.9.0 to 0.16.0. The *sender* uses > > BufferOutputStream and RecordBatchWriter to serialize a set of Arrow > > arrays. The *receiver* uses BufferReader and RecordBatchReader to > > deserialize them. I get the runtime error *Array length did not match > > record batch length (5) *when I do ReadNext on the RecordBatchReader. I > > wonder what am I missing? > > > > To give more details, the *sender* does: > > > > std::shared_ptr<arrow::RecordBatch> arrowBatch; > > arrowBatch = arrow::RecordBatch::Make(_arrowSchema, nCells, > > _arrowArrays); > > > > std::shared_ptr<arrow::io::BufferOutputStream> arrowStream; > > ARROW_ASSIGN_OR_RAISE( > > arrowStream, > > arrow::io::BufferOutputStream::Create(bytesCount * 2, > > _arrowPool)); > > > > std::shared_ptr<arrow::ipc::RecordBatchWriter> arrowWriter; > > ARROW_RETURN_NOT_OK( > > arrow::ipc::RecordBatchStreamWriter::Open( > > &*arrowStream, _arrowSchema, &arrowWriter)); > > ARROW_RETURN_NOT_OK(arrowWriter->WriteRecordBatch(*arrowBatch)); > > ARROW_RETURN_NOT_OK(arrowWriter->Close()); > > > > std::shared_ptr<arrow::Buffer> arrowBuffer; > > ARROW_ASSIGN_OR_RAISE(arrowBuffer, arrowStream->Finish()); > > > > // Copy data to *receiver* > > builder.addData(reinterpret_cast<const > char*>(arrowBuffer->data()), > > arrowBuffer->size()); > > > > The *receiver* does: > > > > std::shared_ptr<arrow::ipc::RecordBatchWriter> arrowWriter; > > std::shared_ptr<arrow::RecordBatch> arrowBatch; > > std::shared_ptr<arrow::RecordBatchReader> arrowReader; > > > > std::shared_ptr<arrow::Schema> arrowSchema = > attributes2ArrowSchema( > > inputSchema, settings.isAttsOnly()); > > > > ARROW_RETURN_NOT_OK( > > arrow::ipc::RecordBatchStreamWriter::Open( > > arrowStream.get(), arrowSchema, &arrowWriter)); > > > > // Get data and size from *sender* > > uint32_t* sizePointer = (uint32_t*) > > (((char*)chunk.getConstData()) + > > > > AioSaveSettings::chunkSizeOffset()); > > uint32_t size = *sizePointer; > > char* data = ((char*)chunk.getConstData() + > > AioSaveSettings::chunkDataOffset()); > > > > arrow::io::BufferReader arrowBufferReader( > > reinterpret_cast<const uint8_t*>(data), size); > > > > ARROW_RETURN_NOT_OK( > > arrow::ipc::RecordBatchStreamReader::Open( > > &arrowBufferReader, &arrowReader)); > > ARROW_RETURN_NOT_OK(arrowReader->ReadNext(&arrowBatch)); > > > > The data transfer and size interpretation is the same as in the older > > working version. The data buffer size from the *sender* matches the data > > buffer size on the *receiver*. > > > > Thanks! > > Rares >