This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new cb89444d feat: add ipc RecordBatch encoding (#555)
cb89444d is described below

commit cb89444d34e391a3b5fdc06da51a720542f0dd27
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Tue Aug 6 19:56:45 2024 -0500

    feat: add ipc RecordBatch encoding (#555)
    
    - added `ArrowIpcEncoderEncodeRecordBatch()` which uses a callback for 
buffer encoding, but left it internal
    - added `ArrowIpcEncoderEncodeSimpleRecordBatch()` which simply 
concatenates a batch's buffers into one contiguous (properly aligned and 
padded) body buffer
    - testing uses the decoder tests, replacing arrow C++'s encoder with the 
new nanoarrow encoder
---
 CMakeLists.txt                    |   1 +
 src/nanoarrow/ipc/decoder.c       |   9 +-
 src/nanoarrow/ipc/decoder_test.cc |  78 ++++++++
 src/nanoarrow/ipc/encoder.c       | 402 ++++++++++++++++++++++++++------------
 src/nanoarrow/ipc/encoder_test.cc |   5 -
 src/nanoarrow/nanoarrow_ipc.h     |  36 ++--
 6 files changed, 380 insertions(+), 151 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index e73779a9..a954d215 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -510,6 +510,7 @@ if(NANOARROW_BUILD_TESTS)
 
     target_link_libraries(nanoarrow_ipc_files_test nanoarrow_testing ZLIB::ZLIB
                           nanoarrow_coverage_config)
+    target_link_libraries(nanoarrow_ipc_decoder_test gmock_main)
   endif()
 
   if(NANOARROW_DEVICE)
diff --git a/src/nanoarrow/ipc/decoder.c b/src/nanoarrow/ipc/decoder.c
index 2cd2a005..6e37e20d 100644
--- a/src/nanoarrow/ipc/decoder.c
+++ b/src/nanoarrow/ipc/decoder.c
@@ -1657,9 +1657,16 @@ static ArrowErrorCode 
ArrowIpcDecoderDecodeArrayViewInternal(
     return EINVAL;
   }
 
+  // RecordBatch messages don't count the root node but decoder->fields does
+  // (decoder->fields[0] is the root field)
+  if (field_i + 1 >= private_data->n_fields) {
+    ArrowErrorSet(error, "cannot decode column %" PRId64 "; there are only %" 
PRId64,
+                  field_i, private_data->n_fields - 1);
+    return EINVAL;
+  }
+
   ns(RecordBatch_table_t) batch = 
(ns(RecordBatch_table_t))private_data->last_message;
 
-  // RecordBatch messages don't count the root node but decoder->fields does
   struct ArrowIpcField* root = private_data->fields + field_i + 1;
 
   struct ArrowIpcArraySetter setter;
diff --git a/src/nanoarrow/ipc/decoder_test.cc 
b/src/nanoarrow/ipc/decoder_test.cc
index 1560a835..dd50201c 100644
--- a/src/nanoarrow/ipc/decoder_test.cc
+++ b/src/nanoarrow/ipc/decoder_test.cc
@@ -21,6 +21,7 @@
 #include <arrow/c/bridge.h>
 #include <arrow/ipc/api.h>
 #include <arrow/util/key_value_metadata.h>
+#include <gmock/gmock-matchers.h>
 #include <gtest/gtest.h>
 
 // For bswap32()
@@ -761,6 +762,83 @@ TEST_P(ArrowTypeParameterizedTestFixture, 
NanoarrowIpcArrowArrayRoundtrip) {
   ArrowIpcDecoderReset(&decoder);
 }
 
+void AssertArrayViewIdentical(const struct ArrowArrayView* actual,
+                              const struct ArrowArrayView* expected) {
+  NANOARROW_DCHECK(actual->dictionary == nullptr);
+  NANOARROW_DCHECK(expected->dictionary == nullptr);
+
+  ASSERT_EQ(actual->storage_type, expected->storage_type);
+  ASSERT_EQ(actual->offset, expected->offset);
+  ASSERT_EQ(actual->length, expected->length);
+  for (int i = 0; i < 3; i++) {
+    auto a_buf = actual->buffer_views[i];
+    auto e_buf = expected->buffer_views[i];
+    ASSERT_EQ(a_buf.size_bytes, e_buf.size_bytes);
+    if (a_buf.size_bytes != 0) {
+      ASSERT_EQ(memcmp(a_buf.data.data, e_buf.data.data, a_buf.size_bytes), 0);
+    }
+  }
+
+  ASSERT_EQ(actual->n_children, expected->n_children);
+  for (int i = 0; i < actual->n_children; i++) {
+    AssertArrayViewIdentical(actual->children[i], expected->children[i]);
+  }
+}
+
+TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcNanoarrowArrayRoundtrip) 
{
+  struct ArrowError error;
+  nanoarrow::UniqueSchema schema;
+  ASSERT_TRUE(
+      arrow::ExportSchema(arrow::Schema({arrow::field("", GetParam())}), 
schema.get())
+          .ok());
+
+  nanoarrow::UniqueArrayView array_view;
+  ASSERT_EQ(ArrowArrayViewInitFromSchema(array_view.get(), schema.get(), 
&error),
+            NANOARROW_OK);
+
+  // now make one empty struct array with this schema and another with all 
zeroes
+  nanoarrow::UniqueArray empty_array, zero_array;
+  for (auto* array : {empty_array.get(), zero_array.get()}) {
+    ASSERT_EQ(ArrowArrayInitFromSchema(array, schema.get(), nullptr), 
NANOARROW_OK);
+    ASSERT_EQ(ArrowArrayStartAppending(array), NANOARROW_OK);
+    if (array == zero_array.get()) {
+      ASSERT_EQ(ArrowArrayAppendEmpty(array, 5), NANOARROW_OK);
+    }
+    ASSERT_EQ(ArrowArrayFinishBuildingDefault(array, nullptr), NANOARROW_OK);
+    ASSERT_EQ(ArrowArrayViewSetArray(array_view.get(), array, &error), 
NANOARROW_OK)
+        << error.message;
+
+    nanoarrow::ipc::UniqueEncoder encoder;
+    EXPECT_EQ(ArrowIpcEncoderInit(encoder.get()), NANOARROW_OK);
+
+    nanoarrow::UniqueBuffer buffer, body_buffer;
+    EXPECT_EQ(ArrowIpcEncoderEncodeSimpleRecordBatch(encoder.get(), 
array_view.get(),
+                                                     body_buffer.get(), 
&error),
+              NANOARROW_OK)
+        << error.message;
+    EXPECT_EQ(
+        ArrowIpcEncoderFinalizeBuffer(encoder.get(), /*encapsulate=*/true, 
buffer.get()),
+        NANOARROW_OK);
+
+    nanoarrow::ipc::UniqueDecoder decoder;
+    ArrowIpcDecoderInit(decoder.get());
+    EXPECT_EQ(ArrowIpcDecoderSetSchema(decoder.get(), schema.get(), &error), 
NANOARROW_OK)
+        << error.message;
+    EXPECT_EQ(ArrowIpcDecoderDecodeHeader(decoder.get(),
+                                          {buffer->data, buffer->size_bytes}, 
&error),
+              NANOARROW_OK)
+        << error.message;
+
+    struct ArrowArrayView* roundtripped;
+    ASSERT_EQ(ArrowIpcDecoderDecodeArrayView(decoder.get(),
+                                             {body_buffer->data, 
body_buffer->size_bytes},
+                                             -1, &roundtripped, nullptr),
+              NANOARROW_OK);
+
+    AssertArrayViewIdentical(roundtripped, array_view.get());
+  }
+}
+
 INSTANTIATE_TEST_SUITE_P(
     NanoarrowIpcTest, ArrowTypeParameterizedTestFixture,
     ::testing::Values(
diff --git a/src/nanoarrow/ipc/encoder.c b/src/nanoarrow/ipc/encoder.c
index 4eebe36e..889f66ad 100644
--- a/src/nanoarrow/ipc/encoder.c
+++ b/src/nanoarrow/ipc/encoder.c
@@ -27,8 +27,19 @@
 
 #define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x)
 
-#define FLATCC_RETURN_UNLESS_0(x) \
-  if (ns(x) != 0) return ENOMEM;
+#define FLATCC_RETURN_UNLESS_0_NO_NS(x, error)                        \
+  if ((x) != 0) {                                                     \
+    ArrowErrorSet(error, "%s:%d: %s failed", __FILE__, __LINE__, #x); \
+    return ENOMEM;                                                    \
+  }
+
+#define FLATCC_RETURN_UNLESS_0(x, error) FLATCC_RETURN_UNLESS_0_NO_NS(ns(x), 
error)
+
+#define FLATCC_RETURN_IF_NULL(x, error)                                 \
+  if (!(x)) {                                                           \
+    ArrowErrorSet(error, "%s:%d: %s was null", __FILE__, __LINE__, #x); \
+    return ENOMEM;                                                      \
+  }
 
 struct ArrowIpcEncoderPrivate {
   flatcc_builder_t builder;
@@ -36,22 +47,15 @@ struct ArrowIpcEncoderPrivate {
   struct ArrowBuffer nodes;
 };
 
-static int32_t ArrowInt32ToLe(int32_t i) {
-  if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_BIG) {
-    return bswap32(i);
-  }
-  return i;
-}
-
 ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) {
   NANOARROW_DCHECK(encoder != NULL);
   memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
-  encoder->encode_buffer = NULL;
-  encoder->encode_buffer_state = NULL;
-  encoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
   encoder->private_data = ArrowMalloc(sizeof(struct ArrowIpcEncoderPrivate));
   struct ArrowIpcEncoderPrivate* private =
       (struct ArrowIpcEncoderPrivate*)encoder->private_data;
+  if (private == NULL) {
+    return ENOMEM;
+  }
   if (flatcc_builder_init(&private->builder) == -1) {
     ArrowFree(private);
     return ESPIPE;
@@ -70,8 +74,8 @@ void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) {
     ArrowBufferReset(&private->nodes);
     ArrowBufferReset(&private->buffers);
     ArrowFree(private);
-    memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
   }
+  memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
 }
 
 ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
@@ -80,40 +84,41 @@ ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct 
ArrowIpcEncoder* encoder,
   struct ArrowIpcEncoderPrivate* private =
       (struct ArrowIpcEncoderPrivate*)encoder->private_data;
 
-  int64_t size = (int64_t)flatcc_builder_get_buffer_size(&private->builder);
-  int32_t header[] = {-1, ArrowInt32ToLe((int32_t)size)};
+  size_t size = flatcc_builder_get_buffer_size(&private->builder);
+  _NANOARROW_CHECK_UPPER_LIMIT(size, INT32_MAX);
+
+  int32_t header[] = {-1, (int32_t)size};
+  if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_BIG) {
+    header[1] = (int32_t)bswap32((uint32_t)size);
+  }
 
   if (size == 0) {
     // Finalizing an empty flatcc_builder_t triggers an assertion
     return encapsulate ? ArrowBufferAppend(out, &header, sizeof(header)) : 
NANOARROW_OK;
   }
 
-  const void* data = flatcc_builder_get_direct_buffer(&private->builder, NULL);
-  if (data == NULL) {
-    return ENOMEM;
-  }
-
-  int64_t i = out->size_bytes;
   if (encapsulate) {
     int64_t encapsulated_size =
         _ArrowRoundUpToMultipleOf8(sizeof(int32_t) + sizeof(int32_t) + size);
-    NANOARROW_RETURN_NOT_OK(
-        ArrowBufferResize(out, out->size_bytes + encapsulated_size, 0));
+    NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, encapsulated_size));
+    NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(out, &header, sizeof(header)));
   } else {
-    NANOARROW_RETURN_NOT_OK(ArrowBufferResize(out, out->size_bytes + size, 0));
+    NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, size));
   }
 
