http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/metadata-internal.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc deleted file mode 100644 index be0d282..0000000 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ /dev/null @@ -1,597 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/ipc/metadata-internal.h" - -#include <cstdint> -#include <cstring> -#include <memory> -#include <sstream> -#include <string> - -#include "flatbuffers/flatbuffers.h" - -#include "arrow/array.h" -#include "arrow/buffer.h" -#include "arrow/ipc/Message_generated.h" -#include "arrow/schema.h" -#include "arrow/status.h" -#include "arrow/type.h" - -namespace arrow { - -namespace flatbuf = org::apache::arrow::flatbuf; - -namespace ipc { - -static Status IntFromFlatbuffer( - const flatbuf::Int* int_data, std::shared_ptr<DataType>* out) { - if (int_data->bitWidth() > 64) { - return Status::NotImplemented("Integers with more than 64 bits not implemented"); - } - if (int_data->bitWidth() < 8) { - return Status::NotImplemented("Integers with less than 8 bits not implemented"); - } - - switch (int_data->bitWidth()) { - case 8: - *out = int_data->is_signed() ? int8() : uint8(); - break; - case 16: - *out = int_data->is_signed() ? int16() : uint16(); - break; - case 32: - *out = int_data->is_signed() ? int32() : uint32(); - break; - case 64: - *out = int_data->is_signed() ? int64() : uint64(); - break; - default: - return Status::NotImplemented("Integers not in cstdint are not implemented"); - } - return Status::OK(); -} - -static Status FloatFromFlatuffer( - const flatbuf::FloatingPoint* float_data, std::shared_ptr<DataType>* out) { - if (float_data->precision() == flatbuf::Precision_HALF) { - *out = float16(); - } else if (float_data->precision() == flatbuf::Precision_SINGLE) { - *out = float32(); - } else { - *out = float64(); - } - return Status::OK(); -} - -// Forward declaration -static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field, - DictionaryMemo* dictionary_memo, FieldOffset* offset); - -static Offset IntToFlatbuffer(FBB& fbb, int bitWidth, bool is_signed) { - return flatbuf::CreateInt(fbb, bitWidth, is_signed).Union(); -} - -static Offset FloatToFlatbuffer(FBB& fbb, flatbuf::Precision precision) { - return flatbuf::CreateFloatingPoint(fbb, precision).Union(); -} - -static Status AppendChildFields(FBB& fbb, const std::shared_ptr<DataType>& type, - std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo) { - FieldOffset field; - for (int i = 0; i < type->num_children(); ++i) { - RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), dictionary_memo, &field)); - out_children->push_back(field); - } - return Status::OK(); -} - -static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, - std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo, - Offset* offset) { - RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo)); - *offset = flatbuf::CreateList(fbb).Union(); - return Status::OK(); -} - -static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, - std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo, - Offset* offset) { - RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo)); - *offset = flatbuf::CreateStruct_(fbb).Union(); - return Status::OK(); -} - -// ---------------------------------------------------------------------- -// Union implementation - -static Status UnionFromFlatbuffer(const flatbuf::Union* union_data, - const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) { - UnionMode mode = union_data->mode() == flatbuf::UnionMode_Sparse ? UnionMode::SPARSE - : UnionMode::DENSE; - - std::vector<uint8_t> type_codes; - - const flatbuffers::Vector<int32_t>* fb_type_ids = union_data->typeIds(); - if (fb_type_ids == nullptr) { - for (uint8_t i = 0; i < children.size(); ++i) { - type_codes.push_back(i); - } - } else { - for (int32_t id : (*fb_type_ids)) { - // TODO(wesm): can these values exceed 255? - type_codes.push_back(static_cast<uint8_t>(id)); - } - } - - *out = union_(children, type_codes, mode); - return Status::OK(); -} - -static Status UnionToFlatBuffer(FBB& fbb, const std::shared_ptr<DataType>& type, - std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo, - Offset* offset) { - RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo)); - - const auto& union_type = static_cast<const UnionType&>(*type); - - flatbuf::UnionMode mode = union_type.mode == UnionMode::SPARSE - ? flatbuf::UnionMode_Sparse - : flatbuf::UnionMode_Dense; - - std::vector<int32_t> type_ids; - type_ids.reserve(union_type.type_codes.size()); - for (uint8_t code : union_type.type_codes) { - type_ids.push_back(code); - } - - auto fb_type_ids = fbb.CreateVector(type_ids); - - *offset = flatbuf::CreateUnion(fbb, mode, fb_type_ids).Union(); - return Status::OK(); -} - -#define INT_TO_FB_CASE(BIT_WIDTH, IS_SIGNED) \ - *out_type = flatbuf::Type_Int; \ - *offset = IntToFlatbuffer(fbb, BIT_WIDTH, IS_SIGNED); \ - break; - -static inline flatbuf::TimeUnit ToFlatbufferUnit(TimeUnit unit) { - switch (unit) { - case TimeUnit::SECOND: - return flatbuf::TimeUnit_SECOND; - case TimeUnit::MILLI: - return flatbuf::TimeUnit_MILLISECOND; - case TimeUnit::MICRO: - return flatbuf::TimeUnit_MICROSECOND; - case TimeUnit::NANO: - return flatbuf::TimeUnit_NANOSECOND; - default: - break; - } - return flatbuf::TimeUnit_MIN; -} - -static inline TimeUnit FromFlatbufferUnit(flatbuf::TimeUnit unit) { - switch (unit) { - case flatbuf::TimeUnit_SECOND: - return TimeUnit::SECOND; - case flatbuf::TimeUnit_MILLISECOND: - return TimeUnit::MILLI; - case flatbuf::TimeUnit_MICROSECOND: - return TimeUnit::MICRO; - case flatbuf::TimeUnit_NANOSECOND: - return TimeUnit::NANO; - default: - break; - } - // cannot reach - return TimeUnit::SECOND; -} - -static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data, - const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) { - switch (type) { - case flatbuf::Type_NONE: - return Status::Invalid("Type metadata cannot be none"); - case flatbuf::Type_Int: - return IntFromFlatbuffer(static_cast<const flatbuf::Int*>(type_data), out); - case flatbuf::Type_FloatingPoint: - return FloatFromFlatuffer( - static_cast<const flatbuf::FloatingPoint*>(type_data), out); - case flatbuf::Type_Binary: - *out = binary(); - return Status::OK(); - case flatbuf::Type_FixedWidthBinary: { - auto fw_binary = static_cast<const flatbuf::FixedWidthBinary*>(type_data); - *out = fixed_width_binary(fw_binary->byteWidth()); - return Status::OK(); - } - case flatbuf::Type_Utf8: - *out = utf8(); - return Status::OK(); - case flatbuf::Type_Bool: - *out = boolean(); - return Status::OK(); - case flatbuf::Type_Decimal: - return Status::NotImplemented("Decimal"); - case flatbuf::Type_Date: - *out = date(); - return Status::OK(); - case flatbuf::Type_Time: { - auto time_type = static_cast<const flatbuf::Time*>(type_data); - *out = time(FromFlatbufferUnit(time_type->unit())); - return Status::OK(); - } - case flatbuf::Type_Timestamp: { - auto ts_type = static_cast<const flatbuf::Timestamp*>(type_data); - *out = timestamp(FromFlatbufferUnit(ts_type->unit())); - return Status::OK(); - } - case flatbuf::Type_Interval: - return Status::NotImplemented("Interval"); - case flatbuf::Type_List: - if (children.size() != 1) { - return Status::Invalid("List must have exactly 1 child field"); - } - *out = std::make_shared<ListType>(children[0]); - return Status::OK(); - case flatbuf::Type_Struct_: - *out = std::make_shared<StructType>(children); - return Status::OK(); - case flatbuf::Type_Union: - return UnionFromFlatbuffer( - static_cast<const flatbuf::Union*>(type_data), children, out); - default: - return Status::Invalid("Unrecognized type"); - } -} - -// TODO(wesm): Convert this to visitor pattern -static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, - std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* layout, - flatbuf::Type* out_type, DictionaryMemo* dictionary_memo, Offset* offset) { - if (type->type == Type::DICTIONARY) { - // In this library, the dictionary "type" is a logical construct. Here we - // pass through to the value type, as we've already captured the index - // type in the DictionaryEncoding metadata in the parent field - const auto& dict_type = static_cast<const DictionaryType&>(*type); - return TypeToFlatbuffer(fbb, dict_type.dictionary()->type(), children, layout, - out_type, dictionary_memo, offset); - } - - std::vector<BufferDescr> buffer_layout = type->GetBufferLayout(); - for (const BufferDescr& descr : buffer_layout) { - flatbuf::VectorType vector_type; - switch (descr.type()) { - case BufferType::OFFSET: - vector_type = flatbuf::VectorType_OFFSET; - break; - case BufferType::DATA: - vector_type = flatbuf::VectorType_DATA; - break; - case BufferType::VALIDITY: - vector_type = flatbuf::VectorType_VALIDITY; - break; - case BufferType::TYPE: - vector_type = flatbuf::VectorType_TYPE; - break; - default: - vector_type = flatbuf::VectorType_DATA; - break; - } - auto offset = flatbuf::CreateVectorLayout( - fbb, static_cast<int16_t>(descr.bit_width()), vector_type); - layout->push_back(offset); - } - - switch (type->type) { - case Type::BOOL: - *out_type = flatbuf::Type_Bool; - *offset = flatbuf::CreateBool(fbb).Union(); - break; - case Type::UINT8: - INT_TO_FB_CASE(8, false); - case Type::INT8: - INT_TO_FB_CASE(8, true); - case Type::UINT16: - INT_TO_FB_CASE(16, false); - case Type::INT16: - INT_TO_FB_CASE(16, true); - case Type::UINT32: - INT_TO_FB_CASE(32, false); - case Type::INT32: - INT_TO_FB_CASE(32, true); - case Type::UINT64: - INT_TO_FB_CASE(64, false); - case Type::INT64: - INT_TO_FB_CASE(64, true); - case Type::FLOAT: - *out_type = flatbuf::Type_FloatingPoint; - *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_SINGLE); - break; - case Type::DOUBLE: - *out_type = flatbuf::Type_FloatingPoint; - *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_DOUBLE); - break; - case Type::FIXED_WIDTH_BINARY: { - const auto& fw_type = static_cast<const FixedWidthBinaryType&>(*type); - *out_type = flatbuf::Type_FixedWidthBinary; - *offset = flatbuf::CreateFixedWidthBinary(fbb, fw_type.byte_width()).Union(); - } break; - case Type::BINARY: - *out_type = flatbuf::Type_Binary; - *offset = flatbuf::CreateBinary(fbb).Union(); - break; - case Type::STRING: - *out_type = flatbuf::Type_Utf8; - *offset = flatbuf::CreateUtf8(fbb).Union(); - break; - case Type::DATE: - *out_type = flatbuf::Type_Date; - *offset = flatbuf::CreateDate(fbb).Union(); - break; - case Type::TIME: { - const auto& time_type = static_cast<const TimeType&>(*type); - *out_type = flatbuf::Type_Time; - *offset = flatbuf::CreateTime(fbb, ToFlatbufferUnit(time_type.unit)).Union(); - } break; - case Type::TIMESTAMP: { - const auto& ts_type = static_cast<const TimestampType&>(*type); - *out_type = flatbuf::Type_Timestamp; - *offset = flatbuf::CreateTimestamp(fbb, ToFlatbufferUnit(ts_type.unit)).Union(); - } break; - case Type::LIST: - *out_type = flatbuf::Type_List; - return ListToFlatbuffer(fbb, type, children, dictionary_memo, offset); - case Type::STRUCT: - *out_type = flatbuf::Type_Struct_; - return StructToFlatbuffer(fbb, type, children, dictionary_memo, offset); - case Type::UNION: - *out_type = flatbuf::Type_Union; - return UnionToFlatBuffer(fbb, type, children, dictionary_memo, offset); - default: - *out_type = flatbuf::Type_NONE; // Make clang-tidy happy - std::stringstream ss; - ss << "Unable to convert type: " << type->ToString() << std::endl; - return Status::NotImplemented(ss.str()); - } - return Status::OK(); -} - -using DictionaryOffset = flatbuffers::Offset<flatbuf::DictionaryEncoding>; - -static DictionaryOffset GetDictionaryEncoding( - FBB& fbb, const DictionaryType& type, DictionaryMemo* memo) { - int64_t dictionary_id = memo->GetId(type.dictionary()); - - // We assume that the dictionary index type (as an integer) has already been - // validated elsewhere, and can safely assume we are dealing with signed - // integers - const auto& fw_index_type = static_cast<const FixedWidthType&>(*type.index_type()); - - auto index_type_offset = flatbuf::CreateInt(fbb, fw_index_type.bit_width(), true); - - // TODO(wesm): ordered dictionaries - return flatbuf::CreateDictionaryEncoding(fbb, dictionary_id, index_type_offset); -} - -static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field, - DictionaryMemo* dictionary_memo, FieldOffset* offset) { - auto fb_name = fbb.CreateString(field->name); - - flatbuf::Type type_enum; - Offset type_offset; - Offset type_layout; - std::vector<FieldOffset> children; - std::vector<VectorLayoutOffset> layout; - - RETURN_NOT_OK(TypeToFlatbuffer( - fbb, field->type, &children, &layout, &type_enum, dictionary_memo, &type_offset)); - auto fb_children = fbb.CreateVector(children); - auto fb_layout = fbb.CreateVector(layout); - - DictionaryOffset dictionary = 0; - if (field->type->type == Type::DICTIONARY) { - dictionary = GetDictionaryEncoding( - fbb, static_cast<const DictionaryType&>(*field->type), dictionary_memo); - } - - // TODO: produce the list of VectorTypes - *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_offset, - dictionary, fb_children, fb_layout); - - return Status::OK(); -} - -Status FieldFromFlatbufferDictionary( - const flatbuf::Field* field, std::shared_ptr<Field>* out) { - // Need an empty memo to pass down for constructing children - DictionaryMemo dummy_memo; - - // Any DictionaryEncoding set is ignored here - - std::shared_ptr<DataType> type; - auto children = field->children(); - std::vector<std::shared_ptr<Field>> child_fields(children->size()); - for (int i = 0; i < static_cast<int>(children->size()); ++i) { - RETURN_NOT_OK(FieldFromFlatbuffer(children->Get(i), dummy_memo, &child_fields[i])); - } - - RETURN_NOT_OK( - TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type)); - - *out = std::make_shared<Field>(field->name()->str(), type, field->nullable()); - return Status::OK(); -} - -Status FieldFromFlatbuffer(const flatbuf::Field* field, - const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out) { - std::shared_ptr<DataType> type; - - const flatbuf::DictionaryEncoding* encoding = field->dictionary(); - - if (encoding == nullptr) { - // The field is not dictionary encoded. We must potentially visit its - // children to fully reconstruct the data type - auto children = field->children(); - std::vector<std::shared_ptr<Field>> child_fields(children->size()); - for (int i = 0; i < static_cast<int>(children->size()); ++i) { - RETURN_NOT_OK( - FieldFromFlatbuffer(children->Get(i), dictionary_memo, &child_fields[i])); - } - RETURN_NOT_OK( - TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type)); - } else { - // The field is dictionary encoded. The type of the dictionary values has - // been determined elsewhere, and is stored in the DictionaryMemo. Here we - // construct the logical DictionaryType object - - std::shared_ptr<Array> dictionary; - RETURN_NOT_OK(dictionary_memo.GetDictionary(encoding->id(), &dictionary)); - - std::shared_ptr<DataType> index_type; - RETURN_NOT_OK(IntFromFlatbuffer(encoding->indexType(), &index_type)); - type = std::make_shared<DictionaryType>(index_type, dictionary); - } - *out = std::make_shared<Field>(field->name()->str(), type, field->nullable()); - return Status::OK(); -} - -// Implement MessageBuilder - -// will return the endianness of the system we are running on -// based the NUMPY_API function. See NOTICE.txt -flatbuf::Endianness endianness() { - union { - uint32_t i; - char c[4]; - } bint = {0x01020304}; - - return bint.c[0] == 1 ? flatbuf::Endianness_Big : flatbuf::Endianness_Little; -} - -Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, DictionaryMemo* dictionary_memo, - flatbuffers::Offset<flatbuf::Schema>* out) { - std::vector<FieldOffset> field_offsets; - for (int i = 0; i < schema.num_fields(); ++i) { - std::shared_ptr<Field> field = schema.field(i); - FieldOffset offset; - RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, dictionary_memo, &offset)); - field_offsets.push_back(offset); - } - - *out = flatbuf::CreateSchema(fbb, endianness(), fbb.CreateVector(field_offsets)); - return Status::OK(); -} - -class MessageBuilder { - public: - Status SetSchema(const Schema& schema, DictionaryMemo* dictionary_memo) { - flatbuffers::Offset<flatbuf::Schema> fb_schema; - RETURN_NOT_OK(SchemaToFlatbuffer(fbb_, schema, dictionary_memo, &fb_schema)); - - header_type_ = flatbuf::MessageHeader_Schema; - header_ = fb_schema.Union(); - body_length_ = 0; - return Status::OK(); - } - - Status SetRecordBatch(int32_t length, int64_t body_length, - const std::vector<flatbuf::FieldNode>& nodes, - const std::vector<flatbuf::Buffer>& buffers) { - header_type_ = flatbuf::MessageHeader_RecordBatch; - header_ = flatbuf::CreateRecordBatch(fbb_, length, fbb_.CreateVectorOfStructs(nodes), - fbb_.CreateVectorOfStructs(buffers)) - .Union(); - body_length_ = body_length; - - return Status::OK(); - } - - Status SetDictionary(int64_t id, int32_t length, int64_t body_length, - const std::vector<flatbuf::FieldNode>& nodes, - const std::vector<flatbuf::Buffer>& buffers) { - header_type_ = flatbuf::MessageHeader_DictionaryBatch; - - auto record_batch = flatbuf::CreateRecordBatch(fbb_, length, - fbb_.CreateVectorOfStructs(nodes), fbb_.CreateVectorOfStructs(buffers)); - - header_ = flatbuf::CreateDictionaryBatch(fbb_, id, record_batch).Union(); - body_length_ = body_length; - return Status::OK(); - } - - Status Finish(); - - Status GetBuffer(std::shared_ptr<Buffer>* out); - - private: - flatbuf::MessageHeader header_type_; - flatbuffers::Offset<void> header_; - int64_t body_length_; - flatbuffers::FlatBufferBuilder fbb_; -}; - -Status WriteSchemaMessage( - const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out) { - MessageBuilder message; - RETURN_NOT_OK(message.SetSchema(schema, dictionary_memo)); - RETURN_NOT_OK(message.Finish()); - return message.GetBuffer(out); -} - -Status WriteRecordBatchMessage(int32_t length, int64_t body_length, - const std::vector<flatbuf::FieldNode>& nodes, - const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) { - MessageBuilder builder; - RETURN_NOT_OK(builder.SetRecordBatch(length, body_length, nodes, buffers)); - RETURN_NOT_OK(builder.Finish()); - return builder.GetBuffer(out); -} - -Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length, - const std::vector<flatbuf::FieldNode>& nodes, - const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) { - MessageBuilder builder; - RETURN_NOT_OK(builder.SetDictionary(id, length, body_length, nodes, buffers)); - RETURN_NOT_OK(builder.Finish()); - return builder.GetBuffer(out); -} - -Status MessageBuilder::Finish() { - auto message = - flatbuf::CreateMessage(fbb_, kMetadataVersion, header_type_, header_, body_length_); - fbb_.Finish(message); - return Status::OK(); -} - -Status MessageBuilder::GetBuffer(std::shared_ptr<Buffer>* out) { - int32_t size = fbb_.GetSize(); - - auto result = std::make_shared<PoolBuffer>(); - RETURN_NOT_OK(result->Resize(size)); - - uint8_t* dst = result->mutable_data(); - memcpy(dst, fbb_.GetBufferPointer(), size); - - *out = result; - return Status::OK(); -} - -} // namespace ipc -} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/metadata-internal.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h deleted file mode 100644 index 59afecb..0000000 --- a/cpp/src/arrow/ipc/metadata-internal.h +++ /dev/null @@ -1,83 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef ARROW_IPC_METADATA_INTERNAL_H -#define ARROW_IPC_METADATA_INTERNAL_H - -#include <cstdint> -#include <memory> -#include <vector> - -#include "flatbuffers/flatbuffers.h" - -#include "arrow/ipc/File_generated.h" -#include "arrow/ipc/Message_generated.h" -#include "arrow/ipc/metadata.h" - -namespace arrow { - -namespace flatbuf = org::apache::arrow::flatbuf; - -class Buffer; -struct Field; -class Schema; -class Status; - -namespace ipc { - -using FBB = flatbuffers::FlatBufferBuilder; -using FieldOffset = flatbuffers::Offset<arrow::flatbuf::Field>; -using VectorLayoutOffset = flatbuffers::Offset<arrow::flatbuf::VectorLayout>; -using Offset = flatbuffers::Offset<void>; - -static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVersion_V2; - -// Construct a field with type for a dictionary-encoded field. None of its -// children or children's descendents can be dictionary encoded -Status FieldFromFlatbufferDictionary( - const flatbuf::Field* field, std::shared_ptr<Field>* out); - -// Construct a field for a non-dictionary-encoded field. Its children may be -// dictionary encoded -Status FieldFromFlatbuffer(const flatbuf::Field* field, - const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out); - -Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, DictionaryMemo* dictionary_memo, - flatbuffers::Offset<flatbuf::Schema>* out); - -// Serialize arrow::Schema as a Flatbuffer -// -// \param[in] schema a Schema instance -// \param[inout] dictionary_memo class for tracking dictionaries and assigning -// dictionary ids -// \param[out] out the serialized arrow::Buffer -// \return Status outcome -Status WriteSchemaMessage( - const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out); - -Status WriteRecordBatchMessage(int32_t length, int64_t body_length, - const std::vector<flatbuf::FieldNode>& nodes, - const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out); - -Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length, - const std::vector<flatbuf::FieldNode>& nodes, - const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out); - -} // namespace ipc -} // namespace arrow - -#endif // ARROW_IPC_METADATA_INTERNAL_H http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/metadata.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index 71bc5c9..a418d48 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -24,14 +24,14 @@ #include "flatbuffers/flatbuffers.h" +#include "arrow/array.h" +#include "arrow/buffer.h" #include "arrow/io/interfaces.h" #include "arrow/ipc/File_generated.h" #include "arrow/ipc/Message_generated.h" -#include "arrow/ipc/metadata-internal.h" - -#include "arrow/buffer.h" #include "arrow/schema.h" #include "arrow/status.h" +#include "arrow/type.h" namespace arrow { @@ -39,6 +39,643 @@ namespace flatbuf = org::apache::arrow::flatbuf; namespace ipc { +using FBB = flatbuffers::FlatBufferBuilder; +using DictionaryOffset = flatbuffers::Offset<flatbuf::DictionaryEncoding>; +using FieldOffset = flatbuffers::Offset<flatbuf::Field>; +using LargeRecordBatchOffset = flatbuffers::Offset<flatbuf::LargeRecordBatch>; +using RecordBatchOffset = flatbuffers::Offset<flatbuf::RecordBatch>; +using VectorLayoutOffset = flatbuffers::Offset<arrow::flatbuf::VectorLayout>; +using Offset = flatbuffers::Offset<void>; + +static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVersion_V2; + +static Status IntFromFlatbuffer( + const flatbuf::Int* int_data, std::shared_ptr<DataType>* out) { + if (int_data->bitWidth() > 64) { + return Status::NotImplemented("Integers with more than 64 bits not implemented"); + } + if (int_data->bitWidth() < 8) { + return Status::NotImplemented("Integers with less than 8 bits not implemented"); + } + + switch (int_data->bitWidth()) { + case 8: + *out = int_data->is_signed() ? int8() : uint8(); + break; + case 16: + *out = int_data->is_signed() ? int16() : uint16(); + break; + case 32: + *out = int_data->is_signed() ? int32() : uint32(); + break; + case 64: + *out = int_data->is_signed() ? int64() : uint64(); + break; + default: + return Status::NotImplemented("Integers not in cstdint are not implemented"); + } + return Status::OK(); +} + +static Status FloatFromFlatuffer( + const flatbuf::FloatingPoint* float_data, std::shared_ptr<DataType>* out) { + if (float_data->precision() == flatbuf::Precision_HALF) { + *out = float16(); + } else if (float_data->precision() == flatbuf::Precision_SINGLE) { + *out = float32(); + } else { + *out = float64(); + } + return Status::OK(); +} + +// Forward declaration +static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field, + DictionaryMemo* dictionary_memo, FieldOffset* offset); + +static Offset IntToFlatbuffer(FBB& fbb, int bitWidth, bool is_signed) { + return flatbuf::CreateInt(fbb, bitWidth, is_signed).Union(); +} + +static Offset FloatToFlatbuffer(FBB& fbb, flatbuf::Precision precision) { + return flatbuf::CreateFloatingPoint(fbb, precision).Union(); +} + +static Status AppendChildFields(FBB& fbb, const std::shared_ptr<DataType>& type, + std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo) { + FieldOffset field; + for (int i = 0; i < type->num_children(); ++i) { + RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), dictionary_memo, &field)); + out_children->push_back(field); + } + return Status::OK(); +} + +static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, + std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo, + Offset* offset) { + RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo)); + *offset = flatbuf::CreateList(fbb).Union(); + return Status::OK(); +} + +static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, + std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo, + Offset* offset) { + RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo)); + *offset = flatbuf::CreateStruct_(fbb).Union(); + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// Union implementation + +static Status UnionFromFlatbuffer(const flatbuf::Union* union_data, + const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) { + UnionMode mode = union_data->mode() == flatbuf::UnionMode_Sparse ? UnionMode::SPARSE + : UnionMode::DENSE; + + std::vector<uint8_t> type_codes; + + const flatbuffers::Vector<int32_t>* fb_type_ids = union_data->typeIds(); + if (fb_type_ids == nullptr) { + for (uint8_t i = 0; i < children.size(); ++i) { + type_codes.push_back(i); + } + } else { + for (int32_t id : (*fb_type_ids)) { + // TODO(wesm): can these values exceed 255? + type_codes.push_back(static_cast<uint8_t>(id)); + } + } + + *out = union_(children, type_codes, mode); + return Status::OK(); +} + +static Status UnionToFlatBuffer(FBB& fbb, const std::shared_ptr<DataType>& type, + std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo, + Offset* offset) { + RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo)); + + const auto& union_type = static_cast<const UnionType&>(*type); + + flatbuf::UnionMode mode = union_type.mode == UnionMode::SPARSE + ? flatbuf::UnionMode_Sparse + : flatbuf::UnionMode_Dense; + + std::vector<int32_t> type_ids; + type_ids.reserve(union_type.type_codes.size()); + for (uint8_t code : union_type.type_codes) { + type_ids.push_back(code); + } + + auto fb_type_ids = fbb.CreateVector(type_ids); + + *offset = flatbuf::CreateUnion(fbb, mode, fb_type_ids).Union(); + return Status::OK(); +} + +#define INT_TO_FB_CASE(BIT_WIDTH, IS_SIGNED) \ + *out_type = flatbuf::Type_Int; \ + *offset = IntToFlatbuffer(fbb, BIT_WIDTH, IS_SIGNED); \ + break; + +static inline flatbuf::TimeUnit ToFlatbufferUnit(TimeUnit unit) { + switch (unit) { + case TimeUnit::SECOND: + return flatbuf::TimeUnit_SECOND; + case TimeUnit::MILLI: + return flatbuf::TimeUnit_MILLISECOND; + case TimeUnit::MICRO: + return flatbuf::TimeUnit_MICROSECOND; + case TimeUnit::NANO: + return flatbuf::TimeUnit_NANOSECOND; + default: + break; + } + return flatbuf::TimeUnit_MIN; +} + +static inline TimeUnit FromFlatbufferUnit(flatbuf::TimeUnit unit) { + switch (unit) { + case flatbuf::TimeUnit_SECOND: + return TimeUnit::SECOND; + case flatbuf::TimeUnit_MILLISECOND: + return TimeUnit::MILLI; + case flatbuf::TimeUnit_MICROSECOND: + return TimeUnit::MICRO; + case flatbuf::TimeUnit_NANOSECOND: + return TimeUnit::NANO; + default: + break; + } + // cannot reach + return TimeUnit::SECOND; +} + +static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data, + const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) { + switch (type) { + case flatbuf::Type_NONE: + return Status::Invalid("Type metadata cannot be none"); + case flatbuf::Type_Int: + return IntFromFlatbuffer(static_cast<const flatbuf::Int*>(type_data), out); + case flatbuf::Type_FloatingPoint: + return FloatFromFlatuffer( + static_cast<const flatbuf::FloatingPoint*>(type_data), out); + case flatbuf::Type_Binary: + *out = binary(); + return Status::OK(); + case flatbuf::Type_FixedWidthBinary: { + auto fw_binary = static_cast<const flatbuf::FixedWidthBinary*>(type_data); + *out = fixed_width_binary(fw_binary->byteWidth()); + return Status::OK(); + } + case flatbuf::Type_Utf8: + *out = utf8(); + return Status::OK(); + case flatbuf::Type_Bool: + *out = boolean(); + return Status::OK(); + case flatbuf::Type_Decimal: + return Status::NotImplemented("Decimal"); + case flatbuf::Type_Date: + *out = date(); + return Status::OK(); + case flatbuf::Type_Time: { + auto time_type = static_cast<const flatbuf::Time*>(type_data); + *out = time(FromFlatbufferUnit(time_type->unit())); + return Status::OK(); + } + case flatbuf::Type_Timestamp: { + auto ts_type = static_cast<const flatbuf::Timestamp*>(type_data); + *out = timestamp(FromFlatbufferUnit(ts_type->unit())); + return Status::OK(); + } + case flatbuf::Type_Interval: + return Status::NotImplemented("Interval"); + case flatbuf::Type_List: + if (children.size() != 1) { + return Status::Invalid("List must have exactly 1 child field"); + } + *out = std::make_shared<ListType>(children[0]); + return Status::OK(); + case flatbuf::Type_Struct_: + *out = std::make_shared<StructType>(children); + return Status::OK(); + case flatbuf::Type_Union: + return UnionFromFlatbuffer( + static_cast<const flatbuf::Union*>(type_data), children, out); + default: + return Status::Invalid("Unrecognized type"); + } +} + +// TODO(wesm): Convert this to visitor pattern +static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type, + std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* layout, + flatbuf::Type* out_type, DictionaryMemo* dictionary_memo, Offset* offset) { + if (type->type == Type::DICTIONARY) { + // In this library, the dictionary "type" is a logical construct. Here we + // pass through to the value type, as we've already captured the index + // type in the DictionaryEncoding metadata in the parent field + const auto& dict_type = static_cast<const DictionaryType&>(*type); + return TypeToFlatbuffer(fbb, dict_type.dictionary()->type(), children, layout, + out_type, dictionary_memo, offset); + } + + std::vector<BufferDescr> buffer_layout = type->GetBufferLayout(); + for (const BufferDescr& descr : buffer_layout) { + flatbuf::VectorType vector_type; + switch (descr.type()) { + case BufferType::OFFSET: + vector_type = flatbuf::VectorType_OFFSET; + break; + case BufferType::DATA: + vector_type = flatbuf::VectorType_DATA; + break; + case BufferType::VALIDITY: + vector_type = flatbuf::VectorType_VALIDITY; + break; + case BufferType::TYPE: + vector_type = flatbuf::VectorType_TYPE; + break; + default: + vector_type = flatbuf::VectorType_DATA; + break; + } + auto offset = flatbuf::CreateVectorLayout( + fbb, static_cast<int16_t>(descr.bit_width()), vector_type); + layout->push_back(offset); + } + + switch (type->type) { + case Type::BOOL: + *out_type = flatbuf::Type_Bool; + *offset = flatbuf::CreateBool(fbb).Union(); + break; + case Type::UINT8: + INT_TO_FB_CASE(8, false); + case Type::INT8: + INT_TO_FB_CASE(8, true); + case Type::UINT16: + INT_TO_FB_CASE(16, false); + case Type::INT16: + INT_TO_FB_CASE(16, true); + case Type::UINT32: + INT_TO_FB_CASE(32, false); + case Type::INT32: + INT_TO_FB_CASE(32, true); + case Type::UINT64: + INT_TO_FB_CASE(64, false); + case Type::INT64: + INT_TO_FB_CASE(64, true); + case Type::FLOAT: + *out_type = flatbuf::Type_FloatingPoint; + *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_SINGLE); + break; + case Type::DOUBLE: + *out_type = flatbuf::Type_FloatingPoint; + *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_DOUBLE); + break; + case Type::FIXED_WIDTH_BINARY: { + const auto& fw_type = static_cast<const FixedWidthBinaryType&>(*type); + *out_type = flatbuf::Type_FixedWidthBinary; + *offset = flatbuf::CreateFixedWidthBinary(fbb, fw_type.byte_width()).Union(); + } break; + case Type::BINARY: + *out_type = flatbuf::Type_Binary; + *offset = flatbuf::CreateBinary(fbb).Union(); + break; + case Type::STRING: + *out_type = flatbuf::Type_Utf8; + *offset = flatbuf::CreateUtf8(fbb).Union(); + break; + case Type::DATE: + *out_type = flatbuf::Type_Date; + *offset = flatbuf::CreateDate(fbb).Union(); + break; + case Type::TIME: { + const auto& time_type = static_cast<const TimeType&>(*type); + *out_type = flatbuf::Type_Time; + *offset = flatbuf::CreateTime(fbb, ToFlatbufferUnit(time_type.unit)).Union(); + } break; + case Type::TIMESTAMP: { + const auto& ts_type = static_cast<const TimestampType&>(*type); + *out_type = flatbuf::Type_Timestamp; + *offset = flatbuf::CreateTimestamp(fbb, ToFlatbufferUnit(ts_type.unit)).Union(); + } break; + case Type::LIST: + *out_type = flatbuf::Type_List; + return ListToFlatbuffer(fbb, type, children, dictionary_memo, offset); + case Type::STRUCT: + *out_type = flatbuf::Type_Struct_; + return StructToFlatbuffer(fbb, type, children, dictionary_memo, offset); + case Type::UNION: + *out_type = flatbuf::Type_Union; + return UnionToFlatBuffer(fbb, type, children, dictionary_memo, offset); + default: + *out_type = flatbuf::Type_NONE; // Make clang-tidy happy + std::stringstream ss; + ss << "Unable to convert type: " << type->ToString() << std::endl; + return Status::NotImplemented(ss.str()); + } + return Status::OK(); +} + +static DictionaryOffset GetDictionaryEncoding( + FBB& fbb, const DictionaryType& type, DictionaryMemo* memo) { + int64_t dictionary_id = memo->GetId(type.dictionary()); + + // We assume that the dictionary index type (as an integer) has already been + // validated elsewhere, and can safely assume we are dealing with signed + // integers + const auto& fw_index_type = static_cast<const FixedWidthType&>(*type.index_type()); + + auto index_type_offset = flatbuf::CreateInt(fbb, fw_index_type.bit_width(), true); + + // TODO(wesm): ordered dictionaries + return flatbuf::CreateDictionaryEncoding(fbb, dictionary_id, index_type_offset); +} + +static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field, + DictionaryMemo* dictionary_memo, FieldOffset* offset) { + auto fb_name = fbb.CreateString(field->name); + + flatbuf::Type type_enum; + Offset type_offset; + Offset type_layout; + std::vector<FieldOffset> children; + std::vector<VectorLayoutOffset> layout; + + RETURN_NOT_OK(TypeToFlatbuffer( + fbb, field->type, &children, &layout, &type_enum, dictionary_memo, &type_offset)); + auto fb_children = fbb.CreateVector(children); + auto fb_layout = fbb.CreateVector(layout); + + DictionaryOffset dictionary = 0; + if (field->type->type == Type::DICTIONARY) { + dictionary = GetDictionaryEncoding( + fbb, static_cast<const DictionaryType&>(*field->type), dictionary_memo); + } + + // TODO: produce the list of VectorTypes + *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_offset, + dictionary, fb_children, fb_layout); + + return Status::OK(); +} + +static Status FieldFromFlatbuffer(const flatbuf::Field* field, + const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out) { + std::shared_ptr<DataType> type; + + const flatbuf::DictionaryEncoding* encoding = field->dictionary(); + + if (encoding == nullptr) { + // The field is not dictionary encoded. We must potentially visit its + // children to fully reconstruct the data type + auto children = field->children(); + std::vector<std::shared_ptr<Field>> child_fields(children->size()); + for (int i = 0; i < static_cast<int>(children->size()); ++i) { + RETURN_NOT_OK( + FieldFromFlatbuffer(children->Get(i), dictionary_memo, &child_fields[i])); + } + RETURN_NOT_OK( + TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type)); + } else { + // The field is dictionary encoded. The type of the dictionary values has + // been determined elsewhere, and is stored in the DictionaryMemo. Here we + // construct the logical DictionaryType object + + std::shared_ptr<Array> dictionary; + RETURN_NOT_OK(dictionary_memo.GetDictionary(encoding->id(), &dictionary)); + + std::shared_ptr<DataType> index_type; + RETURN_NOT_OK(IntFromFlatbuffer(encoding->indexType(), &index_type)); + type = std::make_shared<DictionaryType>(index_type, dictionary); + } + *out = std::make_shared<Field>(field->name()->str(), type, field->nullable()); + return Status::OK(); +} + +static Status FieldFromFlatbufferDictionary( + const flatbuf::Field* field, std::shared_ptr<Field>* out) { + // Need an empty memo to pass down for constructing children + DictionaryMemo dummy_memo; + + // Any DictionaryEncoding set is ignored here + + std::shared_ptr<DataType> type; + auto children = field->children(); + std::vector<std::shared_ptr<Field>> child_fields(children->size()); + for (int i = 0; i < static_cast<int>(children->size()); ++i) { + RETURN_NOT_OK(FieldFromFlatbuffer(children->Get(i), dummy_memo, &child_fields[i])); + } + + RETURN_NOT_OK( + TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type)); + + *out = std::make_shared<Field>(field->name()->str(), type, field->nullable()); + return Status::OK(); +} + +// will return the endianness of the system we are running on +// based the NUMPY_API function. See NOTICE.txt +flatbuf::Endianness endianness() { + union { + uint32_t i; + char c[4]; + } bint = {0x01020304}; + + return bint.c[0] == 1 ? flatbuf::Endianness_Big : flatbuf::Endianness_Little; +} + +static Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, + DictionaryMemo* dictionary_memo, flatbuffers::Offset<flatbuf::Schema>* out) { + std::vector<FieldOffset> field_offsets; + for (int i = 0; i < schema.num_fields(); ++i) { + std::shared_ptr<Field> field = schema.field(i); + FieldOffset offset; + RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, dictionary_memo, &offset)); + field_offsets.push_back(offset); + } + + *out = flatbuf::CreateSchema(fbb, endianness(), fbb.CreateVector(field_offsets)); + return Status::OK(); +} + +static Status WriteFlatbufferBuilder(FBB& fbb, std::shared_ptr<Buffer>* out) { + int32_t size = fbb.GetSize(); + + auto result = std::make_shared<PoolBuffer>(); + RETURN_NOT_OK(result->Resize(size)); + + uint8_t* dst = result->mutable_data(); + memcpy(dst, fbb.GetBufferPointer(), size); + *out = result; + return Status::OK(); +} + +static Status WriteMessage(FBB& fbb, flatbuf::MessageHeader header_type, + flatbuffers::Offset<void> header, int64_t body_length, std::shared_ptr<Buffer>* out) { + auto message = + flatbuf::CreateMessage(fbb, kMetadataVersion, header_type, header, body_length); + fbb.Finish(message); + return WriteFlatbufferBuilder(fbb, out); +} + +Status WriteSchemaMessage( + const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out) { + FBB fbb; + flatbuffers::Offset<flatbuf::Schema> fb_schema; + RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema)); + return WriteMessage(fbb, flatbuf::MessageHeader_Schema, fb_schema.Union(), 0, out); +} + +using FieldNodeVector = + flatbuffers::Offset<flatbuffers::Vector<const flatbuf::FieldNode*>>; +using LargeFieldNodeVector = + flatbuffers::Offset<flatbuffers::Vector<const flatbuf::LargeFieldNode*>>; +using BufferVector = flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Buffer*>>; + +static Status WriteFieldNodes( + FBB& fbb, const std::vector<FieldMetadata>& nodes, FieldNodeVector* out) { + std::vector<flatbuf::FieldNode> fb_nodes; + fb_nodes.reserve(nodes.size()); + + for (size_t i = 0; i < nodes.size(); ++i) { + const FieldMetadata& node = nodes[i]; + if (node.offset != 0) { + return Status::Invalid("Field metadata for IPC must have offset 0"); + } + fb_nodes.emplace_back( + static_cast<int32_t>(node.length), static_cast<int32_t>(node.null_count)); + } + *out = fbb.CreateVectorOfStructs(fb_nodes); + return Status::OK(); +} + +static Status WriteLargeFieldNodes( + FBB& fbb, const std::vector<FieldMetadata>& nodes, LargeFieldNodeVector* out) { + std::vector<flatbuf::LargeFieldNode> fb_nodes; + fb_nodes.reserve(nodes.size()); + + for (size_t i = 0; i < nodes.size(); ++i) { + const FieldMetadata& node = nodes[i]; + if (node.offset != 0) { + return Status::Invalid("Field metadata for IPC must have offset 0"); + } + fb_nodes.emplace_back(node.length, node.null_count); + } + *out = fbb.CreateVectorOfStructs(fb_nodes); + return Status::OK(); +} + +static Status WriteBuffers( + FBB& fbb, const std::vector<BufferMetadata>& buffers, BufferVector* out) { + std::vector<flatbuf::Buffer> fb_buffers; + fb_buffers.reserve(buffers.size()); + + for (size_t i = 0; i < buffers.size(); ++i) { + const BufferMetadata& buffer = buffers[i]; + fb_buffers.emplace_back(buffer.page, buffer.offset, buffer.length); + } + *out = fbb.CreateVectorOfStructs(fb_buffers); + return Status::OK(); +} + +static Status MakeRecordBatch(FBB& fbb, int32_t length, int64_t body_length, + const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, + RecordBatchOffset* offset) { + FieldNodeVector fb_nodes; + BufferVector fb_buffers; + + RETURN_NOT_OK(WriteFieldNodes(fbb, nodes, &fb_nodes)); + RETURN_NOT_OK(WriteBuffers(fbb, buffers, &fb_buffers)); + + *offset = flatbuf::CreateRecordBatch(fbb, length, fb_nodes, fb_buffers); + return Status::OK(); +} + +static Status MakeLargeRecordBatch(FBB& fbb, int64_t length, int64_t body_length, + const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, + LargeRecordBatchOffset* offset) { + LargeFieldNodeVector fb_nodes; + BufferVector fb_buffers; + + RETURN_NOT_OK(WriteLargeFieldNodes(fbb, nodes, &fb_nodes)); + RETURN_NOT_OK(WriteBuffers(fbb, buffers, &fb_buffers)); + + *offset = flatbuf::CreateLargeRecordBatch(fbb, length, fb_nodes, fb_buffers); + return Status::OK(); +} + +Status WriteRecordBatchMessage(int32_t length, int64_t body_length, + const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, + std::shared_ptr<Buffer>* out) { + FBB fbb; + RecordBatchOffset record_batch; + RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch)); + return WriteMessage( + fbb, flatbuf::MessageHeader_RecordBatch, record_batch.Union(), body_length, out); +} + +Status WriteLargeRecordBatchMessage(int64_t length, int64_t body_length, + const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, + std::shared_ptr<Buffer>* out) { + FBB fbb; + LargeRecordBatchOffset large_batch; + RETURN_NOT_OK( + MakeLargeRecordBatch(fbb, length, body_length, nodes, buffers, &large_batch)); + return WriteMessage(fbb, flatbuf::MessageHeader_LargeRecordBatch, large_batch.Union(), + body_length, out); +} + +Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length, + const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, + std::shared_ptr<Buffer>* out) { + FBB fbb; + RecordBatchOffset record_batch; + RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch)); + auto dictionary_batch = flatbuf::CreateDictionaryBatch(fbb, id, record_batch).Union(); + return WriteMessage( + fbb, flatbuf::MessageHeader_DictionaryBatch, dictionary_batch, body_length, out); +} + +static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>> +FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) { + std::vector<flatbuf::Block> fb_blocks; + + for (const FileBlock& block : blocks) { + fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length); + } + + return fbb.CreateVectorOfStructs(fb_blocks); +} + +Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries, + const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo, + io::OutputStream* out) { + FBB fbb; + + flatbuffers::Offset<flatbuf::Schema> fb_schema; + RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema)); + + auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries); + auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches); + + auto footer = flatbuf::CreateFooter( + fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches); + + fbb.Finish(footer); + + int32_t size = fbb.GetSize(); + + return out->Write(fbb.GetBufferPointer(), size); +} + // ---------------------------------------------------------------------- // Memoization data structure for handling shared dictionaries @@ -158,7 +795,18 @@ int64_t Message::body_length() const { // ---------------------------------------------------------------------- // SchemaMetadata -class SchemaMetadata::SchemaMetadataImpl { +class MessageHolder { + public: + void set_message(const std::shared_ptr<Message>& message) { message_ = message; } + void set_buffer(const std::shared_ptr<Buffer>& buffer) { buffer_ = buffer; } + + protected: + // Possible parents, owns the flatbuffer data + std::shared_ptr<Message> message_; + std::shared_ptr<Buffer> buffer_; +}; + +class SchemaMetadata::SchemaMetadataImpl : public MessageHolder { public: explicit SchemaMetadataImpl(const void* schema) : schema_(static_cast<const flatbuf::Schema*>(schema)) {} @@ -196,15 +844,19 @@ class SchemaMetadata::SchemaMetadataImpl { const flatbuf::Schema* schema_; }; -SchemaMetadata::SchemaMetadata( - const std::shared_ptr<Message>& message, const void* flatbuf) { - message_ = message; - impl_.reset(new SchemaMetadataImpl(flatbuf)); +SchemaMetadata::SchemaMetadata(const std::shared_ptr<Message>& message) + : SchemaMetadata(message->impl_->header()) { + impl_->set_message(message); } -SchemaMetadata::SchemaMetadata(const std::shared_ptr<Message>& message) { - message_ = message; - impl_.reset(new SchemaMetadataImpl(message->impl_->header())); +SchemaMetadata::SchemaMetadata(const void* header) { + impl_.reset(new SchemaMetadataImpl(header)); +} + +SchemaMetadata::SchemaMetadata(const std::shared_ptr<Buffer>& buffer, int64_t offset) + : SchemaMetadata(buffer->data() + offset) { + // Preserve ownership + impl_->set_buffer(buffer); } SchemaMetadata::~SchemaMetadata() {} @@ -231,7 +883,7 @@ Status SchemaMetadata::GetSchema( // ---------------------------------------------------------------------- // RecordBatchMetadata -class RecordBatchMetadata::RecordBatchMetadataImpl { +class RecordBatchMetadata::RecordBatchMetadataImpl : public MessageHolder { public: explicit RecordBatchMetadataImpl(const void* batch) : batch_(static_cast<const flatbuf::RecordBatch*>(batch)) { @@ -249,22 +901,14 @@ class RecordBatchMetadata::RecordBatchMetadataImpl { int num_fields() const { return batch_->nodes()->size(); } - void set_message(const std::shared_ptr<Message>& message) { message_ = message; } - - void set_buffer(const std::shared_ptr<Buffer>& buffer) { buffer_ = buffer; } - private: const flatbuf::RecordBatch* batch_; const flatbuffers::Vector<const flatbuf::FieldNode*>* nodes_; const flatbuffers::Vector<const flatbuf::Buffer*>* buffers_; - - // Possible parents, owns the flatbuffer data - std::shared_ptr<Message> message_; - std::shared_ptr<Buffer> buffer_; }; -RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Message>& message) { - impl_.reset(new RecordBatchMetadataImpl(message->impl_->header())); +RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Message>& message) + : RecordBatchMetadata(message->impl_->header()) { impl_->set_message(message); } @@ -358,8 +1002,8 @@ const RecordBatchMetadata& DictionaryBatchMetadata::record_batch() const { // ---------------------------------------------------------------------- // Conveniences -Status ReadMessage(int64_t offset, int32_t metadata_length, - io::RandomAccessFile* file, std::shared_ptr<Message>* message) { +Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file, + std::shared_ptr<Message>* message) { std::shared_ptr<Buffer> buffer; RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer)); http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/metadata.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index 4eb0186..41e6c5e 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -107,10 +107,9 @@ class Message; // Container for serialized Schema metadata contained in an IPC message class ARROW_EXPORT SchemaMetadata { public: + explicit SchemaMetadata(const void* header); explicit SchemaMetadata(const std::shared_ptr<Message>& message); - - // Accepts an opaque flatbuffer pointer - SchemaMetadata(const std::shared_ptr<Message>& message, const void* schema); + SchemaMetadata(const std::shared_ptr<Buffer>& message, int64_t offset); ~SchemaMetadata(); @@ -127,9 +126,6 @@ class ARROW_EXPORT SchemaMetadata { const DictionaryMemo& dictionary_memo, std::shared_ptr<Schema>* out) const; private: - // Parent, owns the flatbuffer data - std::shared_ptr<Message> message_; - class SchemaMetadataImpl; std::unique_ptr<SchemaMetadataImpl> impl_; @@ -145,8 +141,6 @@ struct ARROW_EXPORT BufferMetadata { // Container for serialized record batch metadata contained in an IPC message class ARROW_EXPORT RecordBatchMetadata { public: - // Instantiate from opaque pointer. Memory ownership must be preserved - // elsewhere (e.g. in a dictionary batch) explicit RecordBatchMetadata(const void* header); explicit RecordBatchMetadata(const std::shared_ptr<Message>& message); RecordBatchMetadata(const std::shared_ptr<Buffer>& message, int64_t offset); @@ -218,8 +212,34 @@ class ARROW_EXPORT Message { /// \param[in] file the seekable file interface to read from /// \param[out] message the message read /// \return Status success or failure -Status ReadMessage(int64_t offset, int32_t metadata_length, - io::RandomAccessFile* file, std::shared_ptr<Message>* message); +Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file, + std::shared_ptr<Message>* message); + +// Serialize arrow::Schema as a Flatbuffer +// +// \param[in] schema a Schema instance +// \param[inout] dictionary_memo class for tracking dictionaries and assigning +// dictionary ids +// \param[out] out the serialized arrow::Buffer +// \return Status outcome +Status WriteSchemaMessage( + const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out); + +Status WriteRecordBatchMessage(int32_t length, int64_t body_length, + const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, + std::shared_ptr<Buffer>* out); + +Status WriteLargeRecordBatchMessage(int64_t length, int64_t body_length, + const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, + std::shared_ptr<Buffer>* out); + +Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length, + const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers, + std::shared_ptr<Buffer>* out); + +Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries, + const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo, + io::OutputStream* out); } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/reader.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 9575364..a2b20a9 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -26,17 +26,115 @@ #include "arrow/buffer.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" -#include "arrow/ipc/adapter.h" -#include "arrow/ipc/metadata-internal.h" +#include "arrow/ipc/File_generated.h" +#include "arrow/ipc/Message_generated.h" #include "arrow/ipc/metadata.h" #include "arrow/ipc/util.h" +#include "arrow/schema.h" #include "arrow/status.h" +#include "arrow/table.h" #include "arrow/util/logging.h" namespace arrow { + +namespace flatbuf = org::apache::arrow::flatbuf; + namespace ipc { // ---------------------------------------------------------------------- +// Record batch read path + +class IpcComponentSource : public ArrayComponentSource { + public: + IpcComponentSource(const RecordBatchMetadata& metadata, io::RandomAccessFile* file) + : metadata_(metadata), file_(file) {} + + Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) override { + BufferMetadata buffer_meta = metadata_.buffer(buffer_index); + if (buffer_meta.length == 0) { + *out = nullptr; + return Status::OK(); + } else { + return file_->ReadAt(buffer_meta.offset, buffer_meta.length, out); + } + } + + Status GetFieldMetadata(int field_index, FieldMetadata* metadata) override { + // pop off a field + if (field_index >= metadata_.num_fields()) { + return Status::Invalid("Ran out of field metadata, likely malformed"); + } + *metadata = metadata_.field(field_index); + return Status::OK(); + } + + private: + const RecordBatchMetadata& metadata_; + io::RandomAccessFile* file_; +}; + +Status ReadRecordBatch(const RecordBatchMetadata& metadata, + const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file, + std::shared_ptr<RecordBatch>* out) { + return ReadRecordBatch(metadata, schema, kMaxNestingDepth, file, out); +} + +static Status LoadRecordBatchFromSource(const std::shared_ptr<Schema>& schema, + int64_t num_rows, int max_recursion_depth, ArrayComponentSource* source, + std::shared_ptr<RecordBatch>* out) { + std::vector<std::shared_ptr<Array>> arrays(schema->num_fields()); + + ArrayLoaderContext context; + context.source = source; + context.field_index = 0; + context.buffer_index = 0; + context.max_recursion_depth = max_recursion_depth; + + for (int i = 0; i < schema->num_fields(); ++i) { + RETURN_NOT_OK(LoadArray(schema->field(i)->type, &context, &arrays[i])); + } + + *out = std::make_shared<RecordBatch>(schema, num_rows, arrays); + return Status::OK(); +} + +Status ReadRecordBatch(const RecordBatchMetadata& metadata, + const std::shared_ptr<Schema>& schema, int max_recursion_depth, + io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) { + IpcComponentSource source(metadata, file); + return LoadRecordBatchFromSource( + schema, metadata.length(), max_recursion_depth, &source, out); +} + +Status ReadDictionary(const DictionaryBatchMetadata& metadata, + const DictionaryTypeMap& dictionary_types, io::RandomAccessFile* file, + std::shared_ptr<Array>* out) { + int64_t id = metadata.id(); + auto it = dictionary_types.find(id); + if (it == dictionary_types.end()) { + std::stringstream ss; + ss << "Do not have type metadata for dictionary with id: " << id; + return Status::KeyError(ss.str()); + } + + std::vector<std::shared_ptr<Field>> fields = {it->second}; + + // We need a schema for the record batch + auto dummy_schema = std::make_shared<Schema>(fields); + + // The dictionary is embedded in a record batch with a single column + std::shared_ptr<RecordBatch> batch; + RETURN_NOT_OK(ReadRecordBatch(metadata.record_batch(), dummy_schema, file, &batch)); + + if (batch->num_columns() != 1) { + return Status::Invalid("Dictionary record batch must only contain one field"); + } + + *out = batch->column(0); + return Status::OK(); +} + +// ---------------------------------------------------------------------- // StreamReader implementation static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { @@ -228,7 +326,7 @@ class FileReader::FileReaderImpl { // TODO(wesm): Verify the footer footer_ = flatbuf::GetFooter(footer_buffer_->data()); - schema_metadata_.reset(new SchemaMetadata(nullptr, footer_->schema())); + schema_metadata_.reset(new SchemaMetadata(footer_->schema())); return Status::OK(); } @@ -307,8 +405,7 @@ class FileReader::FileReaderImpl { return schema_metadata_->GetSchema(*dictionary_memo_, &schema_); } - Status Open( - const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset) { + Status Open(const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset) { file_ = file; footer_offset_ = footer_offset; RETURN_NOT_OK(ReadFooter()); @@ -371,5 +468,69 @@ Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) { return impl_->GetRecordBatch(i, batch); } +// ---------------------------------------------------------------------- +// Read LargeRecordBatch + +class LargeRecordBatchSource : public ArrayComponentSource { + public: + LargeRecordBatchSource( + const flatbuf::LargeRecordBatch* metadata, io::RandomAccessFile* file) + : metadata_(metadata), file_(file) {} + + Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) override { + if (buffer_index >= static_cast<int>(metadata_->buffers()->size())) { + return Status::Invalid("Ran out of buffer metadata, likely malformed"); + } + const flatbuf::Buffer* buffer = metadata_->buffers()->Get(buffer_index); + + if (buffer->length() == 0) { + *out = nullptr; + return Status::OK(); + } else { + return file_->ReadAt(buffer->offset(), buffer->length(), out); + } + } + + Status GetFieldMetadata(int field_index, FieldMetadata* metadata) override { + // pop off a field + if (field_index >= static_cast<int>(metadata_->nodes()->size())) { + return Status::Invalid("Ran out of field metadata, likely malformed"); + } + const flatbuf::LargeFieldNode* node = metadata_->nodes()->Get(field_index); + + metadata->length = node->length(); + metadata->null_count = node->null_count(); + metadata->offset = 0; + return Status::OK(); + } + + private: + const flatbuf::LargeRecordBatch* metadata_; + io::RandomAccessFile* file_; +}; + +Status ReadLargeRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset, + io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) { + std::shared_ptr<Buffer> buffer; + RETURN_NOT_OK(file->Seek(offset)); + + RETURN_NOT_OK(file->Read(sizeof(int32_t), &buffer)); + int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data()); + + RETURN_NOT_OK(file->Read(flatbuffer_size, &buffer)); + auto message = flatbuf::GetMessage(buffer->data()); + auto batch = reinterpret_cast<const flatbuf::LargeRecordBatch*>(message->header()); + + // TODO(ARROW-388): The buffer offsets start at 0, so we must construct a + // RandomAccessFile according to that frame of reference + std::shared_ptr<Buffer> buffer_payload; + RETURN_NOT_OK(file->Read(message->bodyLength(), &buffer_payload)); + io::BufferReader buffer_reader(buffer_payload); + + LargeRecordBatchSource source(batch, &buffer_reader); + return LoadRecordBatchFromSource( + schema, batch->length(), kMaxNestingDepth, &source, out); +} + } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/reader.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index ca91765..1c1314a 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -43,6 +43,20 @@ class RandomAccessFile; namespace ipc { +// Generic read functionsh; does not copy data if the input supports zero copy reads + +Status ReadRecordBatch(const RecordBatchMetadata& metadata, + const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file, + std::shared_ptr<RecordBatch>* out); + +Status ReadRecordBatch(const RecordBatchMetadata& metadata, + const std::shared_ptr<Schema>& schema, int max_recursion_depth, + io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out); + +Status ReadDictionary(const DictionaryBatchMetadata& metadata, + const DictionaryTypeMap& dictionary_types, io::RandomAccessFile* file, + std::shared_ptr<Array>* out); + class ARROW_EXPORT StreamReader { public: ~StreamReader(); @@ -106,6 +120,14 @@ class ARROW_EXPORT FileReader { std::unique_ptr<FileReaderImpl> impl_; }; +// ---------------------------------------------------------------------- +// + +/// EXPERIMENTAL: Read length-prefixed LargeRecordBatch metadata (64-bit array +/// lengths) at offset and reconstruct RecordBatch +Status ARROW_EXPORT ReadLargeRecordBatch(const std::shared_ptr<Schema>& schema, + int64_t offset, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out); + } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/test-common.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 66a5e09..ba203b0 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -103,7 +103,7 @@ Status MakeRandomListArray(const std::shared_ptr<Array>& child_array, int num_li typedef Status MakeRecordBatch(std::shared_ptr<RecordBatch>* out); Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) { - const int length = 1000; + const int length = 10; // Make the schema auto f0 = field("f0", int32()); http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/writer.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 58402b5..82c119e 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -17,28 +17,510 @@ #include "arrow/ipc/writer.h" +#include <algorithm> #include <cstdint> #include <cstring> +#include <limits> #include <sstream> #include <vector> +#include "arrow/array.h" #include "arrow/buffer.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" -#include "arrow/ipc/adapter.h" -#include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/metadata.h" #include "arrow/ipc/util.h" +#include "arrow/loader.h" #include "arrow/memory_pool.h" #include "arrow/schema.h" #include "arrow/status.h" #include "arrow/table.h" +#include "arrow/type.h" +#include "arrow/util/bit-util.h" #include "arrow/util/logging.h" namespace arrow { namespace ipc { // ---------------------------------------------------------------------- +// Record batch write path + +class RecordBatchWriter : public ArrayVisitor { + public: + RecordBatchWriter( + MemoryPool* pool, int64_t buffer_start_offset, int max_recursion_depth) + : pool_(pool), + max_recursion_depth_(max_recursion_depth), + buffer_start_offset_(buffer_start_offset) { + DCHECK_GT(max_recursion_depth, 0); + } + + virtual ~RecordBatchWriter() = default; + + virtual Status CheckArrayMetadata(const Array& arr) { + if (arr.length() > std::numeric_limits<int32_t>::max()) { + return Status::Invalid("Cannot write arrays larger than 2^31 - 1 in length"); + } + return Status::OK(); + } + + Status VisitArray(const Array& arr) { + if (max_recursion_depth_ <= 0) { + return Status::Invalid("Max recursion depth reached"); + } + + RETURN_NOT_OK(CheckArrayMetadata(arr)); + + // push back all common elements + field_nodes_.emplace_back(arr.length(), arr.null_count(), 0); + + if (arr.null_count() > 0) { + std::shared_ptr<Buffer> bitmap = arr.null_bitmap(); + + if (arr.offset() != 0) { + // With a sliced array / non-zero offset, we must copy the bitmap + RETURN_NOT_OK( + CopyBitmap(pool_, bitmap->data(), arr.offset(), arr.length(), &bitmap)); + } + + buffers_.push_back(bitmap); + } else { + // Push a dummy zero-length buffer, not to be copied + buffers_.push_back(std::make_shared<Buffer>(nullptr, 0)); + } + return arr.Accept(this); + } + + Status Assemble(const RecordBatch& batch, int64_t* body_length) { + if (field_nodes_.size() > 0) { + field_nodes_.clear(); + buffer_meta_.clear(); + buffers_.clear(); + } + + // Perform depth-first traversal of the row-batch + for (int i = 0; i < batch.num_columns(); ++i) { + RETURN_NOT_OK(VisitArray(*batch.column(i))); + } + + // The position for the start of a buffer relative to the passed frame of + // reference. May be 0 or some other position in an address space + int64_t offset = buffer_start_offset_; + + buffer_meta_.reserve(buffers_.size()); + + const int32_t kNoPageId = -1; + + // Construct the buffer metadata for the record batch header + for (size_t i = 0; i < buffers_.size(); ++i) { + const Buffer* buffer = buffers_[i].get(); + int64_t size = 0; + int64_t padding = 0; + + // The buffer might be null if we are handling zero row lengths. + if (buffer) { + size = buffer->size(); + padding = BitUtil::RoundUpToMultipleOf64(size) - size; + } + + // TODO(wesm): We currently have no notion of shared memory page id's, + // but we've included it in the metadata IDL for when we have it in the + // future. Use page = -1 for now + // + // Note that page ids are a bespoke notion for Arrow and not a feature we + // are using from any OS-level shared memory. The thought is that systems + // may (in the future) associate integer page id's with physical memory + // pages (according to whatever is the desired shared memory mechanism) + buffer_meta_.push_back({kNoPageId, offset, size + padding}); + offset += size + padding; + } + + *body_length = offset - buffer_start_offset_; + DCHECK(BitUtil::IsMultipleOf64(*body_length)); + + return Status::OK(); + } + + // Override this for writing dictionary metadata + virtual Status WriteMetadataMessage( + int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) { + return WriteRecordBatchMessage( + static_cast<int32_t>(num_rows), body_length, field_nodes_, buffer_meta_, out); + } + + Status WriteMetadata(int64_t num_rows, int64_t body_length, io::OutputStream* dst, + int32_t* metadata_length) { + // Now that we have computed the locations of all of the buffers in shared + // memory, the data header can be converted to a flatbuffer and written out + // + // Note: The memory written here is prefixed by the size of the flatbuffer + // itself as an int32_t. + std::shared_ptr<Buffer> metadata_fb; + RETURN_NOT_OK(WriteMetadataMessage(num_rows, body_length, &metadata_fb)); + + // Need to write 4 bytes (metadata size), the metadata, plus padding to + // end on an 8-byte offset + int64_t start_offset; + RETURN_NOT_OK(dst->Tell(&start_offset)); + + int32_t padded_metadata_length = static_cast<int32_t>(metadata_fb->size()) + 4; + const int32_t remainder = + (padded_metadata_length + static_cast<int32_t>(start_offset)) % 8; + if (remainder != 0) { padded_metadata_length += 8 - remainder; } + + // The returned metadata size includes the length prefix, the flatbuffer, + // plus padding + *metadata_length = padded_metadata_length; + + // Write the flatbuffer size prefix including padding + int32_t flatbuffer_size = padded_metadata_length - 4; + RETURN_NOT_OK( + dst->Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t))); + + // Write the flatbuffer + RETURN_NOT_OK(dst->Write(metadata_fb->data(), metadata_fb->size())); + + // Write any padding + int32_t padding = + padded_metadata_length - static_cast<int32_t>(metadata_fb->size()) - 4; + if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); } + + return Status::OK(); + } + + Status Write(const RecordBatch& batch, io::OutputStream* dst, int32_t* metadata_length, + int64_t* body_length) { + RETURN_NOT_OK(Assemble(batch, body_length)); + +#ifndef NDEBUG + int64_t start_position, current_position; + RETURN_NOT_OK(dst->Tell(&start_position)); +#endif + + RETURN_NOT_OK(WriteMetadata(batch.num_rows(), *body_length, dst, metadata_length)); + +#ifndef NDEBUG + RETURN_NOT_OK(dst->Tell(¤t_position)); + DCHECK(BitUtil::IsMultipleOf8(current_position)); +#endif + + // Now write the buffers + for (size_t i = 0; i < buffers_.size(); ++i) { + const Buffer* buffer = buffers_[i].get(); + int64_t size = 0; + int64_t padding = 0; + + // The buffer might be null if we are handling zero row lengths. + if (buffer) { + size = buffer->size(); + padding = BitUtil::RoundUpToMultipleOf64(size) - size; + } + + if (size > 0) { RETURN_NOT_OK(dst->Write(buffer->data(), size)); } + + if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); } + } + +#ifndef NDEBUG + RETURN_NOT_OK(dst->Tell(¤t_position)); + DCHECK(BitUtil::IsMultipleOf8(current_position)); +#endif + + return Status::OK(); + } + + Status GetTotalSize(const RecordBatch& batch, int64_t* size) { + // emulates the behavior of Write without actually writing + int32_t metadata_length = 0; + int64_t body_length = 0; + MockOutputStream dst; + RETURN_NOT_OK(Write(batch, &dst, &metadata_length, &body_length)); + *size = dst.GetExtentBytesWritten(); + return Status::OK(); + } + + protected: + template <typename ArrayType> + Status VisitFixedWidth(const ArrayType& array) { + std::shared_ptr<Buffer> data_buffer = array.data(); + + if (array.offset() != 0) { + // Non-zero offset, slice the buffer + const auto& fw_type = static_cast<const FixedWidthType&>(*array.type()); + const int type_width = fw_type.bit_width() / 8; + const int64_t byte_offset = array.offset() * type_width; + + // Send padding if it's available + const int64_t buffer_length = + std::min(BitUtil::RoundUpToMultipleOf64(array.length() * type_width), + data_buffer->size() - byte_offset); + data_buffer = SliceBuffer(data_buffer, byte_offset, buffer_length); + } + buffers_.push_back(data_buffer); + return Status::OK(); + } + + template <typename ArrayType> + Status GetZeroBasedValueOffsets( + const ArrayType& array, std::shared_ptr<Buffer>* value_offsets) { + // Share slicing logic between ListArray and BinaryArray + + auto offsets = array.value_offsets(); + + if (array.offset() != 0) { + // If we have a non-zero offset, then the value offsets do not start at + // zero. We must a) create a new offsets array with shifted offsets and + // b) slice the values array accordingly + + std::shared_ptr<MutableBuffer> shifted_offsets; + RETURN_NOT_OK(AllocateBuffer( + pool_, sizeof(int32_t) * (array.length() + 1), &shifted_offsets)); + + int32_t* dest_offsets = reinterpret_cast<int32_t*>(shifted_offsets->mutable_data()); + const int32_t start_offset = array.value_offset(0); + + for (int i = 0; i < array.length(); ++i) { + dest_offsets[i] = array.value_offset(i) - start_offset; + } + // Final offset + dest_offsets[array.length()] = array.value_offset(array.length()) - start_offset; + offsets = shifted_offsets; + } + + *value_offsets = offsets; + return Status::OK(); + } + + Status VisitBinary(const BinaryArray& array) { + std::shared_ptr<Buffer> value_offsets; + RETURN_NOT_OK(GetZeroBasedValueOffsets<BinaryArray>(array, &value_offsets)); + auto data = array.data(); + + if (array.offset() != 0) { + // Slice the data buffer to include only the range we need now + data = SliceBuffer(data, array.value_offset(0), array.value_offset(array.length())); + } + + buffers_.push_back(value_offsets); + buffers_.push_back(data); + return Status::OK(); + } + + Status Visit(const FixedWidthBinaryArray& array) override { + auto data = array.data(); + int32_t width = array.byte_width(); + + if (array.offset() != 0) { + data = SliceBuffer(data, array.offset() * width, width * array.length()); + } + buffers_.push_back(data); + return Status::OK(); + } + + Status Visit(const BooleanArray& array) override { + buffers_.push_back(array.data()); + return Status::OK(); + } + +#define VISIT_FIXED_WIDTH(TYPE) \ + Status Visit(const TYPE& array) override { return VisitFixedWidth<TYPE>(array); } + + VISIT_FIXED_WIDTH(Int8Array); + VISIT_FIXED_WIDTH(Int16Array); + VISIT_FIXED_WIDTH(Int32Array); + VISIT_FIXED_WIDTH(Int64Array); + VISIT_FIXED_WIDTH(UInt8Array); + VISIT_FIXED_WIDTH(UInt16Array); + VISIT_FIXED_WIDTH(UInt32Array); + VISIT_FIXED_WIDTH(UInt64Array); + VISIT_FIXED_WIDTH(HalfFloatArray); + VISIT_FIXED_WIDTH(FloatArray); + VISIT_FIXED_WIDTH(DoubleArray); + VISIT_FIXED_WIDTH(DateArray); + VISIT_FIXED_WIDTH(Date32Array); + VISIT_FIXED_WIDTH(TimeArray); + VISIT_FIXED_WIDTH(TimestampArray); + +#undef VISIT_FIXED_WIDTH + + Status Visit(const StringArray& array) override { return VisitBinary(array); } + + Status Visit(const BinaryArray& array) override { return VisitBinary(array); } + + Status Visit(const ListArray& array) override { + std::shared_ptr<Buffer> value_offsets; + RETURN_NOT_OK(GetZeroBasedValueOffsets<ListArray>(array, &value_offsets)); + buffers_.push_back(value_offsets); + + --max_recursion_depth_; + std::shared_ptr<Array> values = array.values(); + + if (array.offset() != 0) { + // For non-zero offset, we slice the values array accordingly + const int32_t offset = array.value_offset(0); + const int32_t length = array.value_offset(array.length()) - offset; + values = values->Slice(offset, length); + } + RETURN_NOT_OK(VisitArray(*values)); + ++max_recursion_depth_; + return Status::OK(); + } + + Status Visit(const StructArray& array) override { + --max_recursion_depth_; + for (std::shared_ptr<Array> field : array.fields()) { + if (array.offset() != 0) { + // If offset is non-zero, slice the child array + field = field->Slice(array.offset(), array.length()); + } + RETURN_NOT_OK(VisitArray(*field)); + } + ++max_recursion_depth_; + return Status::OK(); + } + + Status Visit(const UnionArray& array) override { + auto type_ids = array.type_ids(); + if (array.offset() != 0) { + type_ids = SliceBuffer(type_ids, array.offset() * sizeof(UnionArray::type_id_t), + array.length() * sizeof(UnionArray::type_id_t)); + } + + buffers_.push_back(type_ids); + + --max_recursion_depth_; + if (array.mode() == UnionMode::DENSE) { + const auto& type = static_cast<const UnionType&>(*array.type()); + auto value_offsets = array.value_offsets(); + + // The Union type codes are not necessary 0-indexed + uint8_t max_code = 0; + for (uint8_t code : type.type_codes) { + if (code > max_code) { max_code = code; } + } + + // Allocate an array of child offsets. Set all to -1 to indicate that we + // haven't observed a first occurrence of a particular child yet + std::vector<int32_t> child_offsets(max_code + 1); + std::vector<int32_t> child_lengths(max_code + 1, 0); + + if (array.offset() != 0) { + // This is an unpleasant case. Because the offsets are different for + // each child array, when we have a sliced array, we need to "rebase" + // the value_offsets for each array + + const int32_t* unshifted_offsets = array.raw_value_offsets(); + const uint8_t* type_ids = array.raw_type_ids(); + + // Allocate the shifted offsets + std::shared_ptr<MutableBuffer> shifted_offsets_buffer; + RETURN_NOT_OK(AllocateBuffer( + pool_, array.length() * sizeof(int32_t), &shifted_offsets_buffer)); + int32_t* shifted_offsets = + reinterpret_cast<int32_t*>(shifted_offsets_buffer->mutable_data()); + + for (int64_t i = 0; i < array.length(); ++i) { + const uint8_t code = type_ids[i]; + int32_t shift = child_offsets[code]; + if (shift == -1) { child_offsets[code] = shift = unshifted_offsets[i]; } + shifted_offsets[i] = unshifted_offsets[i] - shift; + + // Update the child length to account for observed value + ++child_lengths[code]; + } + + value_offsets = shifted_offsets_buffer; + } + buffers_.push_back(value_offsets); + + // Visit children and slice accordingly + for (int i = 0; i < type.num_children(); ++i) { + std::shared_ptr<Array> child = array.child(i); + if (array.offset() != 0) { + const uint8_t code = type.type_codes[i]; + child = child->Slice(child_offsets[code], child_lengths[code]); + } + RETURN_NOT_OK(VisitArray(*child)); + } + } else { + for (std::shared_ptr<Array> child : array.children()) { + // Sparse union, slicing is simpler + if (array.offset() != 0) { + // If offset is non-zero, slice the child array + child = child->Slice(array.offset(), array.length()); + } + RETURN_NOT_OK(VisitArray(*child)); + } + } + ++max_recursion_depth_; + return Status::OK(); + } + + Status Visit(const DictionaryArray& array) override { + // Dictionary written out separately. Slice offset contained in the indices + return array.indices()->Accept(this); + } + + // In some cases, intermediate buffers may need to be allocated (with sliced arrays) + MemoryPool* pool_; + + std::vector<FieldMetadata> field_nodes_; + std::vector<BufferMetadata> buffer_meta_; + std::vector<std::shared_ptr<Buffer>> buffers_; + + int64_t max_recursion_depth_; + int64_t buffer_start_offset_; +}; + +class DictionaryWriter : public RecordBatchWriter { + public: + using RecordBatchWriter::RecordBatchWriter; + + Status WriteMetadataMessage( + int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override { + return WriteDictionaryMessage(dictionary_id_, static_cast<int32_t>(num_rows), + body_length, field_nodes_, buffer_meta_, out); + } + + Status Write(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary, + io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) { + dictionary_id_ = dictionary_id; + + // Make a dummy record batch. A bit tedious as we have to make a schema + std::vector<std::shared_ptr<Field>> fields = { + arrow::field("dictionary", dictionary->type())}; + auto schema = std::make_shared<Schema>(fields); + RecordBatch batch(schema, dictionary->length(), {dictionary}); + + return RecordBatchWriter::Write(batch, dst, metadata_length, body_length); + } + + private: + // TODO(wesm): Setting this in Write is a bit unclean, but it works + int64_t dictionary_id_; +}; + +Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, + io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, + MemoryPool* pool, int max_recursion_depth) { + RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth); + return writer.Write(batch, dst, metadata_length, body_length); +} + +Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary, + int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, + int64_t* body_length, MemoryPool* pool) { + DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth); + return writer.Write(dictionary_id, dictionary, dst, metadata_length, body_length); +} + +Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) { + RecordBatchWriter writer(default_memory_pool(), 0, kMaxNestingDepth); + RETURN_NOT_OK(writer.GetTotalSize(batch, size)); + return Status::OK(); +} + +// ---------------------------------------------------------------------- // Stream writer implementation class StreamWriter::StreamWriterImpl { @@ -199,38 +681,6 @@ Status StreamWriter::Close() { // ---------------------------------------------------------------------- // File writer implementation -static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>> -FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) { - std::vector<flatbuf::Block> fb_blocks; - - for (const FileBlock& block : blocks) { - fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length); - } - - return fbb.CreateVectorOfStructs(fb_blocks); -} - -Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries, - const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo, - io::OutputStream* out) { - FBB fbb; - - flatbuffers::Offset<flatbuf::Schema> fb_schema; - RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema)); - - auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries); - auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches); - - auto footer = flatbuf::CreateFooter( - fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches); - - fbb.Finish(footer); - - int32_t size = fbb.GetSize(); - - return out->Write(fbb.GetBufferPointer(), size); -} - class FileWriter::FileWriterImpl : public StreamWriter::StreamWriterImpl { public: using BASE = StreamWriter::StreamWriterImpl; @@ -283,5 +733,31 @@ Status FileWriter::Close() { return impl_->Close(); } +// ---------------------------------------------------------------------- +// Write record batches with 64-bit size metadata + +class LargeRecordBatchWriter : public RecordBatchWriter { + public: + using RecordBatchWriter::RecordBatchWriter; + + Status CheckArrayMetadata(const Array& arr) override { + // No < INT32_MAX length check + return Status::OK(); + } + + Status WriteMetadataMessage( + int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override { + return WriteLargeRecordBatchMessage( + num_rows, body_length, field_nodes_, buffer_meta_, out); + } +}; + +Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, + io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, + MemoryPool* pool, int max_recursion_depth) { + LargeRecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth); + return writer.Write(batch, dst, metadata_length, body_length); +} + } // namespace ipc } // namespace arrow