Re: C++ IPC Array length did not match record batch length (5)
Thanks for the pointer. You are right, the "arrowBatch" was not valid. I was not using the correct number of rows in the "Make" call. Best, Rares On Sun, Jun 14, 2020 at 12:16 PM Wes McKinney wrote: > It sounds like "arrowBatch" is invalid. What happens when you run > ARROW_RETURN_NOT_OK(arrowBatch->Validate())? > > On Sun, Jun 14, 2020 at 2:09 PM Rares Vernica wrote: > > > > Hello, > > > > I'm porting a C++ program from Arrow 0.9.0 to 0.16.0. The *sender* uses > > BufferOutputStream and RecordBatchWriter to serialize a set of Arrow > > arrays. The *receiver* uses BufferReader and RecordBatchReader to > > deserialize them. I get the runtime error *Array length did not match > > record batch length (5) *when I do ReadNext on the RecordBatchReader. I > > wonder what am I missing? > > > > To give more details, the *sender* does: > > > > std::shared_ptr arrowBatch; > > arrowBatch = arrow::RecordBatch::Make(_arrowSchema, nCells, > > _arrowArrays); > > > > std::shared_ptr arrowStream; > > ARROW_ASSIGN_OR_RAISE( > > arrowStream, > > arrow::io::BufferOutputStream::Create(bytesCount * 2, > > _arrowPool)); > > > > std::shared_ptr arrowWriter; > > ARROW_RETURN_NOT_OK( > > arrow::ipc::RecordBatchStreamWriter::Open( > > &*arrowStream, _arrowSchema, &arrowWriter)); > > ARROW_RETURN_NOT_OK(arrowWriter->WriteRecordBatch(*arrowBatch)); > > ARROW_RETURN_NOT_OK(arrowWriter->Close()); > > > > std::shared_ptr arrowBuffer; > > ARROW_ASSIGN_OR_RAISE(arrowBuffer, arrowStream->Finish()); > > > > // Copy data to *receiver* > > builder.addData(reinterpret_cast char*>(arrowBuffer->data()), > > arrowBuffer->size()); > > > > The *receiver* does: > > > > std::shared_ptr arrowWriter; > > std::shared_ptr arrowBatch; > > std::shared_ptr arrowReader; > > > > std::shared_ptr arrowSchema = > attributes2ArrowSchema( > > inputSchema, settings.isAttsOnly()); > > > > ARROW_RETURN_NOT_OK( > > arrow::ipc::RecordBatchStreamWriter::Open( > > arrowStream.get(), arrowSchema, &arrowWriter)); > > > > // Get data and size from *sender* > > uint32_t* sizePointer = (uint32_t*) > > (((char*)chunk.getConstData()) + > > > > AioSaveSettings::chunkSizeOffset()); > > uint32_t size = *sizePointer; > > char* data = ((char*)chunk.getConstData() + > > AioSaveSettings::chunkDataOffset()); > > > > arrow::io::BufferReader arrowBufferReader( > > reinterpret_cast(data), size); > > > > ARROW_RETURN_NOT_OK( > > arrow::ipc::RecordBatchStreamReader::Open( > > &arrowBufferReader, &arrowReader)); > > ARROW_RETURN_NOT_OK(arrowReader->ReadNext(&arrowBatch)); > > > > The data transfer and size interpretation is the same as in the older > > working version. The data buffer size from the *sender* matches the data > > buffer size on the *receiver*. > > > > Thanks! > > Rares >
Re: C++ IPC Array length did not match record batch length (5)
It sounds like "arrowBatch" is invalid. What happens when you run ARROW_RETURN_NOT_OK(arrowBatch->Validate())? On Sun, Jun 14, 2020 at 2:09 PM Rares Vernica wrote: > > Hello, > > I'm porting a C++ program from Arrow 0.9.0 to 0.16.0. The *sender* uses > BufferOutputStream and RecordBatchWriter to serialize a set of Arrow > arrays. The *receiver* uses BufferReader and RecordBatchReader to > deserialize them. I get the runtime error *Array length did not match > record batch length (5) *when I do ReadNext on the RecordBatchReader. I > wonder what am I missing? > > To give more details, the *sender* does: > > std::shared_ptr arrowBatch; > arrowBatch = arrow::RecordBatch::Make(_arrowSchema, nCells, > _arrowArrays); > > std::shared_ptr arrowStream; > ARROW_ASSIGN_OR_RAISE( > arrowStream, > arrow::io::BufferOutputStream::Create(bytesCount * 2, > _arrowPool)); > > std::shared_ptr arrowWriter; > ARROW_RETURN_NOT_OK( > arrow::ipc::RecordBatchStreamWriter::Open( > &*arrowStream, _arrowSchema, &arrowWriter)); > ARROW_RETURN_NOT_OK(arrowWriter->WriteRecordBatch(*arrowBatch)); > ARROW_RETURN_NOT_OK(arrowWriter->Close()); > > std::shared_ptr arrowBuffer; > ARROW_ASSIGN_OR_RAISE(arrowBuffer, arrowStream->Finish()); > > // Copy data to *receiver* > builder.addData(reinterpret_cast(arrowBuffer->data()), > arrowBuffer->size()); > > The *receiver* does: > > std::shared_ptr arrowWriter; > std::shared_ptr arrowBatch; > std::shared_ptr arrowReader; > > std::shared_ptr arrowSchema = attributes2ArrowSchema( > inputSchema, settings.isAttsOnly()); > > ARROW_RETURN_NOT_OK( > arrow::ipc::RecordBatchStreamWriter::Open( > arrowStream.get(), arrowSchema, &arrowWriter)); > > // Get data and size from *sender* > uint32_t* sizePointer = (uint32_t*) > (((char*)chunk.getConstData()) + > > AioSaveSettings::chunkSizeOffset()); > uint32_t size = *sizePointer; > char* data = ((char*)chunk.getConstData() + > AioSaveSettings::chunkDataOffset()); > > arrow::io::BufferReader arrowBufferReader( > reinterpret_cast(data), size); > > ARROW_RETURN_NOT_OK( > arrow::ipc::RecordBatchStreamReader::Open( > &arrowBufferReader, &arrowReader)); > ARROW_RETURN_NOT_OK(arrowReader->ReadNext(&arrowBatch)); > > The data transfer and size interpretation is the same as in the older > working version. The data buffer size from the *sender* matches the data > buffer size on the *receiver*. > > Thanks! > Rares
C++ IPC Array length did not match record batch length (5)
Hello, I'm porting a C++ program from Arrow 0.9.0 to 0.16.0. The *sender* uses BufferOutputStream and RecordBatchWriter to serialize a set of Arrow arrays. The *receiver* uses BufferReader and RecordBatchReader to deserialize them. I get the runtime error *Array length did not match record batch length (5) *when I do ReadNext on the RecordBatchReader. I wonder what am I missing? To give more details, the *sender* does: std::shared_ptr arrowBatch; arrowBatch = arrow::RecordBatch::Make(_arrowSchema, nCells, _arrowArrays); std::shared_ptr arrowStream; ARROW_ASSIGN_OR_RAISE( arrowStream, arrow::io::BufferOutputStream::Create(bytesCount * 2, _arrowPool)); std::shared_ptr arrowWriter; ARROW_RETURN_NOT_OK( arrow::ipc::RecordBatchStreamWriter::Open( &*arrowStream, _arrowSchema, &arrowWriter)); ARROW_RETURN_NOT_OK(arrowWriter->WriteRecordBatch(*arrowBatch)); ARROW_RETURN_NOT_OK(arrowWriter->Close()); std::shared_ptr arrowBuffer; ARROW_ASSIGN_OR_RAISE(arrowBuffer, arrowStream->Finish()); // Copy data to *receiver* builder.addData(reinterpret_cast(arrowBuffer->data()), arrowBuffer->size()); The *receiver* does: std::shared_ptr arrowWriter; std::shared_ptr arrowBatch; std::shared_ptr arrowReader; std::shared_ptr arrowSchema = attributes2ArrowSchema( inputSchema, settings.isAttsOnly()); ARROW_RETURN_NOT_OK( arrow::ipc::RecordBatchStreamWriter::Open( arrowStream.get(), arrowSchema, &arrowWriter)); // Get data and size from *sender* uint32_t* sizePointer = (uint32_t*) (((char*)chunk.getConstData()) + AioSaveSettings::chunkSizeOffset()); uint32_t size = *sizePointer; char* data = ((char*)chunk.getConstData() + AioSaveSettings::chunkDataOffset()); arrow::io::BufferReader arrowBufferReader( reinterpret_cast(data), size); ARROW_RETURN_NOT_OK( arrow::ipc::RecordBatchStreamReader::Open( &arrowBufferReader, &arrowReader)); ARROW_RETURN_NOT_OK(arrowReader->ReadNext(&arrowBatch)); The data transfer and size interpretation is the same as in the older working version. The data buffer size from the *sender* matches the data buffer size on the *receiver*. Thanks! Rares