+  void* data =
+      flatcc_builder_copy_buffer(&private->builder, out->data + 
out->size_bytes, size);
+  NANOARROW_DCHECK(data != NULL);
+  NANOARROW_UNUSED(data);
+  out->size_bytes += size;
+
   if (encapsulate) {
-    memcpy(out->data + i, &header, sizeof(header));
-    i += sizeof(header);
+    // zero padding bytes, if any
+    int64_t padded_size = _ArrowRoundUpToMultipleOf8(out->size_bytes);
+    memset(out->data + out->size_bytes, 0, padded_size - out->size_bytes);
+    out->size_bytes = padded_size;
   }
 
-  memcpy(out->data + i, data, size);
-  i += size;
-
-  // zero padding bytes, if any
-  memset(out->data + i, 0, out->size_bytes - i);
-
   // don't deallocate yet, just wipe the builder's current Message
   flatcc_builder_reset(&private->builder);
   return NANOARROW_OK;
@@ -124,178 +129,185 @@ static ArrowErrorCode 
ArrowIpcEncodeFieldType(flatcc_builder_t* builder,
                                               struct ArrowError* error) {
   switch (schema_view->type) {
     case NANOARROW_TYPE_NA:
-      FLATCC_RETURN_UNLESS_0(Field_type_Null_create(builder));
+      FLATCC_RETURN_UNLESS_0(Field_type_Null_create(builder), error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_BOOL:
-      FLATCC_RETURN_UNLESS_0(Field_type_Bool_create(builder));
+      FLATCC_RETURN_UNLESS_0(Field_type_Bool_create(builder), error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_UINT8:
     case NANOARROW_TYPE_INT8:
       FLATCC_RETURN_UNLESS_0(
-          Field_type_Int_create(builder, 8, schema_view->type == 
NANOARROW_TYPE_INT8));
+          Field_type_Int_create(builder, 8, schema_view->type == 
NANOARROW_TYPE_INT8),
+          error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_UINT16:
     case NANOARROW_TYPE_INT16:
       FLATCC_RETURN_UNLESS_0(
-          Field_type_Int_create(builder, 16, schema_view->type == 
NANOARROW_TYPE_INT16));
+          Field_type_Int_create(builder, 16, schema_view->type == 
NANOARROW_TYPE_INT16),
+          error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_UINT32:
     case NANOARROW_TYPE_INT32:
       FLATCC_RETURN_UNLESS_0(
-          Field_type_Int_create(builder, 32, schema_view->type == 
NANOARROW_TYPE_INT32));
+          Field_type_Int_create(builder, 32, schema_view->type == 
NANOARROW_TYPE_INT32),
+          error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_UINT64:
     case NANOARROW_TYPE_INT64:
       FLATCC_RETURN_UNLESS_0(
-          Field_type_Int_create(builder, 64, schema_view->type == 
NANOARROW_TYPE_INT64));
+          Field_type_Int_create(builder, 64, schema_view->type == 
NANOARROW_TYPE_INT64),
+          error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_HALF_FLOAT:
-      FLATCC_RETURN_UNLESS_0(
-          Field_type_FloatingPoint_create(builder, ns(Precision_HALF)));
+      FLATCC_RETURN_UNLESS_0(Field_type_FloatingPoint_create(builder, 
ns(Precision_HALF)),
+                             error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_FLOAT:
       FLATCC_RETURN_UNLESS_0(
-          Field_type_FloatingPoint_create(builder, ns(Precision_SINGLE)));
+          Field_type_FloatingPoint_create(builder, ns(Precision_SINGLE)), 
error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_DOUBLE:
       FLATCC_RETURN_UNLESS_0(
-          Field_type_FloatingPoint_create(builder, ns(Precision_DOUBLE)));
+          Field_type_FloatingPoint_create(builder, ns(Precision_DOUBLE)), 
error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_DECIMAL128:
     case NANOARROW_TYPE_DECIMAL256:
-      FLATCC_RETURN_UNLESS_0(Field_type_Decimal_create(
-          builder, schema_view->decimal_precision, schema_view->decimal_scale,
-          schema_view->decimal_bitwidth));
+      FLATCC_RETURN_UNLESS_0(
+          Field_type_Decimal_create(builder, schema_view->decimal_precision,
+                                    schema_view->decimal_scale,
+                                    schema_view->decimal_bitwidth),
+          error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_STRING:
-      FLATCC_RETURN_UNLESS_0(Field_type_Utf8_create(builder));
+      FLATCC_RETURN_UNLESS_0(Field_type_Utf8_create(builder), error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_LARGE_STRING:
-      FLATCC_RETURN_UNLESS_0(Field_type_LargeUtf8_create(builder));
+      FLATCC_RETURN_UNLESS_0(Field_type_LargeUtf8_create(builder), error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_BINARY:
-      FLATCC_RETURN_UNLESS_0(Field_type_Binary_create(builder));
+      FLATCC_RETURN_UNLESS_0(Field_type_Binary_create(builder), error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_LARGE_BINARY:
-      FLATCC_RETURN_UNLESS_0(Field_type_LargeBinary_create(builder));
+      FLATCC_RETURN_UNLESS_0(Field_type_LargeBinary_create(builder), error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_DATE32:
-      FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, 
ns(DateUnit_DAY)));
+      FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, 
ns(DateUnit_DAY)), error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_DATE64:
-      FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, 
ns(DateUnit_MILLISECOND)));
+      FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, 
ns(DateUnit_MILLISECOND)),
+                             error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_INTERVAL_MONTHS:
       FLATCC_RETURN_UNLESS_0(
-          Field_type_Interval_create(builder, ns(IntervalUnit_YEAR_MONTH)));
+          Field_type_Interval_create(builder, ns(IntervalUnit_YEAR_MONTH)), 
error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_INTERVAL_DAY_TIME:
       FLATCC_RETURN_UNLESS_0(
-          Field_type_Interval_create(builder, ns(IntervalUnit_DAY_TIME)));
+          Field_type_Interval_create(builder, ns(IntervalUnit_DAY_TIME)), 
error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
       FLATCC_RETURN_UNLESS_0(
-          Field_type_Interval_create(builder, 
ns(IntervalUnit_MONTH_DAY_NANO)));
+          Field_type_Interval_create(builder, 
ns(IntervalUnit_MONTH_DAY_NANO)), error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_TIMESTAMP:
-      FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder));
+      FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder), error);
+      FLATCC_RETURN_UNLESS_0(Timestamp_unit_add(builder, 
schema_view->time_unit), error);
       FLATCC_RETURN_UNLESS_0(
-          Timestamp_unit_add(builder, 
(ns(TimeUnit_enum_t))schema_view->time_unit));
-      FLATCC_RETURN_UNLESS_0(
-          Timestamp_timezone_create_str(builder, schema_view->timezone));
-      FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder));
+          Timestamp_timezone_create_str(builder, schema_view->timezone), 
error);
+      FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder), error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_TIME32:
-      FLATCC_RETURN_UNLESS_0(Field_type_Time_create(
-          builder, (ns(TimeUnit_enum_t))schema_view->time_unit, 32));
+      FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, 
schema_view->time_unit, 32),
+                             error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_TIME64:
-      FLATCC_RETURN_UNLESS_0(Field_type_Time_create(
-          builder, (ns(TimeUnit_enum_t))schema_view->time_unit, 64));
+      FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, 
schema_view->time_unit, 64),
+                             error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_DURATION:
-      FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(
-          builder, (ns(TimeUnit_enum_t))schema_view->time_unit));
+      FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(builder, 
schema_view->time_unit),
+                             error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_FIXED_SIZE_BINARY:
       FLATCC_RETURN_UNLESS_0(
-          Field_type_FixedSizeBinary_create(builder, schema_view->fixed_size));
+          Field_type_FixedSizeBinary_create(builder, schema_view->fixed_size), 
error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_LIST:
-      FLATCC_RETURN_UNLESS_0(Field_type_List_create(builder));
+      FLATCC_RETURN_UNLESS_0(Field_type_List_create(builder), error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_LARGE_LIST:
-      FLATCC_RETURN_UNLESS_0(Field_type_LargeList_create(builder));
+      FLATCC_RETURN_UNLESS_0(Field_type_LargeList_create(builder), error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_FIXED_SIZE_LIST:
       FLATCC_RETURN_UNLESS_0(
-          Field_type_FixedSizeList_create(builder, schema_view->fixed_size));
+          Field_type_FixedSizeList_create(builder, schema_view->fixed_size), 
error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_RUN_END_ENCODED:
-      FLATCC_RETURN_UNLESS_0(Field_type_RunEndEncoded_create(builder));
+      FLATCC_RETURN_UNLESS_0(Field_type_RunEndEncoded_create(builder), error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_STRUCT:
-      FLATCC_RETURN_UNLESS_0(Field_type_Struct__create(builder));
+      FLATCC_RETURN_UNLESS_0(Field_type_Struct__create(builder), error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_SPARSE_UNION:
     case NANOARROW_TYPE_DENSE_UNION: {
-      FLATCC_RETURN_UNLESS_0(Field_type_Union_start(builder));
+      FLATCC_RETURN_UNLESS_0(Field_type_Union_start(builder), error);
 
       FLATCC_RETURN_UNLESS_0(
-          Union_mode_add(builder, schema_view->type == 
NANOARROW_TYPE_DENSE_UNION));
+          Union_mode_add(builder, schema_view->type == 
NANOARROW_TYPE_DENSE_UNION),
+          error);
       if (schema_view->union_type_ids) {
         int8_t type_ids[128];
         int n = _ArrowParseUnionTypeIds(schema_view->union_type_ids, type_ids);
         if (n != 0) {
-          FLATCC_RETURN_UNLESS_0(Union_typeIds_start(builder));
+          FLATCC_RETURN_UNLESS_0(Union_typeIds_start(builder), error);
           int32_t* type_ids_32 = (int32_t*)ns(Union_typeIds_extend(builder, 
n));
-          if (!type_ids_32) {
-            return ENOMEM;
-          }
+          FLATCC_RETURN_IF_NULL(type_ids_32, error);
 
-          for (int i = 0; i < n; ++i) {
+          for (int i = 0; i < n; i++) {
             type_ids_32[i] = type_ids[i];
           }
-          FLATCC_RETURN_UNLESS_0(Union_typeIds_end(builder));
+          FLATCC_RETURN_UNLESS_0(Union_typeIds_end(builder), error);
         }
       }
 
-      FLATCC_RETURN_UNLESS_0(Field_type_Union_end(builder));
+      FLATCC_RETURN_UNLESS_0(Field_type_Union_end(builder), error);
       return NANOARROW_OK;
     }
 
     case NANOARROW_TYPE_MAP:
-      FLATCC_RETURN_UNLESS_0(Field_type_Map_create(
-          builder, schema_view->schema->flags & ARROW_FLAG_MAP_KEYS_SORTED));
+      FLATCC_RETURN_UNLESS_0(
+          Field_type_Map_create(builder,
+                                schema_view->schema->flags & 
ARROW_FLAG_MAP_KEYS_SORTED),
+          error);
       return NANOARROW_OK;
 
     case NANOARROW_TYPE_DICTIONARY:
@@ -326,15 +338,12 @@ static ArrowErrorCode 
ArrowIpcEncodeMetadata(flatcc_builder_t* builder,
     struct ArrowStringView key, value;
     NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowMetadataReaderRead(&metadata, 
&key, &value),
                                        error);
-    if (push_start(builder) != 0) {
-      return ENOMEM;
-    }
-    FLATCC_RETURN_UNLESS_0(KeyValue_key_create_strn(builder, key.data, 
key.size_bytes));
+    FLATCC_RETURN_UNLESS_0_NO_NS(push_start(builder), error);
+    FLATCC_RETURN_UNLESS_0(KeyValue_key_create_strn(builder, key.data, 
key.size_bytes),
+                           error);
     FLATCC_RETURN_UNLESS_0(
-        KeyValue_value_create_strn(builder, value.data, value.size_bytes));
-    if (!push_end(builder)) {
-      return ENOMEM;
-    }
+        KeyValue_value_create_strn(builder, value.data, value.size_bytes), 
error);
+    FLATCC_RETURN_IF_NULL(push_end(builder), error);
   }
   return NANOARROW_OK;
 }
@@ -345,14 +354,10 @@ static ArrowErrorCode 
ArrowIpcEncodeFields(flatcc_builder_t* builder,
                                            ns(Field_ref_t) *
                                                (*push_end)(flatcc_builder_t*),
                                            struct ArrowError* error) {
-  for (int i = 0; i < schema->n_children; ++i) {
-    if (push_start(builder) != 0) {
-      return ENOMEM;
-    }
+  for (int i = 0; i < schema->n_children; i++) {
+    FLATCC_RETURN_UNLESS_0_NO_NS(push_start(builder), error);
     NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeField(builder, schema->children[i], 
error));
-    if (!push_end(builder)) {
-      return ENOMEM;
-    }
+    FLATCC_RETURN_IF_NULL(push_end(builder), error);
   }
   return NANOARROW_OK;
 }
@@ -360,28 +365,28 @@ static ArrowErrorCode 
ArrowIpcEncodeFields(flatcc_builder_t* builder,
 static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder,
                                           const struct ArrowSchema* schema,
                                           struct ArrowError* error) {
-  FLATCC_RETURN_UNLESS_0(Field_name_create_str(builder, schema->name));
-  FLATCC_RETURN_UNLESS_0(
-      Field_nullable_add(builder, schema->flags & ARROW_FLAG_NULLABLE));
+  FLATCC_RETURN_UNLESS_0(Field_name_create_str(builder, schema->name), error);
+  FLATCC_RETURN_UNLESS_0(Field_nullable_add(builder, schema->flags & 
ARROW_FLAG_NULLABLE),
+                         error);
 
   struct ArrowSchemaView schema_view;
   NANOARROW_RETURN_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, error));
   NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFieldType(builder, &schema_view, 
error));
 
   if (schema->n_children != 0) {
-    FLATCC_RETURN_UNLESS_0(Field_children_start(builder));
+    FLATCC_RETURN_UNLESS_0(Field_children_start(builder), error);
     NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema,
                                                  
&ns(Field_children_push_start),
                                                  &ns(Field_children_push_end), 
error));
-    FLATCC_RETURN_UNLESS_0(Field_children_end(builder));
+    FLATCC_RETURN_UNLESS_0(Field_children_end(builder), error);
   }
 
   if (schema->metadata) {
-    FLATCC_RETURN_UNLESS_0(Field_custom_metadata_start(builder));
+    FLATCC_RETURN_UNLESS_0(Field_custom_metadata_start(builder), error);
     NANOARROW_RETURN_NOT_OK(
         ArrowIpcEncodeMetadata(builder, schema, 
&ns(Field_custom_metadata_push_start),
                                &ns(Field_custom_metadata_push_end), error));
-    FLATCC_RETURN_UNLESS_0(Field_custom_metadata_end(builder));
+    FLATCC_RETURN_UNLESS_0(Field_custom_metadata_end(builder), error);
   }
   return NANOARROW_OK;
 }
