paleolimbot commented on code in PR #571: URL: https://github.com/apache/arrow-nanoarrow/pull/571#discussion_r1707654816
########## src/nanoarrow/common/inline_types.h: ########## @@ -314,7 +314,7 @@ static inline void ArrowErrorSetString(struct ArrowError* error, const char* src #define NANOARROW_DCHECK(EXPR) _NANOARROW_DCHECK_IMPL(EXPR, #EXPR) #else #define NANOARROW_ASSERT_OK(EXPR) (void)(EXPR) -#define NANOARROW_DCHECK(EXPR) +#define NANOARROW_DCHECK(EXPR) (void)(EXPR) Review Comment: Does this cause the expression to actually execute/would we want that? I don't think Arrow C++'s results in execution: https://github.com/apache/arrow/blob/1f2479908323daff3b08d1d585517239cae637d2/cpp/src/arrow/util/logging.h#L93-L94 ########## src/nanoarrow/ipc/writer.c: ########## @@ -150,3 +155,188 @@ ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream* stream, stream->private_data = private_data; return NANOARROW_OK; } + +struct ArrowIpcWriterPrivate { + struct ArrowIpcOutputStream output_stream; + struct ArrowIpcEncoder encoder; + struct ArrowBuffer buffer; + struct ArrowBuffer body_buffer; + int64_t buffer_cursor; + int64_t body_buffer_cursor; +}; + +ArrowErrorCode ArrowIpcOutputStreamWrite(struct ArrowIpcOutputStream* stream, + struct ArrowBufferView data, + struct ArrowError* error) { + while (data.size_bytes != 0) { + int64_t bytes_written = 0; + NANOARROW_RETURN_NOT_OK(stream->write(stream, data.data.as_uint8, data.size_bytes, + &bytes_written, error)); + data.size_bytes -= bytes_written; + data.data.as_uint8 += bytes_written; + } + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer, + struct ArrowIpcEncoder* encoder, + struct ArrowIpcOutputStream* output_stream) { + NANOARROW_DCHECK(writer != NULL && encoder != NULL && output_stream != NULL); + + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)ArrowMalloc(sizeof(struct ArrowIpcWriterPrivate)); + + if (private == NULL) { + return ENOMEM; + } + + ArrowIpcOutputStreamMove(output_stream, &private->output_stream); + + memcpy(&private->encoder, encoder, sizeof(struct ArrowIpcEncoder)); + encoder->private_data = NULL; + + ArrowBufferInit(&private->buffer); + ArrowBufferInit(&private->body_buffer); + private->buffer_cursor = 0; + private->body_buffer_cursor = 0; + + writer->private_data = private; + return NANOARROW_OK; +} + +void ArrowIpcWriterReset(struct ArrowIpcWriter* writer) { + NANOARROW_DCHECK(writer != NULL); + + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)writer->private_data; + + if (private != NULL) { + ArrowIpcEncoderReset(&private->encoder); + private->output_stream.release(&private->output_stream); + ArrowBufferReset(&private->buffer); + ArrowBufferReset(&private->body_buffer); + + ArrowFree(private); + } + memset(writer, 0, sizeof(struct ArrowIpcWriter)); +} + +static struct ArrowBufferView ArrowBufferToBufferView(const struct ArrowBuffer* buffer) { + struct ArrowBufferView buffer_view = { + .data.as_uint8 = buffer->data, + .size_bytes = buffer->size_bytes, + }; + return buffer_view; +} + +// Eventually, it may be necessary to construct an ArrowIpcWriter which doesn't rely on +// blocking writes (ArrowIpcOutputStreamWrite). For example an ArrowIpcOutputStream +// might wrap a socket which is not always able to transmit all bytes of a Message. In +// that case users of ArrowIpcWriter might prefer to do other work until a socket is +// ready rather than blocking, or timeout, or otherwise respond to partial transmission. +// +// This could be handled by: +// - keeping partially sent buffers internal and signalling incomplete transmission by +// raising EAGAIN, returning "bytes actually written", ... +// - when the caller is ready to try again, call ArrowIpcWriterWriteSome() +// - exposing internal buffers which have not been completely sent, deferring +// follow-up transmission to the caller + +ArrowErrorCode ArrowIpcWriterWriteSchema(struct ArrowIpcWriter* writer, + const struct ArrowSchema* in, + struct ArrowError* error) { + NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL && in != NULL); + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)writer->private_data; + + NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0)); + NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSchema(&private->encoder, in, error)); + NANOARROW_RETURN_NOT_OK_WITH_ERROR( + ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1, + &private->buffer), + error); + + return ArrowIpcOutputStreamWrite(&private->output_stream, + ArrowBufferToBufferView(&private->buffer), error); +} + +ArrowErrorCode ArrowIpcWriterWriteArrayView(struct ArrowIpcWriter* writer, + const struct ArrowArrayView* in, + struct ArrowError* error) { + NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL); + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)writer->private_data; + + if (in == NULL) { + int32_t eos[] = {-1, 0}; + struct ArrowBufferView buffer_view = { + .data.as_int32 = eos, + .size_bytes = sizeof(eos), + }; + return ArrowIpcOutputStreamWrite(&private->output_stream, buffer_view, error); + } + + NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0)); + NANOARROW_ASSERT_OK(ArrowBufferResize(&private->body_buffer, 0, 0)); + + NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSimpleRecordBatch( + &private->encoder, in, &private->body_buffer, error)); + + NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamWrite( + &private->output_stream, ArrowBufferToBufferView(&private->buffer), error)); + NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamWrite( + &private->output_stream, ArrowBufferToBufferView(&private->body_buffer), error)); + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcWriterWriteArrayStream(struct ArrowIpcWriter* writer, + struct ArrowArrayStream* in, + struct ArrowError* error) { + NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL && in != NULL); + + ArrowErrorCode error_code = NANOARROW_OK; + + struct ArrowSchema schema; + ArrowSchemaInit(&schema); Review Comment: ```suggestion struct ArrowSchema schema = {.release = NULL}; ``` (`ArrowSchemaInit()` does not currently involve private data that will leak when overwritten by the stream's schema, but it might do so in the future to reduce the number of allocations required to build them) ########## src/nanoarrow/ipc/writer.c: ########## @@ -150,3 +155,188 @@ ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream* stream, stream->private_data = private_data; return NANOARROW_OK; } + +struct ArrowIpcWriterPrivate { + struct ArrowIpcOutputStream output_stream; + struct ArrowIpcEncoder encoder; + struct ArrowBuffer buffer; + struct ArrowBuffer body_buffer; + int64_t buffer_cursor; + int64_t body_buffer_cursor; +}; + +ArrowErrorCode ArrowIpcOutputStreamWrite(struct ArrowIpcOutputStream* stream, + struct ArrowBufferView data, + struct ArrowError* error) { + while (data.size_bytes != 0) { + int64_t bytes_written = 0; + NANOARROW_RETURN_NOT_OK(stream->write(stream, data.data.as_uint8, data.size_bytes, + &bytes_written, error)); + data.size_bytes -= bytes_written; + data.data.as_uint8 += bytes_written; + } + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer, + struct ArrowIpcEncoder* encoder, + struct ArrowIpcOutputStream* output_stream) { + NANOARROW_DCHECK(writer != NULL && encoder != NULL && output_stream != NULL); + + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)ArrowMalloc(sizeof(struct ArrowIpcWriterPrivate)); + + if (private == NULL) { + return ENOMEM; + } + + ArrowIpcOutputStreamMove(output_stream, &private->output_stream); + + memcpy(&private->encoder, encoder, sizeof(struct ArrowIpcEncoder)); + encoder->private_data = NULL; + + ArrowBufferInit(&private->buffer); + ArrowBufferInit(&private->body_buffer); + private->buffer_cursor = 0; + private->body_buffer_cursor = 0; + + writer->private_data = private; + return NANOARROW_OK; +} + +void ArrowIpcWriterReset(struct ArrowIpcWriter* writer) { + NANOARROW_DCHECK(writer != NULL); + + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)writer->private_data; + + if (private != NULL) { + ArrowIpcEncoderReset(&private->encoder); + private->output_stream.release(&private->output_stream); + ArrowBufferReset(&private->buffer); + ArrowBufferReset(&private->body_buffer); + + ArrowFree(private); + } + memset(writer, 0, sizeof(struct ArrowIpcWriter)); +} + +static struct ArrowBufferView ArrowBufferToBufferView(const struct ArrowBuffer* buffer) { + struct ArrowBufferView buffer_view = { + .data.as_uint8 = buffer->data, + .size_bytes = buffer->size_bytes, + }; + return buffer_view; +} + +// Eventually, it may be necessary to construct an ArrowIpcWriter which doesn't rely on +// blocking writes (ArrowIpcOutputStreamWrite). For example an ArrowIpcOutputStream +// might wrap a socket which is not always able to transmit all bytes of a Message. In +// that case users of ArrowIpcWriter might prefer to do other work until a socket is +// ready rather than blocking, or timeout, or otherwise respond to partial transmission. +// +// This could be handled by: +// - keeping partially sent buffers internal and signalling incomplete transmission by +// raising EAGAIN, returning "bytes actually written", ... +// - when the caller is ready to try again, call ArrowIpcWriterWriteSome() +// - exposing internal buffers which have not been completely sent, deferring +// follow-up transmission to the caller + +ArrowErrorCode ArrowIpcWriterWriteSchema(struct ArrowIpcWriter* writer, + const struct ArrowSchema* in, + struct ArrowError* error) { + NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL && in != NULL); + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)writer->private_data; + + NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0)); + NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSchema(&private->encoder, in, error)); + NANOARROW_RETURN_NOT_OK_WITH_ERROR( + ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1, + &private->buffer), + error); + + return ArrowIpcOutputStreamWrite(&private->output_stream, + ArrowBufferToBufferView(&private->buffer), error); +} + +ArrowErrorCode ArrowIpcWriterWriteArrayView(struct ArrowIpcWriter* writer, + const struct ArrowArrayView* in, + struct ArrowError* error) { + NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL); + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)writer->private_data; + + if (in == NULL) { + int32_t eos[] = {-1, 0}; + struct ArrowBufferView buffer_view = { + .data.as_int32 = eos, + .size_bytes = sizeof(eos), + }; + return ArrowIpcOutputStreamWrite(&private->output_stream, buffer_view, error); + } + + NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0)); + NANOARROW_ASSERT_OK(ArrowBufferResize(&private->body_buffer, 0, 0)); + + NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSimpleRecordBatch( + &private->encoder, in, &private->body_buffer, error)); + + NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamWrite( + &private->output_stream, ArrowBufferToBufferView(&private->buffer), error)); + NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamWrite( + &private->output_stream, ArrowBufferToBufferView(&private->body_buffer), error)); + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcWriterWriteArrayStream(struct ArrowIpcWriter* writer, + struct ArrowArrayStream* in, + struct ArrowError* error) { + NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL && in != NULL); + + ArrowErrorCode error_code = NANOARROW_OK; + + struct ArrowSchema schema; + ArrowSchemaInit(&schema); + + struct ArrowArray array = {.release = NULL}; + + struct ArrowArrayView array_view; + ArrowArrayViewInitFromType(&array_view, NANOARROW_TYPE_UNINITIALIZED); + + while (!error_code) { +#define NANOARROW_BREAK_NOT_OK(expr) \ Review Comment: I don't particularly mind a single-purpose macro in general, but it might be easier to debug this using an internal `Impl` method (with the outer layer handling the things that need releasing all in one place). ```c static ArrowErrorCode ArrowIpcWriterWriteArrayStreamImpl(struct ArrowIpcWriter* writer, struct ArrowArrayStream* in, struct ArrowSchema* tmp_schema, struct ArrowArrayView* tmp_array_view, struct ArrowArray* tmp_array, struct ArrowError* error) { NANOARROW_RETURN_NOT_OK(...); } ArrowErrorCode ArrowIpcWriterWriteArrayStream(struct ArrowIpcWriter* writer, struct ArrowArrayStream* in, struct ArrowError* error) { struct ArrowSchema schema; ArrowSchemaInit(&schema); struct ArrowArray array = {.release = NULL}; struct ArrowArrayView array_view; ArrowArrayViewInitFromType(&array_view, NANOARROW_TYPE_UNINITIALIZED); int result = ArrowIpcWriterWriteArrayStreamImpl(...); int stream_finished = array.release == NULL; // release all the things NANOARROW_RETURN_NOT_OK(result); // ...maybe write EOS } ``` ########## src/nanoarrow/ipc/files_test.cc: ########## @@ -105,6 +105,29 @@ class TestFile { return NANOARROW_OK; } + ArrowErrorCode ReadArrowArrayStreamIPC(const std::string& dir_prefix, Review Comment: Thank you for cleaning this up 😬 ########## src/nanoarrow/nanoarrow_ipc.h: ########## @@ -491,8 +501,59 @@ ArrowErrorCode ArrowIpcOutputStreamInitBuffer(struct ArrowIpcOutputStream* strea /// close_on_release and handle closing the file independently from stream. ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream* stream, void* file_ptr, int close_on_release); -/// @} +/// \brief Write to a stream, trying again until all are written or the stream errors. +ArrowErrorCode ArrowIpcOutputStreamWrite(struct ArrowIpcOutputStream* stream, + struct ArrowBufferView data, + struct ArrowError* error); + +/// \brief A stream writer which encodes Schemas and ArrowArrays into an IPC byte stream +/// +/// This structure is intended to be allocated by the caller, +/// initialized using ArrowIpcWriterInit(), and released with +/// ArrowIpcWriterReset(). +struct ArrowIpcWriter { + /// \brief Private resources managed by this library + void* private_data; +}; + +/// \brief Initialize an output stream of bytes from an ArrowArrayStream +/// +/// Returns NANOARROW_OK on success. If NANOARROW_OK is returned the writer +/// takes ownership of the output byte stream and the encoder, and the caller is +/// responsible for releasing the writer by calling ArrowIpcWriterReset(). +ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer, + struct ArrowIpcEncoder* encoder, Review Comment: Would it make sense to initialize the encoder internally here instead of force the caller to initialize one and then immediately move it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
