Repository: arrow Updated Branches: refs/heads/master 82575ca3c -> 4b72329fe
ARROW-406: [C++] Set explicit 64K HDFS buffer size, test large reads We could not support reads in excess of the default buffer size (typically 64K) Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #226 from wesm/ARROW-406 and squashes the following commits: d09b645 [Wes McKinney] cpplint 0028e90 [Wes McKinney] Set explicit 64K HDFS buffer size, test large reads using buffered chunks Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/4b72329f Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/4b72329f Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/4b72329f Branch: refs/heads/master Commit: 4b72329fe2d731f445e44925783b9489f4e0d0d5 Parents: 82575ca Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Tue Dec 6 11:41:08 2016 -0500 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Tue Dec 6 11:41:08 2016 -0500 ---------------------------------------------------------------------- cpp/src/arrow/io/hdfs.cc | 33 ++++++++++++++++++++++++++------- cpp/src/arrow/io/hdfs.h | 3 +++ cpp/src/arrow/io/io-hdfs-test.cc | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/4b72329f/cpp/src/arrow/io/hdfs.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index 13491e7..8c6d49f 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -17,6 +17,7 @@ #include <hdfs.h> +#include <algorithm> #include <cstdint> #include <sstream> #include <string> @@ -51,6 +52,8 @@ static Status CheckReadResult(int ret) { return Status::OK(); } +static constexpr int kDefaultHdfsBufferSize = 1 << 16; + // ---------------------------------------------------------------------- // File reading @@ -124,9 +127,16 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { } Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { - tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer), nbytes); - RETURN_NOT_OK(CheckReadResult(ret)); - *bytes_read = ret; + int64_t total_bytes = 0; + while (total_bytes < nbytes) { + tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer + total_bytes), + std::min<int64_t>(buffer_size_, nbytes - total_bytes)); + RETURN_NOT_OK(CheckReadResult(ret)); + total_bytes += ret; + if (ret == 0) { break; } + } + + *bytes_read = total_bytes; return Status::OK(); } @@ -136,7 +146,6 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { int64_t bytes_read = 0; RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data())); - if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); } *out = buffer; @@ -154,8 +163,11 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { void set_memory_pool(MemoryPool* pool) { pool_ = pool; } + void set_buffer_size(int32_t buffer_size) { buffer_size_ = buffer_size; } + private: MemoryPool* pool_; + int32_t buffer_size_; }; HdfsReadableFile::HdfsReadableFile(MemoryPool* pool) { @@ -384,8 +396,9 @@ class HdfsClient::HdfsClientImpl { return Status::OK(); } - Status OpenReadable(const std::string& path, std::shared_ptr<HdfsReadableFile>* file) { - hdfsFile handle = hdfsOpenFile(fs_, path.c_str(), O_RDONLY, 0, 0, 0); + Status OpenReadable(const std::string& path, int32_t buffer_size, + std::shared_ptr<HdfsReadableFile>* file) { + hdfsFile handle = hdfsOpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0); if (handle == nullptr) { // TODO(wesm): determine cause of failure @@ -397,6 +410,7 @@ class HdfsClient::HdfsClientImpl { // std::make_shared does not work with private ctors *file = std::shared_ptr<HdfsReadableFile>(new HdfsReadableFile()); (*file)->impl_->set_members(path, fs_, handle); + (*file)->impl_->set_buffer_size(buffer_size); return Status::OK(); } @@ -490,9 +504,14 @@ Status HdfsClient::ListDirectory( return impl_->ListDirectory(path, listing); } +Status HdfsClient::OpenReadable(const std::string& path, int32_t buffer_size, + std::shared_ptr<HdfsReadableFile>* file) { + return impl_->OpenReadable(path, buffer_size, file); +} + Status HdfsClient::OpenReadable( const std::string& path, std::shared_ptr<HdfsReadableFile>* file) { - return impl_->OpenReadable(path, file); + return OpenReadable(path, kDefaultHdfsBufferSize, file); } Status HdfsClient::OpenWriteable(const std::string& path, bool append, http://git-wip-us.apache.org/repos/asf/arrow/blob/4b72329f/cpp/src/arrow/io/hdfs.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index 48699c9..1c76f15 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -128,6 +128,9 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient { // status if the file is not found. // // @param path complete file path + Status OpenReadable(const std::string& path, int32_t buffer_size, + std::shared_ptr<HdfsReadableFile>* file); + Status OpenReadable(const std::string& path, std::shared_ptr<HdfsReadableFile>* file); // FileMode::WRITE options http://git-wip-us.apache.org/repos/asf/arrow/blob/4b72329f/cpp/src/arrow/io/io-hdfs-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc index 7901932..8338de6 100644 --- a/cpp/src/arrow/io/io-hdfs-test.cc +++ b/cpp/src/arrow/io/io-hdfs-test.cc @@ -293,6 +293,39 @@ TEST_F(TestHdfsClient, ReadableMethods) { ASSERT_EQ(60, position); } +TEST_F(TestHdfsClient, LargeFile) { + SKIP_IF_NO_LIBHDFS(); + + ASSERT_OK(MakeScratchDir()); + + auto path = ScratchPath("test-large-file"); + const int size = 1000000; + + std::vector<uint8_t> data = RandomData(size); + ASSERT_OK(WriteDummyFile(path, data.data(), size)); + + std::shared_ptr<HdfsReadableFile> file; + ASSERT_OK(client_->OpenReadable(path, &file)); + + auto buffer = std::make_shared<PoolBuffer>(); + ASSERT_OK(buffer->Resize(size)); + int64_t bytes_read = 0; + + ASSERT_OK(file->Read(size, &bytes_read, buffer->mutable_data())); + ASSERT_EQ(0, std::memcmp(buffer->data(), data.data(), size)); + ASSERT_EQ(size, bytes_read); + + // explicit buffer size + std::shared_ptr<HdfsReadableFile> file2; + ASSERT_OK(client_->OpenReadable(path, 1 << 18, &file2)); + + auto buffer2 = std::make_shared<PoolBuffer>(); + ASSERT_OK(buffer2->Resize(size)); + ASSERT_OK(file2->Read(size, &bytes_read, buffer2->mutable_data())); + ASSERT_EQ(0, std::memcmp(buffer2->data(), data.data(), size)); + ASSERT_EQ(size, bytes_read); +} + TEST_F(TestHdfsClient, RenameFile) { SKIP_IF_NO_LIBHDFS();