This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new cb89444d feat: add ipc RecordBatch encoding (#555)
cb89444d is described below
commit cb89444d34e391a3b5fdc06da51a720542f0dd27
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Tue Aug 6 19:56:45 2024 -0500
feat: add ipc RecordBatch encoding (#555)
- added `ArrowIpcEncoderEncodeRecordBatch()` which uses a callback for
buffer encoding, but left it internal
- added `ArrowIpcEncoderEncodeSimpleRecordBatch()` which simply
concatenates a batch's buffers into one contiguous (properly aligned and
padded) body buffer
- testing uses the decoder tests, replacing arrow C++'s encoder with the
new nanoarrow encoder
---
CMakeLists.txt | 1 +
src/nanoarrow/ipc/decoder.c | 9 +-
src/nanoarrow/ipc/decoder_test.cc | 78 ++++++++
src/nanoarrow/ipc/encoder.c | 402 ++++++++++++++++++++++++++------------
src/nanoarrow/ipc/encoder_test.cc | 5 -
src/nanoarrow/nanoarrow_ipc.h | 36 ++--
6 files changed, 380 insertions(+), 151 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index e73779a9..a954d215 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -510,6 +510,7 @@ if(NANOARROW_BUILD_TESTS)
target_link_libraries(nanoarrow_ipc_files_test nanoarrow_testing ZLIB::ZLIB
nanoarrow_coverage_config)
+ target_link_libraries(nanoarrow_ipc_decoder_test gmock_main)
endif()
if(NANOARROW_DEVICE)
diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c
index 2cd2a005..6e37e20d 100644
--- a/src/nanoarrow/ipc/decoder.c
+++ b/src/nanoarrow/ipc/decoder.c
@@ -1657,9 +1657,16 @@ static ArrowErrorCode
ArrowIpcDecoderDecodeArrayViewInternal(
return EINVAL;
}
+ // RecordBatch messages don't count the root node but decoder->fields does
+ // (decoder->fields[0] is the root field)
+ if (field_i + 1 >= private_data->n_fields) {
+ ArrowErrorSet(error, "cannot decode column %" PRId64 "; there are only %"
PRId64,
+ field_i, private_data->n_fields - 1);
+ return EINVAL;
+ }
+
ns(RecordBatch_table_t) batch =
(ns(RecordBatch_table_t))private_data->last_message;
- // RecordBatch messages don't count the root node but decoder->fields does
struct ArrowIpcField* root = private_data->fields + field_i + 1;
struct ArrowIpcArraySetter setter;
diff --git a/src/nanoarrow/ipc/decoder_test.cc
b/src/nanoarrow/ipc/decoder_test.cc
index 1560a835..dd50201c 100644
--- a/src/nanoarrow/ipc/decoder_test.cc
+++ b/src/nanoarrow/ipc/decoder_test.cc
@@ -21,6 +21,7 @@
#include <arrow/c/bridge.h>
#include <arrow/ipc/api.h>
#include <arrow/util/key_value_metadata.h>
+#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>
// For bswap32()
@@ -761,6 +762,83 @@ TEST_P(ArrowTypeParameterizedTestFixture,
NanoarrowIpcArrowArrayRoundtrip) {
ArrowIpcDecoderReset(&decoder);
}
+void AssertArrayViewIdentical(const struct ArrowArrayView* actual,
+ const struct ArrowArrayView* expected) {
+ NANOARROW_DCHECK(actual->dictionary == nullptr);
+ NANOARROW_DCHECK(expected->dictionary == nullptr);
+
+ ASSERT_EQ(actual->storage_type, expected->storage_type);
+ ASSERT_EQ(actual->offset, expected->offset);
+ ASSERT_EQ(actual->length, expected->length);
+ for (int i = 0; i < 3; i++) {
+ auto a_buf = actual->buffer_views[i];
+ auto e_buf = expected->buffer_views[i];
+ ASSERT_EQ(a_buf.size_bytes, e_buf.size_bytes);
+ if (a_buf.size_bytes != 0) {
+ ASSERT_EQ(memcmp(a_buf.data.data, e_buf.data.data, a_buf.size_bytes), 0);
+ }
+ }
+
+ ASSERT_EQ(actual->n_children, expected->n_children);
+ for (int i = 0; i < actual->n_children; i++) {
+ AssertArrayViewIdentical(actual->children[i], expected->children[i]);
+ }
+}
+
+TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcNanoarrowArrayRoundtrip)
{
+ struct ArrowError error;
+ nanoarrow::UniqueSchema schema;
+ ASSERT_TRUE(
+ arrow::ExportSchema(arrow::Schema({arrow::field("", GetParam())}),
schema.get())
+ .ok());
+
+ nanoarrow::UniqueArrayView array_view;
+ ASSERT_EQ(ArrowArrayViewInitFromSchema(array_view.get(), schema.get(),
&error),
+ NANOARROW_OK);
+
+ // now make one empty struct array with this schema and another with all
zeroes
+ nanoarrow::UniqueArray empty_array, zero_array;
+ for (auto* array : {empty_array.get(), zero_array.get()}) {
+ ASSERT_EQ(ArrowArrayInitFromSchema(array, schema.get(), nullptr),
NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayStartAppending(array), NANOARROW_OK);
+ if (array == zero_array.get()) {
+ ASSERT_EQ(ArrowArrayAppendEmpty(array, 5), NANOARROW_OK);
+ }
+ ASSERT_EQ(ArrowArrayFinishBuildingDefault(array, nullptr), NANOARROW_OK);
+ ASSERT_EQ(ArrowArrayViewSetArray(array_view.get(), array, &error),
NANOARROW_OK)
+ << error.message;
+
+ nanoarrow::ipc::UniqueEncoder encoder;
+ EXPECT_EQ(ArrowIpcEncoderInit(encoder.get()), NANOARROW_OK);
+
+ nanoarrow::UniqueBuffer buffer, body_buffer;
+ EXPECT_EQ(ArrowIpcEncoderEncodeSimpleRecordBatch(encoder.get(),
array_view.get(),
+ body_buffer.get(),
&error),
+ NANOARROW_OK)
+ << error.message;
+ EXPECT_EQ(
+ ArrowIpcEncoderFinalizeBuffer(encoder.get(), /*encapsulate=*/true,
buffer.get()),
+ NANOARROW_OK);
+
+ nanoarrow::ipc::UniqueDecoder decoder;
+ ArrowIpcDecoderInit(decoder.get());
+ EXPECT_EQ(ArrowIpcDecoderSetSchema(decoder.get(), schema.get(), &error),
NANOARROW_OK)
+ << error.message;
+ EXPECT_EQ(ArrowIpcDecoderDecodeHeader(decoder.get(),
+ {buffer->data, buffer->size_bytes},
&error),
+ NANOARROW_OK)
+ << error.message;
+
+ struct ArrowArrayView* roundtripped;
+ ASSERT_EQ(ArrowIpcDecoderDecodeArrayView(decoder.get(),
+ {body_buffer->data,
body_buffer->size_bytes},
+ -1, &roundtripped, nullptr),
+ NANOARROW_OK);
+
+ AssertArrayViewIdentical(roundtripped, array_view.get());
+ }
+}
+
INSTANTIATE_TEST_SUITE_P(
NanoarrowIpcTest, ArrowTypeParameterizedTestFixture,
::testing::Values(
diff --git a/src/nanoarrow/ipc/encoder.c b/src/nanoarrow/ipc/encoder.c
index 4eebe36e..889f66ad 100644
--- a/src/nanoarrow/ipc/encoder.c
+++ b/src/nanoarrow/ipc/encoder.c
@@ -27,8 +27,19 @@
#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x)
-#define FLATCC_RETURN_UNLESS_0(x) \
- if (ns(x) != 0) return ENOMEM;
+#define FLATCC_RETURN_UNLESS_0_NO_NS(x, error) \
+ if ((x) != 0) { \
+ ArrowErrorSet(error, "%s:%d: %s failed", __FILE__, __LINE__, #x); \
+ return ENOMEM; \
+ }
+
+#define FLATCC_RETURN_UNLESS_0(x, error) FLATCC_RETURN_UNLESS_0_NO_NS(ns(x),
error)
+
+#define FLATCC_RETURN_IF_NULL(x, error) \
+ if (!(x)) { \
+ ArrowErrorSet(error, "%s:%d: %s was null", __FILE__, __LINE__, #x); \
+ return ENOMEM; \
+ }
struct ArrowIpcEncoderPrivate {
flatcc_builder_t builder;
@@ -36,22 +47,15 @@ struct ArrowIpcEncoderPrivate {
struct ArrowBuffer nodes;
};
-static int32_t ArrowInt32ToLe(int32_t i) {
- if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_BIG) {
- return bswap32(i);
- }
- return i;
-}
-
ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) {
NANOARROW_DCHECK(encoder != NULL);
memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
- encoder->encode_buffer = NULL;
- encoder->encode_buffer_state = NULL;
- encoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
encoder->private_data = ArrowMalloc(sizeof(struct ArrowIpcEncoderPrivate));
struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;
+ if (private == NULL) {
+ return ENOMEM;
+ }
if (flatcc_builder_init(&private->builder) == -1) {
ArrowFree(private);
return ESPIPE;
@@ -70,8 +74,8 @@ void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) {
ArrowBufferReset(&private->nodes);
ArrowBufferReset(&private->buffers);
ArrowFree(private);
- memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
}
+ memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
}
ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
@@ -80,40 +84,41 @@ ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct
ArrowIpcEncoder* encoder,
struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;
- int64_t size = (int64_t)flatcc_builder_get_buffer_size(&private->builder);
- int32_t header[] = {-1, ArrowInt32ToLe((int32_t)size)};
+ size_t size = flatcc_builder_get_buffer_size(&private->builder);
+ _NANOARROW_CHECK_UPPER_LIMIT(size, INT32_MAX);
+
+ int32_t header[] = {-1, (int32_t)size};
+ if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_BIG) {
+ header[1] = (int32_t)bswap32((uint32_t)size);
+ }
if (size == 0) {
// Finalizing an empty flatcc_builder_t triggers an assertion
return encapsulate ? ArrowBufferAppend(out, &header, sizeof(header)) :
NANOARROW_OK;
}
- const void* data = flatcc_builder_get_direct_buffer(&private->builder, NULL);
- if (data == NULL) {
- return ENOMEM;
- }
-
- int64_t i = out->size_bytes;
if (encapsulate) {
int64_t encapsulated_size =
_ArrowRoundUpToMultipleOf8(sizeof(int32_t) + sizeof(int32_t) + size);
- NANOARROW_RETURN_NOT_OK(
- ArrowBufferResize(out, out->size_bytes + encapsulated_size, 0));
+ NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, encapsulated_size));
+ NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(out, &header, sizeof(header)));
} else {
- NANOARROW_RETURN_NOT_OK(ArrowBufferResize(out, out->size_bytes + size, 0));
+ NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, size));
}
+ void* data =
+ flatcc_builder_copy_buffer(&private->builder, out->data +
out->size_bytes, size);
+ NANOARROW_DCHECK(data != NULL);
+ NANOARROW_UNUSED(data);
+ out->size_bytes += size;
+
if (encapsulate) {
- memcpy(out->data + i, &header, sizeof(header));
- i += sizeof(header);
+ // zero padding bytes, if any
+ int64_t padded_size = _ArrowRoundUpToMultipleOf8(out->size_bytes);
+ memset(out->data + out->size_bytes, 0, padded_size - out->size_bytes);
+ out->size_bytes = padded_size;
}
- memcpy(out->data + i, data, size);
- i += size;
-
- // zero padding bytes, if any
- memset(out->data + i, 0, out->size_bytes - i);
-
// don't deallocate yet, just wipe the builder's current Message
flatcc_builder_reset(&private->builder);
return NANOARROW_OK;
@@ -124,178 +129,185 @@ static ArrowErrorCode
ArrowIpcEncodeFieldType(flatcc_builder_t* builder,
struct ArrowError* error) {
switch (schema_view->type) {
case NANOARROW_TYPE_NA:
- FLATCC_RETURN_UNLESS_0(Field_type_Null_create(builder));
+ FLATCC_RETURN_UNLESS_0(Field_type_Null_create(builder), error);
return NANOARROW_OK;
case NANOARROW_TYPE_BOOL:
- FLATCC_RETURN_UNLESS_0(Field_type_Bool_create(builder));
+ FLATCC_RETURN_UNLESS_0(Field_type_Bool_create(builder), error);
return NANOARROW_OK;
case NANOARROW_TYPE_UINT8:
case NANOARROW_TYPE_INT8:
FLATCC_RETURN_UNLESS_0(
- Field_type_Int_create(builder, 8, schema_view->type ==
NANOARROW_TYPE_INT8));
+ Field_type_Int_create(builder, 8, schema_view->type ==
NANOARROW_TYPE_INT8),
+ error);
return NANOARROW_OK;
case NANOARROW_TYPE_UINT16:
case NANOARROW_TYPE_INT16:
FLATCC_RETURN_UNLESS_0(
- Field_type_Int_create(builder, 16, schema_view->type ==
NANOARROW_TYPE_INT16));
+ Field_type_Int_create(builder, 16, schema_view->type ==
NANOARROW_TYPE_INT16),
+ error);
return NANOARROW_OK;
case NANOARROW_TYPE_UINT32:
case NANOARROW_TYPE_INT32:
FLATCC_RETURN_UNLESS_0(
- Field_type_Int_create(builder, 32, schema_view->type ==
NANOARROW_TYPE_INT32));
+ Field_type_Int_create(builder, 32, schema_view->type ==
NANOARROW_TYPE_INT32),
+ error);
return NANOARROW_OK;
case NANOARROW_TYPE_UINT64:
case NANOARROW_TYPE_INT64:
FLATCC_RETURN_UNLESS_0(
- Field_type_Int_create(builder, 64, schema_view->type ==
NANOARROW_TYPE_INT64));
+ Field_type_Int_create(builder, 64, schema_view->type ==
NANOARROW_TYPE_INT64),
+ error);
return NANOARROW_OK;
case NANOARROW_TYPE_HALF_FLOAT:
- FLATCC_RETURN_UNLESS_0(
- Field_type_FloatingPoint_create(builder, ns(Precision_HALF)));
+ FLATCC_RETURN_UNLESS_0(Field_type_FloatingPoint_create(builder,
ns(Precision_HALF)),
+ error);
return NANOARROW_OK;
case NANOARROW_TYPE_FLOAT:
FLATCC_RETURN_UNLESS_0(
- Field_type_FloatingPoint_create(builder, ns(Precision_SINGLE)));
+ Field_type_FloatingPoint_create(builder, ns(Precision_SINGLE)),
error);
return NANOARROW_OK;
case NANOARROW_TYPE_DOUBLE:
FLATCC_RETURN_UNLESS_0(
- Field_type_FloatingPoint_create(builder, ns(Precision_DOUBLE)));
+ Field_type_FloatingPoint_create(builder, ns(Precision_DOUBLE)),
error);
return NANOARROW_OK;
case NANOARROW_TYPE_DECIMAL128:
case NANOARROW_TYPE_DECIMAL256:
- FLATCC_RETURN_UNLESS_0(Field_type_Decimal_create(
- builder, schema_view->decimal_precision, schema_view->decimal_scale,
- schema_view->decimal_bitwidth));
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Decimal_create(builder, schema_view->decimal_precision,
+ schema_view->decimal_scale,
+ schema_view->decimal_bitwidth),
+ error);
return NANOARROW_OK;
case NANOARROW_TYPE_STRING:
- FLATCC_RETURN_UNLESS_0(Field_type_Utf8_create(builder));
+ FLATCC_RETURN_UNLESS_0(Field_type_Utf8_create(builder), error);
return NANOARROW_OK;
case NANOARROW_TYPE_LARGE_STRING:
- FLATCC_RETURN_UNLESS_0(Field_type_LargeUtf8_create(builder));
+ FLATCC_RETURN_UNLESS_0(Field_type_LargeUtf8_create(builder), error);
return NANOARROW_OK;
case NANOARROW_TYPE_BINARY:
- FLATCC_RETURN_UNLESS_0(Field_type_Binary_create(builder));
+ FLATCC_RETURN_UNLESS_0(Field_type_Binary_create(builder), error);
return NANOARROW_OK;
case NANOARROW_TYPE_LARGE_BINARY:
- FLATCC_RETURN_UNLESS_0(Field_type_LargeBinary_create(builder));
+ FLATCC_RETURN_UNLESS_0(Field_type_LargeBinary_create(builder), error);
return NANOARROW_OK;
case NANOARROW_TYPE_DATE32:
- FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder,
ns(DateUnit_DAY)));
+ FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder,
ns(DateUnit_DAY)), error);
return NANOARROW_OK;
case NANOARROW_TYPE_DATE64:
- FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder,
ns(DateUnit_MILLISECOND)));
+ FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder,
ns(DateUnit_MILLISECOND)),
+ error);
return NANOARROW_OK;
case NANOARROW_TYPE_INTERVAL_MONTHS:
FLATCC_RETURN_UNLESS_0(
- Field_type_Interval_create(builder, ns(IntervalUnit_YEAR_MONTH)));
+ Field_type_Interval_create(builder, ns(IntervalUnit_YEAR_MONTH)),
error);
return NANOARROW_OK;
case NANOARROW_TYPE_INTERVAL_DAY_TIME:
FLATCC_RETURN_UNLESS_0(
- Field_type_Interval_create(builder, ns(IntervalUnit_DAY_TIME)));
+ Field_type_Interval_create(builder, ns(IntervalUnit_DAY_TIME)),
error);
return NANOARROW_OK;
case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
FLATCC_RETURN_UNLESS_0(
- Field_type_Interval_create(builder,
ns(IntervalUnit_MONTH_DAY_NANO)));
+ Field_type_Interval_create(builder,
ns(IntervalUnit_MONTH_DAY_NANO)), error);
return NANOARROW_OK;
case NANOARROW_TYPE_TIMESTAMP:
- FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder));
+ FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder), error);
+ FLATCC_RETURN_UNLESS_0(Timestamp_unit_add(builder,
schema_view->time_unit), error);
FLATCC_RETURN_UNLESS_0(
- Timestamp_unit_add(builder,
(ns(TimeUnit_enum_t))schema_view->time_unit));
- FLATCC_RETURN_UNLESS_0(
- Timestamp_timezone_create_str(builder, schema_view->timezone));
- FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder));
+ Timestamp_timezone_create_str(builder, schema_view->timezone),
error);
+ FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder), error);
return NANOARROW_OK;
case NANOARROW_TYPE_TIME32:
- FLATCC_RETURN_UNLESS_0(Field_type_Time_create(
- builder, (ns(TimeUnit_enum_t))schema_view->time_unit, 32));
+ FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder,
schema_view->time_unit, 32),
+ error);
return NANOARROW_OK;
case NANOARROW_TYPE_TIME64:
- FLATCC_RETURN_UNLESS_0(Field_type_Time_create(
- builder, (ns(TimeUnit_enum_t))schema_view->time_unit, 64));
+ FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder,
schema_view->time_unit, 64),
+ error);
return NANOARROW_OK;
case NANOARROW_TYPE_DURATION:
- FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(
- builder, (ns(TimeUnit_enum_t))schema_view->time_unit));
+ FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(builder,
schema_view->time_unit),
+ error);
return NANOARROW_OK;
case NANOARROW_TYPE_FIXED_SIZE_BINARY:
FLATCC_RETURN_UNLESS_0(
- Field_type_FixedSizeBinary_create(builder, schema_view->fixed_size));
+ Field_type_FixedSizeBinary_create(builder, schema_view->fixed_size),
error);
return NANOARROW_OK;
case NANOARROW_TYPE_LIST:
- FLATCC_RETURN_UNLESS_0(Field_type_List_create(builder));
+ FLATCC_RETURN_UNLESS_0(Field_type_List_create(builder), error);
return NANOARROW_OK;
case NANOARROW_TYPE_LARGE_LIST:
- FLATCC_RETURN_UNLESS_0(Field_type_LargeList_create(builder));
+ FLATCC_RETURN_UNLESS_0(Field_type_LargeList_create(builder), error);
return NANOARROW_OK;
case NANOARROW_TYPE_FIXED_SIZE_LIST:
FLATCC_RETURN_UNLESS_0(
- Field_type_FixedSizeList_create(builder, schema_view->fixed_size));
+ Field_type_FixedSizeList_create(builder, schema_view->fixed_size),
error);
return NANOARROW_OK;
case NANOARROW_TYPE_RUN_END_ENCODED:
- FLATCC_RETURN_UNLESS_0(Field_type_RunEndEncoded_create(builder));
+ FLATCC_RETURN_UNLESS_0(Field_type_RunEndEncoded_create(builder), error);
return NANOARROW_OK;
case NANOARROW_TYPE_STRUCT:
- FLATCC_RETURN_UNLESS_0(Field_type_Struct__create(builder));
+ FLATCC_RETURN_UNLESS_0(Field_type_Struct__create(builder), error);
return NANOARROW_OK;
case NANOARROW_TYPE_SPARSE_UNION:
case NANOARROW_TYPE_DENSE_UNION: {
- FLATCC_RETURN_UNLESS_0(Field_type_Union_start(builder));
+ FLATCC_RETURN_UNLESS_0(Field_type_Union_start(builder), error);
FLATCC_RETURN_UNLESS_0(
- Union_mode_add(builder, schema_view->type ==
NANOARROW_TYPE_DENSE_UNION));
+ Union_mode_add(builder, schema_view->type ==
NANOARROW_TYPE_DENSE_UNION),
+ error);
if (schema_view->union_type_ids) {
int8_t type_ids[128];
int n = _ArrowParseUnionTypeIds(schema_view->union_type_ids, type_ids);
if (n != 0) {
- FLATCC_RETURN_UNLESS_0(Union_typeIds_start(builder));
+ FLATCC_RETURN_UNLESS_0(Union_typeIds_start(builder), error);
int32_t* type_ids_32 = (int32_t*)ns(Union_typeIds_extend(builder,
n));
- if (!type_ids_32) {
- return ENOMEM;
- }
+ FLATCC_RETURN_IF_NULL(type_ids_32, error);
- for (int i = 0; i < n; ++i) {
+ for (int i = 0; i < n; i++) {
type_ids_32[i] = type_ids[i];
}
- FLATCC_RETURN_UNLESS_0(Union_typeIds_end(builder));
+ FLATCC_RETURN_UNLESS_0(Union_typeIds_end(builder), error);
}
}
- FLATCC_RETURN_UNLESS_0(Field_type_Union_end(builder));
+ FLATCC_RETURN_UNLESS_0(Field_type_Union_end(builder), error);
return NANOARROW_OK;
}
case NANOARROW_TYPE_MAP:
- FLATCC_RETURN_UNLESS_0(Field_type_Map_create(
- builder, schema_view->schema->flags & ARROW_FLAG_MAP_KEYS_SORTED));
+ FLATCC_RETURN_UNLESS_0(
+ Field_type_Map_create(builder,
+ schema_view->schema->flags &
ARROW_FLAG_MAP_KEYS_SORTED),
+ error);
return NANOARROW_OK;
case NANOARROW_TYPE_DICTIONARY:
@@ -326,15 +338,12 @@ static ArrowErrorCode
ArrowIpcEncodeMetadata(flatcc_builder_t* builder,
struct ArrowStringView key, value;
NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowMetadataReaderRead(&metadata,
&key, &value),
error);
- if (push_start(builder) != 0) {
- return ENOMEM;
- }
- FLATCC_RETURN_UNLESS_0(KeyValue_key_create_strn(builder, key.data,
key.size_bytes));
+ FLATCC_RETURN_UNLESS_0_NO_NS(push_start(builder), error);
+ FLATCC_RETURN_UNLESS_0(KeyValue_key_create_strn(builder, key.data,
key.size_bytes),
+ error);
FLATCC_RETURN_UNLESS_0(
- KeyValue_value_create_strn(builder, value.data, value.size_bytes));
- if (!push_end(builder)) {
- return ENOMEM;
- }
+ KeyValue_value_create_strn(builder, value.data, value.size_bytes),
error);
+ FLATCC_RETURN_IF_NULL(push_end(builder), error);
}
return NANOARROW_OK;
}
@@ -345,14 +354,10 @@ static ArrowErrorCode
ArrowIpcEncodeFields(flatcc_builder_t* builder,
ns(Field_ref_t) *
(*push_end)(flatcc_builder_t*),
struct ArrowError* error) {
- for (int i = 0; i < schema->n_children; ++i) {
- if (push_start(builder) != 0) {
- return ENOMEM;
- }
+ for (int i = 0; i < schema->n_children; i++) {
+ FLATCC_RETURN_UNLESS_0_NO_NS(push_start(builder), error);
NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeField(builder, schema->children[i],
error));
- if (!push_end(builder)) {
- return ENOMEM;
- }
+ FLATCC_RETURN_IF_NULL(push_end(builder), error);
}
return NANOARROW_OK;
}
@@ -360,28 +365,28 @@ static ArrowErrorCode
ArrowIpcEncodeFields(flatcc_builder_t* builder,
static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder,
const struct ArrowSchema* schema,
struct ArrowError* error) {
- FLATCC_RETURN_UNLESS_0(Field_name_create_str(builder, schema->name));
- FLATCC_RETURN_UNLESS_0(
- Field_nullable_add(builder, schema->flags & ARROW_FLAG_NULLABLE));
+ FLATCC_RETURN_UNLESS_0(Field_name_create_str(builder, schema->name), error);
+ FLATCC_RETURN_UNLESS_0(Field_nullable_add(builder, schema->flags &
ARROW_FLAG_NULLABLE),
+ error);
struct ArrowSchemaView schema_view;
NANOARROW_RETURN_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, error));
NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFieldType(builder, &schema_view,
error));
if (schema->n_children != 0) {
- FLATCC_RETURN_UNLESS_0(Field_children_start(builder));
+ FLATCC_RETURN_UNLESS_0(Field_children_start(builder), error);
NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema,
&ns(Field_children_push_start),
&ns(Field_children_push_end),
error));
- FLATCC_RETURN_UNLESS_0(Field_children_end(builder));
+ FLATCC_RETURN_UNLESS_0(Field_children_end(builder), error);
}
if (schema->metadata) {
- FLATCC_RETURN_UNLESS_0(Field_custom_metadata_start(builder));
+ FLATCC_RETURN_UNLESS_0(Field_custom_metadata_start(builder), error);
NANOARROW_RETURN_NOT_OK(
ArrowIpcEncodeMetadata(builder, schema,
&ns(Field_custom_metadata_push_start),
&ns(Field_custom_metadata_push_end), error));
- FLATCC_RETURN_UNLESS_0(Field_custom_metadata_end(builder));
+ FLATCC_RETURN_UNLESS_0(Field_custom_metadata_end(builder), error);
}
return NANOARROW_OK;
}
@@ -396,41 +401,194 @@ ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct
ArrowIpcEncoder* encoder,
flatcc_builder_t* builder = &private->builder;
- FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder));
- FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)));
+ FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder), error);
+ FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)),
error);
- FLATCC_RETURN_UNLESS_0(Message_header_Schema_start(builder));
+ FLATCC_RETURN_UNLESS_0(Message_header_Schema_start(builder), error);
if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_LITTLE) {
- FLATCC_RETURN_UNLESS_0(Schema_endianness_add(builder,
ns(Endianness_Little)));
+ FLATCC_RETURN_UNLESS_0(Schema_endianness_add(builder,
ns(Endianness_Little)), error);
} else {
- FLATCC_RETURN_UNLESS_0(Schema_endianness_add(builder, ns(Endianness_Big)));
+ FLATCC_RETURN_UNLESS_0(Schema_endianness_add(builder, ns(Endianness_Big)),
error);
}
- FLATCC_RETURN_UNLESS_0(Schema_fields_start(builder));
+ FLATCC_RETURN_UNLESS_0(Schema_fields_start(builder), error);
NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema,
&ns(Schema_fields_push_start),
&ns(Schema_fields_push_end),
error));
- FLATCC_RETURN_UNLESS_0(Schema_fields_end(builder));
+ FLATCC_RETURN_UNLESS_0(Schema_fields_end(builder), error);
if (schema->metadata) {
- FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_start(builder));
+ FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_start(builder), error);
NANOARROW_RETURN_NOT_OK(
ArrowIpcEncodeMetadata(builder, schema,
&ns(Schema_custom_metadata_push_start),
&ns(Schema_custom_metadata_push_end), error));
- FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_end(builder));
+ FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_end(builder), error);
}
- FLATCC_RETURN_UNLESS_0(Schema_features_start(builder));
- ns(Feature_enum_t)* features = ns(Schema_features_extend(builder, 1));
- if (!features) {
- return ENOMEM;
+ FLATCC_RETURN_UNLESS_0(Schema_features_start(builder), error);
+ FLATCC_RETURN_UNLESS_0(Schema_features_end(builder), error);
+
+ FLATCC_RETURN_UNLESS_0(Message_header_Schema_end(builder), error);
+
+ FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0), error);
+ FLATCC_RETURN_IF_NULL(ns(Message_end_as_root(builder)), error);
+ return NANOARROW_OK;
+}
+
+struct ArrowIpcBufferEncoder {
+ /// \brief Callback invoked against each buffer to be encoded
+ ///
+ /// Encoding of buffers is left as a callback to accommodate dissociated
data storage.
+ /// One implementation of this callback might copy all buffers into a
contiguous body
+ /// for use in an arrow IPC stream, another implementation might store
offsets and
+ /// lengths relative to a known arena.
+ ArrowErrorCode (*encode_buffer)(struct ArrowBufferView buffer_view,
+ struct ArrowIpcEncoder* encoder,
+ struct ArrowIpcBufferEncoder* buffer_encoder,
+ int64_t* offset, int64_t* length,
+ struct ArrowError* error);
+
+ /// \brief Pointer to arbitrary data used by encode_buffer()
+ void* encode_buffer_state;
+
+ /// \brief Finalized body length of the most recently encoded RecordBatch
message
+ ///
+ /// encode_buffer() is expected to update this while encoding each buffer.
After all
+ /// buffers are encoded, this will be written to the RecordBatch's
.bodyLength
+ int64_t body_length;
+};
+
+static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback(
+ struct ArrowBufferView buffer_view, struct ArrowIpcEncoder* encoder,
+ struct ArrowIpcBufferEncoder* buffer_encoder, int64_t* offset, int64_t*
length,
+ struct ArrowError* error) {
+ NANOARROW_UNUSED(encoder);
+
+ struct ArrowBuffer* body_buffer =
+ (struct ArrowBuffer*)buffer_encoder->encode_buffer_state;
+
+ int64_t old_size = body_buffer->size_bytes;
+ int64_t buffer_begin = _ArrowRoundUpToMultipleOf8(old_size);
+ int64_t buffer_end = buffer_begin + buffer_view.size_bytes;
+ int64_t new_size = _ArrowRoundUpToMultipleOf8(buffer_end);
+
+ // reserve all the memory we'll need now
+ NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowBufferReserve(body_buffer, new_size
- old_size),
+ error);
+
+ // zero padding up to the start of the buffer
+ NANOARROW_ASSERT_OK(ArrowBufferAppendFill(body_buffer, 0, buffer_begin -
old_size));
+
+ // store offset and length of the buffer
+ *offset = buffer_begin;
+ *length = buffer_view.size_bytes;
+
+ NANOARROW_ASSERT_OK(
+ ArrowBufferAppend(body_buffer, buffer_view.data.data,
buffer_view.size_bytes));
+
+ // zero padding after writing the buffer
+ NANOARROW_DCHECK(body_buffer->size_bytes == buffer_end);
+ NANOARROW_ASSERT_OK(ArrowBufferAppendFill(body_buffer, 0, new_size -
buffer_end));
+
+ buffer_encoder->body_length = body_buffer->size_bytes;
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatchImpl(
+ struct ArrowIpcEncoder* encoder, struct ArrowIpcBufferEncoder*
buffer_encoder,
+ const struct ArrowArrayView* array_view, struct ArrowBuffer* buffers,
+ struct ArrowBuffer* nodes, struct ArrowError* error) {
+ if (array_view->offset != 0) {
+ ArrowErrorSet(error, "Cannot encode arrays with nonzero offset");
+ return ENOTSUP;
+ }
+
+ for (int64_t c = 0; c < array_view->n_children; ++c) {
+ const struct ArrowArrayView* child = array_view->children[c];
+
+ struct ns(FieldNode) node = {child->length, child->null_count};
+ NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowBufferAppend(nodes, &node,
sizeof(node)),
+ error);
+
+ for (int64_t b = 0; b < child->array->n_buffers; ++b) {
+ struct ns(Buffer) buffer;
+ NANOARROW_RETURN_NOT_OK(
+ buffer_encoder->encode_buffer(child->buffer_views[b], encoder,
buffer_encoder,
+ &buffer.offset, &buffer.length,
error));
+ NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+ ArrowBufferAppend(buffers, &buffer, sizeof(buffer)), error);
+ }
+
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeRecordBatchImpl(
+ encoder, buffer_encoder, child, buffers, nodes, error));
}
- features[0] = ns(Feature_COMPRESSED_BODY);
- FLATCC_RETURN_UNLESS_0(Schema_features_end(builder));
+ return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(
+ struct ArrowIpcEncoder* encoder, struct ArrowIpcBufferEncoder*
buffer_encoder,
+ const struct ArrowArrayView* array_view, struct ArrowError* error) {
+ NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL &&
+ buffer_encoder != NULL && buffer_encoder->encode_buffer !=
NULL);
+
+ if (array_view->null_count != 0 &&
ArrowArrayViewComputeNullCount(array_view) != 0) {
+ ArrowErrorSet(error,
+ "RecordBatches cannot be constructed from arrays with top
level nulls");
+ return EINVAL;
+ }
+
+ if (array_view->storage_type != NANOARROW_TYPE_STRUCT) {
+ ArrowErrorSet(
+ error,
+ "RecordBatches cannot be constructed from arrays of type other than
struct");
+ return EINVAL;
+ }
+
+ struct ArrowIpcEncoderPrivate* private =
+ (struct ArrowIpcEncoderPrivate*)encoder->private_data;
+
+ flatcc_builder_t* builder = &private->builder;
+
+ FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder), error);
+ FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)),
error);
+
+ FLATCC_RETURN_UNLESS_0(Message_header_RecordBatch_start(builder), error);
+ FLATCC_RETURN_UNLESS_0(RecordBatch_length_add(builder, array_view->length),
error);
+
+ NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffers, 0, 0));
+ NANOARROW_ASSERT_OK(ArrowBufferResize(&private->nodes, 0, 0));
+ NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeRecordBatchImpl(
+ encoder, buffer_encoder, array_view, &private->buffers, &private->nodes,
error));
+
+ FLATCC_RETURN_UNLESS_0(RecordBatch_nodes_create( //
+ builder, (struct
ns(FieldNode)*)private->nodes.data,
+ private->nodes.size_bytes / sizeof(struct
ns(FieldNode))),
+ error);
+ FLATCC_RETURN_UNLESS_0(RecordBatch_buffers_create( //
+ builder, (struct
ns(Buffer)*)private->buffers.data,
+ private->buffers.size_bytes / sizeof(struct
ns(Buffer))),
+ error);
+
+ FLATCC_RETURN_UNLESS_0(Message_header_RecordBatch_end(builder), error);
+
+ FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder,
buffer_encoder->body_length),
+ error);
+ FLATCC_RETURN_IF_NULL(ns(Message_end_as_root(builder)), error);
+ return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcEncoderEncodeSimpleRecordBatch(
+ struct ArrowIpcEncoder* encoder, const struct ArrowArrayView* array_view,
+ struct ArrowBuffer* body_buffer, struct ArrowError* error) {
+ NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL &&
+ body_buffer != NULL);
- FLATCC_RETURN_UNLESS_0(Message_header_Schema_end(builder));
+ struct ArrowIpcBufferEncoder buffer_encoder = {
+ .encode_buffer = &ArrowIpcEncoderBuildContiguousBodyBufferCallback,
+ .encode_buffer_state = body_buffer,
+ .body_length = 0,
+ };
- FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0));
- return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM;
+ return ArrowIpcEncoderEncodeRecordBatch(encoder, &buffer_encoder,
array_view, error);
}
diff --git a/src/nanoarrow/ipc/encoder_test.cc
b/src/nanoarrow/ipc/encoder_test.cc
index f5e6659f..78ca0978 100644
--- a/src/nanoarrow/ipc/encoder_test.cc
+++ b/src/nanoarrow/ipc/encoder_test.cc
@@ -35,11 +35,6 @@ TEST(NanoarrowIpcTest, NanoarrowIpcEncoderConstruction) {
EXPECT_EQ(ArrowIpcEncoderInit(encoder.get()), NANOARROW_OK);
- EXPECT_EQ(encoder->codec, NANOARROW_IPC_COMPRESSION_TYPE_NONE);
- EXPECT_EQ(encoder->body_length, 0);
- EXPECT_EQ(encoder->encode_buffer, nullptr);
- EXPECT_EQ(encoder->encode_buffer_state, nullptr);
-
auto* p = static_cast<struct ArrowIpcEncoderPrivate*>(encoder->private_data);
ASSERT_NE(p, nullptr);
for (auto* b : {&p->buffers, &p->nodes}) {
diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h
index 2695ada9..e4856d07 100644
--- a/src/nanoarrow/nanoarrow_ipc.h
+++ b/src/nanoarrow/nanoarrow_ipc.h
@@ -63,6 +63,8 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderFinalizeBuffer)
#define ArrowIpcEncoderEncodeSchema \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderEncodeSchema)
+#define ArrowIpcEncoderEncodeSimpleRecordBatch \
+ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderEncodeSimpleRecordBatch)
#define ArrowIpcOutputStreamInitBuffer \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamInitBuffer)
#define ArrowIpcOutputStreamInitFile \
@@ -214,7 +216,7 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder);
/// \brief Peek at a message header
///
-/// The first 8 bytes of an Arrow IPC message are 0xFFFFFF followed by the size
+/// The first 8 bytes of an Arrow IPC message are 0xFFFFFFFF followed by the
size
/// of the header as a little-endian 32-bit integer.
ArrowIpcDecoderPeekHeader() reads
/// these bytes and returns ESPIPE if there are not enough remaining bytes in
data to read
/// the entire header message, EINVAL if the first 8 bytes are not valid,
ENODATA if the
@@ -411,28 +413,6 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
/// initialized using ArrowIpcEncoderInit(), and released with
/// ArrowIpcEncoderReset().
struct ArrowIpcEncoder {
- /// \brief Compression to encode in the next RecordBatch message.
- enum ArrowIpcCompressionType codec;
-
- /// \brief Callback invoked against each buffer to be encoded
- ///
- /// Encoding of buffers is left as a callback to accommodate dissociated
data storage.
- /// One implementation of this callback might copy all buffers into a
contiguous body
- /// for use in an arrow IPC stream, another implementation might store
offsets and
- /// lengths relative to a known arena.
- ArrowErrorCode (*encode_buffer)(struct ArrowBufferView buffer_view,
- struct ArrowIpcEncoder* encoder, int64_t*
offset,
- int64_t* length, struct ArrowError* error);
-
- /// \brief Pointer to arbitrary data used by encode_buffer()
- void* encode_buffer_state;
-
- /// \brief Finalized body length of the most recently encoded RecordBatch
message
- ///
- /// (This is initially 0 and encode_buffer() is expected to update it. After
all
- /// buffers are encoded, this will be written to the RecordBatch's
.bodyLength)
- int64_t body_length;
-
/// \brief Private resources managed by this library
void* private_data;
};
@@ -462,6 +442,16 @@ ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct
ArrowIpcEncoder* encoder,
const struct ArrowSchema* schema,
struct ArrowError* error);
+/// \brief Encode a struct typed ArrayView to a flatbuffer RecordBatch,
embedded in a
+/// Message.
+///
+/// Body buffers are concatenated into a contiguous, padded body_buffer.
+///
+/// Returns ENOMEM if allocation fails, NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcEncoderEncodeSimpleRecordBatch(
+ struct ArrowIpcEncoder* encoder, const struct ArrowArrayView* array_view,
+ struct ArrowBuffer* body_buffer, struct ArrowError* error);
+
/// \brief An user-extensible output data sink
struct ArrowIpcOutputStream {
/// \brief Write up to buf_size_bytes from stream into buf