Repository: parquet-cpp Updated Branches: refs/heads/master 70f5088f7 -> 64862b0ed
PARQUET-555: Dictionary page metadata handling inconsistencies Includes tests Author: Deepak Majeti <[email protected]> Closes #73 from majetideepak/PARQUET-555 and squashes the following commits: 2c53f53 [Deepak Majeti] minor fixes c7f1b24 [Deepak Majeti] addressed comment and added more tests 5b66a1f [Deepak Majeti] PARQUET-555: Dictionary page metadata handling inconsistencies Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/64862b0e Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/64862b0e Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/64862b0e Branch: refs/heads/master Commit: 64862b0edc9ab38611b3f96c4f700ef7e0031f8a Parents: 70f5088 Author: Deepak Majeti <[email protected]> Authored: Fri Mar 4 10:17:45 2016 -0800 Committer: Wes McKinney <[email protected]> Committed: Fri Mar 4 10:17:45 2016 -0800 ---------------------------------------------------------------------- src/parquet/column/column-reader-test.cc | 65 +++++++++++++++++++++++++++ src/parquet/column/reader.cc | 38 ++++++++++------ src/parquet/column/scanner-test.cc | 6 +++ src/parquet/column/test-util.h | 3 +- 4 files changed, 98 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/64862b0e/src/parquet/column/column-reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc index 855669a..6cd8925 100644 --- a/src/parquet/column/column-reader-test.cc +++ b/src/parquet/column/column-reader-test.cc @@ -155,5 +155,70 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) { ExecuteDict(num_pages, levels_per_page, &descr); } +TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) { + max_def_level_ = 0; + max_rep_level_ = 0; + NodePtr type = schema::Int32("a", Repetition::REQUIRED); + const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); + shared_ptr<OwnedMutableBuffer> dummy = std::make_shared<OwnedMutableBuffer>(); + + shared_ptr<DictionaryPage> dict_page = std::make_shared<DictionaryPage>(dummy, + 0, Encoding::PLAIN); + shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(&descr, {}, 0, + Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0); + pages_.push_back(dict_page); + pages_.push_back(data_page); + InitReader(&descr); + // Tests Dict : PLAIN, Data : RLE_DICTIONARY + ASSERT_NO_THROW(reader_->HasNext()); + pages_.clear(); + + dict_page = std::make_shared<DictionaryPage>(dummy, + 0, Encoding::PLAIN_DICTIONARY); + data_page = MakeDataPage<Int32Type>(&descr, {}, 0, + Encoding::PLAIN_DICTIONARY, {}, 0, {}, 0, {}, 0); + pages_.push_back(dict_page); + pages_.push_back(data_page); + InitReader(&descr); + // Tests Dict : PLAIN_DICTIONARY, Data : PLAIN_DICTIONARY + ASSERT_NO_THROW(reader_->HasNext()); + pages_.clear(); + + data_page = MakeDataPage<Int32Type>(&descr, {}, 0, + Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0); + pages_.push_back(data_page); + InitReader(&descr); + // Tests dictionary page must occur before data page + ASSERT_THROW(reader_->HasNext(), ParquetException); + pages_.clear(); + + dict_page = std::make_shared<DictionaryPage>(dummy, + 0, Encoding::DELTA_BYTE_ARRAY); + pages_.push_back(dict_page); + InitReader(&descr); + // Tests only RLE_DICTIONARY is supported + ASSERT_THROW(reader_->HasNext(), ParquetException); + pages_.clear(); + + shared_ptr<DictionaryPage> dict_page1 = std::make_shared<DictionaryPage>(dummy, + 0, Encoding::PLAIN_DICTIONARY); + shared_ptr<DictionaryPage> dict_page2 = std::make_shared<DictionaryPage>(dummy, + 0, Encoding::PLAIN); + pages_.push_back(dict_page1); + pages_.push_back(dict_page2); + InitReader(&descr); + // Column cannot have more than one dictionary + ASSERT_THROW(reader_->HasNext(), ParquetException); + pages_.clear(); + + data_page = MakeDataPage<Int32Type>(&descr, {}, 0, + Encoding::DELTA_BYTE_ARRAY, {}, 0, {}, 0, {}, 0); + pages_.push_back(data_page); + InitReader(&descr); + // unsupported encoding + ASSERT_THROW(reader_->HasNext(), ParquetException); + pages_.clear(); +} + } // namespace test } // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/64862b0e/src/parquet/column/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc index 2885ebe..bf76d4c 100644 --- a/src/parquet/column/reader.cc +++ b/src/parquet/column/reader.cc @@ -37,26 +37,35 @@ ColumnReader::ColumnReader(const ColumnDescriptor* descr, template <int TYPE> void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) { - int encoding = static_cast<int>(Encoding::RLE_DICTIONARY); + int encoding = static_cast<int>(page->encoding()); + if (page->encoding() == Encoding::PLAIN_DICTIONARY || + page->encoding() == Encoding::PLAIN) { + encoding = static_cast<int>(Encoding::RLE_DICTIONARY); + } auto it = decoders_.find(encoding); if (it != decoders_.end()) { throw ParquetException("Column cannot have more than one dictionary."); } - PlainDecoder<TYPE> dictionary(descr_); - dictionary.SetData(page->num_values(), page->data(), page->size()); - - // The dictionary is fully decoded during DictionaryDecoder::Init, so the - // DictionaryPage buffer is no longer required after this step - // - // TODO(wesm): investigate whether this all-or-nothing decoding of the - // dictionary makes sense and whether performance can be improved - - auto decoder = std::make_shared<DictionaryDecoder<TYPE> >(descr_); - decoder->SetDict(&dictionary); + if (page->encoding() == Encoding::PLAIN_DICTIONARY || + page->encoding() == Encoding::PLAIN) { + PlainDecoder<TYPE> dictionary(descr_); + dictionary.SetData(page->num_values(), page->data(), page->size()); + + // The dictionary is fully decoded during DictionaryDecoder::Init, so the + // DictionaryPage buffer is no longer required after this step + // + // TODO(wesm): investigate whether this all-or-nothing decoding of the + // dictionary makes sense and whether performance can be improved + + auto decoder = std::make_shared<DictionaryDecoder<TYPE> >(descr_); + decoder->SetDict(&dictionary); + decoders_[encoding] = decoder; + } else { + ParquetException::NYI("only plain dictionary encoding has been implemented"); + } - decoders_[encoding] = decoder; current_decoder_ = decoders_[encoding].get(); } @@ -130,6 +139,9 @@ bool TypedColumnReader<TYPE>::ReadNewPage() { auto it = decoders_.find(static_cast<int>(encoding)); if (it != decoders_.end()) { + if (encoding == Encoding::RLE_DICTIONARY) { + DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); + } current_decoder_ = it->second.get(); } else { switch (encoding) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/64862b0e/src/parquet/column/scanner-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc index b52a993..1d7579e 100644 --- a/src/parquet/column/scanner-test.cc +++ b/src/parquet/column/scanner-test.cc @@ -217,6 +217,12 @@ TEST_F(TestFLBAFlatScanner, TestDictScanner) { Encoding::RLE_DICTIONARY); } +TEST_F(TestFLBAFlatScanner, TestPlainDictScanner) { + this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH, + Encoding::PLAIN_DICTIONARY); +} + + //PARQUET 502 TEST_F(TestFlatFLBAScanner, TestSmallBatch) { NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/64862b0e/src/parquet/column/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h index c9b08c2..4d10a42 100644 --- a/src/parquet/column/test-util.h +++ b/src/parquet/column/test-util.h @@ -444,7 +444,8 @@ static int MakePages(const ColumnDescriptor *d, int num_pages, int levels_per_pa InitValues<typename Type::c_type>(num_values, values, buffer); PaginatePlain<Type>(d, values, def_levels, max_def_level, rep_levels, max_rep_level, levels_per_page, values_per_page, pages); - } else if (encoding == Encoding::RLE_DICTIONARY) { + } else if (encoding == Encoding::RLE_DICTIONARY + || encoding == Encoding::PLAIN_DICTIONARY) { // Calls InitValues and repeats the data InitDictValues<typename Type::c_type>(num_values, levels_per_page, values, buffer); PaginateDict<Type>(d, values, def_levels, max_def_level,
