Hi,

You need to use GArrowRecordBatchStreamWriter instead of
garrow_output_stream_write_record_batch() to read by
GArrowRecordBatchStreamReader.

GArrowRecordBatchStreamWriter and
GArrowRecordBatchStreamReader assume
IPC Streaming Format
https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
but garrow_output_stream_write_record_batch() just writes a
RecordBatch message
https://arrow.apache.org/docs/format/Columnar.html#recordbatch-message
.

void testRecordbatchStream(GArrowRecordBatch *rb){
  GError *error = NULL;

  // Write Recordbatch
  GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300, &error);
  GArrowBufferOutputStream *bufferStream =
    garrow_buffer_output_stream_new(buffer);
  GArrowSchema *schema = garrow_record_batch_get_schema(rb);
  GArrowRecordBatchStreamWriter *writer =
    garrow_record_batch_stream_writer_new(bufferStream, schema, error);
  g_object_unref(schema);
  g_object_unref(bufferStream);
  garrow_record_batch_writer_write_record_batch(
    GARROW_RECORD_BATCH_WRITER(writer), rb, error);
  g_object_unref(writer);

  // Read RecordBatch from buffer
  GArrowBufferInputStream *inputStream =
    garrow_buffer_input_stream_new(buffer);
  GArrowRecordBatchStreamReader *sr =
    garrow_record_batch_stream_reader_new(
      GARROW_INPUT_STREAM(inputStream), &error);
  g_object_unref(inputStream);
  GArrowRecordBatch *rb2 = garrow_record_batch_reader_read_next(sr, &error);
  printf("Received RB: \n%s\n", garrow_record_batch_to_string(rb2, &error));
  g_object_urnef(rb2);
  g_object_unref(sr);

  g_object_unref(buffer);
}

Your code misses g_object_unref()s. You need to call
g_object_unref() when an object is no longer needed. If you
forget to call g_object_unref(), your program causes a
memory leak.


Thanks,
-- 
kou

In <deca8e53-52ff-e1b7-4141-4feb05a9d...@freenet.de>
  "[C/GLib] Trying (and failing) to send RecordBatches between Client and 
Server in C" on Tue, 5 Jul 2022 17:04:33 +0200,
  Joel Ziegler <cod...@freenet.de> wrote:

> Hi folks,
> 
> I read some data from a PostgreSQL database, convert it into
> RecordBatches and try to send the data to a client. But I fail to
> properly understand the usage of Apache Arrow C/GLib.
> 
> My information sources are the [C++ docs][1], [the Apache Arrow C/GLib
> reference manual][2] and [the C/GLib Github files][3].
> 
> By following the usage description of Apache Arrow C++ and
> experimenting with the wrapper classes in C, I build this minimal
> example of writing out a RecordBatch into a buffer and (after
> theoretically sending and receiving the buffer) trying to read that
> buffer back into a RecordBatch. But it fails and i would be glad, if
> you could point out my mistakes!
> 
> I omitted the error catching for readability. The code errors out at
> creation of the GArrowRecordBatchStreamReader. If i use the
> arrowbuffer or the buffer from the top in creating the InputStream,
> the error reads:
> 
> ```[record-batch-stream-reader][open]: IOError: Expected IPC message
> of type schema but got record batch```.
> 
> If i use the testBuffer the error complains about an invalid IPC
> stream, so the data is just corrupt.
> 
> 
> ```
> void testRecordbatchStream(GArrowRecordBatch *rb){
>     GError *error = NULL;
> 
>     // Write Recordbatch
>     GArrowResizableBuffer *buffer = garrow_resizable_buffer_new(300,
> &error);
>     GArrowBufferOutputStream *bufferStream =
> garrow_buffer_output_stream_new(buffer);
>     long written =
> garrow_output_stream_write_record_batch(GARROW_OUTPUT_STREAM(bufferStream),
> rb, NULL, &error);
> 
>     // Use buffer as plain bytes
>     void *data = garrow_buffer_get_data(GARROW_BUFFER(buffer));
>     size_t length = garrow_buffer_get_size(GARROW_BUFFER(buffer));
> 
>     // Read plain bytes and test serialize function
>     GArrowBuffer *testBuffer = garrow_buffer_new(data, length);
>     GArrowBuffer *arrowbuffer = garrow_record_batch_serialize(rb,
> NULL, &error);
> 
>     // Read RecordBatch from buffer
>     GArrowBufferInputStream *inputStream =
> garrow_buffer_input_stream_new(arrowbuffer);
>     GArrowRecordBatchStreamReader *sr =
> garrow_record_batch_stream_reader_new(GARROW_INPUT_STREAM(inputStream),
> &error);
>     GArrowRecordBatch *rb2 = garrow_record_batch_reader_read_next(sr,
> &error);
> 
> 
>     printf("Received RB: \n%s\n", garrow_record_batch_to_string(rb2,
> &error));
> }
> ```
> 
> 
>   [1]: https://arrow.apache.org/docs/cpp/index.html
>   [2]: https://arrow.apache.org/docs/c_glib/arrow-glib/
>   [3]: https://github.com/apache/arrow/tree/master/c_glib
> 

Reply via email to