Repository: parquet-cpp Updated Branches: refs/heads/master 5b3e9c103 -> 41c1e6887
PARQUET-520: Add MemoryMapSource and add unit tests for both it and LocalFileSource I also added the `file_descriptor` API so that we can verify that dtors elsewhere successfully close open files. Closes #56 Author: Wes McKinney <[email protected]> Closes #66 from wesm/PARQUET-520 and squashes the following commits: 9d638ba [Wes McKinney] Add memory-mapping option to ParquetFileReader::OpenFile. Add --no-memory-map flag to parquet_reader 6389683 [Wes McKinney] Add Read API tests dbf6a45 [Wes McKinney] Test some failure modes for LocalFileSource / MemoryMapSource 01a7d64 [Wes McKinney] Add a MemoryMapSource and use this by default for SerializedFileReader Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/41c1e688 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/41c1e688 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/41c1e688 Branch: refs/heads/master Commit: 41c1e6887cc53d43d48c7d327ed04ee4dd1a57b4 Parents: 5b3e9c1 Author: Wes McKinney <[email protected]> Authored: Mon Feb 29 16:17:16 2016 -0800 Committer: Julien Le Dem <[email protected]> Committed: Mon Feb 29 16:17:16 2016 -0800 ---------------------------------------------------------------------- example/parquet_reader.cc | 10 ++- src/parquet/file/reader.cc | 10 ++- src/parquet/file/reader.h | 3 +- src/parquet/util/CMakeLists.txt | 2 +- src/parquet/util/input-output-test.cc | 125 +++++++++++++++++++++++++++++ src/parquet/util/input.cc | 96 ++++++++++++++++++++-- src/parquet/util/input.h | 39 ++++++++- src/parquet/util/output-test.cc | 46 ----------- 8 files changed, 270 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/example/parquet_reader.cc ---------------------------------------------------------------------- diff --git a/example/parquet_reader.cc b/example/parquet_reader.cc index 8e498c7..8acc4a7 100644 --- a/example/parquet_reader.cc +++ b/example/parquet_reader.cc @@ -24,26 +24,30 @@ using namespace parquet_cpp; int main(int argc, char** argv) { if (argc > 3) { - std::cerr << "Usage: parquet_reader [--only-stats] <file>" + std::cerr << "Usage: parquet_reader [--only-stats] [--no-memory-map] <file>" << std::endl; return -1; } std::string filename; bool print_values = true; + bool memory_map = true; // Read command-line options char *param, *value; for (int i = 1; i < argc; i++) { - if ( (param = std::strstr(argv[i], "--only-stats")) ) { + if ((param = std::strstr(argv[i], "--only-stats"))) { print_values = false; + } else if ((param = std::strstr(argv[i], "--no-memory-map"))) { + memory_map = false; } else { filename = argv[i]; } } try { - std::unique_ptr<ParquetFileReader> reader = ParquetFileReader::OpenFile(filename); + std::unique_ptr<ParquetFileReader> reader = ParquetFileReader::OpenFile(filename, + memory_map); reader->DebugPrint(std::cout, print_values); } catch (const std::exception& e) { std::cerr << "Parquet error: " http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/file/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc index 4901471..fcbe453 100644 --- a/src/parquet/file/reader.cc +++ b/src/parquet/file/reader.cc @@ -67,8 +67,14 @@ RowGroupStatistics RowGroupReader::GetColumnStats(int i) const { ParquetFileReader::ParquetFileReader() : schema_(nullptr) {} ParquetFileReader::~ParquetFileReader() {} -std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(const std::string& path) { - std::unique_ptr<LocalFileSource> file(new LocalFileSource()); +std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(const std::string& path, + bool memory_map) { + std::unique_ptr<LocalFileSource> file; + if (memory_map) { + file.reset(new MemoryMapSource()); + } else { + file.reset(new LocalFileSource()); + } file->Open(path); auto contents = SerializedFile::Open(std::move(file)); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/file/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h index fcc2c18..94f5931 100644 --- a/src/parquet/file/reader.h +++ b/src/parquet/file/reader.h @@ -89,7 +89,8 @@ class ParquetFileReader { ~ParquetFileReader(); // API Convenience to open a serialized Parquet file on disk - static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path); + static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path, + bool memory_map = true); void Open(std::unique_ptr<Contents> contents); void Close(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt index a009129..c8d2c2f 100644 --- a/src/parquet/util/CMakeLists.txt +++ b/src/parquet/util/CMakeLists.txt @@ -63,6 +63,6 @@ endif() ADD_PARQUET_TEST(bit-util-test) ADD_PARQUET_TEST(buffer-test) +ADD_PARQUET_TEST(input-output-test) ADD_PARQUET_TEST(mem-pool-test) -ADD_PARQUET_TEST(output-test) ADD_PARQUET_TEST(rle-test) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/util/input-output-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/input-output-test.cc b/src/parquet/util/input-output-test.cc new file mode 100644 index 0000000..424be3a --- /dev/null +++ b/src/parquet/util/input-output-test.cc @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include <cstdint> +#include <cstdio> +#include <fstream> +#include <iostream> +#include <memory> +#include <string> +#include <vector> + +#include "parquet/exception.h" +#include "parquet/util/buffer.h" +#include "parquet/util/input.h" +#include "parquet/util/output.h" +#include "parquet/util/test-common.h" + +namespace parquet_cpp { + +TEST(TestInMemoryOutputStream, Basics) { + std::unique_ptr<InMemoryOutputStream> stream(new InMemoryOutputStream(8)); + + std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; + + stream->Write(&data[0], 4); + ASSERT_EQ(4, stream->Tell()); + stream->Write(&data[4], data.size() - 4); + + std::shared_ptr<Buffer> buffer = stream->GetBuffer(); + + Buffer data_buf(data.data(), data.size()); + + ASSERT_TRUE(data_buf.Equals(*buffer)); +} + +static bool file_exists(const std::string& path) { + return std::ifstream(path.c_str()).good(); +} + +template <typename ReaderType> +class TestFileReaders : public ::testing::Test { + public: + void SetUp() { + test_path_ = "parquet-input-output-test.txt"; + if (file_exists(test_path_)) { + std::remove(test_path_.c_str()); + } + test_data_ = "testingdata"; + + std::ofstream stream; + stream.open(test_path_.c_str()); + stream << test_data_; + filesize_ = test_data_.size(); + } + + void TearDown() { + DeleteTestFile(); + } + + void DeleteTestFile() { + if (file_exists(test_path_)) { + std::remove(test_path_.c_str()); + } + } + + protected: + ReaderType source; + std::string test_path_; + std::string test_data_; + int filesize_; +}; + +typedef ::testing::Types<LocalFileSource, MemoryMapSource> ReaderTypes; + +TYPED_TEST_CASE(TestFileReaders, ReaderTypes); + +TYPED_TEST(TestFileReaders, NonExistentFile) { + ASSERT_THROW(this->source.Open("0xDEADBEEF.txt"), ParquetException); +} + +TYPED_TEST(TestFileReaders, Read) { + this->source.Open(this->test_path_); + + ASSERT_EQ(this->filesize_, this->source.Size()); + + std::shared_ptr<Buffer> buffer = this->source.Read(4); + ASSERT_EQ(4, buffer->size()); + ASSERT_EQ(0, memcmp(this->test_data_.c_str(), buffer->data(), 4)); + + // Read past EOF + buffer = this->source.Read(10); + ASSERT_EQ(7, buffer->size()); + ASSERT_EQ(0, memcmp(this->test_data_.c_str() + 4, buffer->data(), 7)); +} + +TYPED_TEST(TestFileReaders, FileDisappeared) { + this->source.Open(this->test_path_); + this->source.Seek(4); + this->DeleteTestFile(); + this->source.Close(); +} + +TYPED_TEST(TestFileReaders, BadSeek) { + this->source.Open(this->test_path_); + + ASSERT_THROW(this->source.Seek(this->filesize_ + 1), ParquetException); +} + +} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/util/input.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/input.cc b/src/parquet/util/input.cc index a238ff6..897d81c 100644 --- a/src/parquet/util/input.cc +++ b/src/parquet/util/input.cc @@ -17,7 +17,9 @@ #include "parquet/util/input.h" +#include <sys/mman.h> #include <algorithm> +#include <sstream> #include <string> #include "parquet/exception.h" @@ -42,13 +44,32 @@ LocalFileSource::~LocalFileSource() { void LocalFileSource::Open(const std::string& path) { path_ = path; - file_ = fopen(path_.c_str(), "r"); + file_ = fopen(path_.c_str(), "rb"); + if (file_ == nullptr || ferror(file_)) { + std::stringstream ss; + ss << "Unable to open file: " << path; + throw ParquetException(ss.str()); + } is_open_ = true; - fseek(file_, 0L, SEEK_END); - size_ = Tell(); + SeekFile(0, SEEK_END); + size_ = LocalFileSource::Tell(); Seek(0); } +void LocalFileSource::SeekFile(int64_t pos, int origin) { + if (origin == SEEK_SET && (pos < 0 || pos >= size_)) { + std::stringstream ss; + ss << "Position " << pos << " is not in range."; + throw ParquetException(ss.str()); + } + + if (0 != fseek(file_, pos, origin)) { + std::stringstream ss; + ss << "File seek to position " << pos << " failed."; + throw ParquetException(ss.str()); + } +} + void LocalFileSource::Close() { // Pure virtual CloseFile(); @@ -62,7 +83,7 @@ void LocalFileSource::CloseFile() { } void LocalFileSource::Seek(int64_t pos) { - fseek(file_, pos, SEEK_SET); + SeekFile(pos); } int64_t LocalFileSource::Size() const { @@ -70,7 +91,15 @@ int64_t LocalFileSource::Size() const { } int64_t LocalFileSource::Tell() const { - return ftell(file_); + int64_t position = ftell(file_); + if (position < 0) { + throw ParquetException("ftell failed, did the file disappear?"); + } + return position; +} + +int LocalFileSource::file_descriptor() const { + return fileno(file_); } int64_t LocalFileSource::Read(int64_t nbytes, uint8_t* buffer) { @@ -87,6 +116,63 @@ std::shared_ptr<Buffer> LocalFileSource::Read(int64_t nbytes) { } return result; } +// ---------------------------------------------------------------------- +// MemoryMapSource methods + +MemoryMapSource::~MemoryMapSource() { + CloseFile(); +} + +void MemoryMapSource::Open(const std::string& path) { + LocalFileSource::Open(path); + data_ = reinterpret_cast<uint8_t*>(mmap(nullptr, size_, PROT_READ, + MAP_SHARED, fileno(file_), 0)); + if (data_ == nullptr) { + throw ParquetException("Memory mapping file failed"); + } + pos_ = 0; +} + +void MemoryMapSource::Close() { + // Pure virtual + CloseFile(); +} + +void MemoryMapSource::CloseFile() { + if (data_ != nullptr) { + munmap(data_, size_); + } + + LocalFileSource::CloseFile(); +} + +void MemoryMapSource::Seek(int64_t pos) { + if (pos < 0 || pos >= size_) { + std::stringstream ss; + ss << "Position " << pos << " is not in range."; + throw ParquetException(ss.str()); + } + + pos_ = pos; +} + +int64_t MemoryMapSource::Tell() const { + return pos_; +} + +int64_t MemoryMapSource::Read(int64_t nbytes, uint8_t* buffer) { + int64_t bytes_available = std::min(nbytes, size_ - pos_); + memcpy(buffer, data_ + pos_, bytes_available); + pos_ += bytes_available; + return bytes_available; +} + +std::shared_ptr<Buffer> MemoryMapSource::Read(int64_t nbytes) { + int64_t bytes_available = std::min(nbytes, size_ - pos_); + auto result = std::make_shared<Buffer>(data_ + pos_, bytes_available); + pos_ += bytes_available; + return result; +} // ---------------------------------------------------------------------- // BufferReader http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/util/input.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/input.h b/src/parquet/util/input.h index 5f2bde3..80fb730 100644 --- a/src/parquet/util/input.h +++ b/src/parquet/util/input.h @@ -18,8 +18,8 @@ #ifndef PARQUET_UTIL_INPUT_H #define PARQUET_UTIL_INPUT_H -#include <stdio.h> #include <cstdint> +#include <cstdio> #include <memory> #include <string> #include <vector> @@ -58,7 +58,7 @@ class LocalFileSource : public RandomAccessSource { LocalFileSource() : file_(nullptr), is_open_(false) {} virtual ~LocalFileSource(); - void Open(const std::string& path); + virtual void Open(const std::string& path); virtual void Close(); virtual int64_t Size() const; @@ -73,14 +73,47 @@ class LocalFileSource : public RandomAccessSource { bool is_open() const { return is_open_;} const std::string& path() const { return path_;} - private: + // Return the integer file descriptor + int file_descriptor() const; + + protected: void CloseFile(); + void SeekFile(int64_t pos, int origin = SEEK_SET); std::string path_; FILE* file_; bool is_open_; }; +class MemoryMapSource : public LocalFileSource { + public: + MemoryMapSource() : + LocalFileSource(), + data_(nullptr), + pos_(0) {} + + virtual ~MemoryMapSource(); + + virtual void Close(); + virtual void Open(const std::string& path); + + virtual int64_t Tell() const; + virtual void Seek(int64_t pos); + + // Copy data from memory map into out (must be already allocated memory) + // @returns: actual number of bytes read + virtual int64_t Read(int64_t nbytes, uint8_t* out); + + // Return a buffer referencing memory-map (no copy) + virtual std::shared_ptr<Buffer> Read(int64_t nbytes); + + private: + void CloseFile(); + + uint8_t* data_; + int64_t pos_; +}; + // ---------------------------------------------------------------------- // A file-like object that reads from virtual address space http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41c1e688/src/parquet/util/output-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/output-test.cc b/src/parquet/util/output-test.cc deleted file mode 100644 index bae184a..0000000 --- a/src/parquet/util/output-test.cc +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <gtest/gtest.h> - -#include <cstdint> -#include <memory> -#include <vector> - -#include "parquet/util/buffer.h" -#include "parquet/util/output.h" -#include "parquet/util/test-common.h" - -namespace parquet_cpp { - -TEST(TestInMemoryOutputStream, Basics) { - std::unique_ptr<InMemoryOutputStream> stream(new InMemoryOutputStream(8)); - - std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; - - stream->Write(&data[0], 4); - ASSERT_EQ(4, stream->Tell()); - stream->Write(&data[4], data.size() - 4); - - std::shared_ptr<Buffer> buffer = stream->GetBuffer(); - - Buffer data_buf(data.data(), data.size()); - - ASSERT_TRUE(data_buf.Equals(*buffer)); -} - -} // namespace parquet_cpp
