This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 162dcbd9 feat: Add IPC stream writing (#571)
162dcbd9 is described below
commit 162dcbd9da387f62aae3f75cfe006def21aaee42
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Thu Aug 8 13:28:23 2024 -0500
feat: Add IPC stream writing (#571)
- adds ArrowIpcArrayStreamWriter along with Init and Reset
- after each call to ArrowIpcArrayStreamWriter it will try to write as
many contiguous bytes as it can from the current message. Whenever it
runs out of bytes to write, it will encode an array stream's next
message
- doesn't send explicit EOS
---
src/nanoarrow/common/inline_buffer.h | 4 +-
src/nanoarrow/ipc/encoder.c | 20 ++--
src/nanoarrow/ipc/files_test.cc | 141 ++++++++++++++++++----------
src/nanoarrow/ipc/writer.c | 175 +++++++++++++++++++++++++++++++++++
src/nanoarrow/nanoarrow_ipc.h | 62 ++++++++++++-
src/nanoarrow/nanoarrow_ipc.hpp | 19 ++++
valgrind.supp | 7 ++
7 files changed, 370 insertions(+), 58 deletions(-)
diff --git a/src/nanoarrow/common/inline_buffer.h
b/src/nanoarrow/common/inline_buffer.h
index caa6be4a..875bfeaa 100644
--- a/src/nanoarrow/common/inline_buffer.h
+++ b/src/nanoarrow/common/inline_buffer.h
@@ -451,8 +451,8 @@ static inline void ArrowBitClear(uint8_t* bits, int64_t i) {
}
static inline void ArrowBitSetTo(uint8_t* bits, int64_t i, uint8_t bit_is_set)
{
- bits[i / 8] ^=
- ((uint8_t)(-((uint8_t)(bit_is_set != 0)) ^ bits[i / 8])) &
_ArrowkBitmask[i % 8];
+ bits[i / 8] ^= (uint8_t)(((uint8_t)(-((uint8_t)(bit_is_set != 0)) ^ bits[i /
8])) &
+ _ArrowkBitmask[i % 8]);
}
static inline void ArrowBitsSetTo(uint8_t* bits, int64_t start_offset, int64_t
length,
diff --git a/src/nanoarrow/ipc/encoder.c b/src/nanoarrow/ipc/encoder.c
index 889f66ad..53e3610e 100644
--- a/src/nanoarrow/ipc/encoder.c
+++ b/src/nanoarrow/ipc/encoder.c
@@ -230,24 +230,31 @@ static ArrowErrorCode
ArrowIpcEncodeFieldType(flatcc_builder_t* builder,
case NANOARROW_TYPE_TIMESTAMP:
FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder), error);
- FLATCC_RETURN_UNLESS_0(Timestamp_unit_add(builder,
schema_view->time_unit), error);
+ FLATCC_RETURN_UNLESS_0(
+ Timestamp_unit_add(builder,
(ns(TimeUnit_enum_t))schema_view->time_unit),
+ error);
FLATCC_RETURN_UNLESS_0(
Timestamp_timezone_create_str(builder, schema_view->timezone),
error);
FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder), error);
return NANOARROW_OK;
case NANOARROW_TYPE_TIME32:
- FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder,
schema_view->time_unit, 32),
- error);
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Time_create(builder,
(ns(TimeUnit_enum_t))schema_view->time_unit,
+ 32),
+ error);
return NANOARROW_OK;
case NANOARROW_TYPE_TIME64:
- FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder,
schema_view->time_unit, 64),
- error);
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Time_create(builder,
(ns(TimeUnit_enum_t))schema_view->time_unit,
+ 64),
+ error);
return NANOARROW_OK;
case NANOARROW_TYPE_DURATION:
- FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(builder,
schema_view->time_unit),
+ FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(
+ builder,
(ns(TimeUnit_enum_t))schema_view->time_unit),
error);
return NANOARROW_OK;
@@ -531,7 +538,6 @@ static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(
const struct ArrowArrayView* array_view, struct ArrowError* error) {
NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL &&
buffer_encoder != NULL && buffer_encoder->encode_buffer !=
NULL);
-
if (array_view->null_count != 0 &&
ArrowArrayViewComputeNullCount(array_view) != 0) {
ArrowErrorSet(error,
"RecordBatches cannot be constructed from arrays with top
level nulls");
diff --git a/src/nanoarrow/ipc/files_test.cc b/src/nanoarrow/ipc/files_test.cc
index 76582327..9f8f65ac 100644
--- a/src/nanoarrow/ipc/files_test.cc
+++ b/src/nanoarrow/ipc/files_test.cc
@@ -29,7 +29,7 @@
#include <gtest/gtest.h>
#include "nanoarrow/nanoarrow.hpp"
-#include "nanoarrow/nanoarrow_ipc.h"
+#include "nanoarrow/nanoarrow_ipc.hpp"
#include "nanoarrow/nanoarrow_testing.hpp"
#include "flatcc/portable/pendian_detect.h"
@@ -105,6 +105,29 @@ class TestFile {
return NANOARROW_OK;
}
+ ArrowErrorCode ReadArrowArrayStreamIPC(const std::string& dir_prefix,
+ struct ArrowSchema* schema,
+ std::vector<nanoarrow::UniqueArray>*
arrays,
+ ArrowError* error) {
+ nanoarrow::UniqueArrayStream stream;
+ NANOARROW_RETURN_NOT_OK(GetArrowArrayStreamIPC(dir_prefix, stream.get(),
error));
+
+ NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetSchema(stream.get(), schema,
error));
+
+ while (true) {
+ nanoarrow::UniqueArray array;
+
+ NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetNext(stream.get(),
array.get(), error));
+
+ if (array->release == nullptr) {
+ break;
+ }
+
+ arrays->push_back(std::move(array));
+ }
+ return NANOARROW_OK;
+ }
+
ArrowErrorCode GetArrowArrayStreamCheckJSON(const std::string& dir_prefix,
ArrowArrayStream* out,
ArrowError* error) {
std::stringstream path_builder;
@@ -171,6 +194,31 @@ class TestFile {
return std::make_shared<io::BufferReader>(content_copy_wrapped);
}
+ ArrowErrorCode WriteNanoarrowStream(const nanoarrow::UniqueSchema& schema,
+ const
std::vector<nanoarrow::UniqueArray>& arrays,
+ struct ArrowBuffer* buffer,
+ struct ArrowError* error) {
+ nanoarrow::ipc::UniqueOutputStream output_stream;
+
NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamInitBuffer(output_stream.get(),
buffer));
+
+ nanoarrow::ipc::UniqueWriter writer;
+ NANOARROW_RETURN_NOT_OK(ArrowIpcWriterInit(writer.get(),
output_stream.get()));
+
+ nanoarrow::UniqueArrayView array_view;
+ NANOARROW_RETURN_NOT_OK(
+ ArrowArrayViewInitFromSchema(array_view.get(), schema.get(), error));
+
+ NANOARROW_RETURN_NOT_OK(ArrowIpcWriterWriteSchema(writer.get(),
schema.get(), error));
+ for (const auto& array : arrays) {
+ NANOARROW_RETURN_NOT_OK(
+ ArrowArrayViewSetArray(array_view.get(), array.get(), error));
+
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcWriterWriteArrayView(writer.get(), array_view.get(), error));
+ }
+ return ArrowIpcWriterWriteArrayView(writer.get(), nullptr, error);
+ }
+
void TestEqualsArrowCpp(const std::string& dir_prefix) {
std::stringstream path_builder;
path_builder << dir_prefix << "/" << path_;
@@ -179,76 +227,72 @@ class TestFile {
ArrowErrorInit(&error);
// Read using nanoarrow_ipc
- nanoarrow::UniqueArrayStream stream;
- ASSERT_EQ(GetArrowArrayStreamIPC(dir_prefix, stream.get(), &error),
NANOARROW_OK)
- << error.message;
-
nanoarrow::UniqueSchema schema;
- int result = ArrowArrayStreamGetSchema(stream.get(), schema.get(), &error);
+ std::vector<nanoarrow::UniqueArray> arrays;
+ int result = ReadArrowArrayStreamIPC(dir_prefix, schema.get(), &arrays,
&error);
if (result != NANOARROW_OK) {
if (Check(result, error.message)) {
return;
}
-
GTEST_FAIL() << MakeError(result, error.message);
}
- std::vector<nanoarrow::UniqueArray> arrays;
- while (true) {
- nanoarrow::UniqueArray array;
-
- result = ArrowArrayStreamGetNext(stream.get(), array.get(), &error);
- if (result != NANOARROW_OK) {
- if (Check(result, error.message)) {
- return;
- }
-
- GTEST_FAIL() << MakeError(result, error.message);
- }
-
- if (array->release == nullptr) {
- break;
- }
-
- arrays.push_back(std::move(array));
- }
-
// If the file was supposed to fail the read but did not, fail here
if (expected_return_code_ != NANOARROW_OK) {
GTEST_FAIL() << MakeError(NANOARROW_OK, "");
}
- // Read the same file with Arrow C++
- nanoarrow::UniqueBuffer content_copy;
- ASSERT_EQ(ReadFileBuffer(path_builder.str(), content_copy.get(), &error),
+ // Write back to a buffer using nanoarrow
+ nanoarrow::UniqueBuffer roundtripped;
+ ASSERT_EQ(WriteNanoarrowStream(schema, arrays, roundtripped.get(), &error),
NANOARROW_OK)
<< error.message;
- std::shared_ptr<io::InputStream> input_stream =
BufferInputStream(content_copy.get());
-
- auto maybe_reader = ipc::RecordBatchStreamReader::Open(input_stream);
- FAIL_RESULT_NOT_OK(maybe_reader);
- auto maybe_table_arrow = maybe_reader.ValueUnsafe()->ToTable();
+ // Read the same file with Arrow C++
+ auto maybe_table_arrow =
ReadTable(io::ReadableFile::Open(path_builder.str()));
FAIL_RESULT_NOT_OK(maybe_table_arrow);
- // Make a Table from the our vector of arrays
- auto maybe_schema = ImportSchema(schema.get());
- FAIL_RESULT_NOT_OK(maybe_schema);
+ AssertEqualsTable(std::move(schema), std::move(arrays),
+ maybe_table_arrow.ValueUnsafe());
-
ASSERT_TRUE(maybe_table_arrow.ValueUnsafe()->schema()->Equals(**maybe_schema,
true));
+ // Read the roundtripped buffer using nanoarrow
+ nanoarrow::UniqueSchema roundtripped_schema;
+ std::vector<nanoarrow::UniqueArray> roundtripped_arrays;
+ ASSERT_EQ(ReadArrowArrayStreamIPC(dir_prefix, roundtripped_schema.get(),
+ &roundtripped_arrays, &error),
+ NANOARROW_OK);
- std::vector<std::shared_ptr<RecordBatch>> batches;
- for (auto& array : arrays) {
- auto maybe_batch = ImportRecordBatch(array.get(), *maybe_schema);
- FAIL_RESULT_NOT_OK(maybe_batch);
+ AssertEqualsTable(std::move(roundtripped_schema),
std::move(roundtripped_arrays),
+ maybe_table_arrow.ValueUnsafe());
+ }
- batches.push_back(std::move(*maybe_batch));
+ Result<std::shared_ptr<Table>> ReadTable(
+ Result<std::shared_ptr<io::InputStream>> maybe_input_stream) {
+ ARROW_ASSIGN_OR_RAISE(auto input_stream, maybe_input_stream);
+ ARROW_ASSIGN_OR_RAISE(auto reader,
ipc::RecordBatchStreamReader::Open(input_stream));
+ return reader->ToTable();
+ }
+
+ Result<std::shared_ptr<Table>> ToTable(nanoarrow::UniqueSchema schema,
+ std::vector<nanoarrow::UniqueArray>
arrays) {
+ ARROW_ASSIGN_OR_RAISE(auto arrow_schema, ImportSchema(schema.get()));
+
+ std::vector<std::shared_ptr<RecordBatch>> batches(arrays.size());
+ size_t i = 0;
+ for (auto& array : arrays) {
+ ARROW_ASSIGN_OR_RAISE(auto batch, ImportRecordBatch(array.get(),
arrow_schema));
+ batches[i++] = std::move(batch);
}
+ return Table::FromRecordBatches(std::move(arrow_schema),
std::move(batches));
+ }
- auto maybe_table = Table::FromRecordBatches(*maybe_schema, batches);
+ void AssertEqualsTable(nanoarrow::UniqueSchema schema,
+ std::vector<nanoarrow::UniqueArray> arrays,
+ const std::shared_ptr<Table>& expected) {
+ auto maybe_table = ToTable(std::move(schema), std::move(arrays));
FAIL_RESULT_NOT_OK(maybe_table);
-
- EXPECT_TRUE(maybe_table.ValueUnsafe()->Equals(**maybe_table_arrow, true));
+
ASSERT_TRUE(expected->schema()->Equals(maybe_table.ValueUnsafe()->schema(),
true));
+ EXPECT_TRUE(maybe_table.ValueUnsafe()->Equals(*expected, true));
}
void TestIPCCheckJSON(const std::string& dir_prefix) {
@@ -382,7 +426,8 @@ INSTANTIATE_TEST_SUITE_P(
NanoarrowIpcTest, TestFileFixture,
::testing::Values(
// Files in data/arrow-ipc-stream/integration/1.0.0-(little|big)endian/
- // should read without error and the data should match Arrow C++'s read
+ // should read without error and the data should match Arrow C++'s
read.
+ // Also write the stream to a buffer and check Arrow C++'s read of
that.
TestFile::OK("generated_custom_metadata.stream"),
TestFile::OK("generated_datetime.stream"),
TestFile::OK("generated_decimal.stream"),
diff --git a/src/nanoarrow/ipc/writer.c b/src/nanoarrow/ipc/writer.c
index 3227d20a..776a22cd 100644
--- a/src/nanoarrow/ipc/writer.c
+++ b/src/nanoarrow/ipc/writer.c
@@ -25,6 +25,8 @@
void ArrowIpcOutputStreamMove(struct ArrowIpcOutputStream* src,
struct ArrowIpcOutputStream* dst) {
+ NANOARROW_DCHECK(src != NULL && dst != NULL);
+
memcpy(dst, src, sizeof(struct ArrowIpcOutputStream));
src->release = NULL;
}
@@ -55,6 +57,8 @@ static void ArrowIpcOutputStreamBufferRelease(struct
ArrowIpcOutputStream* strea
ArrowErrorCode ArrowIpcOutputStreamInitBuffer(struct ArrowIpcOutputStream*
stream,
struct ArrowBuffer* output) {
+ NANOARROW_DCHECK(stream != NULL && output != NULL);
+
struct ArrowIpcOutputStreamBufferPrivate* private_data =
(struct ArrowIpcOutputStreamBufferPrivate*)ArrowMalloc(
sizeof(struct ArrowIpcOutputStreamBufferPrivate));
@@ -130,6 +134,7 @@ static ArrowErrorCode ArrowIpcOutputStreamFileWrite(struct
ArrowIpcOutputStream*
ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream*
stream,
void* file_ptr, int
close_on_release) {
+ NANOARROW_DCHECK(stream != NULL);
if (file_ptr == NULL) {
return EINVAL;
}
@@ -150,3 +155,173 @@ ArrowErrorCode ArrowIpcOutputStreamInitFile(struct
ArrowIpcOutputStream* stream,
stream->private_data = private_data;
return NANOARROW_OK;
}
+
+struct ArrowIpcWriterPrivate {
+ struct ArrowIpcEncoder encoder;
+ struct ArrowIpcOutputStream output_stream;
+ struct ArrowBuffer buffer;
+ struct ArrowBuffer body_buffer;
+};
+
+ArrowErrorCode ArrowIpcOutputStreamWrite(struct ArrowIpcOutputStream* stream,
+ struct ArrowBufferView data,
+ struct ArrowError* error) {
+ while (data.size_bytes != 0) {
+ int64_t bytes_written = 0;
+ NANOARROW_RETURN_NOT_OK(stream->write(stream, data.data.as_uint8,
data.size_bytes,
+ &bytes_written, error));
+ data.size_bytes -= bytes_written;
+ data.data.as_uint8 += bytes_written;
+ }
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer,
+ struct ArrowIpcOutputStream* output_stream) {
+ NANOARROW_DCHECK(writer != NULL && output_stream != NULL);
+
+ struct ArrowIpcWriterPrivate* private =
+ (struct ArrowIpcWriterPrivate*)ArrowMalloc(sizeof(struct
ArrowIpcWriterPrivate));
+
+ if (private == NULL) {
+ return ENOMEM;
+ }
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderInit(&private->encoder));
+ ArrowIpcOutputStreamMove(output_stream, &private->output_stream);
+
+ ArrowBufferInit(&private->buffer);
+ ArrowBufferInit(&private->body_buffer);
+
+ writer->private_data = private;
+ return NANOARROW_OK;
+}
+
+void ArrowIpcWriterReset(struct ArrowIpcWriter* writer) {
+ NANOARROW_DCHECK(writer != NULL);
+
+ struct ArrowIpcWriterPrivate* private =
+ (struct ArrowIpcWriterPrivate*)writer->private_data;
+
+ if (private != NULL) {
+ ArrowIpcEncoderReset(&private->encoder);
+ private->output_stream.release(&private->output_stream);
+ ArrowBufferReset(&private->buffer);
+ ArrowBufferReset(&private->body_buffer);
+
+ ArrowFree(private);
+ }
+ memset(writer, 0, sizeof(struct ArrowIpcWriter));
+}
+
+static struct ArrowBufferView ArrowBufferToBufferView(const struct
ArrowBuffer* buffer) {
+ struct ArrowBufferView buffer_view = {
+ .data.as_uint8 = buffer->data,
+ .size_bytes = buffer->size_bytes,
+ };
+ return buffer_view;
+}
+
+// Eventually, it may be necessary to construct an ArrowIpcWriter which
doesn't rely on
+// blocking writes (ArrowIpcOutputStreamWrite). For example an
ArrowIpcOutputStream
+// might wrap a socket which is not always able to transmit all bytes of a
Message. In
+// that case users of ArrowIpcWriter might prefer to do other work until a
socket is
+// ready rather than blocking, or timeout, or otherwise respond to partial
transmission.
+//
+// This could be handled by:
+// - keeping partially sent buffers internal and signalling incomplete
transmission by
+// raising EAGAIN, returning "bytes actually written", ...
+// - when the caller is ready to try again, call ArrowIpcWriterWriteSome()
+// - exposing internal buffers which have not been completely sent, deferring
+// follow-up transmission to the caller
+
+ArrowErrorCode ArrowIpcWriterWriteSchema(struct ArrowIpcWriter* writer,
+ const struct ArrowSchema* in,
+ struct ArrowError* error) {
+ NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL && in !=
NULL);
+ struct ArrowIpcWriterPrivate* private =
+ (struct ArrowIpcWriterPrivate*)writer->private_data;
+
+ NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSchema(&private->encoder, in,
error));
+ NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+ ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1,
+ &private->buffer),
+ error);
+
+ return ArrowIpcOutputStreamWrite(&private->output_stream,
+ ArrowBufferToBufferView(&private->buffer),
error);
+}
+
+ArrowErrorCode ArrowIpcWriterWriteArrayView(struct ArrowIpcWriter* writer,
+ const struct ArrowArrayView* in,
+ struct ArrowError* error) {
+ NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL);
+ struct ArrowIpcWriterPrivate* private =
+ (struct ArrowIpcWriterPrivate*)writer->private_data;
+
+ if (in == NULL) {
+ int32_t eos[] = {-1, 0};
+ struct ArrowBufferView data = {.data.as_int32 = eos, .size_bytes =
sizeof(eos)};
+ return ArrowIpcOutputStreamWrite(&private->output_stream, data, error);
+ }
+
+ NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0));
+ NANOARROW_ASSERT_OK(ArrowBufferResize(&private->body_buffer, 0, 0));
+
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSimpleRecordBatch(
+ &private->encoder, in, &private->body_buffer, error));
+
+ NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamWrite(
+ &private->output_stream, ArrowBufferToBufferView(&private->buffer),
error));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamWrite(
+ &private->output_stream, ArrowBufferToBufferView(&private->body_buffer),
error));
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcWriterWriteArrayStreamImpl(
+ struct ArrowIpcWriter* writer, struct ArrowArrayStream* in,
+ struct ArrowSchema* schema, struct ArrowArray* array,
+ struct ArrowArrayView* array_view, struct ArrowError* error) {
+ NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetSchema(in, schema, error));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcWriterWriteSchema(writer, schema, error));
+
+ NANOARROW_RETURN_NOT_OK(ArrowArrayViewInitFromSchema(array_view, schema,
error));
+ while (1) {
+ NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetNext(in, array, error));
+ if (array->release == NULL) {
+ break;
+ }
+
+ NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view, array, error));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcWriterWriteArrayView(writer, array_view,
error));
+ ArrowArrayRelease(array);
+ }
+
+ // The stream is complete, signal the end to the caller
+ return ArrowIpcWriterWriteArrayView(writer, NULL, error);
+}
+
+ArrowErrorCode ArrowIpcWriterWriteArrayStream(struct ArrowIpcWriter* writer,
+ struct ArrowArrayStream* in,
+ struct ArrowError* error) {
+ NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL && in !=
NULL);
+
+ struct ArrowSchema schema = {.release = NULL};
+ struct ArrowArray array = {.release = NULL};
+ struct ArrowArrayView array_view;
+ ArrowArrayViewInitFromType(&array_view, NANOARROW_TYPE_UNINITIALIZED);
+
+ NANOARROW_RETURN_NOT_OK(ArrowIpcWriterWriteArrayStreamImpl(writer, in,
&schema, &array,
+ &array_view,
error));
+
+ if (schema.release != NULL) {
+ ArrowSchemaRelease(&schema);
+ }
+
+ if (array.release != NULL) {
+ ArrowArrayRelease(&array);
+ }
+
+ ArrowArrayViewReset(&array_view);
+ return NANOARROW_OK;
+}
diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h
index e4856d07..1e58c8dc 100644
--- a/src/nanoarrow/nanoarrow_ipc.h
+++ b/src/nanoarrow/nanoarrow_ipc.h
@@ -69,8 +69,18 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamInitBuffer)
#define ArrowIpcOutputStreamInitFile \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamInitFile)
+#define ArrowIpcOutputStreamWrite \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamWrite)
#define ArrowIpcOutputStreamMove \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamMove)
+#define ArrowIpcWriterInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcWriterInit)
+#define ArrowIpcWriterReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE,
ArrowIpcWriterReset)
+#define ArrowIpcWriterWriteSchema \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcWriterWriteSchema)
+#define ArrowIpcWriterWriteArrayView \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcWriterWriteArrayView)
+#define ArrowIpcWriterWriteArrayStream \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcWriterWriteArrayStream)
#endif
@@ -491,8 +501,58 @@ ArrowErrorCode ArrowIpcOutputStreamInitBuffer(struct
ArrowIpcOutputStream* strea
/// close_on_release and handle closing the file independently from stream.
ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream*
stream,
void* file_ptr, int
close_on_release);
-/// @}
+/// \brief Write to a stream, trying again until all are written or the stream
errors.
+ArrowErrorCode ArrowIpcOutputStreamWrite(struct ArrowIpcOutputStream* stream,
+ struct ArrowBufferView data,
+ struct ArrowError* error);
+
+/// \brief A stream writer which encodes Schemas and ArrowArrays into an IPC
byte stream
+///
+/// This structure is intended to be allocated by the caller,
+/// initialized using ArrowIpcWriterInit(), and released with
+/// ArrowIpcWriterReset().
+struct ArrowIpcWriter {
+ /// \brief Private resources managed by this library
+ void* private_data;
+};
+
+/// \brief Initialize an output stream of bytes from an ArrowArrayStream
+///
+/// Returns NANOARROW_OK on success. If NANOARROW_OK is returned the writer
+/// takes ownership of the output byte stream, and the caller is
+/// responsible for releasing the writer by calling ArrowIpcWriterReset().
+ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer,
+ struct ArrowIpcOutputStream* output_stream);
+
+/// \brief Release all resources attached to a writer
+void ArrowIpcWriterReset(struct ArrowIpcWriter* writer);
+
+/// \brief Write a schema to the output byte stream
+///
+/// Errors are propagated from the underlying encoder and output byte stream.
+ArrowErrorCode ArrowIpcWriterWriteSchema(struct ArrowIpcWriter* writer,
+ const struct ArrowSchema* in,
+ struct ArrowError* error);
+
+/// \brief Write an array view to the output byte stream
+///
+/// The array view may be NULL, in which case an EOS will be written.
+/// The writer does not check that a schema was already written.
+///
+/// Errors are propagated from the underlying encoder and output byte stream,
+ArrowErrorCode ArrowIpcWriterWriteArrayView(struct ArrowIpcWriter* writer,
+ const struct ArrowArrayView* in,
+ struct ArrowError* error);
+
+/// \brief Write an entire stream (including EOS) to the output byte stream
+///
+/// Errors are propagated from the underlying encoder, array stream, and
output byte
+/// stream.
+ArrowErrorCode ArrowIpcWriterWriteArrayStream(struct ArrowIpcWriter* writer,
+ struct ArrowArrayStream* in,
+ struct ArrowError* error);
+/// @}
#ifdef __cplusplus
}
#endif
diff --git a/src/nanoarrow/nanoarrow_ipc.hpp b/src/nanoarrow/nanoarrow_ipc.hpp
index 8000ca05..6cf7bc22 100644
--- a/src/nanoarrow/nanoarrow_ipc.hpp
+++ b/src/nanoarrow/nanoarrow_ipc.hpp
@@ -95,6 +95,22 @@ inline void release_pointer(struct ArrowIpcOutputStream*
data) {
}
}
+template <>
+inline void init_pointer(struct ArrowIpcWriter* data) {
+ data->private_data = nullptr;
+}
+
+template <>
+inline void move_pointer(struct ArrowIpcWriter* src, struct ArrowIpcWriter*
dst) {
+ memcpy(dst, src, sizeof(struct ArrowIpcWriter));
+ src->private_data = nullptr;
+}
+
+template <>
+inline void release_pointer(struct ArrowIpcWriter* data) {
+ ArrowIpcWriterReset(data);
+}
+
} // namespace internal
} // namespace nanoarrow
@@ -121,6 +137,9 @@ using UniqueInputStream = internal::Unique<struct
ArrowIpcInputStream>;
/// \brief Class wrapping a unique struct ArrowIpcOutputStream
using UniqueOutputStream = internal::Unique<struct ArrowIpcOutputStream>;
+/// \brief Class wrapping a unique struct ArrowIpcWriter
+using UniqueWriter = internal::Unique<struct ArrowIpcWriter>;
+
/// @}
} // namespace ipc
diff --git a/valgrind.supp b/valgrind.supp
index c1753041..ddd0c064 100644
--- a/valgrind.supp
+++ b/valgrind.supp
@@ -66,3 +66,10 @@
fun:base64_encode
fun:R_base64_encode
}
+
+# TODO https://github.com/apache/arrow-nanoarrow/issues/579 remove this
+{
+ <flatcc>:flatcc uses realloc() and valgrind thinks something was free'd
+ Memcheck:Addr4
+ fun:flatcc_builder_create_cached_vtable
+}