@@ -396,41 +401,194 @@ ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct 
ArrowIpcEncoder* encoder,
 
   flatcc_builder_t* builder = &private->builder;
 
-  FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder));
-  FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)));
+  FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder), error);
+  FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)), 
error);
 
-  FLATCC_RETURN_UNLESS_0(Message_header_Schema_start(builder));
+  FLATCC_RETURN_UNLESS_0(Message_header_Schema_start(builder), error);
 
   if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_LITTLE) {
-    FLATCC_RETURN_UNLESS_0(Schema_endianness_add(builder, 
ns(Endianness_Little)));
+    FLATCC_RETURN_UNLESS_0(Schema_endianness_add(builder, 
ns(Endianness_Little)), error);
   } else {
-    FLATCC_RETURN_UNLESS_0(Schema_endianness_add(builder, ns(Endianness_Big)));
+    FLATCC_RETURN_UNLESS_0(Schema_endianness_add(builder, ns(Endianness_Big)), 
error);
   }
 
-  FLATCC_RETURN_UNLESS_0(Schema_fields_start(builder));
+  FLATCC_RETURN_UNLESS_0(Schema_fields_start(builder), error);
   NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema,
                                                &ns(Schema_fields_push_start),
                                                &ns(Schema_fields_push_end), 
