Repository: arrow Updated Branches: refs/heads/master 4c5f79c39 -> df2220f35
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/writer.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index 7aff71e..1271652 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -45,6 +45,40 @@ class OutputStream; namespace ipc { +// Write the RecordBatch (collection of equal-length Arrow arrays) to the +// output stream in a contiguous block. The record batch metadata is written as +// a flatbuffer (see format/Message.fbs -- the RecordBatch message type) +// prefixed by its size, followed by each of the memory buffers in the batch +// written end to end (with appropriate alignment and padding): +// +// <int32: metadata size> <uint8*: metadata> <buffers> +// +// Finally, the absolute offsets (relative to the start of the output stream) +// to the end of the body and end of the metadata / data header (suffixed by +// the header size) is returned in out-variables +// +// @param(in) buffer_start_offset: the start offset to use in the buffer metadata, +// default should be 0 +// +// @param(out) metadata_length: the size of the length-prefixed flatbuffer +// including padding to a 64-byte boundary +// +// @param(out) body_length: the size of the contiguous buffer block plus +// padding bytes +Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, + io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, + MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth); + +// Write Array as a DictionaryBatch message +Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary, + int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, + int64_t* body_length, MemoryPool* pool); + +// Compute the precise number of bytes needed in a contiguous memory segment to +// write the record batch. This involves generating the complete serialized +// Flatbuffers metadata. +Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size); + class ARROW_EXPORT StreamWriter { public: virtual ~StreamWriter() = default; @@ -68,10 +102,6 @@ class ARROW_EXPORT StreamWriter { std::unique_ptr<StreamWriterImpl> impl_; }; -Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries, - const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo, - io::OutputStream* out); - class ARROW_EXPORT FileWriter : public StreamWriter { public: static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema, @@ -86,6 +116,14 @@ class ARROW_EXPORT FileWriter : public StreamWriter { std::unique_ptr<FileWriterImpl> impl_; }; +// ---------------------------------------------------------------------- + +/// EXPERIMENTAL: Write record batch using LargeRecordBatch IPC metadata. This +/// data may not be readable by all Arrow implementations +Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, + io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, + MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth); + } // namespace ipc } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/loader.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/loader.h b/cpp/src/arrow/loader.h index f116d64..9b650e2 100644 --- a/cpp/src/arrow/loader.h +++ b/cpp/src/arrow/loader.h @@ -41,11 +41,36 @@ struct DataType; constexpr int kMaxNestingDepth = 64; struct ARROW_EXPORT FieldMetadata { + FieldMetadata() {} + FieldMetadata(int64_t length, int64_t null_count, int64_t offset) + : length(length), null_count(null_count), offset(offset) {} + + FieldMetadata(const FieldMetadata& other) { + this->length = other.length; + this->null_count = other.null_count; + this->offset = other.offset; + } + int64_t length; int64_t null_count; int64_t offset; }; +struct ARROW_EXPORT BufferMetadata { + BufferMetadata() {} + BufferMetadata(int32_t page, int64_t offset, int64_t length) + : page(page), offset(offset), length(length) {} + + /// The shared memory page id where to find this. Set to -1 if unused + int32_t page; + + /// The relative offset into the memory page to the starting byte of the buffer + int64_t offset; + + /// Absolute length in bytes of the buffer + int64_t length; +}; + /// Implement this to create new types of Arrow data loaders class ARROW_EXPORT ArrayComponentSource { public: http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/type.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h index a143d79..adc3161 100644 --- a/cpp/src/arrow/type.h +++ b/cpp/src/arrow/type.h @@ -222,6 +222,7 @@ struct ARROW_EXPORT Field { std::string ToString() const; }; + typedef std::shared_ptr<Field> FieldPtr; struct ARROW_EXPORT PrimitiveCType : public FixedWidthType { http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/util/bit-util.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/bit-util.cc b/cpp/src/arrow/util/bit-util.cc index 3767ba9..ba0bfd7 100644 --- a/cpp/src/arrow/util/bit-util.cc +++ b/cpp/src/arrow/util/bit-util.cc @@ -112,7 +112,21 @@ Status CopyBitmap(MemoryPool* pool, const uint8_t* data, int64_t offset, int64_t bool BitmapEquals(const uint8_t* left, int64_t left_offset, const uint8_t* right, int64_t right_offset, int64_t bit_length) { - // TODO(wesm): Make this faster using word-wise comparisons + if (left_offset % 8 == 0 && right_offset % 8 == 0) { + // byte aligned, can use memcmp + bool bytes_equal = std::memcmp(left + left_offset / 8, right + right_offset / 8, + bit_length / 8) == 0; + if (!bytes_equal) { return false; } + for (int64_t i = (bit_length / 8) * 8; i < bit_length; ++i) { + if (BitUtil::GetBit(left, left_offset + i) != + BitUtil::GetBit(right, right_offset + i)) { + return false; + } + } + return true; + } + + // Unaligned slow case for (int64_t i = 0; i < bit_length; ++i) { if (BitUtil::GetBit(left, left_offset + i) != BitUtil::GetBit(right, right_offset + i)) { http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/format/Message.fbs ---------------------------------------------------------------------- diff --git a/format/Message.fbs b/format/Message.fbs index 8fdcc80..2af26d4 100644 --- a/format/Message.fbs +++ b/format/Message.fbs @@ -308,6 +308,22 @@ table RecordBatch { } /// ---------------------------------------------------------------------- +/// EXPERIMENTAL: A RecordBatch type that supports data with more than 2^31 - 1 +/// elements. Arrow implementations do not need to implement this type to be +/// compliant + +struct LargeFieldNode { + length: long; + null_count: long; +} + +table LargeRecordBatch { + length: long; + nodes: [LargeFieldNode]; + buffers: [Buffer]; +} + +/// ---------------------------------------------------------------------- /// For sending dictionary encoding information. Any Field can be /// dictionary-encoded, but in this case none of its children may be /// dictionary-encoded. @@ -324,8 +340,12 @@ table DictionaryBatch { /// This union enables us to easily send different message types without /// redundant storage, and in the future we can easily add new message types. +/// +/// Arrow implementations do not need to implement all of the message types, +/// which may include experimental metadata types. For maximum compatibility, +/// it is best to send data using RecordBatch union MessageHeader { - Schema, DictionaryBatch, RecordBatch + Schema, DictionaryBatch, RecordBatch, LargeRecordBatch } table Message {