Gumix commented on code in PR #571:
URL: https://github.com/apache/arrow-nanoarrow/pull/571#discussion_r1712730976
##########
src/nanoarrow/ipc/writer.c:
##########
@@ -150,3 +155,173 @@ ArrowErrorCode ArrowIpcOutputStreamInitFile(struct
ArrowIpcOutputStream* stream,
stream->private_data = private_data;
return NANOARROW_OK;
}
+
+struct ArrowIpcWriterPrivate {
+ struct ArrowIpcEncoder encoder;
+ struct ArrowIpcOutputStream output_stream;
+ struct ArrowBuffer buffer;
+ struct ArrowBuffer body_buffer;
+};
+
+ArrowErrorCode ArrowIpcOutputStreamWrite(struct ArrowIpcOutputStream* stream,
+ struct ArrowBufferView data,
+ struct ArrowError* error) {
+ while (data.size_bytes != 0) {
+ int64_t bytes_written = 0;
+ NANOARROW_RETURN_NOT_OK(stream->write(stream, data.data.as_uint8,
data.size_bytes,
+ &bytes_written, error));
+ data.size_bytes -= bytes_written;
+ data.data.as_uint8 += bytes_written;
+ }
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer,
+ struct ArrowIpcOutputStream* output_stream) {
+ NANOARROW_DCHECK(writer != NULL && output_stream != NULL);
+
+ struct ArrowIpcWriterPrivate* private =
+ (struct ArrowIpcWriterPrivate*)ArrowMalloc(sizeof(struct
ArrowIpcWriterPrivate));
+
+ if (private == NULL) {
+ return ENOMEM;
+ }
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderInit(&private->encoder));
+ ArrowIpcOutputStreamMove(output_stream, &private->output_stream);
+
+ ArrowBufferInit(&private->buffer);
+ ArrowBufferInit(&private->body_buffer);
+
+ writer->private_data = private;
+ return NANOARROW_OK;
+}
+
+void ArrowIpcWriterReset(struct ArrowIpcWriter* writer) {
+ NANOARROW_DCHECK(writer != NULL);
+
+ struct ArrowIpcWriterPrivate* private =
+ (struct ArrowIpcWriterPrivate*)writer->private_data;
+
+ if (private != NULL) {
+ ArrowIpcEncoderReset(&private->encoder);
+ private->output_stream.release(&private->output_stream);
+ ArrowBufferReset(&private->buffer);
+ ArrowBufferReset(&private->body_buffer);
+
+ ArrowFree(private);
+ }
+ memset(writer, 0, sizeof(struct ArrowIpcWriter));
+}
+
+static struct ArrowBufferView ArrowBufferToBufferView(const struct
ArrowBuffer* buffer) {
+ struct ArrowBufferView buffer_view = {
+ .data.as_uint8 = buffer->data,
+ .size_bytes = buffer->size_bytes,
+ };
+ return buffer_view;
+}
+
+// Eventually, it may be necessary to construct an ArrowIpcWriter which
doesn't rely on
+// blocking writes (ArrowIpcOutputStreamWrite). For example an
ArrowIpcOutputStream
+// might wrap a socket which is not always able to transmit all bytes of a
Message. In
+// that case users of ArrowIpcWriter might prefer to do other work until a
socket is
+// ready rather than blocking, or timeout, or otherwise respond to partial
transmission.
+//
+// This could be handled by:
+// - keeping partially sent buffers internal and signalling incomplete
transmission by
+// raising EAGAIN, returning "bytes actually written", ...
+// - when the caller is ready to try again, call ArrowIpcWriterWriteSome()
+// - exposing internal buffers which have not been completely sent, deferring
+// follow-up transmission to the caller
+
+ArrowErrorCode ArrowIpcWriterWriteSchema(struct ArrowIpcWriter* writer,
+ const struct ArrowSchema* in,
+ struct ArrowError* error) {
+ NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL && in !=
NULL);
+ struct ArrowIpcWriterPrivate* private =
+ (struct ArrowIpcWriterPrivate*)writer->private_data;
+
+ NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSchema(&private->encoder, in,
error));
+ NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+ ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1,
+ &private->buffer),
+ error);
+
+ return ArrowIpcOutputStreamWrite(&private->output_stream,
+ ArrowBufferToBufferView(&private->buffer),
error);
+}
+
+ArrowErrorCode ArrowIpcWriterWriteArrayView(struct ArrowIpcWriter* writer,
+ const struct ArrowArrayView* in,
+ struct ArrowError* error) {
+ NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL);
+ struct ArrowIpcWriterPrivate* private =
+ (struct ArrowIpcWriterPrivate*)writer->private_data;
+
+ if (in == NULL) {
+ int32_t eos[] = {-1, 0};
+ struct ArrowBufferView data = {.data.as_int32 = eos, .size_bytes =
sizeof(eos)};
+ return ArrowIpcOutputStreamWrite(&private->output_stream, data, error);
+ }
+
+ NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0));
+ NANOARROW_ASSERT_OK(ArrowBufferResize(&private->body_buffer, 0, 0));
+
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSimpleRecordBatch(
+ &private->encoder, in, &private->body_buffer, error));
+
Review Comment:
Hello,
Shouldn't `private->buffer` be finalized here? Otherwise the header of a
message isn't written to the output stream.
Like this:
```c
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1,
&private->buffer),
error);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]