error));
-  FLATCC_RETURN_UNLESS_0(Schema_fields_end(builder));
+  FLATCC_RETURN_UNLESS_0(Schema_fields_end(builder), error);
 
   if (schema->metadata) {
-    FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_start(builder));
+    FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_start(builder), error);
     NANOARROW_RETURN_NOT_OK(
         ArrowIpcEncodeMetadata(builder, schema, 
&ns(Schema_custom_metadata_push_start),
                                &ns(Schema_custom_metadata_push_end), error));
-    FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_end(builder));
+    FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_end(builder), error);
   }
 
-  FLATCC_RETURN_UNLESS_0(Schema_features_start(builder));
-  ns(Feature_enum_t)* features = ns(Schema_features_extend(builder, 1));
-  if (!features) {
-    return ENOMEM;
+  FLATCC_RETURN_UNLESS_0(Schema_features_start(builder), error);
+  FLATCC_RETURN_UNLESS_0(Schema_features_end(builder), error);
+
+  FLATCC_RETURN_UNLESS_0(Message_header_Schema_end(builder), error);
+
+  FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0), error);
+  FLATCC_RETURN_IF_NULL(ns(Message_end_as_root(builder)), error);
+  return NANOARROW_OK;
+}
+
+struct ArrowIpcBufferEncoder {
+  /// \brief Callback invoked against each buffer to be encoded
+  ///
+  /// Encoding of buffers is left as a callback to accommodate dissociated 
data storage.
+  /// One implementation of this callback might copy all buffers into a 
contiguous body
+  /// for use in an arrow IPC stream, another implementation might store 
offsets and
+  /// lengths relative to a known arena.
+  ArrowErrorCode (*encode_buffer)(struct ArrowBufferView buffer_view,
+                                  struct ArrowIpcEncoder* encoder,
+                                  struct ArrowIpcBufferEncoder* buffer_encoder,
+                                  int64_t* offset, int64_t* length,
+                                  struct ArrowError* error);
+
+  /// \brief Pointer to arbitrary data used by encode_buffer()
+  void* encode_buffer_state;
+
+  /// \brief Finalized body length of the most recently encoded RecordBatch 
message
+  ///
+  /// encode_buffer() is expected to update this while encoding each buffer. 
After all
+  /// buffers are encoded, this will be written to the RecordBatch's 
.bodyLength
+  int64_t body_length;
+};
+
+static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback(
+    struct ArrowBufferView buffer_view, struct ArrowIpcEncoder* encoder,
+    struct ArrowIpcBufferEncoder* buffer_encoder, int64_t* offset, int64_t* 
length,
+    struct ArrowError* error) {
+  NANOARROW_UNUSED(encoder);
+
+  struct ArrowBuffer* body_buffer =
+      (struct ArrowBuffer*)buffer_encoder->encode_buffer_state;
+
+  int64_t old_size = body_buffer->size_bytes;
+  int64_t buffer_begin = _ArrowRoundUpToMultipleOf8(old_size);
+  int64_t buffer_end = buffer_begin + buffer_view.size_bytes;
+  int64_t new_size = _ArrowRoundUpToMultipleOf8(buffer_end);
+
+  // reserve all the memory we'll need now
+  NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowBufferReserve(body_buffer, new_size 
- old_size),
+                                     error);
+
+  // zero padding up to the start of the buffer
+  NANOARROW_ASSERT_OK(ArrowBufferAppendFill(body_buffer, 0, buffer_begin - 
old_size));
+
+  // store offset and length of the buffer
+  *offset = buffer_begin;
+  *length = buffer_view.size_bytes;
+
+  NANOARROW_ASSERT_OK(
+      ArrowBufferAppend(body_buffer, buffer_view.data.data, 
buffer_view.size_bytes));
+
+  // zero padding after writing the buffer
+  NANOARROW_DCHECK(body_buffer->size_bytes == buffer_end);
+  NANOARROW_ASSERT_OK(ArrowBufferAppendFill(body_buffer, 0, new_size - 
buffer_end));
+
+  buffer_encoder->body_length = body_buffer->size_bytes;
+  return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatchImpl(
+    struct ArrowIpcEncoder* encoder, struct ArrowIpcBufferEncoder* 
buffer_encoder,
+    const struct ArrowArrayView* array_view, struct ArrowBuffer* buffers,
+    struct ArrowBuffer* nodes, struct ArrowError* error) {
+  if (array_view->offset != 0) {
+    ArrowErrorSet(error, "Cannot encode arrays with nonzero offset");
+    return ENOTSUP;
+  }
+
+  for (int64_t c = 0; c < array_view->n_children; ++c) {
+    const struct ArrowArrayView* child = array_view->children[c];
+
+    struct ns(FieldNode) node = {child->length, child->null_count};
+    NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowBufferAppend(nodes, &node, 
sizeof(node)),
+                                       error);
+
+    for (int64_t b = 0; b < child->array->n_buffers; ++b) {
+      struct ns(Buffer) buffer;
+      NANOARROW_RETURN_NOT_OK(
+          buffer_encoder->encode_buffer(child->buffer_views[b], encoder, 
buffer_encoder,
+                                        &buffer.offset, &buffer.length, 
error));
+      NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+          ArrowBufferAppend(buffers, &buffer, sizeof(buffer)), error);
+    }
+
+    NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeRecordBatchImpl(
+        encoder, buffer_encoder, child, buffers, nodes, error));
   }
