bkietz commented on code in PR #571:
URL: https://github.com/apache/arrow-nanoarrow/pull/571#discussion_r1705832564


##########
src/nanoarrow/ipc/writer.c:
##########
@@ -150,3 +155,160 @@ ArrowErrorCode ArrowIpcOutputStreamInitFile(struct 
ArrowIpcOutputStream* stream,
   stream->private_data = private_data;
   return NANOARROW_OK;
 }
+
+struct ArrowIpcArrayStreamWriterPrivate {
+  struct ArrowArrayStream in;
+  struct ArrowIpcOutputStream output_stream;
+  struct ArrowIpcEncoder encoder;
+  struct ArrowSchema schema;
+  struct ArrowArray array;
+  struct ArrowArrayView array_view;
+  struct ArrowBuffer buffer;
+  struct ArrowBuffer body_buffer;
+  int64_t buffer_cursor;
+  int64_t body_buffer_cursor;
+};
+
+ArrowErrorCode ArrowIpcArrayStreamWriterInit(struct ArrowIpcArrayStreamWriter* 
writer,
+                                             struct ArrowArrayStream* in,
+                                             struct ArrowIpcOutputStream* 
output_stream) {
+  NANOARROW_DCHECK(writer != NULL && in != NULL && output_stream != NULL);
+
+  struct ArrowIpcArrayStreamWriterPrivate* private =
+      (struct ArrowIpcArrayStreamWriterPrivate*)ArrowMalloc(
+          sizeof(struct ArrowIpcArrayStreamWriterPrivate));
+
+  if (private == NULL) {
+    return ENOMEM;
+  }
+
+  NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderInit(&private->encoder));
+  ArrowIpcOutputStreamMove(output_stream, &private->output_stream);
+  ArrowArrayStreamMove(in, &private->in);
+  private->schema.release = NULL;
+  private->array.release = NULL;
+  ArrowArrayViewInitFromType(&private->array_view, 
NANOARROW_TYPE_UNINITIALIZED);
+  ArrowBufferInit(&private->buffer);
+  ArrowBufferInit(&private->body_buffer);
+  private->buffer_cursor = 0;
+  private->body_buffer_cursor = 0;
+  // It'd probably be better to push directly from encode_buffer to the 
output_stream
+  ArrowIpcEncoderBuildContiguousBodyBuffer(&private->encoder, 
&private->body_buffer);
+
+  writer->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
+  writer->finished = 0;
+  writer->private_data = private;
+  return NANOARROW_OK;
+}
+
+void ArrowIpcArrayStreamWriterReset(struct ArrowIpcArrayStreamWriter* writer) {
+  NANOARROW_DCHECK(writer != NULL);
+
+  struct ArrowIpcArrayStreamWriterPrivate* private =
+      (struct ArrowIpcArrayStreamWriterPrivate*)writer->private_data;
+
+  if (private != NULL) {
+    ArrowIpcEncoderReset(&private->encoder);
+    ArrowArrayStreamRelease(&private->in);
+    private->output_stream.release(&private->output_stream);
+    if (private->schema.release != NULL) {
+      ArrowSchemaRelease(&private->schema);
+    }
+    if (private->array.release != NULL) {
+      ArrowArrayRelease(&private->array);
+    }
+    ArrowArrayViewReset(&private->array_view);
+    ArrowBufferReset(&private->buffer);
+    ArrowBufferReset(&private->body_buffer);
+
+    ArrowFree(private);
+  }
+  memset(writer, 0, sizeof(struct ArrowIpcArrayStreamWriter));
+}
+
+static ArrowErrorCode ArrowIpcArrayStreamWriterPush(
+    struct ArrowIpcArrayStreamWriterPrivate* private, struct ArrowBuffer* 
buffer,
+    int* had_bytes_to_push, struct ArrowError* error) {

Review Comment:
   Simply calling `write()` doesn't handle the case where writing doesn't fail 
but also doesn't complete. We need to distinguish between "no bytes were sent 
from buffer because IO" and "no bytes were sent from buffer because all of its 
bytes have already been sent". In the former case we need to try again because 
those bytes still need to be sequenced next in the stream, in the latter case 
we should write from body_buffer or encode a new message.
   
   I expect that ordinarily callers will simply loop while calling WriteSome(), 
but they might know more about what kind of output stream we're writing to and 
decide to wait while doing other work, timeout, ... I think we *need* to return 
control to the caller so they can decide when or if we try again. However if 
you like we could add helpers which do the loop, calling WriteSome() until a 
message *is* completely written (or even until the entire stream is completely 
written).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to