Thanks a lot for the reply! The conversion works as you wrote it. I am still unsure, how to send the IPC format written buffer. I tried getting a pointer to the data and length, so that i can simply send the data over the network, but the buffer created from the data pointer and length is not the same.

    gboolean test = garrow_record_batch_writer_write_record_batch(
            GARROW_RECORD_BATCH_WRITER(sw), rb, &error);

    GBytes *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
    gint64 length = garrow_buffer_get_size(GARROW_BUFFER(buffer));

    g_object_unref(sw);

    // Receiving side
    GArrowBuffer *receivingBuffer = garrow_buffer_new(data, length);
    GArrowBufferInputStream *inputStream = garrow_buffer_input_stream_new(GARROW_BUFFER(receivingBuffer));


On 06.07.22 09:35, Sutou Kouhei wrote:
Hi,

You need to use GArrowRecordBatchStreamWriter instead of
garrow_output_stream_write_record_batch() to read by
GArrowRecordBatchStreamReader.

GArrowRecordBatchStreamWriter and
GArrowRecordBatchStreamReader assume
IPC Streaming Format
https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
but garrow_output_stream_write_record_batch() just writes a
RecordBatch message
https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message
.

void testRecordbatchStream(GArrowRecordBatch *rb){
   GError *error = NULL;

   // Write Recordbatch
   GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300, &error);
   GArrowBufferOutputStream *bufferStream =
     garrow_buffer_output_stream_new(buffer);
   GArrowSchema *schema = garrow_record_batch_get_schema(rb);
   GArrowRecordBatchStreamWriter *writer =
     garrow_record_batch_stream_writer_new(bufferStream, schema, error);
   g_object_unref(schema);
   g_object_unref(bufferStream);
   garrow_record_batch_writer_write_record_batch(
     GARROW_RECORD_BATCH_WRITER(writer), rb, error);
   g_object_unref(writer);

   // Read RecordBatch from buffer
   GArrowBufferInputStream *inputStream =
     garrow_buffer_input_stream_new(buffer);
   GArrowRecordBatchStreamReader *sr =
     garrow_record_batch_stream_reader_new(
       GARROW_INPUT_STREAM(inputStream), &error);
   g_object_unref(inputStream);
   GArrowRecordBatch *rb2 = garrow_record_batch_reader_read_next(sr, &error);
   printf("Received RB: \n%s\n", garrow_record_batch_to_string(rb2, &error));
   g_object_urnef(rb2);
   g_object_unref(sr);

   g_object_unref(buffer);
}

Your code misses g_object_unref()s. You need to call
g_object_unref() when an object is no longer needed. If you
forget to call g_object_unref(), your program causes a
memory leak.


Thanks,

Reply via email to