Hello,

I'm porting a C++ program from Arrow 0.9.0 to 0.16.0. The *sender* uses
BufferOutputStream and RecordBatchWriter to serialize a set of Arrow
arrays. The *receiver* uses BufferReader and RecordBatchReader to
deserialize them. I get the runtime error *Array length did not match
record batch length (5) *when I do ReadNext on the RecordBatchReader. I
wonder what am I missing?

To give more details, the *sender* does:

        std::shared_ptr<arrow::RecordBatch> arrowBatch;
        arrowBatch = arrow::RecordBatch::Make(_arrowSchema, nCells,
_arrowArrays);

        std::shared_ptr<arrow::io::BufferOutputStream> arrowStream;
        ARROW_ASSIGN_OR_RAISE(
            arrowStream,
            arrow::io::BufferOutputStream::Create(bytesCount * 2,
_arrowPool));

        std::shared_ptr<arrow::ipc::RecordBatchWriter> arrowWriter;
        ARROW_RETURN_NOT_OK(
            arrow::ipc::RecordBatchStreamWriter::Open(
                &*arrowStream, _arrowSchema, &arrowWriter));
        ARROW_RETURN_NOT_OK(arrowWriter->WriteRecordBatch(*arrowBatch));
        ARROW_RETURN_NOT_OK(arrowWriter->Close());

        std::shared_ptr<arrow::Buffer> arrowBuffer;
        ARROW_ASSIGN_OR_RAISE(arrowBuffer, arrowStream->Finish());

        // Copy data to *receiver*
        builder.addData(reinterpret_cast<const char*>(arrowBuffer->data()),
                        arrowBuffer->size());

The *receiver* does:

    std::shared_ptr<arrow::ipc::RecordBatchWriter> arrowWriter;
    std::shared_ptr<arrow::RecordBatch> arrowBatch;
    std::shared_ptr<arrow::RecordBatchReader> arrowReader;

        std::shared_ptr<arrow::Schema> arrowSchema = attributes2ArrowSchema(
            inputSchema, settings.isAttsOnly());

        ARROW_RETURN_NOT_OK(
            arrow::ipc::RecordBatchStreamWriter::Open(
                arrowStream.get(), arrowSchema, &arrowWriter));

            // Get data and size from *sender*
            uint32_t* sizePointer = (uint32_t*)
(((char*)chunk.getConstData()) +

 AioSaveSettings::chunkSizeOffset());
            uint32_t size = *sizePointer;
            char* data = ((char*)chunk.getConstData() +
AioSaveSettings::chunkDataOffset());

            arrow::io::BufferReader arrowBufferReader(
                reinterpret_cast<const uint8_t*>(data), size);

            ARROW_RETURN_NOT_OK(
                arrow::ipc::RecordBatchStreamReader::Open(
                    &arrowBufferReader, &arrowReader));
            ARROW_RETURN_NOT_OK(arrowReader->ReadNext(&arrowBatch));

The data transfer and size interpretation is the same as in the older
working version. The data buffer size from the *sender* matches the data
buffer size on the *receiver*.

Thanks!
Rares

Reply via email to