-  features[0] = ns(Feature_COMPRESSED_BODY);
-  FLATCC_RETURN_UNLESS_0(Schema_features_end(builder));
+  return NANOARROW_OK;
+}
+
+static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(
+    struct ArrowIpcEncoder* encoder, struct ArrowIpcBufferEncoder* 
buffer_encoder,
+    const struct ArrowArrayView* array_view, struct ArrowError* error) {
+  NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL &&
+                   buffer_encoder != NULL && buffer_encoder->encode_buffer != 
NULL);
+
+  if (array_view->null_count != 0 && 
ArrowArrayViewComputeNullCount(array_view) != 0) {
+    ArrowErrorSet(error,
+                  "RecordBatches cannot be constructed from arrays with top 
level nulls");
+    return EINVAL;
+  }
+
+  if (array_view->storage_type != NANOARROW_TYPE_STRUCT) {
+    ArrowErrorSet(
+        error,
+        "RecordBatches cannot be constructed from arrays of type other than 
struct");
+    return EINVAL;
+  }
+
+  struct ArrowIpcEncoderPrivate* private =
+      (struct ArrowIpcEncoderPrivate*)encoder->private_data;
+
+  flatcc_builder_t* builder = &private->builder;
+
+  FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder), error);
+  FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)), 
error);
+
+  FLATCC_RETURN_UNLESS_0(Message_header_RecordBatch_start(builder), error);
+  FLATCC_RETURN_UNLESS_0(RecordBatch_length_add(builder, array_view->length), 
error);
+
+  NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffers, 0, 0));
+  NANOARROW_ASSERT_OK(ArrowBufferResize(&private->nodes, 0, 0));
+  NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeRecordBatchImpl(
+      encoder, buffer_encoder, array_view, &private->buffers, &private->nodes, 
error));
+
+  FLATCC_RETURN_UNLESS_0(RecordBatch_nodes_create(  //
+                             builder, (struct 
ns(FieldNode)*)private->nodes.data,
+                             private->nodes.size_bytes / sizeof(struct 
ns(FieldNode))),
+                         error);
+  FLATCC_RETURN_UNLESS_0(RecordBatch_buffers_create(  //
+                             builder, (struct 
ns(Buffer)*)private->buffers.data,
+                             private->buffers.size_bytes / sizeof(struct 
ns(Buffer))),
+                         error);
+
+  FLATCC_RETURN_UNLESS_0(Message_header_RecordBatch_end(builder), error);
+
+  FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 
buffer_encoder->body_length),
+                         error);
+  FLATCC_RETURN_IF_NULL(ns(Message_end_as_root(builder)), error);
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcEncoderEncodeSimpleRecordBatch(
+    struct ArrowIpcEncoder* encoder, const struct ArrowArrayView* array_view,
+    struct ArrowBuffer* body_buffer, struct ArrowError* error) {
+  NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL &&
+                   body_buffer != NULL);
 
