Repository: arrow Updated Branches: refs/heads/master d0cd03d78 -> d560e3077
ARROW-656: [C++] Add random access writer for a mutable buffer. Rename WriteableFileInterface to WriteableFile for better consistency Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #486 from wesm/ARROW-656 and squashes the following commits: be0d4bc [Wes McKinney] Fix glib after renaming class 042f533 [Wes McKinney] Add random access writer for a mutable buffer. Rename WriteableFileInterface to WriteableFile for better consistency Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/d560e307 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/d560e307 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/d560e307 Branch: refs/heads/master Commit: d560e307749a2397810962db1a5af4fb65675f17 Parents: d0cd03d Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Tue Apr 4 08:40:40 2017 +0200 Committer: Uwe L. Korn <uw...@xhochy.com> Committed: Tue Apr 4 08:40:40 2017 +0200 ---------------------------------------------------------------------- c_glib/arrow-glib/io-memory-mapped-file.cpp | 2 +- c_glib/arrow-glib/io-writeable-file.cpp | 2 +- c_glib/arrow-glib/io-writeable-file.h | 2 +- c_glib/arrow-glib/io-writeable-file.hpp | 8 ++--- cpp/src/arrow/io/interfaces.h | 6 ++-- cpp/src/arrow/io/io-memory-test.cc | 27 ++++++++++++++ cpp/src/arrow/io/memory.cc | 45 ++++++++++++++++++++++++ cpp/src/arrow/io/memory.h | 23 ++++++++++++ python/pyarrow/includes/libarrow.pxd | 4 +-- 9 files changed, 107 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/c_glib/arrow-glib/io-memory-mapped-file.cpp ---------------------------------------------------------------------- diff --git a/c_glib/arrow-glib/io-memory-mapped-file.cpp b/c_glib/arrow-glib/io-memory-mapped-file.cpp index 12c9a6c..e2e255c 100644 --- a/c_glib/arrow-glib/io-memory-mapped-file.cpp +++ b/c_glib/arrow-glib/io-memory-mapped-file.cpp @@ -127,7 +127,7 @@ garrow_io_writeable_interface_init(GArrowIOWriteableInterface *iface) iface->get_raw = garrow_io_memory_mapped_file_get_raw_writeable_interface; } -static std::shared_ptr<arrow::io::WriteableFileInterface> +static std::shared_ptr<arrow::io::WriteableFile> garrow_io_memory_mapped_file_get_raw_writeable_file_interface(GArrowIOWriteableFile *file) { auto memory_mapped_file = GARROW_IO_MEMORY_MAPPED_FILE(file); http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/c_glib/arrow-glib/io-writeable-file.cpp ---------------------------------------------------------------------- diff --git a/c_glib/arrow-glib/io-writeable-file.cpp b/c_glib/arrow-glib/io-writeable-file.cpp index 3de42dd..41b682a 100644 --- a/c_glib/arrow-glib/io-writeable-file.cpp +++ b/c_glib/arrow-glib/io-writeable-file.cpp @@ -76,7 +76,7 @@ garrow_io_writeable_file_write_at(GArrowIOWriteableFile *writeable_file, G_END_DECLS -std::shared_ptr<arrow::io::WriteableFileInterface> +std::shared_ptr<arrow::io::WriteableFile> garrow_io_writeable_file_get_raw(GArrowIOWriteableFile *writeable_file) { auto *iface = GARROW_IO_WRITEABLE_FILE_GET_IFACE(writeable_file); http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/c_glib/arrow-glib/io-writeable-file.h ---------------------------------------------------------------------- diff --git a/c_glib/arrow-glib/io-writeable-file.h b/c_glib/arrow-glib/io-writeable-file.h index 4a4dee5..d1ebdbe 100644 --- a/c_glib/arrow-glib/io-writeable-file.h +++ b/c_glib/arrow-glib/io-writeable-file.h @@ -28,7 +28,7 @@ G_BEGIN_DECLS #define GARROW_IO_WRITEABLE_FILE(obj) \ (G_TYPE_CHECK_INSTANCE_CAST((obj), \ GARROW_IO_TYPE_WRITEABLE_FILE, \ - GArrowIOWriteableFileInterface)) + GArrowIOWriteableFile)) #define GARROW_IO_IS_WRITEABLE_FILE(obj) \ (G_TYPE_CHECK_INSTANCE_TYPE((obj), \ GARROW_IO_TYPE_WRITEABLE_FILE)) http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/c_glib/arrow-glib/io-writeable-file.hpp ---------------------------------------------------------------------- diff --git a/c_glib/arrow-glib/io-writeable-file.hpp b/c_glib/arrow-glib/io-writeable-file.hpp index 2043007..aba95b2 100644 --- a/c_glib/arrow-glib/io-writeable-file.hpp +++ b/c_glib/arrow-glib/io-writeable-file.hpp @@ -24,15 +24,15 @@ #include <arrow-glib/io-writeable-file.h> /** - * GArrowIOWriteableFileInterface: + * GArrowIOWriteableFile: * - * It wraps `arrow::io::WriteableFileInterface`. + * It wraps `arrow::io::WriteableFile`. */ struct _GArrowIOWriteableFileInterface { GTypeInterface parent_iface; - std::shared_ptr<arrow::io::WriteableFileInterface> (*get_raw)(GArrowIOWriteableFile *file); + std::shared_ptr<arrow::io::WriteableFile> (*get_raw)(GArrowIOWriteableFile *file); }; -std::shared_ptr<arrow::io::WriteableFileInterface> garrow_io_writeable_file_get_raw(GArrowIOWriteableFile *writeable_file); +std::shared_ptr<arrow::io::WriteableFile> garrow_io_writeable_file_get_raw(GArrowIOWriteableFile *writeable_file); http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/cpp/src/arrow/io/interfaces.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index 258a315..b5a0bd8 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -121,16 +121,16 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable { RandomAccessFile(); }; -class ARROW_EXPORT WriteableFileInterface : public OutputStream, public Seekable { +class ARROW_EXPORT WriteableFile : public OutputStream, public Seekable { public: virtual Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) = 0; protected: - WriteableFileInterface() { set_mode(FileMode::READ); } + WriteableFile() { set_mode(FileMode::READ); } }; class ARROW_EXPORT ReadWriteFileInterface : public RandomAccessFile, - public WriteableFileInterface { + public WriteableFile { protected: ReadWriteFileInterface() { RandomAccessFile::set_mode(FileMode::READWRITE); } }; http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/cpp/src/arrow/io/io-memory-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc index 442cd0c..4704fe8 100644 --- a/cpp/src/arrow/io/io-memory-test.cc +++ b/cpp/src/arrow/io/io-memory-test.cc @@ -66,6 +66,33 @@ TEST_F(TestBufferOutputStream, CloseResizes) { ASSERT_EQ(static_cast<int64_t>(K * data.size()), buffer_->size()); } +TEST(TestFixedSizeBufferWriter, Basics) { + std::shared_ptr<MutableBuffer> buffer; + ASSERT_OK(AllocateBuffer(default_memory_pool(), 1024, &buffer)); + + FixedSizeBufferWriter writer(buffer); + + int64_t position; + ASSERT_OK(writer.Tell(&position)); + ASSERT_EQ(0, position); + + std::string data = "data123456"; + auto nbytes = static_cast<int64_t>(data.size()); + ASSERT_OK(writer.Write(reinterpret_cast<const uint8_t*>(data.c_str()), nbytes)); + + ASSERT_OK(writer.Tell(&position)); + ASSERT_EQ(nbytes, position); + + ASSERT_OK(writer.Seek(4)); + ASSERT_OK(writer.Tell(&position)); + ASSERT_EQ(4, position); + + ASSERT_RAISES(IOError, writer.Seek(-1)); + ASSERT_RAISES(IOError, writer.Seek(1024)); + + ASSERT_OK(writer.Close()); +} + TEST(TestBufferReader, RetainParentReference) { // ARROW-387 std::string data = "data123456"; http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/cpp/src/arrow/io/memory.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index 5b5c864..2e701e1 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -99,6 +99,51 @@ Status BufferOutputStream::Reserve(int64_t nbytes) { } // ---------------------------------------------------------------------- +// In-memory buffer writer + +/// Input buffer must be mutable, will abort if not +FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer) { + buffer_ = buffer; + DCHECK(buffer->is_mutable()) << "Must pass mutable buffer"; + mutable_data_ = buffer->mutable_data(); + size_ = buffer->size(); + position_ = 0; +} + +FixedSizeBufferWriter::~FixedSizeBufferWriter() {} + +Status FixedSizeBufferWriter::Close() { + // No-op + return Status::OK(); +} + +Status FixedSizeBufferWriter::Seek(int64_t position) { + if (position < 0 || position >= size_) { + return Status::IOError("position out of bounds"); + } + position_ = position; + return Status::OK(); +} + +Status FixedSizeBufferWriter::Tell(int64_t* position) { + *position = position_; + return Status::OK(); +} + +Status FixedSizeBufferWriter::Write(const uint8_t* data, int64_t nbytes) { + std::memcpy(mutable_data_ + position_, data, nbytes); + position_ += nbytes; + return Status::OK(); +} + +Status FixedSizeBufferWriter::WriteAt( + int64_t position, const uint8_t* data, int64_t nbytes) { + std::lock_guard<std::mutex> guard(lock_); + RETURN_NOT_OK(Seek(position)); + return Write(data, nbytes); +} + +// ---------------------------------------------------------------------- // In-memory buffer reader BufferReader::BufferReader(const std::shared_ptr<Buffer>& buffer) http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/cpp/src/arrow/io/memory.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index eb2a509..fbb186b 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -22,6 +22,7 @@ #include <cstdint> #include <memory> +#include <mutex> #include <string> #include "arrow/io/interfaces.h" @@ -66,6 +67,28 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream { uint8_t* mutable_data_; }; +/// \brief Enables random writes into a fixed-size mutable buffer +/// +class ARROW_EXPORT FixedSizeBufferWriter : public WriteableFile { + public: + /// Input buffer must be mutable, will abort if not + explicit FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer); + ~FixedSizeBufferWriter(); + + Status Close() override; + Status Seek(int64_t position) override; + Status Tell(int64_t* position) override; + Status Write(const uint8_t* data, int64_t nbytes) override; + Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override; + + private: + std::mutex lock_; + std::shared_ptr<Buffer> buffer_; + uint8_t* mutable_data_; + int64_t size_; + int64_t position_; +}; + class ARROW_EXPORT BufferReader : public RandomAccessFile { public: explicit BufferReader(const std::shared_ptr<Buffer>& buffer); http://git-wip-us.apache.org/repos/asf/arrow/blob/d560e307/python/pyarrow/includes/libarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 67d6af9..2a0488f 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -342,12 +342,12 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil: CStatus ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, shared_ptr[CBuffer]* out) - cdef cppclass WriteableFileInterface(OutputStream, Seekable): + cdef cppclass WriteableFile(OutputStream, Seekable): CStatus WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) cdef cppclass ReadWriteFileInterface(RandomAccessFile, - WriteableFileInterface): + WriteableFile): pass