Repository: arrow Updated Branches: refs/heads/master dc3cb30b9 -> 5ad498833
ARROW-708: [C++] Simplify metadata APIs to all use the Message class, perf analysis This doesn't produce a meaningful perf improvement, but it does remove a fair amount of code which is nice. Here is an interactive FlameGraph SVG: https://www.dropbox.com/s/kp8i5r3j7i0em02/ipc-perf-20170324.svg?dl=0  So it appears that out of the few hundred nanoseconds spent constructing each Array object, the time is mostly spent in object constructors. One thing that shows up is the RecordBatch constructor which is spending a bunch of time copying the `vector<shared_ptr<Array>>` passed, so I added a move constructor. Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #437 from wesm/record-batch-read-perf and squashes the following commits: 95fdbc7 [Wes McKinney] Add RecordBatch constructor with rvalue-reference for the columns 793b3be [Wes McKinney] Inline SliceBuffer 212f17f [Wes McKinney] Benchmark in nanoseconds a295aae [Wes McKinney] Remove record batch / dictionary PIMPL interfaces, handle flatbuffer details internally Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/5ad49883 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/5ad49883 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/5ad49883 Branch: refs/heads/master Commit: 5ad498833fe6cd5519b8d652d4bf620add5a7eed Parents: dc3cb30 Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Fri Mar 24 14:10:42 2017 -0400 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Fri Mar 24 14:10:42 2017 -0400 ---------------------------------------------------------------------- cpp/src/arrow/buffer.cc | 7 -- cpp/src/arrow/buffer.h | 6 +- cpp/src/arrow/ipc/ipc-read-write-benchmark.cc | 2 - cpp/src/arrow/ipc/ipc-read-write-test.cc | 9 +- cpp/src/arrow/ipc/metadata.cc | 123 +-------------------- cpp/src/arrow/ipc/metadata.h | 41 +------ cpp/src/arrow/ipc/reader.cc | 85 ++++++++------ cpp/src/arrow/ipc/reader.h | 16 ++- cpp/src/arrow/table.cc | 4 + cpp/src/arrow/table.h | 3 + 10 files changed, 76 insertions(+), 220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/5ad49883/cpp/src/arrow/buffer.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc index a0b78ac..28edf5e 100644 --- a/cpp/src/arrow/buffer.cc +++ b/cpp/src/arrow/buffer.cc @@ -68,13 +68,6 @@ bool Buffer::Equals(const Buffer& other) const { static_cast<size_t>(size_)))); } -std::shared_ptr<Buffer> SliceBuffer( - const std::shared_ptr<Buffer>& buffer, int64_t offset, int64_t length) { - DCHECK_LE(offset, buffer->size()); - DCHECK_LE(length, buffer->size() - offset); - return std::make_shared<Buffer>(buffer, offset, length); -} - std::shared_ptr<Buffer> MutableBuffer::GetImmutableView() { return std::make_shared<Buffer>(this->get_shared_ptr(), 0, size()); } http://git-wip-us.apache.org/repos/asf/arrow/blob/5ad49883/cpp/src/arrow/buffer.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index 70c16a2..449bb53 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -96,8 +96,10 @@ class ARROW_EXPORT Buffer : public std::enable_shared_from_this<Buffer> { /// Construct a view on passed buffer at the indicated offset and length. This /// function cannot fail and does not error checking (except in debug builds) -std::shared_ptr<Buffer> ARROW_EXPORT SliceBuffer( - const std::shared_ptr<Buffer>& buffer, int64_t offset, int64_t length); +static inline std::shared_ptr<Buffer> SliceBuffer( + const std::shared_ptr<Buffer>& buffer, int64_t offset, int64_t length) { + return std::make_shared<Buffer>(buffer, offset, length); +} /// A Buffer whose contents can be mutated. May or may not own its data. class ARROW_EXPORT MutableBuffer : public Buffer { http://git-wip-us.apache.org/repos/asf/arrow/blob/5ad49883/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc b/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc index e27e513..1aecdbc 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-benchmark.cc @@ -121,14 +121,12 @@ BENCHMARK(BM_WriteRecordBatch) ->RangeMultiplier(4) ->Range(1, 1 << 13) ->MinTime(1.0) - ->Unit(benchmark::kMicrosecond) ->UseRealTime(); BENCHMARK(BM_ReadRecordBatch) ->RangeMultiplier(4) ->Range(1, 1 << 13) ->MinTime(1.0) - ->Unit(benchmark::kMicrosecond) ->UseRealTime(); } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/5ad49883/cpp/src/arrow/ipc/ipc-read-write-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc index 6919aeb..086cc68 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-test.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc @@ -140,7 +140,6 @@ class IpcTestFixture : public io::MemoryMapFixture { std::shared_ptr<Message> message; RETURN_NOT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); - auto metadata = std::make_shared<RecordBatchMetadata>(message); // The buffer offsets start at 0, so we must construct a // RandomAccessFile according to that frame of reference @@ -148,7 +147,7 @@ class IpcTestFixture : public io::MemoryMapFixture { RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length, &buffer_payload)); io::BufferReader buffer_reader(buffer_payload); - return ReadRecordBatch(*metadata, batch.schema(), &buffer_reader, batch_result); + return ReadRecordBatch(*message, batch.schema(), &buffer_reader, batch_result); } Status DoLargeRoundTrip( @@ -370,7 +369,6 @@ TEST_F(RecursionLimits, ReadLimit) { std::shared_ptr<Message> message; ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); - auto metadata = std::make_shared<RecordBatchMetadata>(message); std::shared_ptr<Buffer> payload; ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload)); @@ -378,7 +376,7 @@ TEST_F(RecursionLimits, ReadLimit) { io::BufferReader reader(payload); std::shared_ptr<RecordBatch> result; - ASSERT_RAISES(Invalid, ReadRecordBatch(*metadata, schema, &reader, &result)); + ASSERT_RAISES(Invalid, ReadRecordBatch(*message, schema, &reader, &result)); } TEST_F(RecursionLimits, StressLimit) { @@ -392,7 +390,6 @@ TEST_F(RecursionLimits, StressLimit) { std::shared_ptr<Message> message; ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); - auto metadata = std::make_shared<RecordBatchMetadata>(message); std::shared_ptr<Buffer> payload; ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload)); @@ -400,7 +397,7 @@ TEST_F(RecursionLimits, StressLimit) { io::BufferReader reader(payload); std::shared_ptr<RecordBatch> result; - ASSERT_OK(ReadRecordBatch(*metadata, schema, recursion_depth + 1, &reader, &result)); + ASSERT_OK(ReadRecordBatch(*message, schema, recursion_depth + 1, &reader, &result)); *it_works = result->Equals(*batch); }; http://git-wip-us.apache.org/repos/asf/arrow/blob/5ad49883/cpp/src/arrow/ipc/metadata.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index b10ccec..14cb627 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -770,6 +770,10 @@ int64_t Message::body_length() const { return impl_->body_length(); } +const void* Message::header() const { + return impl_->header(); +} + // ---------------------------------------------------------------------- // SchemaMetadata @@ -859,125 +863,6 @@ Status SchemaMetadata::GetSchema( } // ---------------------------------------------------------------------- -// RecordBatchMetadata - -class RecordBatchMetadata::RecordBatchMetadataImpl : public MessageHolder { - public: - explicit RecordBatchMetadataImpl(const void* batch) - : batch_(static_cast<const flatbuf::RecordBatch*>(batch)) { - nodes_ = batch_->nodes(); - buffers_ = batch_->buffers(); - } - - const flatbuf::FieldNode* field(int i) const { return nodes_->Get(i); } - - const flatbuf::Buffer* buffer(int i) const { return buffers_->Get(i); } - - int64_t length() const { return batch_->length(); } - - int num_buffers() const { return batch_->buffers()->size(); } - - int num_fields() const { return batch_->nodes()->size(); } - - private: - const flatbuf::RecordBatch* batch_; - const flatbuffers::Vector<const flatbuf::FieldNode*>* nodes_; - const flatbuffers::Vector<const flatbuf::Buffer*>* buffers_; -}; - -RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Message>& message) - : RecordBatchMetadata(message->impl_->header()) { - impl_->set_message(message); -} - -RecordBatchMetadata::RecordBatchMetadata(const void* header) { - impl_.reset(new RecordBatchMetadataImpl(header)); -} - -RecordBatchMetadata::RecordBatchMetadata( - const std::shared_ptr<Buffer>& buffer, int64_t offset) - : RecordBatchMetadata(buffer->data() + offset) { - // Preserve ownership - impl_->set_buffer(buffer); -} - -RecordBatchMetadata::~RecordBatchMetadata() {} - -// TODO(wesm): Copying the flatbuffer data isn't great, but this will do for -// now -FieldMetadata RecordBatchMetadata::field(int i) const { - const flatbuf::FieldNode* node = impl_->field(i); - - FieldMetadata result; - result.length = node->length(); - result.null_count = node->null_count(); - result.offset = 0; - return result; -} - -BufferMetadata RecordBatchMetadata::buffer(int i) const { - const flatbuf::Buffer* buffer = impl_->buffer(i); - - BufferMetadata result; - result.page = buffer->page(); - result.offset = buffer->offset(); - result.length = buffer->length(); - return result; -} - -int64_t RecordBatchMetadata::length() const { - return impl_->length(); -} - -int RecordBatchMetadata::num_buffers() const { - return impl_->num_buffers(); -} - -int RecordBatchMetadata::num_fields() const { - return impl_->num_fields(); -} - -// ---------------------------------------------------------------------- -// DictionaryBatchMetadata - -class DictionaryBatchMetadata::DictionaryBatchMetadataImpl { - public: - explicit DictionaryBatchMetadataImpl(const void* dictionary) - : metadata_(static_cast<const flatbuf::DictionaryBatch*>(dictionary)) { - record_batch_.reset(new RecordBatchMetadata(metadata_->data())); - } - - int64_t id() const { return metadata_->id(); } - const RecordBatchMetadata& record_batch() const { return *record_batch_; } - - void set_message(const std::shared_ptr<Message>& message) { message_ = message; } - - private: - const flatbuf::DictionaryBatch* metadata_; - - std::unique_ptr<RecordBatchMetadata> record_batch_; - - // Parent, owns the flatbuffer data - std::shared_ptr<Message> message_; -}; - -DictionaryBatchMetadata::DictionaryBatchMetadata( - const std::shared_ptr<Message>& message) { - impl_.reset(new DictionaryBatchMetadataImpl(message->impl_->header())); - impl_->set_message(message); -} - -DictionaryBatchMetadata::~DictionaryBatchMetadata() {} - -int64_t DictionaryBatchMetadata::id() const { - return impl_->id(); -} - -const RecordBatchMetadata& DictionaryBatchMetadata::record_batch() const { - return impl_->record_batch(); -} - -// ---------------------------------------------------------------------- // Conveniences Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file, http://git-wip-us.apache.org/repos/asf/arrow/blob/5ad49883/cpp/src/arrow/ipc/metadata.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index dc07c7a..6e903c0 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -138,44 +138,6 @@ struct ARROW_EXPORT BufferMetadata { int64_t length; }; -// Container for serialized record batch metadata contained in an IPC message -class ARROW_EXPORT RecordBatchMetadata { - public: - explicit RecordBatchMetadata(const void* header); - explicit RecordBatchMetadata(const std::shared_ptr<Message>& message); - RecordBatchMetadata(const std::shared_ptr<Buffer>& message, int64_t offset); - - ~RecordBatchMetadata(); - - FieldMetadata field(int i) const; - BufferMetadata buffer(int i) const; - - int64_t length() const; - int num_buffers() const; - int num_fields() const; - - private: - class RecordBatchMetadataImpl; - std::unique_ptr<RecordBatchMetadataImpl> impl_; - - DISALLOW_COPY_AND_ASSIGN(RecordBatchMetadata); -}; - -class ARROW_EXPORT DictionaryBatchMetadata { - public: - explicit DictionaryBatchMetadata(const std::shared_ptr<Message>& message); - ~DictionaryBatchMetadata(); - - int64_t id() const; - const RecordBatchMetadata& record_batch() const; - - private: - class DictionaryBatchMetadataImpl; - std::unique_ptr<DictionaryBatchMetadataImpl> impl_; - - DISALLOW_COPY_AND_ASSIGN(DictionaryBatchMetadata); -}; - class ARROW_EXPORT Message { public: enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH }; @@ -187,11 +149,12 @@ class ARROW_EXPORT Message { Type type() const; + const void* header() const; + private: Message(const std::shared_ptr<Buffer>& buffer, int64_t offset); friend class DictionaryBatchMetadata; - friend class RecordBatchMetadata; friend class SchemaMetadata; // Hide serialization details from user API http://git-wip-us.apache.org/repos/asf/arrow/blob/5ad49883/cpp/src/arrow/ipc/reader.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 71ba951..83e03aa 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -46,36 +46,41 @@ namespace ipc { class IpcComponentSource : public ArrayComponentSource { public: - IpcComponentSource(const RecordBatchMetadata& metadata, io::RandomAccessFile* file) + IpcComponentSource(const flatbuf::RecordBatch* metadata, io::RandomAccessFile* file) : metadata_(metadata), file_(file) {} Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) override { - BufferMetadata buffer_meta = metadata_.buffer(buffer_index); - if (buffer_meta.length == 0) { + const flatbuf::Buffer* buffer = metadata_->buffers()->Get(buffer_index); + + if (buffer->length() == 0) { *out = nullptr; return Status::OK(); } else { - return file_->ReadAt(buffer_meta.offset, buffer_meta.length, out); + return file_->ReadAt(buffer->offset(), buffer->length(), out); } } - Status GetFieldMetadata(int field_index, FieldMetadata* metadata) override { + Status GetFieldMetadata(int field_index, FieldMetadata* field) override { + auto nodes = metadata_->nodes(); // pop off a field - if (field_index >= metadata_.num_fields()) { + if (field_index >= static_cast<int>(nodes->size())) { return Status::Invalid("Ran out of field metadata, likely malformed"); } - *metadata = metadata_.field(field_index); + const flatbuf::FieldNode* node = nodes->Get(field_index); + + field->length = node->length(); + field->null_count = node->null_count(); + field->offset = 0; return Status::OK(); } private: - const RecordBatchMetadata& metadata_; + const flatbuf::RecordBatch* metadata_; io::RandomAccessFile* file_; }; -Status ReadRecordBatch(const RecordBatchMetadata& metadata, - const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file, - std::shared_ptr<RecordBatch>* out) { +Status ReadRecordBatch(const Message& metadata, const std::shared_ptr<Schema>& schema, + io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) { return ReadRecordBatch(metadata, schema, kMaxNestingDepth, file, out); } @@ -94,22 +99,32 @@ static Status LoadRecordBatchFromSource(const std::shared_ptr<Schema>& schema, RETURN_NOT_OK(LoadArray(schema->field(i)->type, &context, &arrays[i])); } - *out = std::make_shared<RecordBatch>(schema, num_rows, arrays); + *out = std::make_shared<RecordBatch>(schema, num_rows, std::move(arrays)); return Status::OK(); } -Status ReadRecordBatch(const RecordBatchMetadata& metadata, +static inline Status ReadRecordBatch(const flatbuf::RecordBatch* metadata, const std::shared_ptr<Schema>& schema, int max_recursion_depth, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) { IpcComponentSource source(metadata, file); return LoadRecordBatchFromSource( - schema, metadata.length(), max_recursion_depth, &source, out); + schema, metadata->length(), max_recursion_depth, &source, out); } -Status ReadDictionary(const DictionaryBatchMetadata& metadata, - const DictionaryTypeMap& dictionary_types, io::RandomAccessFile* file, - std::shared_ptr<Array>* out) { - int64_t id = metadata.id(); +Status ReadRecordBatch(const Message& metadata, const std::shared_ptr<Schema>& schema, + int max_recursion_depth, io::RandomAccessFile* file, + std::shared_ptr<RecordBatch>* out) { + DCHECK_EQ(metadata.type(), Message::RECORD_BATCH); + auto batch = reinterpret_cast<const flatbuf::RecordBatch*>(metadata.header()); + return ReadRecordBatch(batch, schema, max_recursion_depth, file, out); +} + +Status ReadDictionary(const Message& metadata, const DictionaryTypeMap& dictionary_types, + io::RandomAccessFile* file, int64_t* dictionary_id, std::shared_ptr<Array>* out) { + auto dictionary_batch = + reinterpret_cast<const flatbuf::DictionaryBatch*>(metadata.header()); + + int64_t id = *dictionary_id = dictionary_batch->id(); auto it = dictionary_types.find(id); if (it == dictionary_types.end()) { std::stringstream ss; @@ -124,7 +139,10 @@ Status ReadDictionary(const DictionaryBatchMetadata& metadata, // The dictionary is embedded in a record batch with a single column std::shared_ptr<RecordBatch> batch; - RETURN_NOT_OK(ReadRecordBatch(metadata.record_batch(), dummy_schema, file, &batch)); + auto batch_meta = + reinterpret_cast<const flatbuf::RecordBatch*>(dictionary_batch->data()); + RETURN_NOT_OK( + ReadRecordBatch(batch_meta, dummy_schema, kMaxNestingDepth, file, &batch)); if (batch->num_columns() != 1) { return Status::Invalid("Dictionary record batch must only contain one field"); @@ -211,15 +229,14 @@ class StreamReader::StreamReaderImpl { std::shared_ptr<Message> message; RETURN_NOT_OK(ReadNextMessage(Message::DICTIONARY_BATCH, &message)); - DictionaryBatchMetadata metadata(message); - std::shared_ptr<Buffer> batch_body; RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body)) io::BufferReader reader(batch_body); std::shared_ptr<Array> dictionary; - RETURN_NOT_OK(ReadDictionary(metadata, dictionary_types_, &reader, &dictionary)); - return dictionary_memo_.AddDictionary(metadata.id(), dictionary); + int64_t id; + RETURN_NOT_OK(ReadDictionary(*message, dictionary_types_, &reader, &id, &dictionary)); + return dictionary_memo_.AddDictionary(id, dictionary); } Status ReadSchema() { @@ -249,12 +266,10 @@ class StreamReader::StreamReaderImpl { return Status::OK(); } - RecordBatchMetadata batch_metadata(message); - std::shared_ptr<Buffer> batch_body; RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body)); io::BufferReader reader(batch_body); - return ReadRecordBatch(batch_metadata, schema_, &reader, batch); + return ReadRecordBatch(*message, schema_, &reader, batch); } std::shared_ptr<Schema> schema() const { return schema_; } @@ -365,7 +380,6 @@ class FileReader::FileReaderImpl { std::shared_ptr<Message> message; RETURN_NOT_OK( ReadMessage(block.offset, block.metadata_length, file_.get(), &message)); - auto metadata = std::make_shared<RecordBatchMetadata>(message); // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see // ARROW-384). @@ -373,7 +387,7 @@ class FileReader::FileReaderImpl { RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block)); io::BufferReader reader(buffer_block); - return ReadRecordBatch(*metadata, schema_, &reader, batch); + return ReadRecordBatch(*message, schema_, &reader, batch); } Status ReadSchema() { @@ -386,9 +400,8 @@ class FileReader::FileReaderImpl { RETURN_NOT_OK( ReadMessage(block.offset, block.metadata_length, file_.get(), &message)); - // TODO(wesm): ARROW-577: This code is duplicated, can be fixed with a more - // invasive refactor - DictionaryBatchMetadata metadata(message); + // TODO(wesm): ARROW-577: This code is a bit duplicated, can be fixed + // with a more invasive refactor // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see // ARROW-384). @@ -397,8 +410,10 @@ class FileReader::FileReaderImpl { io::BufferReader reader(buffer_block); std::shared_ptr<Array> dictionary; - RETURN_NOT_OK(ReadDictionary(metadata, dictionary_fields_, &reader, &dictionary)); - RETURN_NOT_OK(dictionary_memo_->AddDictionary(metadata.id(), dictionary)); + int64_t dictionary_id; + RETURN_NOT_OK(ReadDictionary( + *message, dictionary_fields_, &reader, &dictionary_id, &dictionary)); + RETURN_NOT_OK(dictionary_memo_->AddDictionary(dictionary_id, dictionary)); } // Get the schema @@ -480,15 +495,13 @@ Status ReadRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset, RETURN_NOT_OK(file->Read(flatbuffer_size, &buffer)); RETURN_NOT_OK(Message::Open(buffer, 0, &message)); - RecordBatchMetadata metadata(message); - // TODO(ARROW-388): The buffer offsets start at 0, so we must construct a // RandomAccessFile according to that frame of reference std::shared_ptr<Buffer> buffer_payload; RETURN_NOT_OK(file->Read(message->body_length(), &buffer_payload)); io::BufferReader buffer_reader(buffer_payload); - return ReadRecordBatch(metadata, schema, kMaxNestingDepth, &buffer_reader, out); + return ReadRecordBatch(*message, schema, kMaxNestingDepth, &buffer_reader, out); } } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/5ad49883/cpp/src/arrow/ipc/reader.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index ffd0a11..6d9e6ca 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -45,17 +45,15 @@ namespace ipc { // Generic read functionsh; does not copy data if the input supports zero copy reads -Status ReadRecordBatch(const RecordBatchMetadata& metadata, - const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file, - std::shared_ptr<RecordBatch>* out); - -Status ReadRecordBatch(const RecordBatchMetadata& metadata, - const std::shared_ptr<Schema>& schema, int max_recursion_depth, +Status ReadRecordBatch(const Message& metadata, const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out); -Status ReadDictionary(const DictionaryBatchMetadata& metadata, - const DictionaryTypeMap& dictionary_types, io::RandomAccessFile* file, - std::shared_ptr<Array>* out); +Status ReadRecordBatch(const Message& metadata, const std::shared_ptr<Schema>& schema, + int max_recursion_depth, io::RandomAccessFile* file, + std::shared_ptr<RecordBatch>* out); + +Status ReadDictionary(const Message& metadata, const DictionaryTypeMap& dictionary_types, + io::RandomAccessFile* file, std::shared_ptr<Array>* out); class ARROW_EXPORT StreamReader { public: http://git-wip-us.apache.org/repos/asf/arrow/blob/5ad49883/cpp/src/arrow/table.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index 6b957c0..3f254aa 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -33,6 +33,10 @@ RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows const std::vector<std::shared_ptr<Array>>& columns) : schema_(schema), num_rows_(num_rows), columns_(columns) {} +RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, + std::vector<std::shared_ptr<Array>>&& columns) + : schema_(schema), num_rows_(num_rows), columns_(std::move(columns)) {} + const std::string& RecordBatch::column_name(int i) const { return schema_->field(i)->name; } http://git-wip-us.apache.org/repos/asf/arrow/blob/5ad49883/cpp/src/arrow/table.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 68f664b..bf0d99c 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -43,6 +43,9 @@ class ARROW_EXPORT RecordBatch { RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, const std::vector<std::shared_ptr<Array>>& columns); + RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, + std::vector<std::shared_ptr<Array>>&& columns); + bool Equals(const RecordBatch& other) const; bool ApproxEquals(const RecordBatch& other) const;