This is an automated email from the ASF dual-hosted git repository. tarmstrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1d5fe2771fb88fc1e9fc4ff84c47c69f4ce1e142 Author: Tim Armstrong <tarmstr...@cloudera.com> AuthorDate: Mon Dec 21 20:05:12 2020 -0800 IMPALA-6434: Add support to decode RLE_DICTIONARY encoded pages The encoding is identical to the already-supported PLAIN_DICTIONARY encoding but the PLAIN enum value is used for the dictionary pages and the RLE_DICTIONARY enum value is used for the data pages. A hidden option -write_new_parquet_dictionary_encodings is added to turn on writing too, for test purposes only. Testing: * Added an automated test using a pregenerated test file. * Ran core tests. * Manually tested by writing out TPC-H lineitem with the new encoding and reading back in Impala and Hive. Parquet-tools output for the generated test file: $ hadoop jar ~/repos/parquet-mr/parquet-tools/target/parquet-tools-1.12.0-SNAPSHOT.jar meta /test-warehouse/att/824de2afebad009f-6f460ade00000003_643159826_data.0.parq 20/12/21 20:28:36 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5 20/12/21 20:28:36 INFO hadoop.ParquetFileReader: reading another 1 footers 20/12/21 20:28:36 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5 file: hdfs://localhost:20500/test-warehouse/att/824de2afebad009f-6f460ade00000003_643159826_data.0.parq creator: impala version 4.0.0-SNAPSHOT (build 7b691c5d4249f0cb1ced8ddf01033fbbe10511d9) file schema: schema -------------------------------------------------------------------------------- id: OPTIONAL INT32 L:INTEGER(32,true) R:0 D:1 bool_col: OPTIONAL BOOLEAN R:0 D:1 tinyint_col: OPTIONAL INT32 L:INTEGER(8,true) R:0 D:1 smallint_col: OPTIONAL INT32 L:INTEGER(16,true) R:0 D:1 int_col: OPTIONAL INT32 L:INTEGER(32,true) R:0 D:1 bigint_col: OPTIONAL INT64 L:INTEGER(64,true) R:0 D:1 float_col: OPTIONAL FLOAT R:0 D:1 double_col: OPTIONAL DOUBLE R:0 D:1 date_string_col: OPTIONAL BINARY R:0 D:1 string_col: OPTIONAL BINARY R:0 D:1 timestamp_col: OPTIONAL INT96 R:0 D:1 year: OPTIONAL INT32 L:INTEGER(32,true) R:0 D:1 month: OPTIONAL INT32 L:INTEGER(32,true) R:0 D:1 row group 1: RC:8 TS:754 OFFSET:4 -------------------------------------------------------------------------------- id: INT32 SNAPPY DO:4 FPO:48 SZ:74/73/0.99 VC:8 ENC:RLE,RLE_DICTIONARY ST:[min: 0, max: 7, num_nulls: 0] bool_col: BOOLEAN SNAPPY DO:0 FPO:141 SZ:26/24/0.92 VC:8 ENC:RLE,PLAIN ST:[min: false, max: true, num_nulls: 0] tinyint_col: INT32 SNAPPY DO:220 FPO:243 SZ:51/47/0.92 VC:8 ENC:RLE,RLE_DICTIONARY ST:[min: 0, max: 1, num_nulls: 0] smallint_col: INT32 SNAPPY DO:343 FPO:366 SZ:51/47/0.92 VC:8 ENC:RLE,RLE_DICTIONARY ST:[min: 0, max: 1, num_nulls: 0] int_col: INT32 SNAPPY DO:467 FPO:490 SZ:51/47/0.92 VC:8 ENC:RLE,RLE_DICTIONARY ST:[min: 0, max: 1, num_nulls: 0] bigint_col: INT64 SNAPPY DO:586 FPO:617 SZ:59/55/0.93 VC:8 ENC:RLE,RLE_DICTIONARY ST:[min: 0, max: 10, num_nulls: 0] float_col: FLOAT SNAPPY DO:724 FPO:747 SZ:51/47/0.92 VC:8 ENC:RLE,RLE_DICTIONARY ST:[min: -0.0, max: 1.1, num_nulls: 0] double_col: DOUBLE SNAPPY DO:845 FPO:876 SZ:59/55/0.93 VC:8 ENC:RLE,RLE_DICTIONARY ST:[min: -0.0, max: 10.1, num_nulls: 0] date_string_col: BINARY SNAPPY DO:983 FPO:1028 SZ:74/88/1.19 VC:8 ENC:RLE,RLE_DICTIONARY ST:[min: 0x30312F30312F3039, max: 0x30342F30312F3039, num_nulls: 0] string_col: BINARY SNAPPY DO:1143 FPO:1168 SZ:53/49/0.92 VC:8 ENC:RLE,RLE_DICTIONARY ST:[min: 0x30, max: 0x31, num_nulls: 0] timestamp_col: INT96 SNAPPY DO:1261 FPO:1329 SZ:98/138/1.41 VC:8 ENC:RLE,RLE_DICTIONARY ST:[num_nulls: 0, min/max not defined] year: INT32 SNAPPY DO:1451 FPO:1470 SZ:47/43/0.91 VC:8 ENC:RLE,RLE_DICTIONARY ST:[min: 2009, max: 2009, num_nulls: 0] month: INT32 SNAPPY DO:1563 FPO:1594 SZ:60/56/0.93 VC:8 ENC:RLE,RLE_DICTIONARY ST:[min: 1, max: 4, num_nulls: 0] Parquet-tools output for one of the lineitem files: $ hadoop jar ~/repos/parquet-mr/parquet-tools/target/parquet-tools-1.12.0-SNAPSHOT.jar meta /test-warehouse/li2/4b4d9143c575dd71-3f69d3cf00000001_1879643220_data.0.parq 20/12/22 09:39:56 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5 20/12/22 09:39:56 INFO hadoop.ParquetFileReader: reading another 1 footers 20/12/22 09:39:56 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5 file: hdfs://localhost:20500/test-warehouse/li2/4b4d9143c575dd71-3f69d3cf00000001_1879643220_data.0.parq creator: impala version 4.0.0-SNAPSHOT (build 7b691c5d4249f0cb1ced8ddf01033fbbe10511d9) file schema: schema -------------------------------------------------------------------------------- l_orderkey: OPTIONAL INT64 L:INTEGER(64,true) R:0 D:1 l_partkey: OPTIONAL INT64 L:INTEGER(64,true) R:0 D:1 l_suppkey: OPTIONAL INT64 L:INTEGER(64,true) R:0 D:1 l_linenumber: OPTIONAL INT32 L:INTEGER(32,true) R:0 D:1 l_quantity: OPTIONAL FIXED_LEN_BYTE_ARRAY L:DECIMAL(12,2) R:0 D:1 l_extendedprice: OPTIONAL FIXED_LEN_BYTE_ARRAY L:DECIMAL(12,2) R:0 D:1 l_discount: OPTIONAL FIXED_LEN_BYTE_ARRAY L:DECIMAL(12,2) R:0 D:1 l_tax: OPTIONAL FIXED_LEN_BYTE_ARRAY L:DECIMAL(12,2) R:0 D:1 l_returnflag: OPTIONAL BINARY R:0 D:1 l_linestatus: OPTIONAL BINARY R:0 D:1 l_shipdate: OPTIONAL BINARY R:0 D:1 l_commitdate: OPTIONAL BINARY R:0 D:1 l_receiptdate: OPTIONAL BINARY R:0 D:1 l_shipinstruct: OPTIONAL BINARY R:0 D:1 l_shipmode: OPTIONAL BINARY R:0 D:1 l_comment: OPTIONAL BINARY R:0 D:1 row group 1: RC:1724693 TS:58432195 OFFSET:4 -------------------------------------------------------------------------------- l_orderkey: INT64 SNAPPY DO:4 FPO:159797 SZ:2839537/13147604/4.63 VC:1724693 ENC:RLE,RLE_DICTIONARY,PLAIN ST:[min: 2142211, max: 6000000, num_nulls: 0] l_partkey: INT64 SNAPPY DO:2839640 FPO:3028619 SZ:8179566/13852808/1.69 VC:1724693 ENC:RLE,RLE_DICTIONARY,PLAIN ST:[min: 1, max: 200000, num_nulls: 0] l_suppkey: INT64 SNAPPY DO:11019308 FPO:11059413 SZ:3063563/3103196/1.01 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 1, max: 10000, num_nulls: 0] l_linenumber: INT32 SNAPPY DO:14082964 FPO:14083007 SZ:412884/650550/1.58 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 1, max: 7, num_nulls: 0] l_quantity: FIXED_LEN_BYTE_ARRAY SNAPPY DO:14495934 FPO:14496204 SZ:1298038/1297963/1.00 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 1.00, max: 50.00, num_nulls: 0] l_extendedprice: FIXED_LEN_BYTE_ARRAY SNAPPY DO:15794062 FPO:16003224 SZ:9087746/10429259/1.15 VC:1724693 ENC:RLE,RLE_DICTIONARY,PLAIN ST:[min: 904.00, max: 104949.50, num_nulls: 0] l_discount: FIXED_LEN_BYTE_ARRAY SNAPPY DO:24881912 FPO:24881976 SZ:866406/866338/1.00 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 0.00, max: 0.10, num_nulls: 0] l_tax: FIXED_LEN_BYTE_ARRAY SNAPPY DO:25748406 FPO:25748463 SZ:866399/866325/1.00 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 0.00, max: 0.08, num_nulls: 0] l_returnflag: BINARY SNAPPY DO:26614888 FPO:26614918 SZ:421113/421069/1.00 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 0x41, max: 0x52, num_nulls: 0] l_linestatus: BINARY SNAPPY DO:27036081 FPO:27036106 SZ:262209/270332/1.03 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 0x46, max: 0x4F, num_nulls: 0] l_shipdate: BINARY SNAPPY DO:27298370 FPO:27309301 SZ:2602937/2627148/1.01 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 0x313939322D30312D3032, max: 0x313939382D31322D3031, num_nulls: 0] l_commitdate: BINARY SNAPPY DO:29901405 FPO:29912079 SZ:2602680/2626308/1.01 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 0x313939322D30312D3331, max: 0x313939382D31302D3331, num_nulls: 0] l_receiptdate: BINARY SNAPPY DO:32504185 FPO:32515219 SZ:2603040/2627498/1.01 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 0x313939322D30312D3036, max: 0x313939382D31322D3330, num_nulls: 0] l_shipinstruct: BINARY SNAPPY DO:35107326 FPO:35107408 SZ:434968/434917/1.00 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 0x434F4C4C45435420434F44, max: 0x54414B45204241434B2052455455524E, num_nulls: 0] l_shipmode: BINARY SNAPPY DO:35542401 FPO:35542471 SZ:650639/650580/1.00 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 0x414952, max: 0x545255434B, num_nulls: 0] l_comment: BINARY SNAPPY DO:36193124 FPO:36711343 SZ:22240470/52696671/2.37 VC:1724693 ENC:RLE,RLE_DICTIONARY,PLAIN ST:[min: 0x20546972657369617320, max: 0x7A7A6C653F20626C697468656C792069726F6E69, num_nulls: 0] Change-Id: I90942022edcd5d96c720a1bde53879e50394660a Reviewed-on: http://gerrit.cloudera.org:8080/16893 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/exec/parquet/hdfs-parquet-scanner.cc | 9 +++-- be/src/exec/parquet/hdfs-parquet-table-writer.cc | 37 +++++++++++++++------ be/src/exec/parquet/parquet-column-chunk-reader.cc | 2 ++ be/src/exec/parquet/parquet-column-readers.cc | 18 +++++----- be/src/exec/parquet/parquet-common.h | 7 ++++ be/src/exec/parquet/parquet-metadata-utils.cc | 1 + testdata/data/README | 7 ++++ testdata/data/alltypes_tiny_rle_dictionary.parquet | Bin 0 -> 3646 bytes .../queries/QueryTest/parquet-rle-dictionary.test | 16 +++++++++ tests/query_test/test_scanners.py | 6 ++++ 10 files changed, 79 insertions(+), 24 deletions(-) diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc index 6d9356d..d71898e 100644 --- a/be/src/exec/parquet/hdfs-parquet-scanner.cc +++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc @@ -955,9 +955,8 @@ bool HdfsParquetScanner::IsDictionaryEncoded( if (col_metadata.__isset.encoding_stats) { // Condition #1 above for (const parquet::PageEncodingStats& enc_stat : col_metadata.encoding_stats) { - if (enc_stat.page_type == parquet::PageType::DATA_PAGE && - enc_stat.encoding != parquet::Encoding::PLAIN_DICTIONARY && - enc_stat.count > 0) { + if (enc_stat.page_type == parquet::PageType::DATA_PAGE + && !IsDictionaryEncoding(enc_stat.encoding) && enc_stat.count > 0) { return false; } } @@ -966,10 +965,10 @@ bool HdfsParquetScanner::IsDictionaryEncoded( bool has_dict_encoding = false; bool has_nondict_encoding = false; for (const parquet::Encoding::type& encoding : col_metadata.encodings) { - if (encoding == parquet::Encoding::PLAIN_DICTIONARY) has_dict_encoding = true; + if (IsDictionaryEncoding(encoding)) has_dict_encoding = true; // RLE and BIT_PACKED are used for repetition/definition levels - if (encoding != parquet::Encoding::PLAIN_DICTIONARY && + if (!IsDictionaryEncoding(encoding) && encoding != parquet::Encoding::RLE && encoding != parquet::Encoding::BIT_PACKED) { has_nondict_encoding = true; diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc index f835fe4..49858e9 100644 --- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc +++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc @@ -89,8 +89,26 @@ using namespace apache::thrift; static const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetTableWriter::$0() failed to allocate $1 bytes for $2."; +DEFINE_bool_hidden(write_new_parquet_dictionary_encodings, false, + "(Experimental) Write parquet files with PLAIN/RLE_DICTIONARY encoding instead of " + "PLAIN_DICTIONARY as recommended by Parquet 2.0 standard"); + namespace impala { +// Returns the parquet::Encoding enum value to use for plain-encoded dictionary pages. +static parquet::Encoding::type DictPageEncoding() { + return FLAGS_write_new_parquet_dictionary_encodings ? + parquet::Encoding::PLAIN : + parquet::Encoding::PLAIN_DICTIONARY; +} + +// Returns the parquet::Encoding enum value to use for dictionary-encoded data pages. +static parquet::Encoding::type DataPageDictionaryEncoding() { + return FLAGS_write_new_parquet_dictionary_encodings ? + parquet::Encoding::RLE_DICTIONARY: + parquet::Encoding::PLAIN_DICTIONARY; +} + // Base class for column writers. This contains most of the logic except for // the type specific functions which are implemented in the subclasses. class HdfsParquetTableWriter::BaseColumnWriter { @@ -329,8 +347,8 @@ class HdfsParquetTableWriter::BaseColumnWriter { // Size of newly created pages. Defaults to DEFAULT_DATA_PAGE_SIZE and is increased // when pages are not big enough. This only happens when there are enough unique values - // such that we switch from PLAIN_DICTIONARY to PLAIN encoding and then have very - // large values (i.e. greater than DEFAULT_DATA_PAGE_SIZE). + // such that we switch from PLAIN_DICTIONARY/RLE_DICTIONARY to PLAIN encoding and then + // have very large values (i.e. greater than DEFAULT_DATA_PAGE_SIZE). // TODO: Consider removing and only creating a single large page as necessary. int64_t page_size_; @@ -422,11 +440,10 @@ class HdfsParquetTableWriter::ColumnWriter : valid_column_index_ = true; // Default to dictionary encoding. If the cardinality ends up being too high, // it will fall back to plain. - current_encoding_ = parquet::Encoding::PLAIN_DICTIONARY; - next_page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY; - dict_encoder_.reset( - new DictEncoder<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_, - parent_->parent_->mem_tracker())); + current_encoding_ = DataPageDictionaryEncoding(); + next_page_encoding_ = DataPageDictionaryEncoding(); + dict_encoder_.reset(new DictEncoder<T>(parent_->per_file_mem_pool_.get(), + plain_encoded_value_size_, parent_->parent_->mem_tracker())); dict_encoder_base_ = dict_encoder_.get(); page_stats_.reset( new ColumnStats<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_)); @@ -439,7 +456,7 @@ class HdfsParquetTableWriter::ColumnWriter : protected: virtual bool ProcessValue(void* value, int64_t* bytes_needed) { T* val = CastValue(value); - if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) { + if (IsDictionaryEncoding(current_encoding_)) { if (UNLIKELY(num_values_since_dict_size_check_ >= DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) { num_values_since_dict_size_check_ = 0; @@ -753,7 +770,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, // Write dictionary page header parquet::DictionaryPageHeader dict_header; dict_header.num_values = dict_encoder_base_->num_entries(); - dict_header.encoding = parquet::Encoding::PLAIN_DICTIONARY; + dict_header.encoding = DictPageEncoding(); ++dict_encoding_stats_[dict_header.encoding]; parquet::PageHeader header; @@ -869,7 +886,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() { // around a parquet MR bug (see IMPALA-759 for more details). if (current_page_->num_non_null == 0) current_encoding_ = parquet::Encoding::PLAIN; - if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) WriteDictDataPage(); + if (IsDictionaryEncoding(current_encoding_)) WriteDictDataPage(); parquet::PageHeader& header = current_page_->header; header.data_page_header.encoding = current_encoding_; diff --git a/be/src/exec/parquet/parquet-column-chunk-reader.cc b/be/src/exec/parquet/parquet-column-chunk-reader.cc index 0dec59f..aa4bacf 100644 --- a/be/src/exec/parquet/parquet-column-chunk-reader.cc +++ b/be/src/exec/parquet/parquet-column-chunk-reader.cc @@ -130,6 +130,8 @@ Status ParquetColumnChunkReader::ReadDictionaryData(ScopedBuffer* uncompressed_b return Status("Dictionary page does not have dictionary header set."); } } + // Check that the dictionary page is PLAIN encoded. PLAIN_DICTIONARY in the context of + // a dictionary page means the same thing. if (dict_header != nullptr && dict_header->encoding != Encoding::PLAIN && dict_header->encoding != Encoding::PLAIN_DICTIONARY) { diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc index b3030d7..a993600 100644 --- a/be/src/exec/parquet/parquet-column-readers.cc +++ b/be/src/exec/parquet/parquet-column-readers.cc @@ -318,14 +318,14 @@ Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataPag DCHECK(slot_desc_ == nullptr || slot_desc_->type().type != TYPE_BOOLEAN) << "Bool has specialized impl"; page_encoding_ = col_chunk_reader_.encoding(); - if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY + if (!IsDictionaryEncoding(page_encoding_) && page_encoding_ != parquet::Encoding::PLAIN) { return GetUnsupportedDecodingError(); } - // If slot_desc_ is NULL, we don't need so decode any values so dict_decoder_ does + // If slot_desc_ is NULL, we don't need to decode any values so dict_decoder_ does // not need to be initialized. - if (page_encoding_ == Encoding::PLAIN_DICTIONARY && slot_desc_ != nullptr) { + if (IsDictionaryEncoding(page_encoding_) && slot_desc_ != nullptr) { if (!dict_decoder_init_) { return Status("File corrupt. Missing dictionary page."); } @@ -363,7 +363,7 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, if (bool_decoder_) { return bool_decoder_->SkipValues(num_values); } - if (page_encoding_ == Encoding::PLAIN_DICTIONARY) { + if (IsDictionaryEncoding(page_encoding_)) { return dict_decoder_.SkipValues(num_values); } else { DCHECK_EQ(page_encoding_, Encoding::PLAIN); @@ -389,7 +389,7 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValue( if (MATERIALIZED) { if (def_level_ >= max_def_level()) { bool continue_execution; - if (page_encoding_ == Encoding::PLAIN_DICTIONARY) { + if (IsDictionaryEncoding(page_encoding_)) { continue_execution = NeedsConversionInline() ? ReadSlot<Encoding::PLAIN_DICTIONARY, true>(tuple) : ReadSlot<Encoding::PLAIN_DICTIONARY, false>(tuple); @@ -643,7 +643,7 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeVa int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT { // Dispatch to the correct templated implementation of MaterializeValueBatch(). - if (page_encoding_ == Encoding::PLAIN_DICTIONARY) { + if (IsDictionaryEncoding(page_encoding_)) { if (NeedsConversionInline()) { return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, true>( max_values, tuple_size, tuple_mem, num_values); @@ -753,7 +753,7 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue( uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end, InternalType* RESTRICT val) RESTRICT { DCHECK_EQ(page_encoding_, ENCODING); - if (ENCODING == Encoding::PLAIN_DICTIONARY) { + if (IsDictionaryEncoding(ENCODING)) { if (UNLIKELY(!dict_decoder_.GetNextValue(val))) { SetDictDecodeError(); return false; @@ -803,7 +803,7 @@ bool ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::DecodeValue( template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValues( int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT { - if (page_encoding_ == Encoding::PLAIN_DICTIONARY) { + if (IsDictionaryEncoding(page_encoding_)) { return DecodeValues<Encoding::PLAIN_DICTIONARY>(stride, count, out_vals); } else { DCHECK_EQ(page_encoding_, Encoding::PLAIN); @@ -815,7 +815,7 @@ template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIAL template <Encoding::type ENCODING> bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValues( int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT { - if (page_encoding_ == Encoding::PLAIN_DICTIONARY) { + if (IsDictionaryEncoding(page_encoding_)) { if (UNLIKELY(!dict_decoder_.GetNextValues(out_vals, stride, count))) { SetDictDecodeError(); return false; diff --git a/be/src/exec/parquet/parquet-common.h b/be/src/exec/parquet/parquet-common.h index a295f58..7a98521 100644 --- a/be/src/exec/parquet/parquet-common.h +++ b/be/src/exec/parquet/parquet-common.h @@ -52,6 +52,13 @@ inline bool operator<(const RowRange& lhs, const RowRange& rhs) { return std::tie(lhs.first, lhs.last) < std::tie(rhs.first, rhs.last); } +// Return true if this is an encoding enum value that indicates that a data page is +// dictionary encoded. +inline bool IsDictionaryEncoding(parquet::Encoding::type encoding) { + return encoding == parquet::Encoding::PLAIN_DICTIONARY + || encoding == parquet::Encoding::RLE_DICTIONARY; +} + /// Return the Impala compression type for the given Parquet codec. The caller must /// validate that the codec is a supported one, otherwise this will DCHECK. THdfsCompression::type ConvertParquetToImpalaCodec(parquet::CompressionCodec::type codec); diff --git a/be/src/exec/parquet/parquet-metadata-utils.cc b/be/src/exec/parquet/parquet-metadata-utils.cc index 2caa949..c999e60 100644 --- a/be/src/exec/parquet/parquet-metadata-utils.cc +++ b/be/src/exec/parquet/parquet-metadata-utils.cc @@ -94,6 +94,7 @@ static bool IsEncodingSupported(parquet::Encoding::type e) { case parquet::Encoding::PLAIN_DICTIONARY: case parquet::Encoding::BIT_PACKED: case parquet::Encoding::RLE: + case parquet::Encoding::RLE_DICTIONARY: return true; default: return false; diff --git a/testdata/data/README b/testdata/data/README index 79bf4e8..0121cef 100644 --- a/testdata/data/README +++ b/testdata/data/README @@ -612,3 +612,10 @@ We generated data by spark-shell, version is 2.4.x, and table data is in testdata/data/iceberg_test/hadoop_catalog/iceberg_resolution_test, this table are generated with HadoopCatalog and Parquet fileformat. We use this table to test complex types for field id resolving. + +alltypes_tiny_rle_dictionary.parquet: +Tiny file using the RLE_DICTIONARY encoding. +Started impala with -write_new_parquet_dictionary_encodings=true + set max_fs_writers=1; + create table att stored as parquet as + select * from functional_parquet.alltypestiny; diff --git a/testdata/data/alltypes_tiny_rle_dictionary.parquet b/testdata/data/alltypes_tiny_rle_dictionary.parquet new file mode 100644 index 0000000..17065b2 Binary files /dev/null and b/testdata/data/alltypes_tiny_rle_dictionary.parquet differ diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-rle-dictionary.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-rle-dictionary.test new file mode 100644 index 0000000..8a64361 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-rle-dictionary.test @@ -0,0 +1,16 @@ +==== +---- QUERY +# Verify that all columns can be read correctly. +select * from alltypes_tiny_rle_dictionary +---- TYPES +INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT +---- RESULTS +0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1 +1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1 +2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2 +3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2 +4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3 +5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3 +6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4 +7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4 +==== diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 5a865cc..e333dad 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -856,6 +856,12 @@ class TestParquet(ImpalaTestSuite): "select * from {0}.{1}".format(unique_database, TABLE_NAME)) assert(len(result.data) == 33) + def test_rle_dictionary_encoding(self, vector, unique_database): + """IMPALA-6434: Add support to decode RLE_DICTIONARY encoded pages.""" + TABLE_NAME = "alltypes_tiny_rle_dictionary" + create_table_from_parquet(self.client, unique_database, TABLE_NAME) + self.run_test_case("QueryTest/parquet-rle-dictionary", vector, unique_database) + def test_type_widening(self, vector, unique_database): """IMPALA-6373: Test that Impala can read parquet file with column types smaller than the schema with larger types"""