Repository: parquet-cpp Updated Branches: refs/heads/master 5a624c069 -> 70f5088f7
PARQUET-549: Add column reader tests for dictionary pages Author: Deepak Majeti <[email protected]> Closes #71 from majetideepak/PARQUET-549 and squashes the following commits: 134cba3 [Deepak Majeti] fixed clang compilation error 18fd2c9 [Deepak Majeti] addressed comments ee47cef [Deepak Majeti] split indices onto multiple pages 06bc0af [Deepak Majeti] resolve clang error 09674f6 [Deepak Majeti] comment edits 10ed327 [Deepak Majeti] re-structured MakePages e00eafe [Deepak Majeti] added scanner tests 832706f [Deepak Majeti] column Reader Test passes 4280ae7 [Deepak Majeti] rebased ae65369 [Deepak Majeti] Test for Column Reader Plain Dictionary Pages 6f9f451 [Deepak Majeti] re-structured code for better reuse Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/70f5088f Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/70f5088f Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/70f5088f Branch: refs/heads/master Commit: 70f5088f75a2aa18d2d57114e7a7f6da342a99d1 Parents: 5a624c0 Author: Deepak Majeti <[email protected]> Authored: Wed Mar 2 21:44:42 2016 -0800 Committer: Wes McKinney <[email protected]> Committed: Wed Mar 2 21:44:42 2016 -0800 ---------------------------------------------------------------------- src/parquet/column/column-reader-test.cc | 65 +++--- src/parquet/column/scanner-test.cc | 165 +++++++-------- src/parquet/column/test-util.h | 294 ++++++++++++++++++++++---- 3 files changed, 361 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/70f5088f/src/parquet/column/column-reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc index a5b918f..855669a 100644 --- a/src/parquet/column/column-reader-test.cc +++ b/src/parquet/column/column-reader-test.cc @@ -45,42 +45,6 @@ namespace test { class TestPrimitiveReader : public ::testing::Test { public: - void MakePages(const ColumnDescriptor *d, int num_pages, int levels_per_page) { - num_levels_ = levels_per_page * num_pages; - num_values_ = 0; - uint32_t seed = 0; - int16_t zero = 0; - vector<int> values_per_page(num_pages, levels_per_page); - // Create definition levels - if (max_def_level_ > 0) { - def_levels_.resize(num_levels_); - random_numbers(num_levels_, seed, zero, max_def_level_, def_levels_.data()); - for (int p = 0; p < num_pages; p++) { - int num_values_per_page = 0; - for (int i = 0; i < levels_per_page; i++) { - if (def_levels_[i + p * levels_per_page] == max_def_level_) { - num_values_per_page++; - num_values_++; - } - } - values_per_page[p] = num_values_per_page; - } - } else { - num_values_ = num_levels_; - } - // Create repitition levels - if (max_rep_level_ > 0) { - rep_levels_.resize(num_levels_); - random_numbers(num_levels_, seed, zero, max_rep_level_, rep_levels_.data()); - } - // Create values - values_.resize(num_values_); - random_numbers(num_values_, seed, std::numeric_limits<int32_t>::min(), - std::numeric_limits<int32_t>::max(), values_.data()); - Paginate<Type::INT32, int32_t>(d, values_, def_levels_, max_def_level_, - rep_levels_, max_rep_level_, levels_per_page, values_per_page, pages_); - } - void InitReader(const ColumnDescriptor* d) { std::unique_ptr<PageReader> pager_; pager_.reset(new test::MockPageReader(pages_)); @@ -124,8 +88,23 @@ class TestPrimitiveReader : public ::testing::Test { ASSERT_EQ(0, values_read); } - void execute(int num_pages, int levels_page, const ColumnDescriptor *d) { - MakePages(d, num_pages, levels_page); + void ExecutePlain(int num_pages, int levels_per_page, const ColumnDescriptor *d) { + num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, + rep_levels_, values_, data_buffer_, pages_, Encoding::PLAIN); + num_levels_ = num_pages * levels_per_page; + InitReader(d); + CheckResults(); + values_.clear(); + def_levels_.clear(); + rep_levels_.clear(); + pages_.clear(); + reader_.reset(); + } + + void ExecuteDict(int num_pages, int levels_per_page, const ColumnDescriptor *d) { + num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_, + rep_levels_, values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY); + num_levels_ = num_pages * levels_per_page; InitReader(d); CheckResults(); } @@ -140,6 +119,7 @@ class TestPrimitiveReader : public ::testing::Test { vector<int32_t> values_; vector<int16_t> def_levels_; vector<int16_t> rep_levels_; + vector<uint8_t> data_buffer_; // For BA and FLBA }; TEST_F(TestPrimitiveReader, TestInt32FlatRequired) { @@ -149,7 +129,8 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequired) { max_rep_level_ = 0; NodePtr type = schema::Int32("a", Repetition::REQUIRED); const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); - execute(num_pages, levels_per_page, &descr); + ExecutePlain(num_pages, levels_per_page, &descr); + ExecuteDict(num_pages, levels_per_page, &descr); } TEST_F(TestPrimitiveReader, TestInt32FlatOptional) { @@ -159,7 +140,8 @@ TEST_F(TestPrimitiveReader, TestInt32FlatOptional) { max_rep_level_ = 0; NodePtr type = schema::Int32("b", Repetition::OPTIONAL); const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); - execute(num_pages, levels_per_page, &descr); + ExecutePlain(num_pages, levels_per_page, &descr); + ExecuteDict(num_pages, levels_per_page, &descr); } TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) { @@ -169,7 +151,8 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) { max_rep_level_ = 2; NodePtr type = schema::Int32("c", Repetition::REPEATED); const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); - execute(num_pages, levels_per_page, &descr); + ExecutePlain(num_pages, levels_per_page, &descr); + ExecuteDict(num_pages, levels_per_page, &descr); } } // namespace test http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/70f5088f/src/parquet/column/scanner-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc index 32c1ea5..b52a993 100644 --- a/src/parquet/column/scanner-test.cc +++ b/src/parquet/column/scanner-test.cc @@ -41,59 +41,58 @@ namespace parquet_cpp { using schema::NodePtr; static int FLBA_LENGTH = 12; + bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) { return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH); } namespace test { +template<> +void InitValues<bool>(int num_values, vector<bool>& values, + vector<uint8_t>& buffer) { + values = flip_coins(num_values, 0); +} + +template<> +void InitValues<Int96>(int num_values, vector<Int96>& values, + vector<uint8_t>& buffer) { + random_Int96_numbers(num_values, 0, std::numeric_limits<int32_t>::min(), + std::numeric_limits<int32_t>::max(), values.data()); +} + +template<> +void InitValues<ByteArray>(int num_values, vector<ByteArray>& values, + vector<uint8_t>& buffer) { + int max_byte_array_len = 12; + int num_bytes = max_byte_array_len + sizeof(uint32_t); + size_t nbytes = num_values * num_bytes; + buffer.resize(nbytes); + random_byte_array(num_values, 0, buffer.data(), values.data(), + max_byte_array_len); +} + +template<> +void InitValues<FLBA>(int num_values, vector<FLBA>& values, + vector<uint8_t>& buffer) { + size_t nbytes = num_values * FLBA_LENGTH; + buffer.resize(nbytes); + random_fixed_byte_array(num_values, 0, buffer.data(), FLBA_LENGTH, + values.data()); +} + +template<> +void InitDictValues<bool>(int num_values, int dict_per_page, + vector<bool>& values, vector<uint8_t>& buffer) { + // No op for bool +} + + template <typename Type> class TestFlatScanner : public ::testing::Test { public: typedef typename Type::c_type T; - void InitValues() { - random_numbers(num_values_, 0, std::numeric_limits<T>::min(), - std::numeric_limits<T>::max(), values_.data()); - } - - void MakePages(const ColumnDescriptor *d, int num_pages, int levels_per_page) { - num_levels_ = levels_per_page * num_pages; - num_values_ = 0; - uint32_t seed = 0; - int16_t zero = 0; - int16_t max_def_level = d->max_definition_level(); - int16_t max_rep_level = d->max_repetition_level(); - vector<int> values_per_page(num_pages, levels_per_page); - // Create definition levels - if (max_def_level > 0) { - def_levels_.resize(num_levels_); - random_numbers(num_levels_, seed, zero, max_def_level, def_levels_.data()); - for (int p = 0; p < num_pages; p++) { - int num_values_per_page = 0; - for (int i = 0; i < levels_per_page; i++) { - if (def_levels_[i + p * levels_per_page] == max_def_level) { - num_values_per_page++; - num_values_++; - } - } - values_per_page[p] = num_values_per_page; - } - } else { - num_values_ = num_levels_; - } - // Create repitition levels - if (max_rep_level > 0) { - rep_levels_.resize(num_levels_); - random_numbers(num_levels_, seed, zero, max_rep_level, rep_levels_.data()); - } - // Create values - values_.resize(num_values_); - InitValues(); - Paginate<Type::type_num>(d, values_, def_levels_, max_def_level, - rep_levels_, max_rep_level, levels_per_page, values_per_page, pages_); - } - void InitScanner(const ColumnDescriptor *d) { std::unique_ptr<PageReader> pager(new test::MockPageReader(pages_)); scanner_ = Scanner::Make(ColumnReader::Make(d, std::move(pager))); @@ -111,7 +110,8 @@ class TestFlatScanner : public ::testing::Test { for (int i = 0; i < num_levels_; i++) { ASSERT_TRUE(scanner->Next(&val, &def_level, &rep_level, &is_null)) << i << j; if (!is_null) { - ASSERT_EQ(values_[j++], val) << i <<"V"<< j; + ASSERT_EQ(values_[j], val) << i <<"V"<< j; + j++; } if (d->max_definition_level() > 0) { ASSERT_EQ(def_levels_[i], def_level) << i <<"D"<< j; @@ -131,9 +131,11 @@ class TestFlatScanner : public ::testing::Test { rep_levels_.clear(); } - void Execute(int num_pages, int levels_page, int batch_size, - const ColumnDescriptor *d) { - MakePages(d, num_pages, levels_page); + void Execute(int num_pages, int levels_per_page, int batch_size, + const ColumnDescriptor *d, Encoding::type encoding) { + num_values_ = MakePages<Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_, + values_, data_buffer_, pages_, encoding); + num_levels_ = num_pages * levels_per_page; InitScanner(d); CheckResults(batch_size, d); Clear(); @@ -154,17 +156,18 @@ class TestFlatScanner : public ::testing::Test { d3.reset(new ColumnDescriptor(type, 4, 2)); } - void ExecuteAll(int num_pages, int num_levels, int batch_size, int type_length) { + void ExecuteAll(int num_pages, int num_levels, int batch_size, int type_length, + Encoding::type encoding = Encoding::PLAIN) { std::shared_ptr<ColumnDescriptor> d1; std::shared_ptr<ColumnDescriptor> d2; std::shared_ptr<ColumnDescriptor> d3; InitDescriptors(d1, d2, d3, type_length); // evaluate REQUIRED pages - Execute(num_pages, num_levels, batch_size, d1.get()); + Execute(num_pages, num_levels, batch_size, d1.get(), encoding); // evaluate OPTIONAL pages - Execute(num_pages, num_levels, batch_size, d2.get()); + Execute(num_pages, num_levels, batch_size, d2.get(), encoding); // evaluate REPEATED pages - Execute(num_pages, num_levels, batch_size, d3.get()); + Execute(num_pages, num_levels, batch_size, d3.get(), encoding); } protected: @@ -178,62 +181,50 @@ class TestFlatScanner : public ::testing::Test { vector<uint8_t> data_buffer_; // For BA and FLBA }; -template<> -void TestFlatScanner<BooleanType>::InitValues() { - values_ = flip_coins(num_values_, 0); -} - -template<> -void TestFlatScanner<Int96Type>::InitValues() { - random_Int96_numbers(num_values_, 0, std::numeric_limits<int32_t>::min(), - std::numeric_limits<int32_t>::max(), values_.data()); -} - -template<> -void TestFlatScanner<ByteArrayType>::InitValues() { - int max_byte_array_len = 12; - int num_bytes = max_byte_array_len + sizeof(uint32_t); - int nbytes = num_values_ * num_bytes; - data_buffer_.resize(nbytes); - random_byte_array(num_values_, 0, data_buffer_.data(), values_.data(), - max_byte_array_len); -} - -template<> -void TestFlatScanner<FLBAType>::InitValues() { - int nbytes = num_values_ * FLBA_LENGTH; - data_buffer_.resize(nbytes); - random_fixed_byte_array(num_values_, 0, data_buffer_.data(), FLBA_LENGTH, - values_.data()); -} - typedef TestFlatScanner<FLBAType> TestFlatFLBAScanner; static int num_levels_per_page = 100; static int num_pages = 20; static int batch_size = 32; -typedef ::testing::Types<BooleanType, Int32Type, Int64Type, Int96Type, +typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType, ByteArrayType> TestTypes; +typedef TestFlatScanner<BooleanType> TestBooleanFlatScanner; typedef TestFlatScanner<FLBAType> TestFLBAFlatScanner; TYPED_TEST_CASE(TestFlatScanner, TestTypes); -TYPED_TEST(TestFlatScanner, TestScanner) { +TYPED_TEST(TestFlatScanner, TestPlainScanner) { + this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0, Encoding::PLAIN); +} + +TYPED_TEST(TestFlatScanner, TestDictScanner) { + this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0, + Encoding::RLE_DICTIONARY); +} + +TEST_F(TestBooleanFlatScanner, TestPlainScanner) { this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0); } -TEST_F(TestFLBAFlatScanner, TestScanner) { +TEST_F(TestFLBAFlatScanner, TestPlainScanner) { this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH); } +TEST_F(TestFLBAFlatScanner, TestDictScanner) { + this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH, + Encoding::RLE_DICTIONARY); +} + //PARQUET 502 TEST_F(TestFlatFLBAScanner, TestSmallBatch) { NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2); const ColumnDescriptor d(type, 0, 0); - MakePages(&d, 1, 100); + num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_, + data_buffer_, pages_); + num_levels_ = 1 * 100; InitScanner(&d); CheckResults(1, &d); } @@ -242,7 +233,9 @@ TEST_F(TestFlatFLBAScanner, TestDescriptorAPI) { NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2); const ColumnDescriptor d(type, 4, 0); - MakePages(&d, 1, 100); + num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_, + data_buffer_, pages_); + num_levels_ = 1 * 100; InitScanner(&d); TypedScanner<FLBAType::type_num>* scanner = reinterpret_cast<TypedScanner<FLBAType::type_num>* >(scanner_.get()); @@ -255,7 +248,9 @@ TEST_F(TestFlatFLBAScanner, TestFLBAPrinterNext) { NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2); const ColumnDescriptor d(type, 4, 0); - MakePages(&d, 1, 100); + num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_, + data_buffer_, pages_); + num_levels_ = 1 * 100; InitScanner(&d); TypedScanner<FLBAType::type_num>* scanner = reinterpret_cast<TypedScanner<FLBAType::type_num>* >(scanner_.get()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/70f5088f/src/parquet/column/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h index 36e9860..c9b08c2 100644 --- a/src/parquet/column/test-util.h +++ b/src/parquet/column/test-util.h @@ -23,6 +23,7 @@ #define PARQUET_COLUMN_TEST_UTIL_H #include <algorithm> +#include <limits> #include <memory> #include <vector> #include <string> @@ -32,40 +33,68 @@ // Depended on by SerializedPageReader test utilities for now #include "parquet/encodings/plain-encoding.h" +#include "parquet/encodings/dictionary-encoding.h" #include "parquet/util/input.h" #include "parquet/util/test-common.h" +using std::vector; +using std::shared_ptr; + namespace parquet_cpp { namespace test { +template <typename T> +static void InitValues(int num_values, vector<T>& values, + vector<uint8_t>& buffer) { + random_numbers(num_values, 0, std::numeric_limits<T>::min(), + std::numeric_limits<T>::max(), values.data()); +} + +template <typename T> +static void InitDictValues(int num_values, int num_dicts, + vector<T>& values, vector<uint8_t>& buffer) { + int repeat_factor = num_values / num_dicts; + InitValues<T>(num_dicts, values, buffer); + // add some repeated values + for (int j = 1; j < repeat_factor; ++j) { + for (int i = 0; i < num_dicts; ++i) { + std::memcpy(&values[num_dicts * j + i], &values[i], sizeof(T)); + } + } + // computed only dict_per_page * repeat_factor - 1 values < num_values + // compute remaining + for (int i = num_dicts * repeat_factor; i < num_values; ++i) { + std::memcpy(&values[i], &values[i - num_dicts * repeat_factor], sizeof(T)); + } +} + class MockPageReader : public PageReader { public: - explicit MockPageReader(const std::vector<std::shared_ptr<Page> >& pages) : + explicit MockPageReader(const vector<shared_ptr<Page> >& pages) : pages_(pages), page_index_(0) {} // Implement the PageReader interface - virtual std::shared_ptr<Page> NextPage() { + virtual shared_ptr<Page> NextPage() { if (page_index_ == static_cast<int>(pages_.size())) { // EOS to consumer - return std::shared_ptr<Page>(nullptr); + return shared_ptr<Page>(nullptr); } return pages_[page_index_++]; } private: - std::vector<std::shared_ptr<Page> > pages_; + vector<shared_ptr<Page> > pages_; int page_index_; }; // TODO(wesm): this is only used for testing for now. Refactor to form part of // primary file write path - -template <int TYPE> +template <typename Type> class DataPageBuilder { public: - typedef typename type_traits<TYPE>::value_type T; + typedef typename Type::c_type T; // This class writes data and metadata to the passed inputs explicit DataPageBuilder(InMemoryOutputStream* sink) : @@ -79,7 +108,7 @@ class DataPageBuilder { have_values_(false) { } - void AppendDefLevels(const std::vector<int16_t>& levels, int16_t max_level, + void AppendDefLevels(const vector<int16_t>& levels, int16_t max_level, Encoding::type encoding = Encoding::RLE) { AppendLevels(levels, max_level, encoding); @@ -88,7 +117,7 @@ class DataPageBuilder { have_def_levels_ = true; } - void AppendRepLevels(const std::vector<int16_t>& levels, int16_t max_level, + void AppendRepLevels(const vector<int16_t>& levels, int16_t max_level, Encoding::type encoding = Encoding::RLE) { AppendLevels(levels, max_level, encoding); @@ -97,12 +126,9 @@ class DataPageBuilder { have_rep_levels_ = true; } - void AppendValues(const ColumnDescriptor *d, const std::vector<T>& values, + void AppendValues(const ColumnDescriptor *d, const vector<T>& values, Encoding::type encoding = Encoding::PLAIN) { - if (encoding != Encoding::PLAIN) { - ParquetException::NYI("only plain encoding currently implemented"); - } - PlainEncoder<TYPE> encoder(d); + PlainEncoder<Type::type_num> encoder(d); encoder.Encode(&values[0], values.size(), sink_); num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_); @@ -139,14 +165,14 @@ class DataPageBuilder { bool have_values_; // Used internally for both repetition and definition levels - void AppendLevels(const std::vector<int16_t>& levels, int16_t max_level, + void AppendLevels(const vector<int16_t>& levels, int16_t max_level, Encoding::type encoding) { if (encoding != Encoding::RLE) { ParquetException::NYI("only rle encoding currently implemented"); } // TODO: compute a more precise maximum size for the encoded levels - std::vector<uint8_t> encode_buffer(levels.size() * 4); + vector<uint8_t> encode_buffer(levels.size() * 2); // We encode into separate memory from the output stream because the // RLE-encoded bytes have to be preceded in the stream by their absolute @@ -164,8 +190,8 @@ class DataPageBuilder { }; template<> -void DataPageBuilder<Type::BOOLEAN>::AppendValues(const ColumnDescriptor *d, - const std::vector<bool>& values, Encoding::type encoding) { +void DataPageBuilder<BooleanType>::AppendValues(const ColumnDescriptor *d, + const vector<bool>& values, Encoding::type encoding) { if (encoding != Encoding::PLAIN) { ParquetException::NYI("only plain encoding currently implemented"); } @@ -177,39 +203,180 @@ void DataPageBuilder<Type::BOOLEAN>::AppendValues(const ColumnDescriptor *d, have_values_ = true; } -template <int TYPE, typename T> -static std::shared_ptr<DataPage> MakeDataPage(const ColumnDescriptor *d, - const std::vector<T>& values, - const std::vector<int16_t>& def_levels, int16_t max_def_level, - const std::vector<int16_t>& rep_levels, int16_t max_rep_level) { +template <typename Type> +static shared_ptr<DataPage> MakeDataPage(const ColumnDescriptor *d, + const vector<typename Type::c_type>& values, int num_vals, + Encoding::type encoding, const uint8_t* indices, int indices_size, + const vector<int16_t>& def_levels, int16_t max_def_level, + const vector<int16_t>& rep_levels, int16_t max_rep_level) { + int num_values = 0; + InMemoryOutputStream page_stream; - test::DataPageBuilder<TYPE> page_builder(&page_stream); + test::DataPageBuilder<Type> page_builder(&page_stream); if (!rep_levels.empty()) { page_builder.AppendRepLevels(rep_levels, max_rep_level); } - if (!def_levels.empty()) { page_builder.AppendDefLevels(def_levels, max_def_level); } - page_builder.AppendValues(d, values); + if (encoding == Encoding::PLAIN) { + page_builder.AppendValues(d, values, encoding); + num_values = page_builder.num_values(); + } else {// DICTIONARY PAGES + page_stream.Write(indices, indices_size); + num_values = std::max(page_builder.num_values(), num_vals); + } auto buffer = page_stream.GetBuffer(); - return std::make_shared<DataPage>(buffer, page_builder.num_values(), - page_builder.encoding(), + return std::make_shared<DataPage>(buffer, num_values, + encoding, page_builder.def_level_encoding(), page_builder.rep_level_encoding()); } -template <int TYPE, typename T> -static void Paginate(const ColumnDescriptor *d, - const std::vector<T>& values, - const std::vector<int16_t>& def_levels, int16_t max_def_level, - const std::vector<int16_t>& rep_levels, int16_t max_rep_level, - int num_levels_per_page, const std::vector<int>& values_per_page, - std::vector<std::shared_ptr<Page> >& pages) { + +template <typename TYPE> +class DictionaryPageBuilder { + public: + typedef typename TYPE::c_type TC; + static constexpr int TN = TYPE::type_num; + + // This class writes data and metadata to the passed inputs + explicit DictionaryPageBuilder(const ColumnDescriptor *d) : + num_dict_values_(0), + have_values_(false) { + int type_length = 0; + if (TN == Type::FIXED_LEN_BYTE_ARRAY) { + type_length = d->type_length(); + } + encoder_.reset(new DictEncoder<TC>(&pool_, type_length)); + } + + ~DictionaryPageBuilder() { + pool_.FreeAll(); + } + + shared_ptr<Buffer> AppendValues(const vector<TC>& values) { + shared_ptr<OwnedMutableBuffer> rle_indices = std::make_shared<OwnedMutableBuffer>(); + int num_values = values.size(); + // Dictionary encoding + for (int i = 0; i < num_values; ++i) { + encoder_->Put(values[i]); + } + num_dict_values_ = encoder_->num_entries(); + have_values_ = true; + rle_indices->Resize(sizeof(int) * encoder_->EstimatedDataEncodedSize()); + int actual_bytes = encoder_->WriteIndices(rle_indices->mutable_data(), + rle_indices->size()); + rle_indices->Resize(actual_bytes); + encoder_->ClearIndices(); + return rle_indices; + } + + shared_ptr<Buffer> WriteDict() { + shared_ptr<OwnedMutableBuffer> dict_buffer = std::make_shared<OwnedMutableBuffer>(); + dict_buffer->Resize(encoder_->dict_encoded_size()); + encoder_->WriteDict(dict_buffer->mutable_data()); + return dict_buffer; + } + + int32_t num_values() const { + return num_dict_values_; + } + + private: + MemPool pool_; + shared_ptr<DictEncoder<TC> > encoder_; + int32_t num_dict_values_; + bool have_values_; +}; + +template<> +DictionaryPageBuilder<BooleanType>::DictionaryPageBuilder(const ColumnDescriptor *d) { + ParquetException::NYI("only plain encoding currently implemented for boolean"); +} + +template<> +shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::WriteDict() { + ParquetException::NYI("only plain encoding currently implemented for boolean"); + return nullptr; +} + +template<> +shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::AppendValues( + const vector<TC>& values) { + ParquetException::NYI("only plain encoding currently implemented for boolean"); + return nullptr; +} + +template <typename Type> +static shared_ptr<DictionaryPage> MakeDictPage(const ColumnDescriptor *d, + const vector<typename Type::c_type>& values, const vector<int>& values_per_page, + Encoding::type encoding, vector<shared_ptr<Buffer> >& rle_indices) { + InMemoryOutputStream page_stream; + test::DictionaryPageBuilder<Type> page_builder(d); + int num_pages = values_per_page.size(); + int value_start = 0; + + for (int i = 0; i < num_pages; i++) { + rle_indices.push_back(page_builder.AppendValues(slice(values, value_start, + value_start + values_per_page[i]))); + value_start += values_per_page[i]; + } + + auto buffer = page_builder.WriteDict(); + + return std::make_shared<DictionaryPage>(buffer, page_builder.num_values(), + Encoding::PLAIN); +} + +// Given def/rep levels and values create multiple dict pages +template <typename Type> +static void PaginateDict(const ColumnDescriptor *d, + const vector<typename Type::c_type>& values, + const vector<int16_t>& def_levels, int16_t max_def_level, + const vector<int16_t>& rep_levels, int16_t max_rep_level, + int num_levels_per_page, const vector<int>& values_per_page, + vector<shared_ptr<Page> >& pages, + Encoding::type encoding = Encoding::RLE_DICTIONARY) { + int num_pages = values_per_page.size(); + vector<shared_ptr<Buffer> > rle_indices; + shared_ptr<DictionaryPage> dict_page = MakeDictPage<Type>(d, values, values_per_page, + encoding, rle_indices); + pages.push_back(dict_page); + int def_level_start = 0; + int def_level_end = 0; + int rep_level_start = 0; + int rep_level_end = 0; + for (int i = 0; i < num_pages; i++) { + if (max_def_level > 0) { + def_level_start = i * num_levels_per_page; + def_level_end = (i + 1) * num_levels_per_page; + } + if (max_rep_level > 0) { + rep_level_start = i * num_levels_per_page; + rep_level_end = (i + 1) * num_levels_per_page; + } + shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(d, {}, values_per_page[i], + encoding, rle_indices[i]->data(), rle_indices[i]->size(), + slice(def_levels, def_level_start, def_level_end), max_def_level, + slice(rep_levels, rep_level_start, rep_level_end), max_rep_level); + pages.push_back(data_page); + } +} + +// Given def/rep levels and values create multiple plain pages +template <typename Type> +static void PaginatePlain(const ColumnDescriptor *d, + const vector<typename Type::c_type>& values, + const vector<int16_t>& def_levels, int16_t max_def_level, + const vector<int16_t>& rep_levels, int16_t max_rep_level, + int num_levels_per_page, const vector<int>& values_per_page, + vector<shared_ptr<Page> >& pages, + Encoding::type encoding = Encoding::PLAIN) { int num_pages = values_per_page.size(); int def_level_start = 0; int def_level_end = 0; @@ -225,8 +392,9 @@ static void Paginate(const ColumnDescriptor *d, rep_level_start = i * num_levels_per_page; rep_level_end = (i + 1) * num_levels_per_page; } - std::shared_ptr<DataPage> page = MakeDataPage<TYPE>(d, - slice(values, value_start, value_start + values_per_page[i]), + shared_ptr<DataPage> page = MakeDataPage<Type>(d, + slice(values, value_start, value_start + values_per_page[i]), values_per_page[i], + encoding, NULL, 0, slice(def_levels, def_level_start, def_level_end), max_def_level, slice(rep_levels, rep_level_start, rep_level_end), max_rep_level); pages.push_back(page); @@ -234,6 +402,58 @@ static void Paginate(const ColumnDescriptor *d, } } +// Generates pages from randomly generated data +template <typename Type> +static int MakePages(const ColumnDescriptor *d, int num_pages, int levels_per_page, + vector<int16_t>& def_levels, vector<int16_t>& rep_levels, + vector<typename Type::c_type>& values, vector<uint8_t>& buffer, + vector<shared_ptr<Page> >& pages, + Encoding::type encoding = Encoding::PLAIN) { + int num_levels = levels_per_page * num_pages; + int num_values = 0; + uint32_t seed = 0; + int16_t zero = 0; + int16_t max_def_level = d->max_definition_level(); + int16_t max_rep_level = d->max_repetition_level(); + vector<int> values_per_page(num_pages, levels_per_page); + // Create definition levels + if (max_def_level > 0) { + def_levels.resize(num_levels); + random_numbers(num_levels, seed, zero, max_def_level, def_levels.data()); + for (int p = 0; p < num_pages; p++) { + int num_values_per_page = 0; + for (int i = 0; i < levels_per_page; i++) { + if (def_levels[i + p * levels_per_page] == max_def_level) { + num_values_per_page++; + num_values++; + } + } + values_per_page[p] = num_values_per_page; + } + } else { + num_values = num_levels; + } + // Create repitition levels + if (max_rep_level > 0) { + rep_levels.resize(num_levels); + random_numbers(num_levels, seed, zero, max_rep_level, rep_levels.data()); + } + // Create values + values.resize(num_values); + if (encoding == Encoding::PLAIN) { + InitValues<typename Type::c_type>(num_values, values, buffer); + PaginatePlain<Type>(d, values, def_levels, max_def_level, + rep_levels, max_rep_level, levels_per_page, values_per_page, pages); + } else if (encoding == Encoding::RLE_DICTIONARY) { + // Calls InitValues and repeats the data + InitDictValues<typename Type::c_type>(num_values, levels_per_page, values, buffer); + PaginateDict<Type>(d, values, def_levels, max_def_level, + rep_levels, max_rep_level, levels_per_page, values_per_page, pages); + } + + return num_values; +} + } // namespace test } // namespace parquet_cpp