-  FLATCC_RETURN_UNLESS_0(Message_header_Schema_end(builder));
+  struct ArrowIpcBufferEncoder buffer_encoder = {
+      .encode_buffer = &ArrowIpcEncoderBuildContiguousBodyBufferCallback,
+      .encode_buffer_state = body_buffer,
+      .body_length = 0,
+  };
 
-  FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0));
-  return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM;
+  return ArrowIpcEncoderEncodeRecordBatch(encoder, &buffer_encoder, 
array_view, error);
 }
diff --git a/src/nanoarrow/ipc/encoder_test.cc 
b/src/nanoarrow/ipc/encoder_test.cc
index f5e6659f..78ca0978 100644
--- a/src/nanoarrow/ipc/encoder_test.cc
+++ b/src/nanoarrow/ipc/encoder_test.cc
@@ -35,11 +35,6 @@ TEST(NanoarrowIpcTest, NanoarrowIpcEncoderConstruction) {
 
   EXPECT_EQ(ArrowIpcEncoderInit(encoder.get()), NANOARROW_OK);
 
-  EXPECT_EQ(encoder->codec, NANOARROW_IPC_COMPRESSION_TYPE_NONE);
-  EXPECT_EQ(encoder->body_length, 0);
-  EXPECT_EQ(encoder->encode_buffer, nullptr);
-  EXPECT_EQ(encoder->encode_buffer_state, nullptr);
-
   auto* p = static_cast<struct ArrowIpcEncoderPrivate*>(encoder->private_data);
   ASSERT_NE(p, nullptr);
   for (auto* b : {&p->buffers, &p->nodes}) {
diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h
index 2695ada9..e4856d07 100644
--- a/src/nanoarrow/nanoarrow_ipc.h
+++ b/src/nanoarrow/nanoarrow_ipc.h
@@ -63,6 +63,8 @@
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderFinalizeBuffer)
 #define ArrowIpcEncoderEncodeSchema \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderEncodeSchema)
+#define ArrowIpcEncoderEncodeSimpleRecordBatch \
+  NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcEncoderEncodeSimpleRecordBatch)
 #define ArrowIpcOutputStreamInitBuffer \
   NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamInitBuffer)
 #define ArrowIpcOutputStreamInitFile \
