This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 532d4ba ARROW-1012: [C++] Configurable batch size for parquet RecordBatchReader 532d4ba is described below commit 532d4ba05a87e64a23e3a7b44bfbf34fa0c9a90b Author: Hatem Helal <hhe...@mathworks.com> AuthorDate: Sun Jun 23 20:33:53 2019 -0500 ARROW-1012: [C++] Configurable batch size for parquet RecordBatchReader This patch adds support for configuring the record batch size when reading a parquet file by adding a `batch_size` to `ArrowReaderProperties`. Author: Hatem Helal <hhe...@mathworks.com> Closes #4304 from hatemhelal/arrow-1012 and squashes the following commits: 9ed935374 <Hatem Helal> update todo comment to be a bit more precise 9f93da7e0 <Hatem Helal> rework existing RecordBatchReader test to cover the case where batch size is smaller than the row group 0e2162849 <Hatem Helal> use deque instead of list 108a5d775 <Hatem Helal> Change default bactch size to 64K and comment 159b03041 <Hatem Helal> fix appveyor windows failure: must cast size_t to int b45782e56 <Hatem Helal> Initial attempt at supporting a configurable batch size for parquet RecordBatchReader --- cpp/src/parquet/arrow/arrow-reader-writer-test.cc | 18 +-- cpp/src/parquet/arrow/reader.cc | 151 ++++++++++------------ cpp/src/parquet/arrow/reader.h | 12 +- 3 files changed, 86 insertions(+), 95 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index f59db1f..5781ad5 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -1940,23 +1940,23 @@ TEST(TestArrowReadWrite, GetRecordBatchReader) { ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2, default_arrow_writer_properties(), &buffer)); + ArrowReaderProperties properties = default_arrow_reader_properties(); + properties.set_batch_size(100); + std::unique_ptr<FileReader> reader; ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), - ::arrow::default_memory_pool(), - ::parquet::default_reader_properties(), nullptr, &reader)); + ::arrow::default_memory_pool(), properties, &reader)); std::shared_ptr<::arrow::RecordBatchReader> rb_reader; ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0, 1}, &rb_reader)); std::shared_ptr<::arrow::RecordBatch> batch; - ASSERT_OK(rb_reader->ReadNext(&batch)); - ASSERT_EQ(500, batch->num_rows()); - ASSERT_EQ(20, batch->num_columns()); - - ASSERT_OK(rb_reader->ReadNext(&batch)); - ASSERT_EQ(500, batch->num_rows()); - ASSERT_EQ(20, batch->num_columns()); + for (int i = 0; i < 10; ++i) { + ASSERT_OK(rb_reader->ReadNext(&batch)); + ASSERT_EQ(100, batch->num_rows()); + ASSERT_EQ(20, batch->num_columns()); + } ASSERT_OK(rb_reader->ReadNext(&batch)); ASSERT_EQ(nullptr, batch); diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 5665603..484719e 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -20,7 +20,9 @@ #include <algorithm> #include <climits> #include <cstring> +#include <deque> #include <future> +#include <numeric> #include <type_traits> #include <utility> #include <vector> @@ -110,14 +112,37 @@ ArrowReaderProperties default_arrow_reader_properties() { // so we can read only a single row group if we want class FileColumnIterator { public: - explicit FileColumnIterator(int column_index, ParquetFileReader* reader) + explicit FileColumnIterator(int column_index, ParquetFileReader* reader, + std::vector<int> row_groups) : column_index_(column_index), reader_(reader), - schema_(reader->metadata()->schema()) {} + schema_(reader->metadata()->schema()), + row_groups_(row_groups.begin(), row_groups.end()) {} virtual ~FileColumnIterator() {} - virtual std::unique_ptr<::parquet::PageReader> NextChunk() = 0; + std::unique_ptr<::parquet::PageReader> NextChunk() { + if (row_groups_.empty()) { + return nullptr; + } + + auto row_group_reader = reader_->RowGroup(row_groups_.front()); + row_groups_.pop_front(); + return row_group_reader->GetColumnPageReader(column_index_); + } + + static FileColumnIterator* MakeAllRowGroupsIterator(int column_index, + ParquetFileReader* reader) { + std::vector<int> row_groups(reader->metadata()->num_row_groups()); + std::iota(row_groups.begin(), row_groups.end(), 0); + return new FileColumnIterator(column_index, reader, row_groups); + } + + static FileColumnIterator* MakeSingleRowGroupIterator(int column_index, + ParquetFileReader* reader, + int row_group) { + return new FileColumnIterator(column_index, reader, {row_group}); + } const SchemaDescriptor* schema() const { return schema_; } @@ -131,50 +156,7 @@ class FileColumnIterator { int column_index_; ParquetFileReader* reader_; const SchemaDescriptor* schema_; -}; - -class AllRowGroupsIterator : public FileColumnIterator { - public: - explicit AllRowGroupsIterator(int column_index, ParquetFileReader* reader) - : FileColumnIterator(column_index, reader), next_row_group_(0) {} - - std::unique_ptr<::parquet::PageReader> NextChunk() override { - std::unique_ptr<::parquet::PageReader> result; - if (next_row_group_ < reader_->metadata()->num_row_groups()) { - result = reader_->RowGroup(next_row_group_)->GetColumnPageReader(column_index_); - next_row_group_++; - } else { - result = nullptr; - } - return result; - } - - private: - int next_row_group_; -}; - -class SingleRowGroupIterator : public FileColumnIterator { - public: - explicit SingleRowGroupIterator(int column_index, int row_group_number, - ParquetFileReader* reader) - : FileColumnIterator(column_index, reader), - row_group_number_(row_group_number), - done_(false) {} - - std::unique_ptr<::parquet::PageReader> NextChunk() override { - if (done_) { - return nullptr; - } - - auto result = - reader_->RowGroup(row_group_number_)->GetColumnPageReader(column_index_); - done_ = true; - return result; - } - - private: - int row_group_number_; - bool done_; + std::deque<int> row_groups_; }; class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader { @@ -182,52 +164,56 @@ class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader { explicit RowGroupRecordBatchReader(const std::vector<int>& row_group_indices, const std::vector<int>& column_indices, std::shared_ptr<::arrow::Schema> schema, - FileReader* reader) - : row_group_indices_(row_group_indices), + FileReader* reader, int64_t batch_size) + : column_readers_(), + row_group_indices_(row_group_indices), column_indices_(column_indices), schema_(schema), file_reader_(reader), - next_row_group_(0) {} + batch_size_(batch_size) {} ~RowGroupRecordBatchReader() override {} std::shared_ptr<::arrow::Schema> schema() const override { return schema_; } Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override { - if (table_ != nullptr) { // one row group has been loaded - std::shared_ptr<::arrow::RecordBatch> tmp; - RETURN_NOT_OK(table_batch_reader_->ReadNext(&tmp)); - if (tmp != nullptr) { // some column chunks are left in table - *out = tmp; - return Status::OK(); - } else { // the entire table is consumed - table_batch_reader_.reset(); - table_.reset(); + if (column_readers_.empty()) { + // Initialize the column readers + column_readers_.reserve(column_indices_.size()); + + for (size_t i = 0; i < column_indices_.size(); ++i) { + ColumnReaderPtr tmp; + RETURN_NOT_OK(file_reader_->GetColumn(column_indices_[i], &tmp)); + column_readers_.emplace_back(std::move(tmp)); } } - // all row groups has been consumed - if (next_row_group_ == row_group_indices_.size()) { - *out = nullptr; - return Status::OK(); - } + // TODO (hatemhelal): Consider refactoring this to share logic with ReadTable as this + // does not currently honor the use_threads option. + std::vector<std::shared_ptr<Column>> columns(column_indices_.size()); - RETURN_NOT_OK(file_reader_->ReadRowGroup(row_group_indices_[next_row_group_], - column_indices_, &table_)); + for (size_t i = 0; i < column_indices_.size(); ++i) { + std::shared_ptr<ChunkedArray> array; + RETURN_NOT_OK(column_readers_[i]->NextBatch(batch_size_, &array)); + columns[i] = std::make_shared<Column>(schema_->field(static_cast<int>(i)), array); + } - next_row_group_++; - table_batch_reader_.reset(new ::arrow::TableBatchReader(*table_.get())); - return table_batch_reader_->ReadNext(out); + // Create an intermediate table and use TableBatchReader as an adaptor to a + // RecordBatch + std::shared_ptr<Table> table = Table::Make(schema_, columns); + RETURN_NOT_OK(table->Validate()); + ::arrow::TableBatchReader table_batch_reader(*table); + return table_batch_reader.ReadNext(out); } private: + using ColumnReaderPtr = std::unique_ptr<ColumnReader>; + std::vector<ColumnReaderPtr> column_readers_; std::vector<int> row_group_indices_; std::vector<int> column_indices_; std::shared_ptr<::arrow::Schema> schema_; FileReader* file_reader_; - size_t next_row_group_; - std::shared_ptr<::arrow::Table> table_; - std::unique_ptr<::arrow::TableBatchReader> table_batch_reader_; + int64_t batch_size_; }; // ---------------------------------------------------------------------- @@ -293,6 +279,8 @@ class FileReader::Impl { const ::arrow::Schema& old_schema, const std::vector<int>& dict_indices, std::vector<std::shared_ptr<::arrow::Column>>& columns); + int64_t batch_size() const { return reader_properties_.batch_size(); } + private: MemoryPool* pool_; std::unique_ptr<ParquetFileReader> reader_; @@ -457,11 +445,8 @@ Status FileReader::Impl::ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* o Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices, std::shared_ptr<ChunkedArray>* out) { - FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader* reader) { - return new AllRowGroupsIterator(i, reader); - }; + auto iterator_factory = FileColumnIterator::MakeAllRowGroupsIterator; auto parquet_schema = reader_->metadata()->schema(); - auto node = parquet_schema->group_node()->field(i).get(); std::unique_ptr<ColumnReader::ColumnReaderImpl> reader_impl; @@ -486,9 +471,7 @@ Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices, } Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) { - FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader* reader) { - return new AllRowGroupsIterator(i, reader); - }; + auto iterator_factory = FileColumnIterator::MakeAllRowGroupsIterator; std::unique_ptr<ColumnReader> flat_column_reader; RETURN_NOT_OK(GetColumn(i, iterator_factory, &flat_column_reader)); @@ -531,7 +514,7 @@ Status FileReader::Impl::ReadColumnChunk(int column_index, FileColumnIteratorFactory iterator_factory = [row_group_index]( int i, ParquetFileReader* reader) { - return new SingleRowGroupIterator(i, row_group_index, reader); + return FileColumnIterator::MakeSingleRowGroupIterator(i, reader, row_group_index); }; RETURN_NOT_OK( GetReaderForNode(column_index, node, indices, 1, iterator_factory, &reader_impl)); @@ -752,9 +735,7 @@ Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, } Status FileReader::GetColumn(int i, std::unique_ptr<ColumnReader>* out) { - FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader* reader) { - return new AllRowGroupsIterator(i, reader); - }; + auto iterator_factory = FileColumnIterator::MakeAllRowGroupsIterator; return impl_->GetColumn(i, iterator_factory, out); } @@ -805,7 +786,7 @@ Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indice Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices, const std::vector<int>& column_indices, std::shared_ptr<RecordBatchReader>* out) { - // column indicies check + // column indices check std::shared_ptr<::arrow::Schema> schema; RETURN_NOT_OK(GetSchema(column_indices, &schema)); @@ -819,7 +800,7 @@ Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indice } *out = std::make_shared<RowGroupRecordBatchReader>(row_group_indices, column_indices, - schema, this); + schema, this, impl_->batch_size()); return Status::OK(); } diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index acdda71..48c9237 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -54,11 +54,16 @@ class RowGroupReader; static constexpr bool DEFAULT_USE_THREADS = false; +// Default number of rows to read when using ::arrow::RecordBatchReader +static constexpr int64_t DEFAULT_BATCH_SIZE = 64 * 1024; + /// EXPERIMENTAL: Properties for configuring FileReader behavior. class PARQUET_EXPORT ArrowReaderProperties { public: explicit ArrowReaderProperties(bool use_threads = DEFAULT_USE_THREADS) - : use_threads_(use_threads), read_dict_indices_() {} + : use_threads_(use_threads), + read_dict_indices_(), + batch_size_(DEFAULT_BATCH_SIZE) {} void set_use_threads(bool use_threads) { use_threads_ = use_threads; } @@ -79,9 +84,14 @@ class PARQUET_EXPORT ArrowReaderProperties { } } + void set_batch_size(int64_t batch_size) { batch_size_ = batch_size; } + + int64_t batch_size() const { return batch_size_; } + private: bool use_threads_; std::unordered_set<int> read_dict_indices_; + int64_t batch_size_; }; /// EXPERIMENTAL: Constructs the default ArrowReaderProperties