It sounds like "arrowBatch" is invalid. What happens when you run ARROW_RETURN_NOT_OK(arrowBatch->Validate())?
On Sun, Jun 14, 2020 at 2:09 PM Rares Vernica <rvern...@gmail.com> wrote: > > Hello, > > I'm porting a C++ program from Arrow 0.9.0 to 0.16.0. The *sender* uses > BufferOutputStream and RecordBatchWriter to serialize a set of Arrow > arrays. The *receiver* uses BufferReader and RecordBatchReader to > deserialize them. I get the runtime error *Array length did not match > record batch length (5) *when I do ReadNext on the RecordBatchReader. I > wonder what am I missing? > > To give more details, the *sender* does: > > std::shared_ptr<arrow::RecordBatch> arrowBatch; > arrowBatch = arrow::RecordBatch::Make(_arrowSchema, nCells, > _arrowArrays); > > std::shared_ptr<arrow::io::BufferOutputStream> arrowStream; > ARROW_ASSIGN_OR_RAISE( > arrowStream, > arrow::io::BufferOutputStream::Create(bytesCount * 2, > _arrowPool)); > > std::shared_ptr<arrow::ipc::RecordBatchWriter> arrowWriter; > ARROW_RETURN_NOT_OK( > arrow::ipc::RecordBatchStreamWriter::Open( > &*arrowStream, _arrowSchema, &arrowWriter)); > ARROW_RETURN_NOT_OK(arrowWriter->WriteRecordBatch(*arrowBatch)); > ARROW_RETURN_NOT_OK(arrowWriter->Close()); > > std::shared_ptr<arrow::Buffer> arrowBuffer; > ARROW_ASSIGN_OR_RAISE(arrowBuffer, arrowStream->Finish()); > > // Copy data to *receiver* > builder.addData(reinterpret_cast<const char*>(arrowBuffer->data()), > arrowBuffer->size()); > > The *receiver* does: > > std::shared_ptr<arrow::ipc::RecordBatchWriter> arrowWriter; > std::shared_ptr<arrow::RecordBatch> arrowBatch; > std::shared_ptr<arrow::RecordBatchReader> arrowReader; > > std::shared_ptr<arrow::Schema> arrowSchema = attributes2ArrowSchema( > inputSchema, settings.isAttsOnly()); > > ARROW_RETURN_NOT_OK( > arrow::ipc::RecordBatchStreamWriter::Open( > arrowStream.get(), arrowSchema, &arrowWriter)); > > // Get data and size from *sender* > uint32_t* sizePointer = (uint32_t*) > (((char*)chunk.getConstData()) + > > AioSaveSettings::chunkSizeOffset()); > uint32_t size = *sizePointer; > char* data = ((char*)chunk.getConstData() + > AioSaveSettings::chunkDataOffset()); > > arrow::io::BufferReader arrowBufferReader( > reinterpret_cast<const uint8_t*>(data), size); > > ARROW_RETURN_NOT_OK( > arrow::ipc::RecordBatchStreamReader::Open( > &arrowBufferReader, &arrowReader)); > ARROW_RETURN_NOT_OK(arrowReader->ReadNext(&arrowBatch)); > > The data transfer and size interpretation is the same as in the older > working version. The data buffer size from the *sender* matches the data > buffer size on the *receiver*. > > Thanks! > Rares