ARROW-1214: [Python/C++] Add C++ functionality to more easily handle encapsulated IPC messages, Python bindings
This patch does a bunch of things: * Decouples the RecordBatchStreamReader from the actual message iteration (which is handled by a new `arrow::ipc::MessageReader` interface * Enables `arrow::ipc::Message` to hold all of the memory for a complete unit of data: metadata plus body * Renames some IPC methods for better consistency (GetNextRecordBatch -> ReadNextRecordBatch) * Adds function to serialize a complete encapsulated message to an `arrow::io::OutputStream* * Add Python bindings for all of the above, introduce `pyarrow.Message`, `pyarrow.MessageReader`. Add `read_message` and `Message.serialize` functions for efficient memory round trips * Add `pyarrow.read_record_batch` for reading a single record batch given a message and a known schema Later we will want to add `pyarrow.read_schema`, but it seemed like a bit of work to make it work for dictionaries. This implements the C++ analogue to ARROW-1047, which was for Java. Not sure why I didn't create a JIRA about this. cc @icexelloss Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #839 from wesm/ARROW-1214 and squashes the following commits: 07f1820a [Wes McKinney] Refactor to introduce MessageReader abstract type, use unique_ptr for messages instead of shared_ptr. First cut at Message, MessageReader Python API. Add read_message, C++/Python machinery for message roundtrips to Buffer, comparison. Add function to read RecordBatch from encapsulated message given schema. Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/bb0a7588 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/bb0a7588 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/bb0a7588 Branch: refs/heads/master Commit: bb0a75885f2655ac54be47bd238811b74782532e Parents: 099f61c Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Sat Jul 15 16:51:51 2017 -0400 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Sat Jul 15 16:51:51 2017 -0400 ---------------------------------------------------------------------- c_glib/arrow-glib/reader.cpp | 4 +- cpp/src/arrow/buffer.cc | 2 - cpp/src/arrow/buffer.h | 15 +- cpp/src/arrow/builder.cc | 12 +- cpp/src/arrow/ipc/file-to-stream.cc | 2 +- cpp/src/arrow/ipc/ipc-json-test.cc | 6 +- cpp/src/arrow/ipc/ipc-read-write-test.cc | 70 ++-- cpp/src/arrow/ipc/json-integration-test.cc | 8 +- cpp/src/arrow/ipc/json.cc | 6 +- cpp/src/arrow/ipc/json.h | 2 +- cpp/src/arrow/ipc/metadata.cc | 150 +++++-- cpp/src/arrow/ipc/metadata.h | 100 +++-- cpp/src/arrow/ipc/reader.cc | 206 ++++------ cpp/src/arrow/ipc/reader.h | 35 +- cpp/src/arrow/ipc/stream-to-file.cc | 2 +- cpp/src/arrow/ipc/writer.cc | 4 +- cpp/src/arrow/python/builtin_convert.cc | 5 +- python/doc/source/api.rst | 12 +- python/pyarrow/__init__.py | 9 +- python/pyarrow/feather.pxi | 109 +++++ python/pyarrow/includes/libarrow.pxd | 66 +-- python/pyarrow/io.pxi | 353 ---------------- python/pyarrow/ipc.pxi | 480 ++++++++++++++++++++++ python/pyarrow/ipc.py | 13 +- python/pyarrow/lib.pyx | 8 +- python/pyarrow/pandas_compat.py | 2 +- python/pyarrow/table.pxi | 16 +- python/pyarrow/tests/conftest.py | 2 +- python/pyarrow/tests/test_array.py | 2 +- python/pyarrow/tests/test_convert_builtin.py | 3 + python/pyarrow/tests/test_feather.py | 3 +- python/pyarrow/tests/test_ipc.py | 59 ++- python/pyarrow/tests/test_parquet.py | 7 +- python/pyarrow/tests/test_table.py | 4 + python/pyarrow/tests/test_tensor.py | 1 + 35 files changed, 1135 insertions(+), 643 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/c_glib/arrow-glib/reader.cpp ---------------------------------------------------------------------- diff --git a/c_glib/arrow-glib/reader.cpp b/c_glib/arrow-glib/reader.cpp index 3ff6ba1..523bdee 100644 --- a/c_glib/arrow-glib/reader.cpp +++ b/c_glib/arrow-glib/reader.cpp @@ -173,7 +173,7 @@ garrow_record_batch_reader_get_next_record_batch(GArrowRecordBatchReader *reader { auto arrow_reader = garrow_record_batch_reader_get_raw(reader); std::shared_ptr<arrow::RecordBatch> arrow_record_batch; - auto status = arrow_reader->GetNextRecordBatch(&arrow_record_batch); + auto status = arrow_reader->ReadNextRecordBatch(&arrow_record_batch); if (garrow_error_check(error, status, @@ -410,7 +410,7 @@ garrow_record_batch_file_reader_get_record_batch(GArrowRecordBatchFileReader *re { auto arrow_reader = garrow_record_batch_file_reader_get_raw(reader); std::shared_ptr<arrow::RecordBatch> arrow_record_batch; - auto status = arrow_reader->GetRecordBatch(i, &arrow_record_batch); + auto status = arrow_reader->ReadRecordBatch(i, &arrow_record_batch); if (garrow_error_check(error, status, http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/buffer.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc index fb63798..a1d119e 100644 --- a/cpp/src/arrow/buffer.cc +++ b/cpp/src/arrow/buffer.cc @@ -27,8 +27,6 @@ namespace arrow { -Buffer::~Buffer() {} - Status Buffer::Copy( int64_t start, int64_t nbytes, MemoryPool* pool, std::shared_ptr<Buffer>* out) const { // Sanity checks http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/buffer.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index bfbea77..b117b24 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -22,6 +22,7 @@ #include <cstdint> #include <cstring> #include <memory> +#include <string> #include "arrow/status.h" #include "arrow/util/macros.h" @@ -47,7 +48,8 @@ class ARROW_EXPORT Buffer { public: Buffer(const uint8_t* data, int64_t size) : is_mutable_(false), data_(data), size_(size), capacity_(size) {} - virtual ~Buffer(); + + virtual ~Buffer() = default; /// An offset into data that is owned by another buffer, but we want to be /// able to retain a valid pointer to it even after other shared_ptr's to the @@ -97,6 +99,17 @@ class ARROW_EXPORT Buffer { DISALLOW_COPY_AND_ASSIGN(Buffer); }; +/// \brief Create Buffer referencing std::string memory +/// +/// Warning: string instance must stay alive +/// +/// \param str std::string instance +/// \return std::shared_ptr<Buffer> +static inline std::shared_ptr<Buffer> GetBufferFromString(const std::string& str) { + return std::make_shared<Buffer>( + reinterpret_cast<const uint8_t*>(str.c_str()), static_cast<int64_t>(str.size())); +} + /// Construct a view on passed buffer at the indicated offset and length. This /// function cannot fail and does not error checking (except in debug builds) static inline std::shared_ptr<Buffer> SliceBuffer( http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/builder.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc index 155d81a..e466838 100644 --- a/cpp/src/arrow/builder.cc +++ b/cpp/src/arrow/builder.cc @@ -342,8 +342,8 @@ Status AdaptiveIntBuilder::Append( sizeof(int64_t) * length); } else { #ifdef _MSC_VER -# pragma warning(push) -# pragma warning(disable:4996) +#pragma warning(push) +#pragma warning(disable : 4996) #endif // int_size_ may have changed, so we need to recheck switch (int_size_) { @@ -366,7 +366,7 @@ Status AdaptiveIntBuilder::Append( DCHECK(false); } #ifdef _MSC_VER -# pragma warning(pop) +#pragma warning(pop) #endif } @@ -497,8 +497,8 @@ Status AdaptiveUIntBuilder::Append( sizeof(uint64_t) * length); } else { #ifdef _MSC_VER -# pragma warning(push) -# pragma warning(disable:4996) +#pragma warning(push) +#pragma warning(disable : 4996) #endif // int_size_ may have changed, so we need to recheck switch (int_size_) { @@ -521,7 +521,7 @@ Status AdaptiveUIntBuilder::Append( DCHECK(false); } #ifdef _MSC_VER -# pragma warning(pop) +#pragma warning(pop) #endif } http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/file-to-stream.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/file-to-stream.cc b/cpp/src/arrow/ipc/file-to-stream.cc index 39c720c..a1feedc 100644 --- a/cpp/src/arrow/ipc/file-to-stream.cc +++ b/cpp/src/arrow/ipc/file-to-stream.cc @@ -39,7 +39,7 @@ Status ConvertToStream(const char* path) { RETURN_NOT_OK(RecordBatchStreamWriter::Open(&sink, reader->schema(), &writer)); for (int i = 0; i < reader->num_record_batches(); ++i) { std::shared_ptr<RecordBatch> chunk; - RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk)); + RETURN_NOT_OK(reader->ReadRecordBatch(i, &chunk)); RETURN_NOT_OK(writer->WriteRecordBatch(*chunk)); } return writer->Close(); http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/ipc-json-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc index 9297146..318e318 100644 --- a/cpp/src/arrow/ipc/ipc-json-test.cc +++ b/cpp/src/arrow/ipc/ipc-json-test.cc @@ -276,7 +276,7 @@ TEST(TestJsonFileReadWrite, BasicRoundTrip) { for (int i = 0; i < nbatches; ++i) { std::shared_ptr<RecordBatch> batch; - ASSERT_OK(reader->GetRecordBatch(i, &batch)); + ASSERT_OK(reader->ReadRecordBatch(i, &batch)); ASSERT_TRUE(batch->Equals(*batches[i])); } } @@ -344,7 +344,7 @@ TEST(TestJsonFileReadWrite, MinimalFormatExample) { ASSERT_EQ(1, reader->num_record_batches()); std::shared_ptr<RecordBatch> batch; - ASSERT_OK(reader->GetRecordBatch(0, &batch)); + ASSERT_OK(reader->ReadRecordBatch(0, &batch)); std::vector<bool> foo_valid = {true, false, true, true, true}; std::vector<int32_t> foo_values = {1, 2, 3, 4, 5}; @@ -388,7 +388,7 @@ void CheckRoundtrip(const RecordBatch& batch) { ASSERT_OK(JsonReader::Open(buffer, &reader)); std::shared_ptr<RecordBatch> result_batch; - ASSERT_OK(reader->GetRecordBatch(0, &result_batch)); + ASSERT_OK(reader->ReadRecordBatch(0, &result_batch)); CompareBatch(batch, *result_batch); } http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/ipc-read-write-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc index c71d046..42f14b0 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-test.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc @@ -51,8 +51,8 @@ class TestSchemaMetadata : public ::testing::Test { std::shared_ptr<Buffer> buffer; ASSERT_OK(WriteSchemaMessage(schema, memo, &buffer)); - std::shared_ptr<Message> message; - ASSERT_OK(Message::Open(buffer, 0, &message)); + std::unique_ptr<Message> message; + ASSERT_OK(Message::Open(buffer, nullptr, &message)); ASSERT_EQ(Message::SCHEMA, message->type()); @@ -65,6 +65,32 @@ class TestSchemaMetadata : public ::testing::Test { } }; +TEST(TestMessage, Equals) { + std::string metadata = "foo"; + std::string body = "bar"; + + auto b1 = GetBufferFromString(metadata); + auto b2 = GetBufferFromString(metadata); + auto b3 = GetBufferFromString(body); + auto b4 = GetBufferFromString(body); + + Message msg1(b1, b3); + Message msg2(b2, b4); + Message msg3(b1, nullptr); + Message msg4(b2, nullptr); + + ASSERT_TRUE(msg1.Equals(msg2)); + ASSERT_TRUE(msg3.Equals(msg4)); + + ASSERT_FALSE(msg1.Equals(msg3)); + ASSERT_FALSE(msg3.Equals(msg1)); + + // same metadata as msg1, different body + Message msg5(b2, b1); + ASSERT_FALSE(msg1.Equals(msg5)); + ASSERT_FALSE(msg5.Equals(msg1)); +} + const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>(); TEST_F(TestSchemaMetadata, PrimitiveFields) { @@ -123,16 +149,12 @@ class IpcTestFixture : public io::MemoryMapFixture { RETURN_NOT_OK(WriteRecordBatch( batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_)); - std::shared_ptr<Message> message; + std::unique_ptr<Message> message; RETURN_NOT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); - // The buffer offsets start at 0, so we must construct a - // RandomAccessFile according to that frame of reference - std::shared_ptr<Buffer> buffer_payload; - RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length, &buffer_payload)); - io::BufferReader buffer_reader(buffer_payload); - - return ReadRecordBatch(*message, batch.schema(), &buffer_reader, batch_result); + io::BufferReader buffer_reader(message->body()); + return ReadRecordBatch( + *message->metadata(), batch.schema(), &buffer_reader, batch_result); } Status DoLargeRoundTrip( @@ -151,7 +173,7 @@ class IpcTestFixture : public io::MemoryMapFixture { std::shared_ptr<RecordBatchFileReader> file_reader; RETURN_NOT_OK(RecordBatchFileReader::Open(mmap_, offset, &file_reader)); - return file_reader->GetRecordBatch(0, result); + return file_reader->ReadRecordBatch(0, result); } void CheckReadResult(const RecordBatch& result, const RecordBatch& expected) { @@ -225,7 +247,7 @@ TEST_F(TestIpcRoundTrip, MetadataVersion) { ASSERT_OK(WriteRecordBatch( *batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_)); - std::shared_ptr<Message> message; + std::unique_ptr<Message> message; ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); ASSERT_EQ(MetadataVersion::V3, message->metadata_version()); @@ -434,16 +456,13 @@ TEST_F(RecursionLimits, ReadLimit) { ASSERT_OK(WriteToMmap( recursion_depth, true, &metadata_length, &body_length, &batch, &schema)); - std::shared_ptr<Message> message; + std::unique_ptr<Message> message; ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); - std::shared_ptr<Buffer> payload; - ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload)); - - io::BufferReader reader(payload); + io::BufferReader reader(message->body()); std::shared_ptr<RecordBatch> result; - ASSERT_RAISES(Invalid, ReadRecordBatch(*message, schema, &reader, &result)); + ASSERT_RAISES(Invalid, ReadRecordBatch(*message->metadata(), schema, &reader, &result)); } TEST_F(RecursionLimits, StressLimit) { @@ -455,16 +474,13 @@ TEST_F(RecursionLimits, StressLimit) { ASSERT_OK(WriteToMmap( recursion_depth, true, &metadata_length, &body_length, &batch, &schema)); - std::shared_ptr<Message> message; + std::unique_ptr<Message> message; ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); - std::shared_ptr<Buffer> payload; - ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload)); - - io::BufferReader reader(payload); - + io::BufferReader reader(message->body()); std::shared_ptr<RecordBatch> result; - ASSERT_OK(ReadRecordBatch(*message, schema, recursion_depth + 1, &reader, &result)); + ASSERT_OK(ReadRecordBatch( + *message->metadata(), schema, recursion_depth + 1, &reader, &result)); *it_works = result->Equals(*batch); }; @@ -511,7 +527,7 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> { EXPECT_EQ(num_batches, reader->num_record_batches()); for (int i = 0; i < num_batches; ++i) { std::shared_ptr<RecordBatch> chunk; - RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk)); + RETURN_NOT_OK(reader->ReadRecordBatch(i, &chunk)); out_batches->emplace_back(chunk); } @@ -571,7 +587,7 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> { std::shared_ptr<RecordBatch> chunk; while (true) { - RETURN_NOT_OK(reader->GetNextRecordBatch(&chunk)); + RETURN_NOT_OK(reader->ReadNextRecordBatch(&chunk)); if (chunk == nullptr) { break; } out_batches->emplace_back(chunk); } http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/json-integration-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc index 424755a..18f5dfa 100644 --- a/cpp/src/arrow/ipc/json-integration-test.cc +++ b/cpp/src/arrow/ipc/json-integration-test.cc @@ -82,7 +82,7 @@ static Status ConvertJsonToArrow( for (int i = 0; i < reader->num_record_batches(); ++i) { std::shared_ptr<RecordBatch> batch; - RETURN_NOT_OK(reader->GetRecordBatch(i, &batch)); + RETURN_NOT_OK(reader->ReadRecordBatch(i, &batch)); RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); } return writer->Close(); @@ -109,7 +109,7 @@ static Status ConvertArrowToJson( for (int i = 0; i < reader->num_record_batches(); ++i) { std::shared_ptr<RecordBatch> batch; - RETURN_NOT_OK(reader->GetRecordBatch(i, &batch)); + RETURN_NOT_OK(reader->ReadRecordBatch(i, &batch)); RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); } @@ -168,8 +168,8 @@ static Status ValidateArrowVsJson( std::shared_ptr<RecordBatch> arrow_batch; std::shared_ptr<RecordBatch> json_batch; for (int i = 0; i < json_nbatches; ++i) { - RETURN_NOT_OK(json_reader->GetRecordBatch(i, &json_batch)); - RETURN_NOT_OK(arrow_reader->GetRecordBatch(i, &arrow_batch)); + RETURN_NOT_OK(json_reader->ReadRecordBatch(i, &json_batch)); + RETURN_NOT_OK(arrow_reader->ReadRecordBatch(i, &arrow_batch)); if (!json_batch->ApproxEquals(*arrow_batch)) { std::stringstream ss; http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/json.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json.cc b/cpp/src/arrow/ipc/json.cc index f8c0b62..36e343e 100644 --- a/cpp/src/arrow/ipc/json.cc +++ b/cpp/src/arrow/ipc/json.cc @@ -115,7 +115,7 @@ class JsonReader::JsonReaderImpl { return Status::OK(); } - Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const { + Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const { DCHECK_GE(i, 0) << "i out of bounds"; DCHECK_LT(i, static_cast<int>(record_batches_->GetArray().Size())) << "i out of bounds"; @@ -164,8 +164,8 @@ int JsonReader::num_record_batches() const { return impl_->num_record_batches(); } -Status JsonReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const { - return impl_->GetRecordBatch(i, batch); +Status JsonReader::ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const { + return impl_->ReadRecordBatch(i, batch); } } // namespace ipc http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/json.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/json.h b/cpp/src/arrow/ipc/json.h index ad94def..2ba27c7 100644 --- a/cpp/src/arrow/ipc/json.h +++ b/cpp/src/arrow/ipc/json.h @@ -72,7 +72,7 @@ class ARROW_EXPORT JsonReader { int num_record_batches() const; // Read a record batch from the file - Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const; + Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) const; private: JsonReader(MemoryPool* pool, const std::shared_ptr<Buffer>& data); http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/metadata.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc index 54f0547..5b2ca3b 100644 --- a/cpp/src/arrow/ipc/metadata.cc +++ b/cpp/src/arrow/ipc/metadata.cc @@ -17,6 +17,7 @@ #include "arrow/ipc/metadata.h" +#include <algorithm> #include <cstdint> #include <memory> #include <sstream> @@ -834,11 +835,12 @@ Status DictionaryMemo::AddDictionary( class Message::MessageImpl { public: - explicit MessageImpl(const std::shared_ptr<Buffer>& buffer, int64_t offset) - : buffer_(buffer), offset_(offset), message_(nullptr) {} + explicit MessageImpl( + const std::shared_ptr<Buffer>& metadata, const std::shared_ptr<Buffer>& body) + : metadata_(metadata), message_(nullptr), body_(body) {} Status Open() { - message_ = flatbuf::GetMessage(buffer_->data() + offset_); + message_ = flatbuf::GetMessage(metadata_->data()); // Check that the metadata version is supported if (message_->version() < kMinMetadataVersion) { @@ -872,7 +874,7 @@ class Message::MessageImpl { // Arrow 0.2 return MetadataVersion::V2; case flatbuf::MetadataVersion_V3: - // Arrow 0.3 + // Arrow >= 0.3 return MetadataVersion::V3; // Add cases as other versions become available default: @@ -882,28 +884,38 @@ class Message::MessageImpl { const void* header() const { return message_->header(); } - int64_t body_length() const { return message_->bodyLength(); } + std::shared_ptr<Buffer> body() const { return body_; } - private: - // Retain reference to memory - std::shared_ptr<Buffer> buffer_; - int64_t offset_; + std::shared_ptr<Buffer> metadata() const { return metadata_; } + private: + // The Flatbuffer metadata + std::shared_ptr<Buffer> metadata_; const flatbuf::Message* message_; + + // The message body, if any + std::shared_ptr<Buffer> body_; }; -Message::Message(const std::shared_ptr<Buffer>& buffer, int64_t offset) { - impl_.reset(new MessageImpl(buffer, offset)); +Message::Message( + const std::shared_ptr<Buffer>& metadata, const std::shared_ptr<Buffer>& body) { + impl_.reset(new MessageImpl(metadata, body)); +} + +Status Message::Open(const std::shared_ptr<Buffer>& metadata, + const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out) { + out->reset(new Message(metadata, body)); + return (*out)->impl_->Open(); } Message::~Message() {} -Status Message::Open(const std::shared_ptr<Buffer>& buffer, int64_t offset, - std::shared_ptr<Message>* out) { - // ctor is private +std::shared_ptr<Buffer> Message::body() const { + return impl_->body(); +} - *out = std::shared_ptr<Message>(new Message(buffer, offset)); - return (*out)->impl_->Open(); +std::shared_ptr<Buffer> Message::metadata() const { + return impl_->metadata(); } Message::Type Message::type() const { @@ -914,14 +926,64 @@ MetadataVersion Message::metadata_version() const { return impl_->version(); } -int64_t Message::body_length() const { - return impl_->body_length(); -} - const void* Message::header() const { return impl_->header(); } +bool Message::Equals(const Message& other) const { + int64_t metadata_bytes = std::min(metadata()->size(), other.metadata()->size()); + + if (!metadata()->Equals(*other.metadata(), metadata_bytes)) { + return false; + } + + // Compare bodies, if they have them + auto this_body = body(); + auto other_body = other.body(); + + const bool this_has_body = (this_body != nullptr) && (this_body->size() > 0); + const bool other_has_body = (other_body != nullptr) && (other_body->size() > 0); + + if (this_has_body && other_has_body) { + return this_body->Equals(*other_body); + } else if (this_has_body ^ other_has_body) { + // One has a body but not the other + return false; + } else { + // Neither has a body + return true; + } +} + +Status Message::SerializeTo(io::OutputStream* file, int64_t* output_length) const { + int32_t metadata_length = 0; + RETURN_NOT_OK(WriteMessage(*metadata(), file, &metadata_length)); + + *output_length = metadata_length; + + auto body_buffer = body(); + if (body_buffer) { + RETURN_NOT_OK(file->Write(body_buffer->data(), body_buffer->size())); + *output_length += body_buffer->size(); + } + + return Status::OK(); +} + +std::string FormatMessageType(Message::Type type) { + switch (type) { + case Message::SCHEMA: + return "schema"; + case Message::RECORD_BATCH: + return "record batch"; + case Message::DICTIONARY_BATCH: + return "dictionary"; + default: + break; + } + return "unknown"; +} + // ---------------------------------------------------------------------- static Status VisitField(const flatbuf::Field* field, DictionaryTypeMap* id_to_field) { @@ -975,10 +1037,11 @@ Status GetSchema(const void* opaque_schema, const DictionaryMemo& dictionary_mem return Status::OK(); } -Status GetTensorMetadata(const void* opaque_tensor, std::shared_ptr<DataType>* type, +Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type, std::vector<int64_t>* shape, std::vector<int64_t>* strides, std::vector<std::string>* dim_names) { - auto tensor = static_cast<const flatbuf::Tensor*>(opaque_tensor); + auto message = flatbuf::GetMessage(metadata.data()); + auto tensor = reinterpret_cast<const flatbuf::Tensor*>(message->header()); int ndim = static_cast<int>(tensor->shape()->size()); @@ -1006,8 +1069,27 @@ Status GetTensorMetadata(const void* opaque_tensor, std::shared_ptr<DataType>* t // ---------------------------------------------------------------------- // Read and write messages +static Status ReadFullMessage(const std::shared_ptr<Buffer>& metadata, + io::InputStream* stream, std::unique_ptr<Message>* message) { + auto fb_message = flatbuf::GetMessage(metadata->data()); + + int64_t body_length = fb_message->bodyLength(); + + std::shared_ptr<Buffer> body; + RETURN_NOT_OK(stream->Read(body_length, &body)); + + if (body->size() < body_length) { + std::stringstream ss; + ss << "Expected to be able to read " << body_length << " bytes for message body, got " + << body->size(); + return Status::IOError(ss.str()); + } + + return Message::Open(metadata, body, message); +} + Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file, - std::shared_ptr<Message>* message) { + std::unique_ptr<Message>* message) { std::shared_ptr<Buffer> buffer; RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer)); @@ -1019,13 +1101,15 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile << ", metadata length: " << metadata_length; return Status::Invalid(ss.str()); } - return Message::Open(buffer, 4, message); + + auto metadata = SliceBuffer(buffer, 4, buffer->size() - 4); + return ReadFullMessage(metadata, file, message); } -Status ReadMessage(io::InputStream* file, std::shared_ptr<Message>* message) { +Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) { std::shared_ptr<Buffer> buffer; - RETURN_NOT_OK(file->Read(sizeof(int32_t), &buffer)); + RETURN_NOT_OK(file->Read(sizeof(int32_t), &buffer)); if (buffer->size() != sizeof(int32_t)) { *message = nullptr; return Status::OK(); @@ -1044,9 +1128,21 @@ Status ReadMessage(io::InputStream* file, std::shared_ptr<Message>* message) { return Status::IOError("Unexpected end of stream trying to read message"); } - return Message::Open(buffer, 0, message); + return ReadFullMessage(buffer, file, message); +} + +// ---------------------------------------------------------------------- +// Implement InputStream message reader + +Status InputStreamMessageReader::ReadNextMessage(std::unique_ptr<Message>* message) { + return ReadMessage(stream_.get(), message); } +InputStreamMessageReader::~InputStreamMessageReader() {} + +// ---------------------------------------------------------------------- +// Implement message writing + Status WriteMessage( const Buffer& message, io::OutputStream* file, int32_t* message_length) { // Need to write 4 bytes (message size), the message, plus padding to http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/metadata.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h index 257bbd8..64b2571 100644 --- a/cpp/src/arrow/ipc/metadata.h +++ b/cpp/src/arrow/ipc/metadata.h @@ -59,26 +59,12 @@ static constexpr const char* kArrowMagicBytes = "ARROW1"; constexpr int kMaxNestingDepth = 64; struct ARROW_EXPORT FieldMetadata { - FieldMetadata() {} - FieldMetadata(int64_t length, int64_t null_count, int64_t offset) - : length(length), null_count(null_count), offset(offset) {} - - FieldMetadata(const FieldMetadata& other) { - this->length = other.length; - this->null_count = other.null_count; - this->offset = other.offset; - } - int64_t length; int64_t null_count; int64_t offset; }; struct ARROW_EXPORT BufferMetadata { - BufferMetadata() {} - BufferMetadata(int32_t page, int64_t offset, int64_t length) - : page(page), offset(offset), length(length) {} - /// The shared memory page id where to find this. Set to -1 if unused int32_t page; @@ -90,10 +76,6 @@ struct ARROW_EXPORT BufferMetadata { }; struct FileBlock { - FileBlock() {} - FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length) - : offset(offset), metadata_length(metadata_length), body_length(body_length) {} - int64_t offset; int32_t metadata_length; int64_t body_length; @@ -153,20 +135,46 @@ Status GetDictionaryTypes(const void* opaque_schema, DictionaryTypeMap* id_to_fi Status ARROW_EXPORT GetSchema(const void* opaque_schema, const DictionaryMemo& dictionary_memo, std::shared_ptr<Schema>* out); -Status ARROW_EXPORT GetTensorMetadata(const void* opaque_tensor, +Status ARROW_EXPORT GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type, std::vector<int64_t>* shape, std::vector<int64_t>* strides, std::vector<std::string>* dim_names); +/// \brief An IPC message including metadata and body class ARROW_EXPORT Message { public: enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH, TENSOR }; - ~Message(); + /// \brief Construct message, but do not validate + /// + /// Use at your own risk; Message::Open has more metadata validation + Message(const std::shared_ptr<Buffer>& metadata, const std::shared_ptr<Buffer>& body); - static Status Open(const std::shared_ptr<Buffer>& buffer, int64_t offset, - std::shared_ptr<Message>* out); + ~Message(); - int64_t body_length() const; + /// \brief Create and validate a Message instance from two buffers + /// + /// \param[in] metadata a buffer containing the Flatbuffer metadata + /// \param[in] body a buffer containing the message body, which may be nullptr + /// \param[out] out the created message + static Status Open(const std::shared_ptr<Buffer>& metadata, + const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out); + + /// \brief Write length-prefixed metadata and body to output stream + /// + /// \param[in] file output stream to write to + /// \param[out] output_length the number of bytes written + /// \return Status + bool Equals(const Message& other) const; + + /// \brief the Message metadata + /// + /// \return buffer + std::shared_ptr<Buffer> metadata() const; + + /// \brief the Message body, if any + /// + /// \return buffer is nullptr if no body + std::shared_ptr<Buffer> body() const; Type type() const; @@ -174,9 +182,14 @@ class ARROW_EXPORT Message { const void* header() const; - private: - Message(const std::shared_ptr<Buffer>& buffer, int64_t offset); + /// \brief Write length-prefixed metadata and body to output stream + /// + /// \param[in] file output stream to write to + /// \param[out] output_length the number of bytes written + /// \return Status + Status SerializeTo(io::OutputStream* file, int64_t* output_length) const; + private: // Hide serialization details from user API class MessageImpl; std::unique_ptr<MessageImpl> impl_; @@ -184,8 +197,34 @@ class ARROW_EXPORT Message { DISALLOW_COPY_AND_ASSIGN(Message); }; +ARROW_EXPORT std::string FormatMessageType(Message::Type type); + +/// \brief Abstract interface for a sequence of messages +class ARROW_EXPORT MessageReader { + public: + virtual ~MessageReader() = default; + + virtual Status ReadNextMessage(std::unique_ptr<Message>* message) = 0; +}; + +class ARROW_EXPORT InputStreamMessageReader : public MessageReader { + public: + explicit InputStreamMessageReader(const std::shared_ptr<io::InputStream>& stream) + : stream_(stream) {} + + ~InputStreamMessageReader(); + + Status ReadNextMessage(std::unique_ptr<Message>* message) override; + + private: + std::shared_ptr<io::InputStream> stream_; +}; + +/// \brief Read encapulated RPC message from position in file +/// /// Read a length-prefixed message flatbuffer starting at the indicated file -/// offset +/// offset. If the message has a body with non-zero length, it will also be +/// read /// /// The metadata_length includes at least the length prefix and the flatbuffer /// @@ -196,15 +235,18 @@ class ARROW_EXPORT Message { /// \param[out] message the message read /// \return Status success or failure Status ARROW_EXPORT ReadMessage(int64_t offset, int32_t metadata_length, - io::RandomAccessFile* file, std::shared_ptr<Message>* message); + io::RandomAccessFile* file, std::unique_ptr<Message>* message); +/// \brief Read encapulated RPC message (metadata and body) from InputStream +/// /// Read length-prefixed message with as-yet unknown length. Returns nullptr if /// there are not enough bytes available or the message length is 0 (e.g. EOS /// in a stream) Status ARROW_EXPORT ReadMessage( - io::InputStream* stream, std::shared_ptr<Message>* message); + io::InputStream* stream, std::unique_ptr<Message>* message); -/// Write a serialized message with a length-prefix and padding to an 8-byte offset +/// Write a serialized message metadata with a length-prefix and padding to an +/// 8-byte offset /// /// <message_size: int32><message: const void*><padding> Status ARROW_EXPORT WriteMessage( http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/reader.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 8ca4d82..88ab330 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -257,11 +257,18 @@ static Status LoadArray(const std::shared_ptr<DataType>& type, return loader.Load(); } -Status ReadRecordBatch(const Message& metadata, const std::shared_ptr<Schema>& schema, +Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) { return ReadRecordBatch(metadata, schema, kMaxNestingDepth, file, out); } +Status ReadRecordBatch(const Message& message, const std::shared_ptr<Schema>& schema, + std::shared_ptr<RecordBatch>* out) { + io::BufferReader reader(message.body()); + DCHECK_EQ(message.type(), Message::RECORD_BATCH); + return ReadRecordBatch(*message.metadata(), schema, kMaxNestingDepth, &reader, out); +} + // ---------------------------------------------------------------------- // Array loading @@ -294,18 +301,22 @@ static inline Status ReadRecordBatch(const flatbuf::RecordBatch* metadata, schema, metadata->length(), max_recursion_depth, &source, out); } -Status ReadRecordBatch(const Message& metadata, const std::shared_ptr<Schema>& schema, +Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema, int max_recursion_depth, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) { - DCHECK_EQ(metadata.type(), Message::RECORD_BATCH); - auto batch = reinterpret_cast<const flatbuf::RecordBatch*>(metadata.header()); + auto message = flatbuf::GetMessage(metadata.data()); + if (message->header_type() != flatbuf::MessageHeader_RecordBatch) { + DCHECK_EQ(message->header_type(), flatbuf::MessageHeader_RecordBatch); + } + auto batch = reinterpret_cast<const flatbuf::RecordBatch*>(message->header()); return ReadRecordBatch(batch, schema, max_recursion_depth, file, out); } -Status ReadDictionary(const Message& metadata, const DictionaryTypeMap& dictionary_types, +Status ReadDictionary(const Buffer& metadata, const DictionaryTypeMap& dictionary_types, io::RandomAccessFile* file, int64_t* dictionary_id, std::shared_ptr<Array>* out) { + auto message = flatbuf::GetMessage(metadata.data()); auto dictionary_batch = - reinterpret_cast<const flatbuf::DictionaryBatch*>(metadata.header()); + reinterpret_cast<const flatbuf::DictionaryBatch*>(message->header()); int64_t id = *dictionary_id = dictionary_batch->id(); auto it = dictionary_types.find(id); @@ -335,25 +346,33 @@ Status ReadDictionary(const Message& metadata, const DictionaryTypeMap& dictiona return Status::OK(); } +static Status ReadMessageAndValidate(MessageReader* reader, Message::Type expected_type, + bool allow_null, std::unique_ptr<Message>* message) { + RETURN_NOT_OK(reader->ReadNextMessage(message)); + + if (!(*message) && !allow_null) { + std::stringstream ss; + ss << "Expected " << FormatMessageType(expected_type) + << " message in stream, was null or length 0"; + return Status::Invalid(ss.str()); + } + + if ((*message) == nullptr) { return Status::OK(); } + + if ((*message)->type() != expected_type) { + std::stringstream ss; + ss << "Message not expected type: " << FormatMessageType(expected_type) + << ", was: " << (*message)->type(); + return Status::IOError(ss.str()); + } + return Status::OK(); +} + // ---------------------------------------------------------------------- // RecordBatchStreamReader implementation static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) { - return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength()); -} - -static inline std::string FormatMessageType(Message::Type type) { - switch (type) { - case Message::SCHEMA: - return "schema"; - case Message::RECORD_BATCH: - return "record batch"; - case Message::DICTIONARY_BATCH: - return "dictionary"; - default: - break; - } - return "unknown"; + return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()}; } RecordBatchReader::~RecordBatchReader() {} @@ -363,59 +382,29 @@ class RecordBatchStreamReader::RecordBatchStreamReaderImpl { RecordBatchStreamReaderImpl() {} ~RecordBatchStreamReaderImpl() {} - Status Open(const std::shared_ptr<io::InputStream>& stream) { - stream_ = stream; + Status Open(std::unique_ptr<MessageReader> message_reader) { + message_reader_ = std::move(message_reader); return ReadSchema(); } - Status ReadNextMessage( - Message::Type expected_type, bool allow_null, std::shared_ptr<Message>* message) { - RETURN_NOT_OK(ReadMessage(stream_.get(), message)); - - if (!(*message) && !allow_null) { - std::stringstream ss; - ss << "Expected " << FormatMessageType(expected_type) - << " message in stream, was null or length 0"; - return Status::Invalid(ss.str()); - } - - if ((*message) == nullptr) { return Status::OK(); } - - if ((*message)->type() != expected_type) { - std::stringstream ss; - ss << "Message not expected type: " << FormatMessageType(expected_type) - << ", was: " << (*message)->type(); - return Status::IOError(ss.str()); - } - return Status::OK(); - } - - Status ReadExact(int64_t size, std::shared_ptr<Buffer>* buffer) { - RETURN_NOT_OK(stream_->Read(size, buffer)); - - if ((*buffer)->size() < size) { - return Status::IOError("Unexpected EOS when reading buffer"); - } - return Status::OK(); - } - Status ReadNextDictionary() { - std::shared_ptr<Message> message; - RETURN_NOT_OK(ReadNextMessage(Message::DICTIONARY_BATCH, false, &message)); + std::unique_ptr<Message> message; + RETURN_NOT_OK(ReadMessageAndValidate( + message_reader_.get(), Message::DICTIONARY_BATCH, false, &message)); - std::shared_ptr<Buffer> batch_body; - RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body)); - io::BufferReader reader(batch_body); + io::BufferReader reader(message->body()); std::shared_ptr<Array> dictionary; int64_t id; - RETURN_NOT_OK(ReadDictionary(*message, dictionary_types_, &reader, &id, &dictionary)); + RETURN_NOT_OK(ReadDictionary( + *message->metadata(), dictionary_types_, &reader, &id, &dictionary)); return dictionary_memo_.AddDictionary(id, dictionary); } Status ReadSchema() { - std::shared_ptr<Message> message; - RETURN_NOT_OK(ReadNextMessage(Message::SCHEMA, false, &message)); + std::unique_ptr<Message> message; + RETURN_NOT_OK( + ReadMessageAndValidate(message_reader_.get(), Message::SCHEMA, false, &message)); RETURN_NOT_OK(GetDictionaryTypes(message->header(), &dictionary_types_)); @@ -429,9 +418,10 @@ class RecordBatchStreamReader::RecordBatchStreamReaderImpl { return GetSchema(message->header(), dictionary_memo_, &schema_); } - Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) { - std::shared_ptr<Message> message; - RETURN_NOT_OK(ReadNextMessage(Message::RECORD_BATCH, true, &message)); + Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) { + std::unique_ptr<Message> message; + RETURN_NOT_OK(ReadMessageAndValidate( + message_reader_.get(), Message::RECORD_BATCH, true, &message)); if (message == nullptr) { // End of stream @@ -439,21 +429,18 @@ class RecordBatchStreamReader::RecordBatchStreamReaderImpl { return Status::OK(); } - std::shared_ptr<Buffer> batch_body; - RETURN_NOT_OK(ReadExact(message->body_length(), &batch_body)); - io::BufferReader reader(batch_body); - return ReadRecordBatch(*message, schema_, &reader, batch); + io::BufferReader reader(message->body()); + return ReadRecordBatch(*message->metadata(), schema_, &reader, batch); } std::shared_ptr<Schema> schema() const { return schema_; } private: + std::unique_ptr<MessageReader> message_reader_; + // dictionary_id -> type DictionaryTypeMap dictionary_types_; - DictionaryMemo dictionary_memo_; - - std::shared_ptr<io::InputStream> stream_; std::shared_ptr<Schema> schema_; }; @@ -463,19 +450,25 @@ RecordBatchStreamReader::RecordBatchStreamReader() { RecordBatchStreamReader::~RecordBatchStreamReader() {} -Status RecordBatchStreamReader::Open(const std::shared_ptr<io::InputStream>& stream, +Status RecordBatchStreamReader::Open(std::unique_ptr<MessageReader> message_reader, std::shared_ptr<RecordBatchStreamReader>* reader) { // Private ctor *reader = std::shared_ptr<RecordBatchStreamReader>(new RecordBatchStreamReader()); - return (*reader)->impl_->Open(stream); + return (*reader)->impl_->Open(std::move(message_reader)); +} + +Status RecordBatchStreamReader::Open(const std::shared_ptr<io::InputStream>& stream, + std::shared_ptr<RecordBatchStreamReader>* out) { + std::unique_ptr<MessageReader> message_reader(new InputStreamMessageReader(stream)); + return Open(std::move(message_reader), out); } std::shared_ptr<Schema> RecordBatchStreamReader::schema() const { return impl_->schema(); } -Status RecordBatchStreamReader::GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) { - return impl_->GetNextRecordBatch(batch); +Status RecordBatchStreamReader::ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) { + return impl_->ReadNextRecordBatch(batch); } // ---------------------------------------------------------------------- @@ -547,22 +540,17 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl { return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i)); } - Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) { + Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) { DCHECK_GE(i, 0); DCHECK_LT(i, num_record_batches()); FileBlock block = record_batch(i); - std::shared_ptr<Message> message; + std::unique_ptr<Message> message; RETURN_NOT_OK( ReadMessage(block.offset, block.metadata_length, file_.get(), &message)); - // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see - // ARROW-384). - std::shared_ptr<Buffer> buffer_block; - RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block)); - io::BufferReader reader(buffer_block); - - return ReadRecordBatch(*message, schema_, &reader, batch); + io::BufferReader reader(message->body()); + return ::arrow::ipc::ReadRecordBatch(*message->metadata(), schema_, &reader, batch); } Status ReadSchema() { @@ -571,23 +559,16 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl { // Read all the dictionaries for (int i = 0; i < num_dictionaries(); ++i) { FileBlock block = dictionary(i); - std::shared_ptr<Message> message; + std::unique_ptr<Message> message; RETURN_NOT_OK( ReadMessage(block.offset, block.metadata_length, file_.get(), &message)); - // TODO(wesm): ARROW-577: This code is a bit duplicated, can be fixed - // with a more invasive refactor - - // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see - // ARROW-384). - std::shared_ptr<Buffer> buffer_block; - RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block)); - io::BufferReader reader(buffer_block); + io::BufferReader reader(message->body()); std::shared_ptr<Array> dictionary; int64_t dictionary_id; - RETURN_NOT_OK(ReadDictionary( - *message, dictionary_fields_, &reader, &dictionary_id, &dictionary)); + RETURN_NOT_OK(ReadDictionary(*message->metadata(), dictionary_fields_, &reader, + &dictionary_id, &dictionary)); RETURN_NOT_OK(dictionary_memo_->AddDictionary(dictionary_id, dictionary)); } @@ -653,12 +634,13 @@ MetadataVersion RecordBatchFileReader::version() const { return impl_->version(); } -Status RecordBatchFileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) { - return impl_->GetRecordBatch(i, batch); +Status RecordBatchFileReader::ReadRecordBatch( + int i, std::shared_ptr<RecordBatch>* batch) { + return impl_->ReadRecordBatch(i, batch); } -static Status ReadContiguousPayload(int64_t offset, io::RandomAccessFile* file, - std::shared_ptr<Message>* message, std::shared_ptr<Buffer>* payload) { +static Status ReadContiguousPayload( + int64_t offset, io::RandomAccessFile* file, std::unique_ptr<Message>* message) { std::shared_ptr<Buffer> buffer; RETURN_NOT_OK(file->Seek(offset)); RETURN_NOT_OK(ReadMessage(file, message)); @@ -666,38 +648,32 @@ static Status ReadContiguousPayload(int64_t offset, io::RandomAccessFile* file, if (*message == nullptr) { return Status::Invalid("Unable to read metadata at offset"); } - - // TODO(ARROW-388): The buffer offsets start at 0, so we must construct a - // RandomAccessFile according to that frame of reference - RETURN_NOT_OK(file->Read((*message)->body_length(), payload)); return Status::OK(); } Status ReadRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) { - std::shared_ptr<Buffer> payload; - std::shared_ptr<Message> message; - - RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message, &payload)); - io::BufferReader buffer_reader(payload); - return ReadRecordBatch(*message, schema, kMaxNestingDepth, &buffer_reader, out); + std::unique_ptr<Message> message; + RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message)); + io::BufferReader buffer_reader(message->body()); + return ReadRecordBatch( + *message->metadata(), schema, kMaxNestingDepth, &buffer_reader, out); } Status ReadTensor( int64_t offset, io::RandomAccessFile* file, std::shared_ptr<Tensor>* out) { // Respect alignment of Tensor messages (see WriteTensor) offset = PaddedLength(offset); - std::shared_ptr<Message> message; - std::shared_ptr<Buffer> data; - RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message, &data)); + std::unique_ptr<Message> message; + RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message)); std::shared_ptr<DataType> type; std::vector<int64_t> shape; std::vector<int64_t> strides; std::vector<std::string> dim_names; RETURN_NOT_OK( - GetTensorMetadata(message->header(), &type, &shape, &strides, &dim_names)); - *out = std::make_shared<Tensor>(type, data, shape, strides, dim_names); + GetTensorMetadata(*message->metadata(), &type, &shape, &strides, &dim_names)); + *out = std::make_shared<Tensor>(type, message->body(), shape, strides, dim_names); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/reader.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index dd29a36..d6c2614 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -57,7 +57,7 @@ class ARROW_EXPORT RecordBatchReader { /// /// \param(out) batch the next loaded batch, nullptr at end of stream /// \return Status - virtual Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) = 0; + virtual Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) = 0; }; /// \class RecordBatchStreamReader @@ -66,16 +66,24 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader { public: virtual ~RecordBatchStreamReader(); - /// Create batch reader from InputStream + /// Create batch reader from generic MessageReader + /// + /// \param(in) message_reader a MessageReader implementation + /// \param(out) out the created RecordBatchStreamReader object + /// \return Status + static Status Open(std::unique_ptr<MessageReader> message_reader, + std::shared_ptr<RecordBatchStreamReader>* out); + + /// \Create Record batch stream reader from InputStream /// /// \param(in) stream an input stream instance - /// \param(out) reader the created reader object + /// \param(out) out the created RecordBatchStreamReader object /// \return Status static Status Open(const std::shared_ptr<io::InputStream>& stream, - std::shared_ptr<RecordBatchStreamReader>* reader); + std::shared_ptr<RecordBatchStreamReader>* out); std::shared_ptr<Schema> schema() const override; - Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) override; + Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) override; private: RecordBatchStreamReader(); @@ -122,7 +130,7 @@ class ARROW_EXPORT RecordBatchFileReader { /// \param(in) i the index of the record batch to return /// \param(out) batch the read batch /// \return Status - Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch); + Status ReadRecordBatch(int i, std::shared_ptr<RecordBatch>* batch); private: RecordBatchFileReader(); @@ -133,16 +141,25 @@ class ARROW_EXPORT RecordBatchFileReader { // Generic read functions; does not copy data if the input supports zero copy reads -/// Read record batch from file given metadata and schema +/// \brief Read record batch from file given metadata and schema /// /// \param(in) metadata a Message containing the record batch metadata /// \param(in) schema the record batch schema /// \param(in) file a random access file /// \param(out) out the read record batch -Status ARROW_EXPORT ReadRecordBatch(const Message& metadata, +Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out); +/// \brief Read record batch from fully encapulated Message +/// +/// \param[in] message a message instance containing metadata and body +/// \param[in] schema +/// \param[out] out the resulting RecordBatch +/// \return Status +Status ARROW_EXPORT ReadRecordBatch(const Message& message, + const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out); + /// Read record batch from file given metadata and schema /// /// \param(in) metadata a Message containing the record batch metadata @@ -150,7 +167,7 @@ Status ARROW_EXPORT ReadRecordBatch(const Message& metadata, /// \param(in) file a random access file /// \param(in) max_recursion_depth the maximum permitted nesting depth /// \param(out) out the read record batch -Status ARROW_EXPORT ReadRecordBatch(const Message& metadata, +Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema, int max_recursion_depth, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out); http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/stream-to-file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/stream-to-file.cc b/cpp/src/arrow/ipc/stream-to-file.cc index b942054..de65883 100644 --- a/cpp/src/arrow/ipc/stream-to-file.cc +++ b/cpp/src/arrow/ipc/stream-to-file.cc @@ -40,7 +40,7 @@ Status ConvertToFile() { std::shared_ptr<RecordBatch> batch; while (true) { - RETURN_NOT_OK(reader->GetNextRecordBatch(&batch)); + RETURN_NOT_OK(reader->ReadNextRecordBatch(&batch)); if (batch == nullptr) break; RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); } http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/ipc/writer.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 7563343..14708a1 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -110,7 +110,7 @@ class RecordBatchSerializer : public ArrayVisitor { } // push back all common elements - field_nodes_.emplace_back(arr.length(), arr.null_count(), 0); + field_nodes_.push_back({arr.length(), arr.null_count(), 0}); if (arr.null_count() > 0) { std::shared_ptr<Buffer> bitmap; @@ -680,7 +680,7 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl { Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) { // Push an empty FileBlock. Can be written in the footer later - record_batches_.emplace_back(0, 0, 0); + record_batches_.push_back({0, 0, 0}); return WriteRecordBatch( batch, allow_64bit, &record_batches_[record_batches_.size() - 1]); } http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/cpp/src/arrow/python/builtin_convert.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/builtin_convert.cc b/cpp/src/arrow/python/builtin_convert.cc index f10dac7..816f95a 100644 --- a/cpp/src/arrow/python/builtin_convert.cc +++ b/cpp/src/arrow/python/builtin_convert.cc @@ -477,9 +477,8 @@ class FixedWidthBytesConverter inline Status AppendItem(const OwnedRef& item) { PyObject* bytes_obj; OwnedRef tmp; - Py_ssize_t expected_length = - std::dynamic_pointer_cast<FixedSizeBinaryType>(typed_builder_->type()) - ->byte_width(); + Py_ssize_t expected_length = std::dynamic_pointer_cast<FixedSizeBinaryType>( + typed_builder_->type())->byte_width(); if (item.obj() == Py_None) { RETURN_NOT_OK(typed_builder_->AppendNull()); return Status::OK(); http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/python/doc/source/api.rst ---------------------------------------------------------------------- diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst index 4810a31..400614d 100644 --- a/python/doc/source/api.rst +++ b/python/doc/source/api.rst @@ -137,7 +137,6 @@ Tables and Record Batches Column RecordBatch Table - get_record_batch_size .. _api.tensor: @@ -148,9 +147,6 @@ Tensor type and Functions :toctree: generated/ Tensor - write_tensor - get_tensor_size - read_tensor .. _api.io: @@ -177,12 +173,20 @@ Interprocess Communication and Messaging .. autosummary:: :toctree: generated/ + Message + MessageReader RecordBatchFileReader RecordBatchFileWriter RecordBatchStreamReader RecordBatchStreamWriter open_file open_stream + read_message + read_record_batch + get_record_batch_size + read_tensor + write_tensor + get_tensor_size .. _api.memory_pool: http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/python/pyarrow/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 434722c..f7cddd0 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -69,9 +69,8 @@ from pyarrow.lib import (null, bool_, from pyarrow.lib import (HdfsFile, NativeFile, PythonFile, Buffer, BufferReader, BufferOutputStream, OSFile, MemoryMappedFile, memory_map, - frombuffer, read_tensor, write_tensor, + frombuffer, memory_map, create_memory_map, - get_record_batch_size, get_tensor_size, have_libhdfs, have_libhdfs3, MockOutputStream) from pyarrow.lib import (MemoryPool, total_allocated_bytes, @@ -89,8 +88,12 @@ from pyarrow.lib import (ArrowException, from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem -from pyarrow.ipc import (RecordBatchFileReader, RecordBatchFileWriter, +from pyarrow.ipc import (Message, MessageReader, + RecordBatchFileReader, RecordBatchFileWriter, RecordBatchStreamReader, RecordBatchStreamWriter, + read_message, read_record_batch, read_tensor, + write_tensor, + get_record_batch_size, get_tensor_size, open_stream, open_file, serialize_pandas, deserialize_pandas) http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/python/pyarrow/feather.pxi ---------------------------------------------------------------------- diff --git a/python/pyarrow/feather.pxi b/python/pyarrow/feather.pxi new file mode 100644 index 0000000..2e7cf6c --- /dev/null +++ b/python/pyarrow/feather.pxi @@ -0,0 +1,109 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#---------------------------------------------------------------------- +# Implement legacy Feather file format + + +class FeatherError(Exception): + pass + + +cdef class FeatherWriter: + cdef: + unique_ptr[CFeatherWriter] writer + + cdef public: + int64_t num_rows + + def __cinit__(self): + self.num_rows = -1 + + def open(self, object dest): + cdef shared_ptr[OutputStream] sink + get_writer(dest, &sink) + + with nogil: + check_status(CFeatherWriter.Open(sink, &self.writer)) + + def close(self): + if self.num_rows < 0: + self.num_rows = 0 + self.writer.get().SetNumRows(self.num_rows) + check_status(self.writer.get().Finalize()) + + def write_array(self, object name, object col, object mask=None): + cdef Array arr + + if self.num_rows >= 0: + if len(col) != self.num_rows: + raise ValueError('prior column had a different number of rows') + else: + self.num_rows = len(col) + + if isinstance(col, Array): + arr = col + else: + arr = Array.from_pandas(col, mask=mask) + + cdef c_string c_name = tobytes(name) + + with nogil: + check_status( + self.writer.get().Append(c_name, deref(arr.sp_array))) + + +cdef class FeatherReader: + cdef: + unique_ptr[CFeatherReader] reader + + def __cinit__(self): + pass + + def open(self, source): + cdef shared_ptr[RandomAccessFile] reader + get_reader(source, &reader) + + with nogil: + check_status(CFeatherReader.Open(reader, &self.reader)) + + property num_rows: + + def __get__(self): + return self.reader.get().num_rows() + + property num_columns: + + def __get__(self): + return self.reader.get().num_columns() + + def get_column_name(self, int i): + cdef c_string name = self.reader.get().GetColumnName(i) + return frombytes(name) + + def get_column(self, int i): + if i < 0 or i >= self.num_columns: + raise IndexError(i) + + cdef shared_ptr[CColumn] sp_column + with nogil: + check_status(self.reader.get() + .GetColumn(i, &sp_column)) + + cdef Column col = Column() + col.init(sp_column) + return col http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/python/pyarrow/includes/libarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 9fad824..dd791cd 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -546,41 +546,41 @@ cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil: int64_t GetExtentBytesWritten() -cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil: - cdef cppclass SchemaMessage: - int num_fields() - CStatus GetField(int i, shared_ptr[CField]* out) - CStatus GetSchema(shared_ptr[CSchema]* out) - - cdef cppclass FieldMetadata: - pass - - cdef cppclass BufferMetadata: - pass - - cdef cppclass RecordBatchMessage: - pass - - cdef cppclass DictionaryBatchMessage: - pass - +cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: enum MessageType" arrow::ipc::Message::Type": MessageType_SCHEMA" arrow::ipc::Message::SCHEMA" MessageType_RECORD_BATCH" arrow::ipc::Message::RECORD_BATCH" MessageType_DICTIONARY_BATCH" arrow::ipc::Message::DICTIONARY_BATCH" - cdef cppclass Message: - CStatus Open(const shared_ptr[CBuffer]& buf, - shared_ptr[Message]* out) - int64_t body_length() + enum MetadataVersion" arrow::ipc::MetadataVersion": + MessageType_V1" arrow::ipc::MetadataVersion::V1" + MessageType_V2" arrow::ipc::MetadataVersion::V2" + MessageType_V3" arrow::ipc::MetadataVersion::V3" + + cdef cppclass CMessage" arrow::ipc::Message": + CStatus Open(const shared_ptr[CBuffer]& metadata, + const shared_ptr[CBuffer]& body, + unique_ptr[CMessage]* out) + + shared_ptr[CBuffer] body() + + c_bool Equals(const CMessage& other) + + shared_ptr[CBuffer] metadata() + MetadataVersion metadata_version() MessageType type() - shared_ptr[SchemaMessage] GetSchema() - shared_ptr[RecordBatchMessage] GetRecordBatch() - shared_ptr[DictionaryBatchMessage] GetDictionaryBatch() + CStatus SerializeTo(OutputStream* stream, int64_t* output_length) + c_string FormatMessageType(MessageType type) -cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: + cdef cppclass CMessageReader \ + " arrow::ipc::MessageReader": + CStatus ReadNextMessage(unique_ptr[CMessage]* out) + + cdef cppclass CInputStreamMessageReader \ + " arrow::ipc::InputStreamMessageReader": + CInputStreamMessageReader(const shared_ptr[InputStream]& stream) cdef cppclass CRecordBatchWriter \ " arrow::ipc::RecordBatchWriter": @@ -590,7 +590,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: cdef cppclass CRecordBatchReader \ " arrow::ipc::RecordBatchReader": shared_ptr[CSchema] schema() - CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch) + CStatus ReadNextRecordBatch(shared_ptr[CRecordBatch]* batch) cdef cppclass CRecordBatchStreamReader \ " arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader): @@ -598,6 +598,10 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: CStatus Open(const shared_ptr[InputStream]& stream, shared_ptr[CRecordBatchStreamReader]* out) + @staticmethod + CStatus Open2" Open"(unique_ptr[CMessageReader] message_reader, + shared_ptr[CRecordBatchStreamReader]* out) + cdef cppclass CRecordBatchStreamWriter \ " arrow::ipc::RecordBatchStreamWriter"(CRecordBatchWriter): @staticmethod @@ -625,7 +629,9 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: int num_record_batches() - CStatus GetRecordBatch(int i, shared_ptr[CRecordBatch]* batch) + CStatus ReadRecordBatch(int i, shared_ptr[CRecordBatch]* batch) + + CStatus ReadMessage(InputStream* stream, unique_ptr[CMessage]* message) CStatus GetRecordBatchSize(const CRecordBatch& batch, int64_t* size) CStatus GetTensorSize(const CTensor& tensor, int64_t* size) @@ -637,6 +643,10 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: CStatus ReadTensor(int64_t offset, RandomAccessFile* file, shared_ptr[CTensor]* out) + CStatus ReadRecordBatch(const CMessage& message, + const shared_ptr[CSchema]& schema, + shared_ptr[CRecordBatch]* out) + cdef extern from "arrow/ipc/feather.h" namespace "arrow::ipc::feather" nogil: http://git-wip-us.apache.org/repos/asf/arrow/blob/bb0a7588/python/pyarrow/io.pxi ---------------------------------------------------------------------- diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index 3221185..8b213a3 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -956,356 +956,3 @@ cdef class HdfsFile(NativeFile): def __dealloc__(self): self.parent = None - -# ---------------------------------------------------------------------- -# File and stream readers and writers - -cdef class _RecordBatchWriter: - cdef: - shared_ptr[CRecordBatchWriter] writer - shared_ptr[OutputStream] sink - bint closed - - def __cinit__(self): - self.closed = True - - def __dealloc__(self): - if not self.closed: - self.close() - - def _open(self, sink, Schema schema): - cdef: - shared_ptr[CRecordBatchStreamWriter] writer - - get_writer(sink, &self.sink) - - with nogil: - check_status( - CRecordBatchStreamWriter.Open(self.sink.get(), - schema.sp_schema, - &writer)) - - self.writer = <shared_ptr[CRecordBatchWriter]> writer - self.closed = False - - def write_batch(self, RecordBatch batch): - with nogil: - check_status(self.writer.get() - .WriteRecordBatch(deref(batch.batch))) - - def close(self): - with nogil: - check_status(self.writer.get().Close()) - self.closed = True - - -cdef class _RecordBatchReader: - cdef: - shared_ptr[CRecordBatchReader] reader - - cdef readonly: - Schema schema - - def __cinit__(self): - pass - - def _open(self, source): - cdef: - shared_ptr[RandomAccessFile] file_handle - shared_ptr[InputStream] in_stream - shared_ptr[CRecordBatchStreamReader] reader - - get_reader(source, &file_handle) - in_stream = <shared_ptr[InputStream]> file_handle - - with nogil: - check_status(CRecordBatchStreamReader.Open(in_stream, &reader)) - - self.reader = <shared_ptr[CRecordBatchReader]> reader - self.schema = pyarrow_wrap_schema(self.reader.get().schema()) - - def get_next_batch(self): - """ - Read next RecordBatch from the stream. Raises StopIteration at end of - stream - """ - cdef shared_ptr[CRecordBatch] batch - - with nogil: - check_status(self.reader.get().GetNextRecordBatch(&batch)) - - if batch.get() == NULL: - raise StopIteration - - return pyarrow_wrap_batch(batch) - - def read_all(self): - """ - Read all record batches as a pyarrow.Table - """ - cdef: - vector[shared_ptr[CRecordBatch]] batches - shared_ptr[CRecordBatch] batch - shared_ptr[CTable] table - - with nogil: - while True: - check_status(self.reader.get().GetNextRecordBatch(&batch)) - if batch.get() == NULL: - break - batches.push_back(batch) - - check_status(CTable.FromRecordBatches(batches, &table)) - - return pyarrow_wrap_table(table) - - -cdef class _RecordBatchFileWriter(_RecordBatchWriter): - - def _open(self, sink, Schema schema): - cdef shared_ptr[CRecordBatchFileWriter] writer - get_writer(sink, &self.sink) - - with nogil: - check_status( - CRecordBatchFileWriter.Open(self.sink.get(), schema.sp_schema, - &writer)) - - # Cast to base class, because has same interface - self.writer = <shared_ptr[CRecordBatchWriter]> writer - self.closed = False - - -cdef class _RecordBatchFileReader: - cdef: - shared_ptr[CRecordBatchFileReader] reader - - cdef readonly: - Schema schema - - def __cinit__(self): - pass - - def _open(self, source, footer_offset=None): - cdef shared_ptr[RandomAccessFile] reader - get_reader(source, &reader) - - cdef int64_t offset = 0 - if footer_offset is not None: - offset = footer_offset - - with nogil: - if offset != 0: - check_status(CRecordBatchFileReader.Open2( - reader, offset, &self.reader)) - else: - check_status(CRecordBatchFileReader.Open(reader, &self.reader)) - - self.schema = pyarrow_wrap_schema(self.reader.get().schema()) - - property num_record_batches: - - def __get__(self): - return self.reader.get().num_record_batches() - - def get_batch(self, int i): - cdef shared_ptr[CRecordBatch] batch - - if i < 0 or i >= self.num_record_batches: - raise ValueError('Batch number {0} out of range'.format(i)) - - with nogil: - check_status(self.reader.get().GetRecordBatch(i, &batch)) - - return pyarrow_wrap_batch(batch) - - # TODO(wesm): ARROW-503: Function was renamed. Remove after a period of - # time has passed - get_record_batch = get_batch - - def read_all(self): - """ - Read all record batches as a pyarrow.Table - """ - cdef: - vector[shared_ptr[CRecordBatch]] batches - shared_ptr[CTable] table - int i, nbatches - - nbatches = self.num_record_batches - - batches.resize(nbatches) - with nogil: - for i in range(nbatches): - check_status(self.reader.get().GetRecordBatch(i, &batches[i])) - check_status(CTable.FromRecordBatches(batches, &table)) - - return pyarrow_wrap_table(table) - - -#---------------------------------------------------------------------- -# Implement legacy Feather file format - - -class FeatherError(Exception): - pass - - -cdef class FeatherWriter: - cdef: - unique_ptr[CFeatherWriter] writer - - cdef public: - int64_t num_rows - - def __cinit__(self): - self.num_rows = -1 - - def open(self, object dest): - cdef shared_ptr[OutputStream] sink - get_writer(dest, &sink) - - with nogil: - check_status(CFeatherWriter.Open(sink, &self.writer)) - - def close(self): - if self.num_rows < 0: - self.num_rows = 0 - self.writer.get().SetNumRows(self.num_rows) - check_status(self.writer.get().Finalize()) - - def write_array(self, object name, object col, object mask=None): - cdef Array arr - - if self.num_rows >= 0: - if len(col) != self.num_rows: - raise ValueError('prior column had a different number of rows') - else: - self.num_rows = len(col) - - if isinstance(col, Array): - arr = col - else: - arr = Array.from_pandas(col, mask=mask) - - cdef c_string c_name = tobytes(name) - - with nogil: - check_status( - self.writer.get().Append(c_name, deref(arr.sp_array))) - - -cdef class FeatherReader: - cdef: - unique_ptr[CFeatherReader] reader - - def __cinit__(self): - pass - - def open(self, source): - cdef shared_ptr[RandomAccessFile] reader - get_reader(source, &reader) - - with nogil: - check_status(CFeatherReader.Open(reader, &self.reader)) - - property num_rows: - - def __get__(self): - return self.reader.get().num_rows() - - property num_columns: - - def __get__(self): - return self.reader.get().num_columns() - - def get_column_name(self, int i): - cdef c_string name = self.reader.get().GetColumnName(i) - return frombytes(name) - - def get_column(self, int i): - if i < 0 or i >= self.num_columns: - raise IndexError(i) - - cdef shared_ptr[CColumn] sp_column - with nogil: - check_status(self.reader.get() - .GetColumn(i, &sp_column)) - - cdef Column col = Column() - col.init(sp_column) - return col - - -def get_tensor_size(Tensor tensor): - """ - Return total size of serialized Tensor including metadata and padding - """ - cdef int64_t size - with nogil: - check_status(GetTensorSize(deref(tensor.tp), &size)) - return size - - -def get_record_batch_size(RecordBatch batch): - """ - Return total size of serialized RecordBatch including metadata and padding - """ - cdef int64_t size - with nogil: - check_status(GetRecordBatchSize(deref(batch.batch), &size)) - return size - - -def write_tensor(Tensor tensor, NativeFile dest): - """ - Write pyarrow.Tensor to pyarrow.NativeFile object its current position - - Parameters - ---------- - tensor : pyarrow.Tensor - dest : pyarrow.NativeFile - - Returns - ------- - bytes_written : int - Total number of bytes written to the file - """ - cdef: - int32_t metadata_length - int64_t body_length - - dest._assert_writeable() - - with nogil: - check_status( - WriteTensor(deref(tensor.tp), dest.wr_file.get(), - &metadata_length, &body_length)) - - return metadata_length + body_length - - -def read_tensor(NativeFile source): - """ - Read pyarrow.Tensor from pyarrow.NativeFile object from current - position. If the file source supports zero copy (e.g. a memory map), then - this operation does not allocate any memory - - Parameters - ---------- - source : pyarrow.NativeFile - - Returns - ------- - tensor : Tensor - """ - cdef: - shared_ptr[CTensor] sp_tensor - - source._assert_readable() - - cdef int64_t offset = source.tell() - with nogil: - check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor)) - - return pyarrow_wrap_tensor(sp_tensor)