paleolimbot commented on code in PR #571:
URL: https://github.com/apache/arrow-nanoarrow/pull/571#discussion_r1702434332
##########
src/nanoarrow/ipc/writer.c:
##########
@@ -150,3 +155,160 @@ ArrowErrorCode ArrowIpcOutputStreamInitFile(struct
ArrowIpcOutputStream* stream,
stream->private_data = private_data;
return NANOARROW_OK;
}
+
+struct ArrowIpcArrayStreamWriterPrivate {
+ struct ArrowArrayStream in;
+ struct ArrowIpcOutputStream output_stream;
+ struct ArrowIpcEncoder encoder;
+ struct ArrowSchema schema;
+ struct ArrowArray array;
+ struct ArrowArrayView array_view;
+ struct ArrowBuffer buffer;
+ struct ArrowBuffer body_buffer;
+ int64_t buffer_cursor;
+ int64_t body_buffer_cursor;
+};
+
+ArrowErrorCode ArrowIpcArrayStreamWriterInit(struct ArrowIpcArrayStreamWriter*
writer,
+ struct ArrowArrayStream* in,
+ struct ArrowIpcOutputStream*
output_stream) {
+ NANOARROW_DCHECK(writer != NULL && in != NULL && output_stream != NULL);
+
+ struct ArrowIpcArrayStreamWriterPrivate* private =
+ (struct ArrowIpcArrayStreamWriterPrivate*)ArrowMalloc(
+ sizeof(struct ArrowIpcArrayStreamWriterPrivate));
+
+ if (private == NULL) {
+ return ENOMEM;
+ }
+
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderInit(&private->encoder));
+ ArrowIpcOutputStreamMove(output_stream, &private->output_stream);
+ ArrowArrayStreamMove(in, &private->in);
+ private->schema.release = NULL;
+ private->array.release = NULL;
+ ArrowArrayViewInitFromType(&private->array_view,
NANOARROW_TYPE_UNINITIALIZED);
+ ArrowBufferInit(&private->buffer);
+ ArrowBufferInit(&private->body_buffer);
+ private->buffer_cursor = 0;
+ private->body_buffer_cursor = 0;
+ // It'd probably be better to push directly from encode_buffer to the
output_stream
+ ArrowIpcEncoderBuildContiguousBodyBuffer(&private->encoder,
&private->body_buffer);
+
+ writer->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
+ writer->finished = 0;
+ writer->private_data = private;
+ return NANOARROW_OK;
+}
+
+void ArrowIpcArrayStreamWriterReset(struct ArrowIpcArrayStreamWriter* writer) {
+ NANOARROW_DCHECK(writer != NULL);
+
+ struct ArrowIpcArrayStreamWriterPrivate* private =
+ (struct ArrowIpcArrayStreamWriterPrivate*)writer->private_data;
+
+ if (private != NULL) {
+ ArrowIpcEncoderReset(&private->encoder);
+ ArrowArrayStreamRelease(&private->in);
+ private->output_stream.release(&private->output_stream);
+ if (private->schema.release != NULL) {
+ ArrowSchemaRelease(&private->schema);
+ }
+ if (private->array.release != NULL) {
+ ArrowArrayRelease(&private->array);
+ }
+ ArrowArrayViewReset(&private->array_view);
+ ArrowBufferReset(&private->buffer);
+ ArrowBufferReset(&private->body_buffer);
+
+ ArrowFree(private);
+ }
+ memset(writer, 0, sizeof(struct ArrowIpcArrayStreamWriter));
+}
+
+static ArrowErrorCode ArrowIpcArrayStreamWriterPush(
+ struct ArrowIpcArrayStreamWriterPrivate* private, struct ArrowBuffer*
buffer,
+ int* had_bytes_to_push, struct ArrowError* error) {
Review Comment:
If there was an `ArrowIpcEncoderWriteMessage(ArrowIpcEncoder*,
ArrowIpcOutputStream*, ArrowError*)` could the `had_bytes_to_push` indirection
be avoided? Maybe you could call
`ArrowIpcEncoderEncodeSimpleRecordBatch(encoder, view, NULL, error)` to build
the message and `ArrowIpcEncoderEncodeSimpleRecordBatch(encoder, view,
out_stream, error)` to actually write the payload?
##########
src/nanoarrow/ipc/writer.c:
##########
@@ -150,3 +155,160 @@ ArrowErrorCode ArrowIpcOutputStreamInitFile(struct
ArrowIpcOutputStream* stream,
stream->private_data = private_data;
return NANOARROW_OK;
}
+
+struct ArrowIpcArrayStreamWriterPrivate {
+ struct ArrowArrayStream in;
+ struct ArrowIpcOutputStream output_stream;
+ struct ArrowIpcEncoder encoder;
+ struct ArrowSchema schema;
+ struct ArrowArray array;
+ struct ArrowArrayView array_view;
+ struct ArrowBuffer buffer;
+ struct ArrowBuffer body_buffer;
+ int64_t buffer_cursor;
+ int64_t body_buffer_cursor;
+};
+
+ArrowErrorCode ArrowIpcArrayStreamWriterInit(struct ArrowIpcArrayStreamWriter*
writer,
+ struct ArrowArrayStream* in,
+ struct ArrowIpcOutputStream*
output_stream) {
+ NANOARROW_DCHECK(writer != NULL && in != NULL && output_stream != NULL);
+
+ struct ArrowIpcArrayStreamWriterPrivate* private =
+ (struct ArrowIpcArrayStreamWriterPrivate*)ArrowMalloc(
+ sizeof(struct ArrowIpcArrayStreamWriterPrivate));
+
+ if (private == NULL) {
+ return ENOMEM;
+ }
+
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderInit(&private->encoder));
+ ArrowIpcOutputStreamMove(output_stream, &private->output_stream);
+ ArrowArrayStreamMove(in, &private->in);
+ private->schema.release = NULL;
+ private->array.release = NULL;
+ ArrowArrayViewInitFromType(&private->array_view,
NANOARROW_TYPE_UNINITIALIZED);
+ ArrowBufferInit(&private->buffer);
+ ArrowBufferInit(&private->body_buffer);
+ private->buffer_cursor = 0;
+ private->body_buffer_cursor = 0;
+ // It'd probably be better to push directly from encode_buffer to the
output_stream
Review Comment:
Agreed! (No need to materialize a copy of every single buffer before writing
it)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]