Thanks for the pointer. You are right, the "arrowBatch" was not valid. I
was not using the correct number of rows in the "Make" call.

Best,
Rares

On Sun, Jun 14, 2020 at 12:16 PM Wes McKinney <wesmck...@gmail.com> wrote:

> It sounds like "arrowBatch" is invalid. What happens when you run
> ARROW_RETURN_NOT_OK(arrowBatch->Validate())?
>
> On Sun, Jun 14, 2020 at 2:09 PM Rares Vernica <rvern...@gmail.com> wrote:
> >
> > Hello,
> >
> > I'm porting a C++ program from Arrow 0.9.0 to 0.16.0. The *sender* uses
> > BufferOutputStream and RecordBatchWriter to serialize a set of Arrow
> > arrays. The *receiver* uses BufferReader and RecordBatchReader to
> > deserialize them. I get the runtime error *Array length did not match
> > record batch length (5) *when I do ReadNext on the RecordBatchReader. I
> > wonder what am I missing?
> >
> > To give more details, the *sender* does:
> >
> >         std::shared_ptr<arrow::RecordBatch> arrowBatch;
> >         arrowBatch = arrow::RecordBatch::Make(_arrowSchema, nCells,
> > _arrowArrays);
> >
> >         std::shared_ptr<arrow::io::BufferOutputStream> arrowStream;
> >         ARROW_ASSIGN_OR_RAISE(
> >             arrowStream,
> >             arrow::io::BufferOutputStream::Create(bytesCount * 2,
> > _arrowPool));
> >
> >         std::shared_ptr<arrow::ipc::RecordBatchWriter> arrowWriter;
> >         ARROW_RETURN_NOT_OK(
> >             arrow::ipc::RecordBatchStreamWriter::Open(
> >                 &*arrowStream, _arrowSchema, &arrowWriter));
> >         ARROW_RETURN_NOT_OK(arrowWriter->WriteRecordBatch(*arrowBatch));
> >         ARROW_RETURN_NOT_OK(arrowWriter->Close());
> >
> >         std::shared_ptr<arrow::Buffer> arrowBuffer;
> >         ARROW_ASSIGN_OR_RAISE(arrowBuffer, arrowStream->Finish());
> >
> >         // Copy data to *receiver*
> >         builder.addData(reinterpret_cast<const
> char*>(arrowBuffer->data()),
> >                         arrowBuffer->size());
> >
> > The *receiver* does:
> >
> >     std::shared_ptr<arrow::ipc::RecordBatchWriter> arrowWriter;
> >     std::shared_ptr<arrow::RecordBatch> arrowBatch;
> >     std::shared_ptr<arrow::RecordBatchReader> arrowReader;
> >
> >         std::shared_ptr<arrow::Schema> arrowSchema =
> attributes2ArrowSchema(
> >             inputSchema, settings.isAttsOnly());
> >
> >         ARROW_RETURN_NOT_OK(
> >             arrow::ipc::RecordBatchStreamWriter::Open(
> >                 arrowStream.get(), arrowSchema, &arrowWriter));
> >
> >             // Get data and size from *sender*
> >             uint32_t* sizePointer = (uint32_t*)
> > (((char*)chunk.getConstData()) +
> >
> >  AioSaveSettings::chunkSizeOffset());
> >             uint32_t size = *sizePointer;
> >             char* data = ((char*)chunk.getConstData() +
> > AioSaveSettings::chunkDataOffset());
> >
> >             arrow::io::BufferReader arrowBufferReader(
> >                 reinterpret_cast<const uint8_t*>(data), size);
> >
> >             ARROW_RETURN_NOT_OK(
> >                 arrow::ipc::RecordBatchStreamReader::Open(
> >                     &arrowBufferReader, &arrowReader));
> >             ARROW_RETURN_NOT_OK(arrowReader->ReadNext(&arrowBatch));
> >
> > The data transfer and size interpretation is the same as in the older
> > working version. The data buffer size from the *sender* matches the data
> > buffer size on the *receiver*.
> >
> > Thanks!
> > Rares
>

Reply via email to