bkietz commented on code in PR #555: URL: https://github.com/apache/arrow-nanoarrow/pull/555#discussion_r1676845872
########## src/nanoarrow/ipc/encoder.c: ########## @@ -0,0 +1,518 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <errno.h> +#include <inttypes.h> +#include <stdio.h> +#include <string.h> + +// For thread safe shared buffers we need C11 + stdatomic.h +// Can compile with -DNANOARROW_IPC_USE_STDATOMIC=0 or 1 to override +// automatic detection +#if !defined(NANOARROW_IPC_USE_STDATOMIC) +#define NANOARROW_IPC_USE_STDATOMIC 0 + +// Check for C11 +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L + +// Check for GCC 4.8, which doesn't include stdatomic.h but does +// not define __STDC_NO_ATOMICS__ +#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ >= 5 + +#if !defined(__STDC_NO_ATOMICS__) +#include <stdatomic.h> +#undef NANOARROW_IPC_USE_STDATOMIC +#define NANOARROW_IPC_USE_STDATOMIC 1 +#endif +#endif +#endif + +#endif + +#include "nanoarrow/ipc/flatcc_generated.h" +#include "nanoarrow/nanoarrow.h" +#include "nanoarrow/nanoarrow_ipc.h" + +// R 3.6 / Windows builds on a very old toolchain that does not define ENODATA +#if defined(_WIN32) && !defined(_MSC_VER) && !defined(ENODATA) +#define ENODATA 120 +#endif + +#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x) + +#define FLATCC_RETURN_UNLESS_0(x) \ + if (ns(x) != 0) return ENOMEM; + +struct ArrowIpcEncoderPrivate { + flatcc_builder_t builder; +}; + +ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) { + memset(encoder, 0, sizeof(struct ArrowIpcEncoder)); + encoder->encode_buffer = NULL; + encoder->encode_buffer_state = NULL; + encoder->private_data = malloc(sizeof(struct ArrowIpcEncoderPrivate)); + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + if (flatcc_builder_init(&private->builder) == -1) { + return ESPIPE; + } + encoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE; + return NANOARROW_OK; +} + +void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + flatcc_builder_clear(&private->builder); + free(private); + memset(encoder, 0, sizeof(struct ArrowIpcEncoder)); +} + +ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder, + struct ArrowBuffer* out) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + ArrowBufferReset(out); + size_t size = 0; + out->data = (uint8_t*)flatcc_builder_finalize_buffer(&private->builder, &size); + out->size_bytes = out->capacity_bytes = (int64_t)size; + return out->data ? NANOARROW_OK : ENOMEM; +} + +ArrowErrorCode ArrowIpcEncodeEncapsulatedMessage(struct ArrowBuffer* buffer) { + if (!buffer) return EINVAL; + + int32_t continuation = -1, message_size = (int32_t)buffer->size_bytes; + + int64_t encapsulated_size = _ArrowRoundUpToMultipleOf8( + sizeof(continuation) + sizeof(message_size) + buffer->size_bytes); + int64_t padding_size = + encapsulated_size - sizeof(continuation) - sizeof(message_size) - message_size; + NANOARROW_RETURN_NOT_OK(ArrowBufferResize(buffer, encapsulated_size, 0)); + + memset(buffer->data + sizeof(continuation) + sizeof(message_size) + message_size, 0, + padding_size); + memmove(buffer->data + sizeof(continuation) + sizeof(message_size), buffer->data, + message_size); + memcpy(buffer->data + sizeof(continuation), &message_size, sizeof(message_size)); + memcpy(buffer->data, &continuation, sizeof(continuation)); + return NANOARROW_OK; +} + +static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback( + struct ArrowBufferView buffer_view, int64_t* offset, struct ArrowIpcEncoder* encoder, + struct ArrowError* error) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + struct ArrowBuffer* body_buffer = (struct ArrowBuffer*)encoder->encode_buffer_state; + NANOARROW_RETURN_NOT_OK(ArrowBufferResize( + body_buffer, _ArrowRoundUpToMultipleOf8(body_buffer->size_bytes), 0)); + *offset = body_buffer->size_bytes; + NANOARROW_RETURN_NOT_OK( + ArrowBufferAppend(body_buffer, buffer_view.data.data, buffer_view.size_bytes)); + encoder->body_length = body_buffer->size_bytes; + return NANOARROW_OK; +} + +void ArrowIpcEncoderBuildContiguousBodyBuffer(struct ArrowIpcEncoder* encoder, + struct ArrowBuffer* body_buffer) { + struct ArrowIpcEncoderPrivate* private = + (struct ArrowIpcEncoderPrivate*)encoder->private_data; + encoder->encode_buffer = &ArrowIpcEncoderBuildContiguousBodyBufferCallback; + encoder->encode_buffer_state = body_buffer; + ArrowBufferResize(body_buffer, 0, 0); +} + +static ArrowErrorCode ArrowIpcEncodeFieldType(flatcc_builder_t* builder, + const struct ArrowSchemaView* schema_view, + struct ArrowError* error) { + switch (schema_view->type) { + case NANOARROW_TYPE_NA: + FLATCC_RETURN_UNLESS_0(Field_type_Null_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_BOOL: + FLATCC_RETURN_UNLESS_0(Field_type_Bool_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT8: + case NANOARROW_TYPE_INT8: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 8, schema_view->type == NANOARROW_TYPE_INT8)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT16: + case NANOARROW_TYPE_INT16: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 16, schema_view->type == NANOARROW_TYPE_INT16)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT32: + case NANOARROW_TYPE_INT32: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 32, schema_view->type == NANOARROW_TYPE_INT32)); + return NANOARROW_OK; + + case NANOARROW_TYPE_UINT64: + case NANOARROW_TYPE_INT64: + FLATCC_RETURN_UNLESS_0( + Field_type_Int_create(builder, 64, schema_view->type == NANOARROW_TYPE_INT64)); + return NANOARROW_OK; + + case NANOARROW_TYPE_HALF_FLOAT: + FLATCC_RETURN_UNLESS_0( + Field_type_FloatingPoint_create(builder, ns(Precision_HALF))); + return NANOARROW_OK; + + case NANOARROW_TYPE_FLOAT: + FLATCC_RETURN_UNLESS_0( + Field_type_FloatingPoint_create(builder, ns(Precision_SINGLE))); + return NANOARROW_OK; + + case NANOARROW_TYPE_DOUBLE: + FLATCC_RETURN_UNLESS_0( + Field_type_FloatingPoint_create(builder, ns(Precision_DOUBLE))); + return NANOARROW_OK; + + case NANOARROW_TYPE_DECIMAL128: + case NANOARROW_TYPE_DECIMAL256: + FLATCC_RETURN_UNLESS_0(Field_type_Decimal_create( + builder, schema_view->decimal_precision, schema_view->decimal_scale, + schema_view->decimal_bitwidth)); + return NANOARROW_OK; + + case NANOARROW_TYPE_STRING: + FLATCC_RETURN_UNLESS_0(Field_type_Utf8_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LARGE_STRING: + FLATCC_RETURN_UNLESS_0(Field_type_LargeUtf8_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_BINARY: + FLATCC_RETURN_UNLESS_0(Field_type_Binary_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LARGE_BINARY: + FLATCC_RETURN_UNLESS_0(Field_type_LargeBinary_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_DATE32: + FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, ns(DateUnit_DAY))); + return NANOARROW_OK; + + case NANOARROW_TYPE_DATE64: + FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, ns(DateUnit_MILLISECOND))); + return NANOARROW_OK; + + case NANOARROW_TYPE_INTERVAL_MONTHS: + FLATCC_RETURN_UNLESS_0( + Field_type_Interval_create(builder, ns(IntervalUnit_YEAR_MONTH))); + return NANOARROW_OK; + + case NANOARROW_TYPE_INTERVAL_DAY_TIME: + FLATCC_RETURN_UNLESS_0( + Field_type_Interval_create(builder, ns(IntervalUnit_DAY_TIME))); + return NANOARROW_OK; + + case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO: + FLATCC_RETURN_UNLESS_0( + Field_type_Interval_create(builder, ns(IntervalUnit_MONTH_DAY_NANO))); + return NANOARROW_OK; + + case NANOARROW_TYPE_TIMESTAMP: + FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder)); + FLATCC_RETURN_UNLESS_0(Timestamp_unit_add(builder, schema_view->time_unit)); + FLATCC_RETURN_UNLESS_0( + Timestamp_timezone_create_str(builder, schema_view->timezone)); + FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_TIME32: + FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, schema_view->time_unit, 32)); + return NANOARROW_OK; + + case NANOARROW_TYPE_TIME64: + FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, schema_view->time_unit, 64)); + return NANOARROW_OK; + + case NANOARROW_TYPE_DURATION: + FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(builder, schema_view->time_unit)); + return NANOARROW_OK; + + case NANOARROW_TYPE_FIXED_SIZE_BINARY: + FLATCC_RETURN_UNLESS_0( + Field_type_FixedSizeBinary_create(builder, schema_view->fixed_size)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LIST: + FLATCC_RETURN_UNLESS_0(Field_type_List_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_LARGE_LIST: + FLATCC_RETURN_UNLESS_0(Field_type_LargeList_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_FIXED_SIZE_LIST: + FLATCC_RETURN_UNLESS_0( + Field_type_FixedSizeList_create(builder, schema_view->fixed_size)); + return NANOARROW_OK; + + case NANOARROW_TYPE_RUN_END_ENCODED: + FLATCC_RETURN_UNLESS_0(Field_type_RunEndEncoded_create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_STRUCT: + FLATCC_RETURN_UNLESS_0(Field_type_Struct__create(builder)); + return NANOARROW_OK; + + case NANOARROW_TYPE_SPARSE_UNION: + case NANOARROW_TYPE_DENSE_UNION: { + FLATCC_RETURN_UNLESS_0(Field_type_Union_start(builder)); + + FLATCC_RETURN_UNLESS_0( + Union_mode_add(builder, schema_view->type == NANOARROW_TYPE_DENSE_UNION)); + if (schema_view->union_type_ids) { + int8_t type_ids[128]; + int n = _ArrowParseUnionTypeIds(schema_view->union_type_ids, type_ids); + if (n != 0) { + FLATCC_RETURN_UNLESS_0(Union_typeIds_start(builder)); + int32_t* type_ids_32 = (int32_t*)ns(Union_typeIds_extend(builder, n)); + if (!type_ids_32) return ENOMEM; Review Comment: I'll add braces -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
