paleolimbot commented on code in PR #555: URL: https://github.com/apache/arrow-nanoarrow/pull/555#discussion_r1676570323
########## src/nanoarrow/ipc/encoder.c: ########## @@ -0,0 +1,518 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <errno.h> +#include <inttypes.h> +#include <stdio.h> +#include <string.h> + +// For thread safe shared buffers we need C11 + stdatomic.h +// Can compile with -DNANOARROW_IPC_USE_STDATOMIC=0 or 1 to override +// automatic detection +#if !defined(NANOARROW_IPC_USE_STDATOMIC) +#define NANOARROW_IPC_USE_STDATOMIC 0 + +// Check for C11 +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L + +// Check for GCC 4.8, which doesn't include stdatomic.h but does +// not define __STDC_NO_ATOMICS__ +#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ >= 5 + +#if !defined(__STDC_NO_ATOMICS__) +#include <stdatomic.h> +#undef NANOARROW_IPC_USE_STDATOMIC +#define NANOARROW_IPC_USE_STDATOMIC 1 +#endif +#endif +#endif + +#endif + +#include "nanoarrow/ipc/flatcc_generated.h" +#include "nanoarrow/nanoarrow.h" +#include "nanoarrow/nanoarrow_ipc.h" + +// R 3.6 / Windows builds on a very old toolchain that does not define ENODATA +#if defined(_WIN32) && !defined(_MSC_VER) && !defined(ENODATA) +#define ENODATA 120 +#endif + +#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x) + +#define FLATCC_RETURN_UNLESS_0(x) \ + if (ns(x) != 0) return ENOMEM; + +struct ArrowIpcEncoderPrivate { + flatcc_builder_t builder; +}; + +ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) { + memset(encoder, 0, sizeof(struct ArrowIpcEncoder)); + encoder->encode_buffer = NULL; + encoder->encode_buffer_state = NULL; + encoder->private_data = malloc(sizeof(struct ArrowIpcEncoderPrivate)); + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + if (flatcc_builder_init(&private->builder) == -1) { + return ESPIPE; + } + encoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE; + return NANOARROW_OK; +} + +void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + flatcc_builder_clear(&private->builder); + free(private); + memset(encoder, 0, sizeof(struct ArrowIpcEncoder)); +} + +ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder, + struct ArrowBuffer* out) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + ArrowBufferReset(out); + size_t size = 0; + out->data = (uint8_t*)flatcc_builder_finalize_buffer(&private->builder, &size); + out->size_bytes = out->capacity_bytes = (int64_t)size; + return out->data ? NANOARROW_OK : ENOMEM; +} + +ArrowErrorCode ArrowIpcEncodeEncapsulatedMessage(struct ArrowBuffer* buffer) { + if (!buffer) return EINVAL; Review Comment: nit: This is possibly a better way to do this than what we currently do, but every other nanoarrow function uses `NANOARROW_DCHECK()` for this (if we check arguments at all). In other words, we currently crash for programming errors and reserve EINVAL for inappropriate values (mostly). ########## src/nanoarrow/ipc/encoder.c: ########## @@ -0,0 +1,518 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <errno.h> +#include <inttypes.h> +#include <stdio.h> +#include <string.h> + +// For thread safe shared buffers we need C11 + stdatomic.h +// Can compile with -DNANOARROW_IPC_USE_STDATOMIC=0 or 1 to override +// automatic detection +#if !defined(NANOARROW_IPC_USE_STDATOMIC) +#define NANOARROW_IPC_USE_STDATOMIC 0 + +// Check for C11 +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L + +// Check for GCC 4.8, which doesn't include stdatomic.h but does +// not define __STDC_NO_ATOMICS__ +#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ >= 5 + +#if !defined(__STDC_NO_ATOMICS__) +#include <stdatomic.h> +#undef NANOARROW_IPC_USE_STDATOMIC +#define NANOARROW_IPC_USE_STDATOMIC 1 +#endif +#endif +#endif + +#endif + +#include "nanoarrow/ipc/flatcc_generated.h" +#include "nanoarrow/nanoarrow.h" +#include "nanoarrow/nanoarrow_ipc.h" + +// R 3.6 / Windows builds on a very old toolchain that does not define ENODATA +#if defined(_WIN32) && !defined(_MSC_VER) && !defined(ENODATA) +#define ENODATA 120 +#endif + +#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x) + +#define FLATCC_RETURN_UNLESS_0(x) \ + if (ns(x) != 0) return ENOMEM; + +struct ArrowIpcEncoderPrivate { + flatcc_builder_t builder; +}; + +ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) { + memset(encoder, 0, sizeof(struct ArrowIpcEncoder)); + encoder->encode_buffer = NULL; + encoder->encode_buffer_state = NULL; + encoder->private_data = malloc(sizeof(struct ArrowIpcEncoderPrivate)); + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + if (flatcc_builder_init(&private->builder) == -1) { + return ESPIPE; + } + encoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE; + return NANOARROW_OK; +} + +void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + flatcc_builder_clear(&private->builder); + free(private); + memset(encoder, 0, sizeof(struct ArrowIpcEncoder)); +} + +ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder, + struct ArrowBuffer* out) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + ArrowBufferReset(out); + size_t size = 0; + out->data = (uint8_t*)flatcc_builder_finalize_buffer(&private->builder, &size); + out->size_bytes = out->capacity_bytes = (int64_t)size; + return out->data ? NANOARROW_OK : ENOMEM; +} + +ArrowErrorCode ArrowIpcEncodeEncapsulatedMessage(struct ArrowBuffer* buffer) { + if (!buffer) return EINVAL; + + int32_t continuation = -1, message_size = (int32_t)buffer->size_bytes; + + int64_t encapsulated_size = _ArrowRoundUpToMultipleOf8( + sizeof(continuation) + sizeof(message_size) + buffer->size_bytes); + int64_t padding_size = + encapsulated_size - sizeof(continuation) - sizeof(message_size) - message_size; + NANOARROW_RETURN_NOT_OK(ArrowBufferResize(buffer, encapsulated_size, 0)); + + memset(buffer->data + sizeof(continuation) + sizeof(message_size) + message_size, 0, + padding_size); + memmove(buffer->data + sizeof(continuation) + sizeof(message_size), buffer->data, + message_size); + memcpy(buffer->data + sizeof(continuation), &message_size, sizeof(message_size)); + memcpy(buffer->data, &continuation, sizeof(continuation)); + return NANOARROW_OK; +} + +static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback( + struct ArrowBufferView buffer_view, int64_t* offset, struct ArrowIpcEncoder* encoder, + struct ArrowError* error) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + struct ArrowBuffer* body_buffer = (struct ArrowBuffer*)encoder->encode_buffer_state; + NANOARROW_RETURN_NOT_OK(ArrowBufferResize( + body_buffer, _ArrowRoundUpToMultipleOf8(body_buffer->size_bytes), 0)); + *offset = body_buffer->size_bytes; + NANOARROW_RETURN_NOT_OK( + ArrowBufferAppend(body_buffer, buffer_view.data.data, buffer_view.size_bytes)); + encoder->body_length = body_buffer->size_bytes; + return NANOARROW_OK; +} + +void ArrowIpcEncoderBuildContiguousBodyBuffer(struct ArrowIpcEncoder* encoder, + struct ArrowBuffer* body_buffer) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + encoder->encode_buffer = &ArrowIpcEncoderBuildContiguousBodyBufferCallback; + encoder->encode_buffer_state = body_buffer; + ArrowBufferResize(body_buffer, 0, 0); +} + +static ArrowErrorCode ArrowIpcEncodeFieldType(flatcc_builder_t* builder, + const struct ArrowSchemaView* schema_view, + struct ArrowError* error) { + switch (schema_view->type) { + case NANOARROW_TYPE_NA: + FLATCC_RETURN_UNLESS_0(Field_type_Null_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_BOOL: + FLATCC_RETURN_UNLESS_0(Field_type_Bool_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT8: + case NANOARROW_TYPE_INT8: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 8, schema_view->type == NANOARROW_TYPE_INT8)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT16: + case NANOARROW_TYPE_INT16: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 16, schema_view->type == NANOARROW_TYPE_INT16)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT32: + case NANOARROW_TYPE_INT32: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 32, schema_view->type == NANOARROW_TYPE_INT32)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT64: + case NANOARROW_TYPE_INT64: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 64, schema_view->type == NANOARROW_TYPE_INT64)); + return NANOARROW_OK; + + case NANOARROW_TYPE_HALF_FLOAT: + FLATCC_RETURN_UNLESS_0( + Field_type_FloatingPoint_create(builder, ns(Precision_HALF))); + return NANOARROW_OK; + + case NANOARROW_TYPE_FLOAT: + FLATCC_RETURN_UNLESS_0( + Field_type_FloatingPoint_create(builder, ns(Precision_SINGLE))); + return NANOARROW_OK; + + case NANOARROW_TYPE_DOUBLE: + FLATCC_RETURN_UNLESS_0( + Field_type_FloatingPoint_create(builder, ns(Precision_DOUBLE))); + return NANOARROW_OK; + + case NANOARROW_TYPE_DECIMAL128: + case NANOARROW_TYPE_DECIMAL256: + FLATCC_RETURN_UNLESS_0(Field_type_Decimal_create( + builder, schema_view->decimal_precision, schema_view->decimal_scale, + schema_view->decimal_bitwidth)); + return NANOARROW_OK; + + case NANOARROW_TYPE_STRING: + FLATCC_RETURN_UNLESS_0(Field_type_Utf8_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LARGE_STRING: + FLATCC_RETURN_UNLESS_0(Field_type_LargeUtf8_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_BINARY: + FLATCC_RETURN_UNLESS_0(Field_type_Binary_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LARGE_BINARY: + FLATCC_RETURN_UNLESS_0(Field_type_LargeBinary_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_DATE32: + FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, ns(DateUnit_DAY))); + return NANOARROW_OK; + + case NANOARROW_TYPE_DATE64: + FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, ns(DateUnit_MILLISECOND))); + return NANOARROW_OK; + + case NANOARROW_TYPE_INTERVAL_MONTHS: + FLATCC_RETURN_UNLESS_0( + Field_type_Interval_create(builder, ns(IntervalUnit_YEAR_MONTH))); + return NANOARROW_OK; + + case NANOARROW_TYPE_INTERVAL_DAY_TIME: + FLATCC_RETURN_UNLESS_0( + Field_type_Interval_create(builder, ns(IntervalUnit_DAY_TIME))); + return NANOARROW_OK; + + case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO: + FLATCC_RETURN_UNLESS_0( + Field_type_Interval_create(builder, ns(IntervalUnit_MONTH_DAY_NANO))); + return NANOARROW_OK; + + case NANOARROW_TYPE_TIMESTAMP: + FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder)); + FLATCC_RETURN_UNLESS_0(Timestamp_unit_add(builder, schema_view->time_unit)); + FLATCC_RETURN_UNLESS_0( + Timestamp_timezone_create_str(builder, schema_view->timezone)); + FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_TIME32: + FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, schema_view->time_unit, 32)); + return NANOARROW_OK; + + case NANOARROW_TYPE_TIME64: + FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, schema_view->time_unit, 64)); + return NANOARROW_OK; + + case NANOARROW_TYPE_DURATION: + FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(builder, schema_view->time_unit)); + return NANOARROW_OK; + + case NANOARROW_TYPE_FIXED_SIZE_BINARY: + FLATCC_RETURN_UNLESS_0( + Field_type_FixedSizeBinary_create(builder, schema_view->fixed_size)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LIST: + FLATCC_RETURN_UNLESS_0(Field_type_List_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LARGE_LIST: + FLATCC_RETURN_UNLESS_0(Field_type_LargeList_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_FIXED_SIZE_LIST: + FLATCC_RETURN_UNLESS_0( + Field_type_FixedSizeList_create(builder, schema_view->fixed_size)); + return NANOARROW_OK; + + case NANOARROW_TYPE_RUN_END_ENCODED: + FLATCC_RETURN_UNLESS_0(Field_type_RunEndEncoded_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_STRUCT: + FLATCC_RETURN_UNLESS_0(Field_type_Struct__create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_SPARSE_UNION: + case NANOARROW_TYPE_DENSE_UNION: { + FLATCC_RETURN_UNLESS_0(Field_type_Union_start(builder)); + + FLATCC_RETURN_UNLESS_0( + Union_mode_add(builder, schema_view->type == NANOARROW_TYPE_DENSE_UNION)); + if (schema_view->union_type_ids) { + int8_t type_ids[128]; + int n = _ArrowParseUnionTypeIds(schema_view->union_type_ids, type_ids); + if (n != 0) { + FLATCC_RETURN_UNLESS_0(Union_typeIds_start(builder)); + int32_t* type_ids_32 = (int32_t*)ns(Union_typeIds_extend(builder, n)); + if (!type_ids_32) return ENOMEM; Review Comment: nit: the Google style guide seems like it allows these types of statements (i.e., an `if` without curly braces) for "historical reasons", but all of our existing code uses (I think!) uses curly braces with the body on separate lines. ########## src/nanoarrow/ipc/encoder_test.cc: ########## @@ -0,0 +1,313 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/array.h> +#include <arrow/c/bridge.h> +#include <arrow/compute/api.h> +#include <arrow/io/memory.h> +#include <arrow/ipc/api.h> +#include <arrow/util/key_value_metadata.h> +#include <gtest/gtest.h> + +#include "nanoarrow/nanoarrow.hpp" +#include "nanoarrow/nanoarrow_ipc.h" + +using namespace arrow; + +static enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) { + uint32_t check = 1; + char first_byte; + memcpy(&first_byte, &check, sizeof(char)); + if (first_byte) { + return NANOARROW_IPC_ENDIANNESS_LITTLE; + } else { + return NANOARROW_IPC_ENDIANNESS_BIG; + } +} + +struct TestDecoder : ArrowIpcDecoder { + TestDecoder() { ArrowIpcDecoderInit(this); } + ~TestDecoder() { ArrowIpcDecoderReset(this); } +}; + +struct TestEncoder : ArrowIpcEncoder { + TestEncoder() { ArrowIpcEncoderInit(this); } + ~TestEncoder() { ArrowIpcEncoderReset(this); } +}; + +nanoarrow::UniqueSchema to_unique(const Schema& schema) { + nanoarrow::UniqueSchema exported; + auto st = ExportSchema(schema, exported.get()); + EXPECT_TRUE(st.ok()) << st; + return exported; +} + +nanoarrow::UniqueArray to_unique_array(const RecordBatch& batch) { + nanoarrow::UniqueArray exported; + auto st = ExportRecordBatch(batch, exported.get()); + EXPECT_TRUE(st.ok()) << st; + return exported; +} + +auto read_schema(const struct ArrowBuffer* buffer) { + io::BufferReader reader{std::make_shared<Buffer>(buffer->data, buffer->size_bytes)}; + auto maybe_schema = ipc::ReadSchema(&reader, nullptr); + EXPECT_TRUE(maybe_schema.ok()) << maybe_schema.status(); + return std::move(maybe_schema).ValueOr(nullptr); +} + +auto read_batch(const Schema& schema, const struct ArrowBuffer* metadata, + const struct ArrowBuffer* body) { + Buffer metadata_buf{metadata->data, metadata->size_bytes}; + io::BufferReader body_reader{std::make_shared<Buffer>(body->data, body->size_bytes)}; + auto maybe_batch = ipc::ReadRecordBatch( + metadata_buf, arrow::schema(schema.fields(), schema.metadata()), nullptr, {}, + &body_reader); + EXPECT_TRUE(maybe_batch.ok()) << maybe_batch.status(); + return std::move(maybe_batch).ValueOr(nullptr); +} + +auto random_batch(const Schema& schema, int64_t num_rows) { Review Comment: I am skeptical to add additional Arrow C++ surface since the dependency in our tests is something I'd like to remove in the near future (although I'm more than happy to leave this and replace it later when I through and do that refactor). The integration tests will take care of sending random batches through IPC...here I think you can get full coverage with an array with a zero-size buffer, an array with more than one non-empty buffer, and an array with more than one non-empty child. ########## src/nanoarrow/ipc/encoder_test.cc: ########## @@ -0,0 +1,313 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <arrow/array.h> +#include <arrow/c/bridge.h> +#include <arrow/compute/api.h> +#include <arrow/io/memory.h> +#include <arrow/ipc/api.h> +#include <arrow/util/key_value_metadata.h> +#include <gtest/gtest.h> + +#include "nanoarrow/nanoarrow.hpp" +#include "nanoarrow/nanoarrow_ipc.h" + +using namespace arrow; + +static enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) { + uint32_t check = 1; + char first_byte; + memcpy(&first_byte, &check, sizeof(char)); + if (first_byte) { + return NANOARROW_IPC_ENDIANNESS_LITTLE; + } else { + return NANOARROW_IPC_ENDIANNESS_BIG; + } +} + +struct TestDecoder : ArrowIpcDecoder { Review Comment: I think there is already a `nanoarrow:UniqueIpcDecoder`? https://github.com/apache/arrow-nanoarrow/blob/2aa2e697fd5d0ef3cd88961bb030f9e760db581b/src/nanoarrow/nanoarrow_ipc.hpp#L78 ########## src/nanoarrow/ipc/encoder.c: ########## @@ -0,0 +1,518 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <errno.h> +#include <inttypes.h> +#include <stdio.h> +#include <string.h> + +// For thread safe shared buffers we need C11 + stdatomic.h +// Can compile with -DNANOARROW_IPC_USE_STDATOMIC=0 or 1 to override +// automatic detection +#if !defined(NANOARROW_IPC_USE_STDATOMIC) +#define NANOARROW_IPC_USE_STDATOMIC 0 + +// Check for C11 +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L + +// Check for GCC 4.8, which doesn't include stdatomic.h but does +// not define __STDC_NO_ATOMICS__ +#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ >= 5 + +#if !defined(__STDC_NO_ATOMICS__) +#include <stdatomic.h> +#undef NANOARROW_IPC_USE_STDATOMIC +#define NANOARROW_IPC_USE_STDATOMIC 1 +#endif +#endif +#endif + +#endif + +#include "nanoarrow/ipc/flatcc_generated.h" +#include "nanoarrow/nanoarrow.h" +#include "nanoarrow/nanoarrow_ipc.h" + +// R 3.6 / Windows builds on a very old toolchain that does not define ENODATA +#if defined(_WIN32) && !defined(_MSC_VER) && !defined(ENODATA) +#define ENODATA 120 +#endif + +#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x) + +#define FLATCC_RETURN_UNLESS_0(x) \ + if (ns(x) != 0) return ENOMEM; + +struct ArrowIpcEncoderPrivate { + flatcc_builder_t builder; +}; + +ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) { + memset(encoder, 0, sizeof(struct ArrowIpcEncoder)); + encoder->encode_buffer = NULL; + encoder->encode_buffer_state = NULL; + encoder->private_data = malloc(sizeof(struct ArrowIpcEncoderPrivate)); + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + if (flatcc_builder_init(&private->builder) == -1) { + return ESPIPE; + } + encoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE; + return NANOARROW_OK; +} + +void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + flatcc_builder_clear(&private->builder); + free(private); + memset(encoder, 0, sizeof(struct ArrowIpcEncoder)); +} + +ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder, + struct ArrowBuffer* out) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + ArrowBufferReset(out); + size_t size = 0; + out->data = (uint8_t*)flatcc_builder_finalize_buffer(&private->builder, &size); + out->size_bytes = out->capacity_bytes = (int64_t)size; + return out->data ? NANOARROW_OK : ENOMEM; +} + +ArrowErrorCode ArrowIpcEncodeEncapsulatedMessage(struct ArrowBuffer* buffer) { + if (!buffer) return EINVAL; + + int32_t continuation = -1, message_size = (int32_t)buffer->size_bytes; + + int64_t encapsulated_size = _ArrowRoundUpToMultipleOf8( + sizeof(continuation) + sizeof(message_size) + buffer->size_bytes); + int64_t padding_size = + encapsulated_size - sizeof(continuation) - sizeof(message_size) - message_size; + NANOARROW_RETURN_NOT_OK(ArrowBufferResize(buffer, encapsulated_size, 0)); + + memset(buffer->data + sizeof(continuation) + sizeof(message_size) + message_size, 0, + padding_size); + memmove(buffer->data + sizeof(continuation) + sizeof(message_size), buffer->data, + message_size); + memcpy(buffer->data + sizeof(continuation), &message_size, sizeof(message_size)); + memcpy(buffer->data, &continuation, sizeof(continuation)); + return NANOARROW_OK; +} + +static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback( + struct ArrowBufferView buffer_view, int64_t* offset, struct ArrowIpcEncoder* encoder, + struct ArrowError* error) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + struct ArrowBuffer* body_buffer = (struct ArrowBuffer*)encoder->encode_buffer_state; + NANOARROW_RETURN_NOT_OK(ArrowBufferResize( + body_buffer, _ArrowRoundUpToMultipleOf8(body_buffer->size_bytes), 0)); + *offset = body_buffer->size_bytes; + NANOARROW_RETURN_NOT_OK( + ArrowBufferAppend(body_buffer, buffer_view.data.data, buffer_view.size_bytes)); + encoder->body_length = body_buffer->size_bytes; + return NANOARROW_OK; +} + +void ArrowIpcEncoderBuildContiguousBodyBuffer(struct ArrowIpcEncoder* encoder, + struct ArrowBuffer* body_buffer) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + encoder->encode_buffer = &ArrowIpcEncoderBuildContiguousBodyBufferCallback; + encoder->encode_buffer_state = body_buffer; + ArrowBufferResize(body_buffer, 0, 0); +} + +static ArrowErrorCode ArrowIpcEncodeFieldType(flatcc_builder_t* builder, + const struct ArrowSchemaView* schema_view, + struct ArrowError* error) { + switch (schema_view->type) { + case NANOARROW_TYPE_NA: + FLATCC_RETURN_UNLESS_0(Field_type_Null_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_BOOL: + FLATCC_RETURN_UNLESS_0(Field_type_Bool_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT8: + case NANOARROW_TYPE_INT8: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 8, schema_view->type == NANOARROW_TYPE_INT8)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT16: + case NANOARROW_TYPE_INT16: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 16, schema_view->type == NANOARROW_TYPE_INT16)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT32: + case NANOARROW_TYPE_INT32: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 32, schema_view->type == NANOARROW_TYPE_INT32)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT64: + case NANOARROW_TYPE_INT64: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 64, schema_view->type == NANOARROW_TYPE_INT64)); + return NANOARROW_OK; + + case NANOARROW_TYPE_HALF_FLOAT: + FLATCC_RETURN_UNLESS_0( + Field_type_FloatingPoint_create(builder, ns(Precision_HALF))); + return NANOARROW_OK; + + case NANOARROW_TYPE_FLOAT: + FLATCC_RETURN_UNLESS_0( + Field_type_FloatingPoint_create(builder, ns(Precision_SINGLE))); + return NANOARROW_OK; + + case NANOARROW_TYPE_DOUBLE: + FLATCC_RETURN_UNLESS_0( + Field_type_FloatingPoint_create(builder, ns(Precision_DOUBLE))); + return NANOARROW_OK; + + case NANOARROW_TYPE_DECIMAL128: + case NANOARROW_TYPE_DECIMAL256: + FLATCC_RETURN_UNLESS_0(Field_type_Decimal_create( + builder, schema_view->decimal_precision, schema_view->decimal_scale, + schema_view->decimal_bitwidth)); + return NANOARROW_OK; + + case NANOARROW_TYPE_STRING: + FLATCC_RETURN_UNLESS_0(Field_type_Utf8_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LARGE_STRING: + FLATCC_RETURN_UNLESS_0(Field_type_LargeUtf8_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_BINARY: + FLATCC_RETURN_UNLESS_0(Field_type_Binary_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LARGE_BINARY: + FLATCC_RETURN_UNLESS_0(Field_type_LargeBinary_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_DATE32: + FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, ns(DateUnit_DAY))); + return NANOARROW_OK; + + case NANOARROW_TYPE_DATE64: + FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, ns(DateUnit_MILLISECOND))); + return NANOARROW_OK; + + case NANOARROW_TYPE_INTERVAL_MONTHS: + FLATCC_RETURN_UNLESS_0( + Field_type_Interval_create(builder, ns(IntervalUnit_YEAR_MONTH))); + return NANOARROW_OK; + + case NANOARROW_TYPE_INTERVAL_DAY_TIME: + FLATCC_RETURN_UNLESS_0( + Field_type_Interval_create(builder, ns(IntervalUnit_DAY_TIME))); + return NANOARROW_OK; + + case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO: + FLATCC_RETURN_UNLESS_0( + Field_type_Interval_create(builder, ns(IntervalUnit_MONTH_DAY_NANO))); + return NANOARROW_OK; + + case NANOARROW_TYPE_TIMESTAMP: + FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder)); + FLATCC_RETURN_UNLESS_0(Timestamp_unit_add(builder, schema_view->time_unit)); + FLATCC_RETURN_UNLESS_0( + Timestamp_timezone_create_str(builder, schema_view->timezone)); + FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_TIME32: + FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, schema_view->time_unit, 32)); + return NANOARROW_OK; + + case NANOARROW_TYPE_TIME64: + FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, schema_view->time_unit, 64)); + return NANOARROW_OK; + + case NANOARROW_TYPE_DURATION: + FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(builder, schema_view->time_unit)); + return NANOARROW_OK; + + case NANOARROW_TYPE_FIXED_SIZE_BINARY: + FLATCC_RETURN_UNLESS_0( + Field_type_FixedSizeBinary_create(builder, schema_view->fixed_size)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LIST: + FLATCC_RETURN_UNLESS_0(Field_type_List_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LARGE_LIST: + FLATCC_RETURN_UNLESS_0(Field_type_LargeList_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_FIXED_SIZE_LIST: + FLATCC_RETURN_UNLESS_0( + Field_type_FixedSizeList_create(builder, schema_view->fixed_size)); + return NANOARROW_OK; + + case NANOARROW_TYPE_RUN_END_ENCODED: + FLATCC_RETURN_UNLESS_0(Field_type_RunEndEncoded_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_STRUCT: + FLATCC_RETURN_UNLESS_0(Field_type_Struct__create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_SPARSE_UNION: + case NANOARROW_TYPE_DENSE_UNION: { + FLATCC_RETURN_UNLESS_0(Field_type_Union_start(builder)); + + FLATCC_RETURN_UNLESS_0( + Union_mode_add(builder, schema_view->type == NANOARROW_TYPE_DENSE_UNION)); + if (schema_view->union_type_ids) { + int8_t type_ids[128]; + int n = _ArrowParseUnionTypeIds(schema_view->union_type_ids, type_ids); + if (n != 0) { + FLATCC_RETURN_UNLESS_0(Union_typeIds_start(builder)); + int32_t* type_ids_32 = (int32_t*)ns(Union_typeIds_extend(builder, n)); + if (!type_ids_32) return ENOMEM; + + for (int i = 0; i < n; ++i) { + type_ids_32[i] = type_ids[i]; + } + FLATCC_RETURN_UNLESS_0(Union_typeIds_end(builder)); + } + } + + FLATCC_RETURN_UNLESS_0(Field_type_Union_end(builder)); + return NANOARROW_OK; + } + + case NANOARROW_TYPE_MAP: + FLATCC_RETURN_UNLESS_0(Field_type_Map_create( + builder, schema_view->schema->flags & ARROW_FLAG_MAP_KEYS_SORTED)); + return NANOARROW_OK; + + case NANOARROW_TYPE_DICTIONARY: + ArrowErrorSet(error, "IPC encoding of dictionary types unsupported"); + return ENOTSUP; + + default: + ArrowErrorSet(error, "Expected a valid enum ArrowType value but found %d", + schema_view->type); + return EINVAL; + } +} + +static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder, + const struct ArrowSchema* schema, + struct ArrowError* error); + +static ArrowErrorCode ArrowIpcEncodeMetadata(flatcc_builder_t* builder, + const struct ArrowSchema* schema, + int (*push_start)(flatcc_builder_t*), + ns(KeyValue_ref_t) * + (*push_end)(flatcc_builder_t*), + struct ArrowError* error) { + struct ArrowMetadataReader metadata; + NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderInit(&metadata, schema->metadata)); + while (metadata.remaining_keys > 0) { + struct ArrowStringView key, value; + NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderRead(&metadata, &key, &value)); + if (push_start(builder) != 0) return ENOMEM; + FLATCC_RETURN_UNLESS_0(KeyValue_key_create_strn(builder, key.data, key.size_bytes)); + FLATCC_RETURN_UNLESS_0( + KeyValue_value_create_strn(builder, value.data, value.size_bytes)); + if (!push_end(builder)) return ENOMEM; + } + return NANOARROW_OK; +} + +static ArrowErrorCode ArrowIpcEncodeFields(flatcc_builder_t* builder, + const struct ArrowSchema* schema, + int (*push_start)(flatcc_builder_t*), + ns(Field_ref_t) * + (*push_end)(flatcc_builder_t*), + struct ArrowError* error) { + for (int i = 0; i < schema->n_children; ++i) { + if (push_start(builder) != 0) return ENOMEM; + NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeField(builder, schema->children[i], error)); + if (!push_end(builder)) return ENOMEM; + } + return NANOARROW_OK; +} + +static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder, + const struct ArrowSchema* schema, + struct ArrowError* error) { + FLATCC_RETURN_UNLESS_0(Field_name_create_str(builder, schema->name)); + FLATCC_RETURN_UNLESS_0( + Field_nullable_add(builder, schema->flags & ARROW_FLAG_NULLABLE)); + + struct ArrowSchemaView schema_view; + NANOARROW_RETURN_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, error)); + NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFieldType(builder, &schema_view, error)); + + if (schema->n_children != 0) { + FLATCC_RETURN_UNLESS_0(Field_children_start(builder)); + NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema, + &ns(Field_children_push_start), + &ns(Field_children_push_end), error)); + FLATCC_RETURN_UNLESS_0(Field_children_end(builder)); + } + + if (schema->metadata) { + FLATCC_RETURN_UNLESS_0(Field_custom_metadata_start(builder)); + NANOARROW_RETURN_NOT_OK( + ArrowIpcEncodeMetadata(builder, schema, &ns(Field_custom_metadata_push_start), + &ns(Field_custom_metadata_push_end), error)); + FLATCC_RETURN_UNLESS_0(Field_custom_metadata_end(builder)); + } + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct ArrowIpcEncoder* encoder, + const struct ArrowSchema* schema, + struct ArrowError* error) { + if (!encoder || !encoder->private_data || !schema || !error) return EINVAL; + + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + + flatcc_builder_t* builder = &private->builder; + + FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder)); + FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5))); + + FLATCC_RETURN_UNLESS_0(Message_header_Schema_start(builder)); + + FLATCC_RETURN_UNLESS_0(Schema_fields_start(builder)); + NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema, + &ns(Schema_fields_push_start), + &ns(Schema_fields_push_end), error)); + FLATCC_RETURN_UNLESS_0(Schema_fields_end(builder)); + + if (schema->metadata) { + FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_start(builder)); + NANOARROW_RETURN_NOT_OK( + ArrowIpcEncodeMetadata(builder, schema, &ns(Schema_custom_metadata_push_start), + &ns(Schema_custom_metadata_push_end), error)); + FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_end(builder)); + } + + FLATCC_RETURN_UNLESS_0(Schema_features_start(builder)); + ns(Feature_enum_t)* features = ns(Schema_features_extend(builder, 1)); + if (!features) return ENOMEM; + features[0] = ns(Feature_COMPRESSED_BODY); + FLATCC_RETURN_UNLESS_0(Schema_features_end(builder)); + + FLATCC_RETURN_UNLESS_0(Message_header_Schema_end(builder)); + + FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0)); + return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM; +} + +ArrowErrorCode ArrowIpcEncoderEncodeRecordBatchImpl( Review Comment: ```suggestion static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatchImpl( ``` ########## src/nanoarrow/ipc/encoder.c: ########## @@ -0,0 +1,518 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <errno.h> +#include <inttypes.h> +#include <stdio.h> +#include <string.h> + +// For thread safe shared buffers we need C11 + stdatomic.h +// Can compile with -DNANOARROW_IPC_USE_STDATOMIC=0 or 1 to override +// automatic detection +#if !defined(NANOARROW_IPC_USE_STDATOMIC) +#define NANOARROW_IPC_USE_STDATOMIC 0 + +// Check for C11 +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L + +// Check for GCC 4.8, which doesn't include stdatomic.h but does +// not define __STDC_NO_ATOMICS__ +#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ >= 5 + +#if !defined(__STDC_NO_ATOMICS__) +#include <stdatomic.h> +#undef NANOARROW_IPC_USE_STDATOMIC +#define NANOARROW_IPC_USE_STDATOMIC 1 +#endif +#endif +#endif + +#endif + +#include "nanoarrow/ipc/flatcc_generated.h" +#include "nanoarrow/nanoarrow.h" +#include "nanoarrow/nanoarrow_ipc.h" + +// R 3.6 / Windows builds on a very old toolchain that does not define ENODATA +#if defined(_WIN32) && !defined(_MSC_VER) && !defined(ENODATA) +#define ENODATA 120 +#endif + +#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x) + +#define FLATCC_RETURN_UNLESS_0(x) \ + if (ns(x) != 0) return ENOMEM; + +struct ArrowIpcEncoderPrivate { + flatcc_builder_t builder; +}; + +ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) { + memset(encoder, 0, sizeof(struct ArrowIpcEncoder)); + encoder->encode_buffer = NULL; + encoder->encode_buffer_state = NULL; + encoder->private_data = malloc(sizeof(struct ArrowIpcEncoderPrivate)); + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + if (flatcc_builder_init(&private->builder) == -1) { + return ESPIPE; + } + encoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE; + return NANOARROW_OK; +} + +void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + flatcc_builder_clear(&private->builder); + free(private); + memset(encoder, 0, sizeof(struct ArrowIpcEncoder)); +} + +ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder, + struct ArrowBuffer* out) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + ArrowBufferReset(out); + size_t size = 0; + out->data = (uint8_t*)flatcc_builder_finalize_buffer(&private->builder, &size); + out->size_bytes = out->capacity_bytes = (int64_t)size; + return out->data ? NANOARROW_OK : ENOMEM; +} + +ArrowErrorCode ArrowIpcEncodeEncapsulatedMessage(struct ArrowBuffer* buffer) { + if (!buffer) return EINVAL; + + int32_t continuation = -1, message_size = (int32_t)buffer->size_bytes; + + int64_t encapsulated_size = _ArrowRoundUpToMultipleOf8( + sizeof(continuation) + sizeof(message_size) + buffer->size_bytes); + int64_t padding_size = + encapsulated_size - sizeof(continuation) - sizeof(message_size) - message_size; + NANOARROW_RETURN_NOT_OK(ArrowBufferResize(buffer, encapsulated_size, 0)); + + memset(buffer->data + sizeof(continuation) + sizeof(message_size) + message_size, 0, + padding_size); + memmove(buffer->data + sizeof(continuation) + sizeof(message_size), buffer->data, + message_size); + memcpy(buffer->data + sizeof(continuation), &message_size, sizeof(message_size)); + memcpy(buffer->data, &continuation, sizeof(continuation)); + return NANOARROW_OK; +} + +static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback( + struct ArrowBufferView buffer_view, int64_t* offset, struct ArrowIpcEncoder* encoder, + struct ArrowError* error) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + struct ArrowBuffer* body_buffer = (struct ArrowBuffer*)encoder->encode_buffer_state; + NANOARROW_RETURN_NOT_OK(ArrowBufferResize( + body_buffer, _ArrowRoundUpToMultipleOf8(body_buffer->size_bytes), 0)); + *offset = body_buffer->size_bytes; + NANOARROW_RETURN_NOT_OK( + ArrowBufferAppend(body_buffer, buffer_view.data.data, buffer_view.size_bytes)); + encoder->body_length = body_buffer->size_bytes; + return NANOARROW_OK; +} + +void ArrowIpcEncoderBuildContiguousBodyBuffer(struct ArrowIpcEncoder* encoder, + struct ArrowBuffer* body_buffer) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + encoder->encode_buffer = &ArrowIpcEncoderBuildContiguousBodyBufferCallback; + encoder->encode_buffer_state = body_buffer; + ArrowBufferResize(body_buffer, 0, 0); +} + +static ArrowErrorCode ArrowIpcEncodeFieldType(flatcc_builder_t* builder, + const struct ArrowSchemaView* schema_view, + struct ArrowError* error) { + switch (schema_view->type) { + case NANOARROW_TYPE_NA: + FLATCC_RETURN_UNLESS_0(Field_type_Null_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_BOOL: + FLATCC_RETURN_UNLESS_0(Field_type_Bool_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT8: + case NANOARROW_TYPE_INT8: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 8, schema_view->type == NANOARROW_TYPE_INT8)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT16: + case NANOARROW_TYPE_INT16: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 16, schema_view->type == NANOARROW_TYPE_INT16)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT32: + case NANOARROW_TYPE_INT32: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 32, schema_view->type == NANOARROW_TYPE_INT32)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT64: + case NANOARROW_TYPE_INT64: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 64, schema_view->type == NANOARROW_TYPE_INT64)); + return NANOARROW_OK; + + case NANOARROW_TYPE_HALF_FLOAT: + FLATCC_RETURN_UNLESS_0( + Field_type_FloatingPoint_create(builder, ns(Precision_HALF))); + return NANOARROW_OK; + + case NANOARROW_TYPE_FLOAT: + FLATCC_RETURN_UNLESS_0( + Field_type_FloatingPoint_create(builder, ns(Precision_SINGLE))); + return NANOARROW_OK; + + case NANOARROW_TYPE_DOUBLE: + FLATCC_RETURN_UNLESS_0( + Field_type_FloatingPoint_create(builder, ns(Precision_DOUBLE))); + return NANOARROW_OK; + + case NANOARROW_TYPE_DECIMAL128: + case NANOARROW_TYPE_DECIMAL256: + FLATCC_RETURN_UNLESS_0(Field_type_Decimal_create( + builder, schema_view->decimal_precision, schema_view->decimal_scale, + schema_view->decimal_bitwidth)); + return NANOARROW_OK; + + case NANOARROW_TYPE_STRING: + FLATCC_RETURN_UNLESS_0(Field_type_Utf8_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LARGE_STRING: + FLATCC_RETURN_UNLESS_0(Field_type_LargeUtf8_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_BINARY: + FLATCC_RETURN_UNLESS_0(Field_type_Binary_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LARGE_BINARY: + FLATCC_RETURN_UNLESS_0(Field_type_LargeBinary_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_DATE32: + FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, ns(DateUnit_DAY))); + return NANOARROW_OK; + + case NANOARROW_TYPE_DATE64: + FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, ns(DateUnit_MILLISECOND))); + return NANOARROW_OK; + + case NANOARROW_TYPE_INTERVAL_MONTHS: + FLATCC_RETURN_UNLESS_0( + Field_type_Interval_create(builder, ns(IntervalUnit_YEAR_MONTH))); + return NANOARROW_OK; + + case NANOARROW_TYPE_INTERVAL_DAY_TIME: + FLATCC_RETURN_UNLESS_0( + Field_type_Interval_create(builder, ns(IntervalUnit_DAY_TIME))); + return NANOARROW_OK; + + case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO: + FLATCC_RETURN_UNLESS_0( + Field_type_Interval_create(builder, ns(IntervalUnit_MONTH_DAY_NANO))); + return NANOARROW_OK; + + case NANOARROW_TYPE_TIMESTAMP: + FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder)); + FLATCC_RETURN_UNLESS_0(Timestamp_unit_add(builder, schema_view->time_unit)); + FLATCC_RETURN_UNLESS_0( + Timestamp_timezone_create_str(builder, schema_view->timezone)); + FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_TIME32: + FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, schema_view->time_unit, 32)); + return NANOARROW_OK; + + case NANOARROW_TYPE_TIME64: + FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, schema_view->time_unit, 64)); + return NANOARROW_OK; + + case NANOARROW_TYPE_DURATION: + FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(builder, schema_view->time_unit)); + return NANOARROW_OK; + + case NANOARROW_TYPE_FIXED_SIZE_BINARY: + FLATCC_RETURN_UNLESS_0( + Field_type_FixedSizeBinary_create(builder, schema_view->fixed_size)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LIST: + FLATCC_RETURN_UNLESS_0(Field_type_List_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LARGE_LIST: + FLATCC_RETURN_UNLESS_0(Field_type_LargeList_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_FIXED_SIZE_LIST: + FLATCC_RETURN_UNLESS_0( + Field_type_FixedSizeList_create(builder, schema_view->fixed_size)); + return NANOARROW_OK; + + case NANOARROW_TYPE_RUN_END_ENCODED: + FLATCC_RETURN_UNLESS_0(Field_type_RunEndEncoded_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_STRUCT: + FLATCC_RETURN_UNLESS_0(Field_type_Struct__create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_SPARSE_UNION: + case NANOARROW_TYPE_DENSE_UNION: { + FLATCC_RETURN_UNLESS_0(Field_type_Union_start(builder)); + + FLATCC_RETURN_UNLESS_0( + Union_mode_add(builder, schema_view->type == NANOARROW_TYPE_DENSE_UNION)); + if (schema_view->union_type_ids) { + int8_t type_ids[128]; + int n = _ArrowParseUnionTypeIds(schema_view->union_type_ids, type_ids); + if (n != 0) { + FLATCC_RETURN_UNLESS_0(Union_typeIds_start(builder)); + int32_t* type_ids_32 = (int32_t*)ns(Union_typeIds_extend(builder, n)); + if (!type_ids_32) return ENOMEM; + + for (int i = 0; i < n; ++i) { + type_ids_32[i] = type_ids[i]; + } + FLATCC_RETURN_UNLESS_0(Union_typeIds_end(builder)); + } + } + + FLATCC_RETURN_UNLESS_0(Field_type_Union_end(builder)); + return NANOARROW_OK; + } + + case NANOARROW_TYPE_MAP: + FLATCC_RETURN_UNLESS_0(Field_type_Map_create( + builder, schema_view->schema->flags & ARROW_FLAG_MAP_KEYS_SORTED)); + return NANOARROW_OK; + + case NANOARROW_TYPE_DICTIONARY: + ArrowErrorSet(error, "IPC encoding of dictionary types unsupported"); + return ENOTSUP; + + default: + ArrowErrorSet(error, "Expected a valid enum ArrowType value but found %d", + schema_view->type); + return EINVAL; + } +} + +static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder, + const struct ArrowSchema* schema, + struct ArrowError* error); + +static ArrowErrorCode ArrowIpcEncodeMetadata(flatcc_builder_t* builder, + const struct ArrowSchema* schema, + int (*push_start)(flatcc_builder_t*), + ns(KeyValue_ref_t) * + (*push_end)(flatcc_builder_t*), + struct ArrowError* error) { + struct ArrowMetadataReader metadata; + NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderInit(&metadata, schema->metadata)); + while (metadata.remaining_keys > 0) { + struct ArrowStringView key, value; + NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderRead(&metadata, &key, &value)); + if (push_start(builder) != 0) return ENOMEM; + FLATCC_RETURN_UNLESS_0(KeyValue_key_create_strn(builder, key.data, key.size_bytes)); + FLATCC_RETURN_UNLESS_0( + KeyValue_value_create_strn(builder, value.data, value.size_bytes)); + if (!push_end(builder)) return ENOMEM; + } + return NANOARROW_OK; +} + +static ArrowErrorCode ArrowIpcEncodeFields(flatcc_builder_t* builder, + const struct ArrowSchema* schema, + int (*push_start)(flatcc_builder_t*), + ns(Field_ref_t) * + (*push_end)(flatcc_builder_t*), + struct ArrowError* error) { + for (int i = 0; i < schema->n_children; ++i) { + if (push_start(builder) != 0) return ENOMEM; + NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeField(builder, schema->children[i], error)); + if (!push_end(builder)) return ENOMEM; + } + return NANOARROW_OK; +} + +static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder, + const struct ArrowSchema* schema, + struct ArrowError* error) { + FLATCC_RETURN_UNLESS_0(Field_name_create_str(builder, schema->name)); + FLATCC_RETURN_UNLESS_0( + Field_nullable_add(builder, schema->flags & ARROW_FLAG_NULLABLE)); + + struct ArrowSchemaView schema_view; + NANOARROW_RETURN_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, error)); + NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFieldType(builder, &schema_view, error)); + + if (schema->n_children != 0) { + FLATCC_RETURN_UNLESS_0(Field_children_start(builder)); + NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema, + &ns(Field_children_push_start), + &ns(Field_children_push_end), error)); + FLATCC_RETURN_UNLESS_0(Field_children_end(builder)); + } + + if (schema->metadata) { + FLATCC_RETURN_UNLESS_0(Field_custom_metadata_start(builder)); + NANOARROW_RETURN_NOT_OK( + ArrowIpcEncodeMetadata(builder, schema, &ns(Field_custom_metadata_push_start), + &ns(Field_custom_metadata_push_end), error)); + FLATCC_RETURN_UNLESS_0(Field_custom_metadata_end(builder)); + } + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct ArrowIpcEncoder* encoder, + const struct ArrowSchema* schema, + struct ArrowError* error) { + if (!encoder || !encoder->private_data || !schema || !error) return EINVAL; + + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + + flatcc_builder_t* builder = &private->builder; + + FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder)); + FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5))); + + FLATCC_RETURN_UNLESS_0(Message_header_Schema_start(builder)); + + FLATCC_RETURN_UNLESS_0(Schema_fields_start(builder)); + NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema, + &ns(Schema_fields_push_start), + &ns(Schema_fields_push_end), error)); + FLATCC_RETURN_UNLESS_0(Schema_fields_end(builder)); + + if (schema->metadata) { + FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_start(builder)); + NANOARROW_RETURN_NOT_OK( + ArrowIpcEncodeMetadata(builder, schema, &ns(Schema_custom_metadata_push_start), + &ns(Schema_custom_metadata_push_end), error)); + FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_end(builder)); + } + + FLATCC_RETURN_UNLESS_0(Schema_features_start(builder)); + ns(Feature_enum_t)* features = ns(Schema_features_extend(builder, 1)); + if (!features) return ENOMEM; + features[0] = ns(Feature_COMPRESSED_BODY); + FLATCC_RETURN_UNLESS_0(Schema_features_end(builder)); + + FLATCC_RETURN_UNLESS_0(Message_header_Schema_end(builder)); + + FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0)); + return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM; +} + +ArrowErrorCode ArrowIpcEncoderEncodeRecordBatchImpl( + struct ArrowIpcEncoder* encoder, const struct ArrowArrayView* array_view, + struct ArrowBuffer* buffers, struct ArrowBuffer* nodes, struct ArrowError* error) { + if (array_view->offset != 0) { + ArrowErrorSet(error, "Cannot encode arrays with nonzero offset"); + return ENOTSUP; + } + + for (int c = 0; c < array_view->n_children; ++c) { + const struct ArrowArrayView* child = array_view->children[c]; + + struct ns(FieldNode) node = {child->length, child->null_count}; + NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(nodes, &node, sizeof(node))); + + for (int b = 0; b < child->array->n_buffers; ++b) { + struct ns(Buffer) buffer = {.length = child->buffer_views[b].size_bytes}; + NANOARROW_RETURN_NOT_OK( + encoder->encode_buffer(child->buffer_views[b], &buffer.offset, encoder, error)); + NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(buffers, &buffer, sizeof(buffer))); + } + + NANOARROW_RETURN_NOT_OK( + ArrowIpcEncoderEncodeRecordBatchImpl(encoder, child, buffers, nodes, error)); + } + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(struct ArrowIpcEncoder* encoder, + const struct ArrowArrayView* array_view, + struct ArrowError* error) { + if (!encoder || !encoder->private_data || !array_view || !error) return EINVAL; + + if (array_view->null_count != 0) { Review Comment: We should probably make a function for this (I think there's one in Python that should maybe move to C), but we might have to compute the null count here if `null_count` is -1. For the purposes of this PR, checking for a NULL validity buffer will work for almost all record batch implementations (maybe not C#). ########## src/nanoarrow/ipc/encoder.c: ########## @@ -0,0 +1,518 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <errno.h> +#include <inttypes.h> +#include <stdio.h> +#include <string.h> + +// For thread safe shared buffers we need C11 + stdatomic.h +// Can compile with -DNANOARROW_IPC_USE_STDATOMIC=0 or 1 to override +// automatic detection +#if !defined(NANOARROW_IPC_USE_STDATOMIC) +#define NANOARROW_IPC_USE_STDATOMIC 0 + +// Check for C11 +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L + +// Check for GCC 4.8, which doesn't include stdatomic.h but does +// not define __STDC_NO_ATOMICS__ +#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ >= 5 + +#if !defined(__STDC_NO_ATOMICS__) +#include <stdatomic.h> +#undef NANOARROW_IPC_USE_STDATOMIC +#define NANOARROW_IPC_USE_STDATOMIC 1 +#endif +#endif +#endif + +#endif + +#include "nanoarrow/ipc/flatcc_generated.h" +#include "nanoarrow/nanoarrow.h" +#include "nanoarrow/nanoarrow_ipc.h" + +// R 3.6 / Windows builds on a very old toolchain that does not define ENODATA +#if defined(_WIN32) && !defined(_MSC_VER) && !defined(ENODATA) +#define ENODATA 120 +#endif + +#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x) + +#define FLATCC_RETURN_UNLESS_0(x) \ + if (ns(x) != 0) return ENOMEM; + +struct ArrowIpcEncoderPrivate { + flatcc_builder_t builder; +}; + +ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) { + memset(encoder, 0, sizeof(struct ArrowIpcEncoder)); + encoder->encode_buffer = NULL; + encoder->encode_buffer_state = NULL; + encoder->private_data = malloc(sizeof(struct ArrowIpcEncoderPrivate)); + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + if (flatcc_builder_init(&private->builder) == -1) { + return ESPIPE; + } + encoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE; + return NANOARROW_OK; +} + +void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + flatcc_builder_clear(&private->builder); + free(private); + memset(encoder, 0, sizeof(struct ArrowIpcEncoder)); +} + +ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder, + struct ArrowBuffer* out) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + ArrowBufferReset(out); + size_t size = 0; + out->data = (uint8_t*)flatcc_builder_finalize_buffer(&private->builder, &size); + out->size_bytes = out->capacity_bytes = (int64_t)size; + return out->data ? NANOARROW_OK : ENOMEM; +} + +ArrowErrorCode ArrowIpcEncodeEncapsulatedMessage(struct ArrowBuffer* buffer) { + if (!buffer) return EINVAL; + + int32_t continuation = -1, message_size = (int32_t)buffer->size_bytes; + + int64_t encapsulated_size = _ArrowRoundUpToMultipleOf8( + sizeof(continuation) + sizeof(message_size) + buffer->size_bytes); + int64_t padding_size = + encapsulated_size - sizeof(continuation) - sizeof(message_size) - message_size; + NANOARROW_RETURN_NOT_OK(ArrowBufferResize(buffer, encapsulated_size, 0)); + + memset(buffer->data + sizeof(continuation) + sizeof(message_size) + message_size, 0, + padding_size); + memmove(buffer->data + sizeof(continuation) + sizeof(message_size), buffer->data, + message_size); + memcpy(buffer->data + sizeof(continuation), &message_size, sizeof(message_size)); + memcpy(buffer->data, &continuation, sizeof(continuation)); + return NANOARROW_OK; +} + +static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback( + struct ArrowBufferView buffer_view, int64_t* offset, struct ArrowIpcEncoder* encoder, + struct ArrowError* error) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + struct ArrowBuffer* body_buffer = (struct ArrowBuffer*)encoder->encode_buffer_state; + NANOARROW_RETURN_NOT_OK(ArrowBufferResize( + body_buffer, _ArrowRoundUpToMultipleOf8(body_buffer->size_bytes), 0)); + *offset = body_buffer->size_bytes; + NANOARROW_RETURN_NOT_OK( + ArrowBufferAppend(body_buffer, buffer_view.data.data, buffer_view.size_bytes)); + encoder->body_length = body_buffer->size_bytes; + return NANOARROW_OK; +} + +void ArrowIpcEncoderBuildContiguousBodyBuffer(struct ArrowIpcEncoder* encoder, + struct ArrowBuffer* body_buffer) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + encoder->encode_buffer = &ArrowIpcEncoderBuildContiguousBodyBufferCallback; + encoder->encode_buffer_state = body_buffer; + ArrowBufferResize(body_buffer, 0, 0); +} + +static ArrowErrorCode ArrowIpcEncodeFieldType(flatcc_builder_t* builder, + const struct ArrowSchemaView* schema_view, + struct ArrowError* error) { + switch (schema_view->type) { + case NANOARROW_TYPE_NA: + FLATCC_RETURN_UNLESS_0(Field_type_Null_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_BOOL: + FLATCC_RETURN_UNLESS_0(Field_type_Bool_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT8: + case NANOARROW_TYPE_INT8: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 8, schema_view->type == NANOARROW_TYPE_INT8)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT16: + case NANOARROW_TYPE_INT16: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 16, schema_view->type == NANOARROW_TYPE_INT16)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT32: + case NANOARROW_TYPE_INT32: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 32, schema_view->type == NANOARROW_TYPE_INT32)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT64: + case NANOARROW_TYPE_INT64: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 64, schema_view->type == NANOARROW_TYPE_INT64)); + return NANOARROW_OK; + + case NANOARROW_TYPE_HALF_FLOAT: + FLATCC_RETURN_UNLESS_0( + Field_type_FloatingPoint_create(builder, ns(Precision_HALF))); + return NANOARROW_OK; + + case NANOARROW_TYPE_FLOAT: + FLATCC_RETURN_UNLESS_0( + Field_type_FloatingPoint_create(builder, ns(Precision_SINGLE))); + return NANOARROW_OK; + + case NANOARROW_TYPE_DOUBLE: + FLATCC_RETURN_UNLESS_0( + Field_type_FloatingPoint_create(builder, ns(Precision_DOUBLE))); + return NANOARROW_OK; + + case NANOARROW_TYPE_DECIMAL128: + case NANOARROW_TYPE_DECIMAL256: + FLATCC_RETURN_UNLESS_0(Field_type_Decimal_create( + builder, schema_view->decimal_precision, schema_view->decimal_scale, + schema_view->decimal_bitwidth)); + return NANOARROW_OK; + + case NANOARROW_TYPE_STRING: + FLATCC_RETURN_UNLESS_0(Field_type_Utf8_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LARGE_STRING: + FLATCC_RETURN_UNLESS_0(Field_type_LargeUtf8_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_BINARY: + FLATCC_RETURN_UNLESS_0(Field_type_Binary_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LARGE_BINARY: + FLATCC_RETURN_UNLESS_0(Field_type_LargeBinary_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_DATE32: + FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, ns(DateUnit_DAY))); + return NANOARROW_OK; + + case NANOARROW_TYPE_DATE64: + FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, ns(DateUnit_MILLISECOND))); + return NANOARROW_OK; + + case NANOARROW_TYPE_INTERVAL_MONTHS: + FLATCC_RETURN_UNLESS_0( + Field_type_Interval_create(builder, ns(IntervalUnit_YEAR_MONTH))); + return NANOARROW_OK; + + case NANOARROW_TYPE_INTERVAL_DAY_TIME: + FLATCC_RETURN_UNLESS_0( + Field_type_Interval_create(builder, ns(IntervalUnit_DAY_TIME))); + return NANOARROW_OK; + + case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO: + FLATCC_RETURN_UNLESS_0( + Field_type_Interval_create(builder, ns(IntervalUnit_MONTH_DAY_NANO))); + return NANOARROW_OK; + + case NANOARROW_TYPE_TIMESTAMP: + FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder)); + FLATCC_RETURN_UNLESS_0(Timestamp_unit_add(builder, schema_view->time_unit)); + FLATCC_RETURN_UNLESS_0( + Timestamp_timezone_create_str(builder, schema_view->timezone)); + FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_TIME32: + FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, schema_view->time_unit, 32)); + return NANOARROW_OK; + + case NANOARROW_TYPE_TIME64: + FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, schema_view->time_unit, 64)); + return NANOARROW_OK; + + case NANOARROW_TYPE_DURATION: + FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(builder, schema_view->time_unit)); + return NANOARROW_OK; + + case NANOARROW_TYPE_FIXED_SIZE_BINARY: + FLATCC_RETURN_UNLESS_0( + Field_type_FixedSizeBinary_create(builder, schema_view->fixed_size)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LIST: + FLATCC_RETURN_UNLESS_0(Field_type_List_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LARGE_LIST: + FLATCC_RETURN_UNLESS_0(Field_type_LargeList_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_FIXED_SIZE_LIST: + FLATCC_RETURN_UNLESS_0( + Field_type_FixedSizeList_create(builder, schema_view->fixed_size)); + return NANOARROW_OK; + + case NANOARROW_TYPE_RUN_END_ENCODED: + FLATCC_RETURN_UNLESS_0(Field_type_RunEndEncoded_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_STRUCT: + FLATCC_RETURN_UNLESS_0(Field_type_Struct__create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_SPARSE_UNION: + case NANOARROW_TYPE_DENSE_UNION: { + FLATCC_RETURN_UNLESS_0(Field_type_Union_start(builder)); + + FLATCC_RETURN_UNLESS_0( + Union_mode_add(builder, schema_view->type == NANOARROW_TYPE_DENSE_UNION)); + if (schema_view->union_type_ids) { + int8_t type_ids[128]; + int n = _ArrowParseUnionTypeIds(schema_view->union_type_ids, type_ids); + if (n != 0) { + FLATCC_RETURN_UNLESS_0(Union_typeIds_start(builder)); + int32_t* type_ids_32 = (int32_t*)ns(Union_typeIds_extend(builder, n)); + if (!type_ids_32) return ENOMEM; + + for (int i = 0; i < n; ++i) { + type_ids_32[i] = type_ids[i]; + } + FLATCC_RETURN_UNLESS_0(Union_typeIds_end(builder)); + } + } + + FLATCC_RETURN_UNLESS_0(Field_type_Union_end(builder)); + return NANOARROW_OK; + } + + case NANOARROW_TYPE_MAP: + FLATCC_RETURN_UNLESS_0(Field_type_Map_create( + builder, schema_view->schema->flags & ARROW_FLAG_MAP_KEYS_SORTED)); + return NANOARROW_OK; + + case NANOARROW_TYPE_DICTIONARY: + ArrowErrorSet(error, "IPC encoding of dictionary types unsupported"); + return ENOTSUP; + + default: + ArrowErrorSet(error, "Expected a valid enum ArrowType value but found %d", + schema_view->type); + return EINVAL; + } +} + +static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder, + const struct ArrowSchema* schema, + struct ArrowError* error); + +static ArrowErrorCode ArrowIpcEncodeMetadata(flatcc_builder_t* builder, + const struct ArrowSchema* schema, + int (*push_start)(flatcc_builder_t*), + ns(KeyValue_ref_t) * + (*push_end)(flatcc_builder_t*), + struct ArrowError* error) { + struct ArrowMetadataReader metadata; + NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderInit(&metadata, schema->metadata)); + while (metadata.remaining_keys > 0) { + struct ArrowStringView key, value; + NANOARROW_RETURN_NOT_OK(ArrowMetadataReaderRead(&metadata, &key, &value)); + if (push_start(builder) != 0) return ENOMEM; + FLATCC_RETURN_UNLESS_0(KeyValue_key_create_strn(builder, key.data, key.size_bytes)); + FLATCC_RETURN_UNLESS_0( + KeyValue_value_create_strn(builder, value.data, value.size_bytes)); + if (!push_end(builder)) return ENOMEM; + } + return NANOARROW_OK; +} + +static ArrowErrorCode ArrowIpcEncodeFields(flatcc_builder_t* builder, + const struct ArrowSchema* schema, + int (*push_start)(flatcc_builder_t*), + ns(Field_ref_t) * + (*push_end)(flatcc_builder_t*), + struct ArrowError* error) { + for (int i = 0; i < schema->n_children; ++i) { + if (push_start(builder) != 0) return ENOMEM; + NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeField(builder, schema->children[i], error)); + if (!push_end(builder)) return ENOMEM; + } + return NANOARROW_OK; +} + +static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder, + const struct ArrowSchema* schema, + struct ArrowError* error) { + FLATCC_RETURN_UNLESS_0(Field_name_create_str(builder, schema->name)); + FLATCC_RETURN_UNLESS_0( + Field_nullable_add(builder, schema->flags & ARROW_FLAG_NULLABLE)); + + struct ArrowSchemaView schema_view; + NANOARROW_RETURN_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, error)); + NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFieldType(builder, &schema_view, error)); + + if (schema->n_children != 0) { + FLATCC_RETURN_UNLESS_0(Field_children_start(builder)); + NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema, + &ns(Field_children_push_start), + &ns(Field_children_push_end), error)); + FLATCC_RETURN_UNLESS_0(Field_children_end(builder)); + } + + if (schema->metadata) { + FLATCC_RETURN_UNLESS_0(Field_custom_metadata_start(builder)); + NANOARROW_RETURN_NOT_OK( + ArrowIpcEncodeMetadata(builder, schema, &ns(Field_custom_metadata_push_start), + &ns(Field_custom_metadata_push_end), error)); + FLATCC_RETURN_UNLESS_0(Field_custom_metadata_end(builder)); + } + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct ArrowIpcEncoder* encoder, + const struct ArrowSchema* schema, + struct ArrowError* error) { + if (!encoder || !encoder->private_data || !schema || !error) return EINVAL; + + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + + flatcc_builder_t* builder = &private->builder; + + FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder)); + FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5))); + + FLATCC_RETURN_UNLESS_0(Message_header_Schema_start(builder)); + + FLATCC_RETURN_UNLESS_0(Schema_fields_start(builder)); + NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema, + &ns(Schema_fields_push_start), + &ns(Schema_fields_push_end), error)); + FLATCC_RETURN_UNLESS_0(Schema_fields_end(builder)); + + if (schema->metadata) { + FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_start(builder)); + NANOARROW_RETURN_NOT_OK( + ArrowIpcEncodeMetadata(builder, schema, &ns(Schema_custom_metadata_push_start), + &ns(Schema_custom_metadata_push_end), error)); + FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_end(builder)); + } + + FLATCC_RETURN_UNLESS_0(Schema_features_start(builder)); + ns(Feature_enum_t)* features = ns(Schema_features_extend(builder, 1)); + if (!features) return ENOMEM; + features[0] = ns(Feature_COMPRESSED_BODY); + FLATCC_RETURN_UNLESS_0(Schema_features_end(builder)); + + FLATCC_RETURN_UNLESS_0(Message_header_Schema_end(builder)); + + FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0)); + return ns(Message_end_as_root(builder)) ? NANOARROW_OK : ENOMEM; +} + +ArrowErrorCode ArrowIpcEncoderEncodeRecordBatchImpl( + struct ArrowIpcEncoder* encoder, const struct ArrowArrayView* array_view, + struct ArrowBuffer* buffers, struct ArrowBuffer* nodes, struct ArrowError* error) { + if (array_view->offset != 0) { + ArrowErrorSet(error, "Cannot encode arrays with nonzero offset"); + return ENOTSUP; + } + + for (int c = 0; c < array_view->n_children; ++c) { Review Comment: ```suggestion for (int64_t c = 0; c < array_view->n_children; ++c) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