@@ -214,7 +216,7 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder);
 
 /// \brief Peek at a message header
 ///
-/// The first 8 bytes of an Arrow IPC message are 0xFFFFFF followed by the size
+/// The first 8 bytes of an Arrow IPC message are 0xFFFFFFFF followed by the 
size
 /// of the header as a little-endian 32-bit integer. 
ArrowIpcDecoderPeekHeader() reads
 /// these bytes and returns ESPIPE if there are not enough remaining bytes in 
data to read
 /// the entire header message, EINVAL if the first 8 bytes are not valid, 
ENODATA if the
@@ -411,28 +413,6 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit(
 /// initialized using ArrowIpcEncoderInit(), and released with
 /// ArrowIpcEncoderReset().
 struct ArrowIpcEncoder {
-  /// \brief Compression to encode in the next RecordBatch message.
-  enum ArrowIpcCompressionType codec;
-
-  /// \brief Callback invoked against each buffer to be encoded
-  ///
-  /// Encoding of buffers is left as a callback to accommodate dissociated 
data storage.
-  /// One implementation of this callback might copy all buffers into a 
contiguous body
-  /// for use in an arrow IPC stream, another implementation might store 
offsets and
-  /// lengths relative to a known arena.
-  ArrowErrorCode (*encode_buffer)(struct ArrowBufferView buffer_view,
-                                  struct ArrowIpcEncoder* encoder, int64_t* 
offset,
-                                  int64_t* length, struct ArrowError* error);
-
-  /// \brief Pointer to arbitrary data used by encode_buffer()
-  void* encode_buffer_state;
-
-  /// \brief Finalized body length of the most recently encoded RecordBatch 
message
-  ///
-  /// (This is initially 0 and encode_buffer() is expected to update it. After 
all
-  /// buffers are encoded, this will be written to the RecordBatch's 
.bodyLength)
-  int64_t body_length;
-
   /// \brief Private resources managed by this library
   void* private_data;
 };
@@ -462,6 +442,16 @@ ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct 
ArrowIpcEncoder* encoder,
                                            const struct ArrowSchema* schema,
                                            struct ArrowError* error);
 
+/// \brief Encode a struct typed ArrayView to a flatbuffer RecordBatch, 
embedded in a
+/// Message.
+///
+/// Body buffers are concatenated into a contiguous, padded body_buffer.
+///
+/// Returns ENOMEM if allocation fails, NANOARROW_OK otherwise.
+ArrowErrorCode ArrowIpcEncoderEncodeSimpleRecordBatch(
+    struct ArrowIpcEncoder* encoder, const struct ArrowArrayView* array_view,
+    struct ArrowBuffer* body_buffer, struct ArrowError* error);
+
 /// \brief An user-extensible output data sink
 struct ArrowIpcOutputStream {
   /// \brief Write up to buf_size_bytes from stream into buf


Reply via email to