This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1d5fe2771fb88fc1e9fc4ff84c47c69f4ce1e142
Author: Tim Armstrong <tarmstr...@cloudera.com>
AuthorDate: Mon Dec 21 20:05:12 2020 -0800

    IMPALA-6434: Add support to decode RLE_DICTIONARY encoded pages
    
    The encoding is identical to the already-supported PLAIN_DICTIONARY
    encoding but the PLAIN enum value is used for the dictionary pages
    and the RLE_DICTIONARY enum value is used for the data pages.
    
    A hidden option -write_new_parquet_dictionary_encodings is
    added to turn on writing too, for test purposes only.
    
    Testing:
    * Added an automated test using a pregenerated test file.
    * Ran core tests.
    * Manually tested by writing out TPC-H lineitem with the new encoding
      and reading back in Impala and Hive.
    
    Parquet-tools output for the generated test file:
    $ hadoop jar 
~/repos/parquet-mr/parquet-tools/target/parquet-tools-1.12.0-SNAPSHOT.jar meta 
/test-warehouse/att/824de2afebad009f-6f460ade00000003_643159826_data.0.parq
    20/12/21 20:28:36 INFO hadoop.ParquetFileReader: Initiating action with 
parallelism: 5
    20/12/21 20:28:36 INFO hadoop.ParquetFileReader: reading another 1 footers
    20/12/21 20:28:36 INFO hadoop.ParquetFileReader: Initiating action with 
parallelism: 5
    file:            
hdfs://localhost:20500/test-warehouse/att/824de2afebad009f-6f460ade00000003_643159826_data.0.parq
    creator:         impala version 4.0.0-SNAPSHOT (build 
7b691c5d4249f0cb1ced8ddf01033fbbe10511d9)
    
    file schema:     schema
    
--------------------------------------------------------------------------------
    id:              OPTIONAL INT32 L:INTEGER(32,true) R:0 D:1
    bool_col:        OPTIONAL BOOLEAN R:0 D:1
    tinyint_col:     OPTIONAL INT32 L:INTEGER(8,true) R:0 D:1
    smallint_col:    OPTIONAL INT32 L:INTEGER(16,true) R:0 D:1
    int_col:         OPTIONAL INT32 L:INTEGER(32,true) R:0 D:1
    bigint_col:      OPTIONAL INT64 L:INTEGER(64,true) R:0 D:1
    float_col:       OPTIONAL FLOAT R:0 D:1
    double_col:      OPTIONAL DOUBLE R:0 D:1
    date_string_col: OPTIONAL BINARY R:0 D:1
    string_col:      OPTIONAL BINARY R:0 D:1
    timestamp_col:   OPTIONAL INT96 R:0 D:1
    year:            OPTIONAL INT32 L:INTEGER(32,true) R:0 D:1
    month:           OPTIONAL INT32 L:INTEGER(32,true) R:0 D:1
    
    row group 1:     RC:8 TS:754 OFFSET:4
    
--------------------------------------------------------------------------------
    id:               INT32 SNAPPY DO:4 FPO:48 SZ:74/73/0.99 VC:8 
ENC:RLE,RLE_DICTIONARY ST:[min: 0, max: 7, num_nulls: 0]
    bool_col:         BOOLEAN SNAPPY DO:0 FPO:141 SZ:26/24/0.92 VC:8 
ENC:RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
    tinyint_col:      INT32 SNAPPY DO:220 FPO:243 SZ:51/47/0.92 VC:8 
ENC:RLE,RLE_DICTIONARY ST:[min: 0, max: 1, num_nulls: 0]
    smallint_col:     INT32 SNAPPY DO:343 FPO:366 SZ:51/47/0.92 VC:8 
ENC:RLE,RLE_DICTIONARY ST:[min: 0, max: 1, num_nulls: 0]
    int_col:          INT32 SNAPPY DO:467 FPO:490 SZ:51/47/0.92 VC:8 
ENC:RLE,RLE_DICTIONARY ST:[min: 0, max: 1, num_nulls: 0]
    bigint_col:       INT64 SNAPPY DO:586 FPO:617 SZ:59/55/0.93 VC:8 
ENC:RLE,RLE_DICTIONARY ST:[min: 0, max: 10, num_nulls: 0]
    float_col:        FLOAT SNAPPY DO:724 FPO:747 SZ:51/47/0.92 VC:8 
ENC:RLE,RLE_DICTIONARY ST:[min: -0.0, max: 1.1, num_nulls: 0]
    double_col:       DOUBLE SNAPPY DO:845 FPO:876 SZ:59/55/0.93 VC:8 
ENC:RLE,RLE_DICTIONARY ST:[min: -0.0, max: 10.1, num_nulls: 0]
    date_string_col:  BINARY SNAPPY DO:983 FPO:1028 SZ:74/88/1.19 VC:8 
ENC:RLE,RLE_DICTIONARY ST:[min: 0x30312F30312F3039, max: 0x30342F30312F3039, 
num_nulls: 0]
    string_col:       BINARY SNAPPY DO:1143 FPO:1168 SZ:53/49/0.92 VC:8 
ENC:RLE,RLE_DICTIONARY ST:[min: 0x30, max: 0x31, num_nulls: 0]
    timestamp_col:    INT96 SNAPPY DO:1261 FPO:1329 SZ:98/138/1.41 VC:8 
ENC:RLE,RLE_DICTIONARY ST:[num_nulls: 0, min/max not defined]
    year:             INT32 SNAPPY DO:1451 FPO:1470 SZ:47/43/0.91 VC:8 
ENC:RLE,RLE_DICTIONARY ST:[min: 2009, max: 2009, num_nulls: 0]
    month:            INT32 SNAPPY DO:1563 FPO:1594 SZ:60/56/0.93 VC:8 
ENC:RLE,RLE_DICTIONARY ST:[min: 1, max: 4, num_nulls: 0]
    
    Parquet-tools output for one of the lineitem files:
    $ hadoop jar 
~/repos/parquet-mr/parquet-tools/target/parquet-tools-1.12.0-SNAPSHOT.jar meta 
/test-warehouse/li2/4b4d9143c575dd71-3f69d3cf00000001_1879643220_data.0.parq
    20/12/22 09:39:56 INFO hadoop.ParquetFileReader: Initiating action with 
parallelism: 5
    20/12/22 09:39:56 INFO hadoop.ParquetFileReader: reading another 1 footers
    20/12/22 09:39:56 INFO hadoop.ParquetFileReader: Initiating action with 
parallelism: 5
    file:            
hdfs://localhost:20500/test-warehouse/li2/4b4d9143c575dd71-3f69d3cf00000001_1879643220_data.0.parq
    creator:         impala version 4.0.0-SNAPSHOT (build 
7b691c5d4249f0cb1ced8ddf01033fbbe10511d9)
    
    file schema:     schema
    
--------------------------------------------------------------------------------
    l_orderkey:      OPTIONAL INT64 L:INTEGER(64,true) R:0 D:1
    l_partkey:       OPTIONAL INT64 L:INTEGER(64,true) R:0 D:1
    l_suppkey:       OPTIONAL INT64 L:INTEGER(64,true) R:0 D:1
    l_linenumber:    OPTIONAL INT32 L:INTEGER(32,true) R:0 D:1
    l_quantity:      OPTIONAL FIXED_LEN_BYTE_ARRAY L:DECIMAL(12,2) R:0 D:1
    l_extendedprice: OPTIONAL FIXED_LEN_BYTE_ARRAY L:DECIMAL(12,2) R:0 D:1
    l_discount:      OPTIONAL FIXED_LEN_BYTE_ARRAY L:DECIMAL(12,2) R:0 D:1
    l_tax:           OPTIONAL FIXED_LEN_BYTE_ARRAY L:DECIMAL(12,2) R:0 D:1
    l_returnflag:    OPTIONAL BINARY R:0 D:1
    l_linestatus:    OPTIONAL BINARY R:0 D:1
    l_shipdate:      OPTIONAL BINARY R:0 D:1
    l_commitdate:    OPTIONAL BINARY R:0 D:1
    l_receiptdate:   OPTIONAL BINARY R:0 D:1
    l_shipinstruct:  OPTIONAL BINARY R:0 D:1
    l_shipmode:      OPTIONAL BINARY R:0 D:1
    l_comment:       OPTIONAL BINARY R:0 D:1
    
    row group 1:     RC:1724693 TS:58432195 OFFSET:4
    
--------------------------------------------------------------------------------
    l_orderkey:       INT64 SNAPPY DO:4 FPO:159797 SZ:2839537/13147604/4.63 
VC:1724693 ENC:RLE,RLE_DICTIONARY,PLAIN ST:[min: 2142211, max: 6000000, 
num_nulls: 0]
    l_partkey:        INT64 SNAPPY DO:2839640 FPO:3028619 
SZ:8179566/13852808/1.69 VC:1724693 ENC:RLE,RLE_DICTIONARY,PLAIN ST:[min: 1, 
max: 200000, num_nulls: 0]
    l_suppkey:        INT64 SNAPPY DO:11019308 FPO:11059413 
SZ:3063563/3103196/1.01 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 1, max: 
10000, num_nulls: 0]
    l_linenumber:     INT32 SNAPPY DO:14082964 FPO:14083007 
SZ:412884/650550/1.58 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 1, max: 7, 
num_nulls: 0]
    l_quantity:       FIXED_LEN_BYTE_ARRAY SNAPPY DO:14495934 FPO:14496204 
SZ:1298038/1297963/1.00 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 1.00, max: 
50.00, num_nulls: 0]
    l_extendedprice:  FIXED_LEN_BYTE_ARRAY SNAPPY DO:15794062 FPO:16003224 
SZ:9087746/10429259/1.15 VC:1724693 ENC:RLE,RLE_DICTIONARY,PLAIN ST:[min: 
904.00, max: 104949.50, num_nulls: 0]
    l_discount:       FIXED_LEN_BYTE_ARRAY SNAPPY DO:24881912 FPO:24881976 
SZ:866406/866338/1.00 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 0.00, max: 
0.10, num_nulls: 0]
    l_tax:            FIXED_LEN_BYTE_ARRAY SNAPPY DO:25748406 FPO:25748463 
SZ:866399/866325/1.00 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 0.00, max: 
0.08, num_nulls: 0]
    l_returnflag:     BINARY SNAPPY DO:26614888 FPO:26614918 
SZ:421113/421069/1.00 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 0x41, max: 
0x52, num_nulls: 0]
    l_linestatus:     BINARY SNAPPY DO:27036081 FPO:27036106 
SZ:262209/270332/1.03 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 0x46, max: 
0x4F, num_nulls: 0]
    l_shipdate:       BINARY SNAPPY DO:27298370 FPO:27309301 
SZ:2602937/2627148/1.01 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 
0x313939322D30312D3032, max: 0x313939382D31322D3031, num_nulls: 0]
    l_commitdate:     BINARY SNAPPY DO:29901405 FPO:29912079 
SZ:2602680/2626308/1.01 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 
0x313939322D30312D3331, max: 0x313939382D31302D3331, num_nulls: 0]
    l_receiptdate:    BINARY SNAPPY DO:32504185 FPO:32515219 
SZ:2603040/2627498/1.01 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 
0x313939322D30312D3036, max: 0x313939382D31322D3330, num_nulls: 0]
    l_shipinstruct:   BINARY SNAPPY DO:35107326 FPO:35107408 
SZ:434968/434917/1.00 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 
0x434F4C4C45435420434F44, max: 0x54414B45204241434B2052455455524E, num_nulls: 0]
    l_shipmode:       BINARY SNAPPY DO:35542401 FPO:35542471 
SZ:650639/650580/1.00 VC:1724693 ENC:RLE,RLE_DICTIONARY ST:[min: 0x414952, max: 
0x545255434B, num_nulls: 0]
    l_comment:        BINARY SNAPPY DO:36193124 FPO:36711343 
SZ:22240470/52696671/2.37 VC:1724693 ENC:RLE,RLE_DICTIONARY,PLAIN ST:[min: 
0x20546972657369617320, max: 0x7A7A6C653F20626C697468656C792069726F6E69, 
num_nulls: 0]
    
    Change-Id: I90942022edcd5d96c720a1bde53879e50394660a
    Reviewed-on: http://gerrit.cloudera.org:8080/16893
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |   9 +++--
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   |  37 +++++++++++++++------
 be/src/exec/parquet/parquet-column-chunk-reader.cc |   2 ++
 be/src/exec/parquet/parquet-column-readers.cc      |  18 +++++-----
 be/src/exec/parquet/parquet-common.h               |   7 ++++
 be/src/exec/parquet/parquet-metadata-utils.cc      |   1 +
 testdata/data/README                               |   7 ++++
 testdata/data/alltypes_tiny_rle_dictionary.parquet | Bin 0 -> 3646 bytes
 .../queries/QueryTest/parquet-rle-dictionary.test  |  16 +++++++++
 tests/query_test/test_scanners.py                  |   6 ++++
 10 files changed, 79 insertions(+), 24 deletions(-)

diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc 
b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 6d9356d..d71898e 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -955,9 +955,8 @@ bool HdfsParquetScanner::IsDictionaryEncoded(
   if (col_metadata.__isset.encoding_stats) {
     // Condition #1 above
     for (const parquet::PageEncodingStats& enc_stat : 
col_metadata.encoding_stats) {
-      if (enc_stat.page_type == parquet::PageType::DATA_PAGE &&
-          enc_stat.encoding != parquet::Encoding::PLAIN_DICTIONARY &&
-          enc_stat.count > 0) {
+      if (enc_stat.page_type == parquet::PageType::DATA_PAGE
+          && !IsDictionaryEncoding(enc_stat.encoding) && enc_stat.count > 0) {
         return false;
       }
     }
@@ -966,10 +965,10 @@ bool HdfsParquetScanner::IsDictionaryEncoded(
     bool has_dict_encoding = false;
     bool has_nondict_encoding = false;
     for (const parquet::Encoding::type& encoding : col_metadata.encodings) {
-      if (encoding == parquet::Encoding::PLAIN_DICTIONARY) has_dict_encoding = 
true;
+      if (IsDictionaryEncoding(encoding)) has_dict_encoding = true;
 
       // RLE and BIT_PACKED are used for repetition/definition levels
-      if (encoding != parquet::Encoding::PLAIN_DICTIONARY &&
+      if (!IsDictionaryEncoding(encoding) &&
           encoding != parquet::Encoding::RLE &&
           encoding != parquet::Encoding::BIT_PACKED) {
         has_nondict_encoding = true;
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc 
b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index f835fe4..49858e9 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -89,8 +89,26 @@ using namespace apache::thrift;
 static const string PARQUET_MEM_LIMIT_EXCEEDED =
     "HdfsParquetTableWriter::$0() failed to allocate $1 bytes for $2.";
 
+DEFINE_bool_hidden(write_new_parquet_dictionary_encodings, false,
+    "(Experimental) Write parquet files with PLAIN/RLE_DICTIONARY encoding 
instead of "
+    "PLAIN_DICTIONARY as recommended by Parquet 2.0 standard");
+
 namespace impala {
 
+// Returns the parquet::Encoding enum value to use for plain-encoded 
dictionary pages.
+static parquet::Encoding::type DictPageEncoding() {
+  return FLAGS_write_new_parquet_dictionary_encodings ?
+          parquet::Encoding::PLAIN :
+          parquet::Encoding::PLAIN_DICTIONARY;
+}
+
+// Returns the parquet::Encoding enum value to use for dictionary-encoded data 
pages.
+static parquet::Encoding::type DataPageDictionaryEncoding() {
+  return FLAGS_write_new_parquet_dictionary_encodings ?
+          parquet::Encoding::RLE_DICTIONARY:
+          parquet::Encoding::PLAIN_DICTIONARY;
+}
+
 // Base class for column writers. This contains most of the logic except for
 // the type specific functions which are implemented in the subclasses.
 class HdfsParquetTableWriter::BaseColumnWriter {
@@ -329,8 +347,8 @@ class HdfsParquetTableWriter::BaseColumnWriter {
 
   // Size of newly created pages. Defaults to DEFAULT_DATA_PAGE_SIZE and is 
increased
   // when pages are not big enough. This only happens when there are enough 
unique values
-  // such that we switch from PLAIN_DICTIONARY to PLAIN encoding and then have 
very
-  // large values (i.e. greater than DEFAULT_DATA_PAGE_SIZE).
+  // such that we switch from PLAIN_DICTIONARY/RLE_DICTIONARY to PLAIN 
encoding and then
+  // have very large values (i.e. greater than DEFAULT_DATA_PAGE_SIZE).
   // TODO: Consider removing and only creating a single large page as 
necessary.
   int64_t page_size_;
 
@@ -422,11 +440,10 @@ class HdfsParquetTableWriter::ColumnWriter :
     valid_column_index_ = true;
     // Default to dictionary encoding.  If the cardinality ends up being too 
high,
     // it will fall back to plain.
-    current_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
-    next_page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
-    dict_encoder_.reset(
-        new DictEncoder<T>(parent_->per_file_mem_pool_.get(), 
plain_encoded_value_size_,
-            parent_->parent_->mem_tracker()));
+    current_encoding_ = DataPageDictionaryEncoding();
+    next_page_encoding_ = DataPageDictionaryEncoding();
+    dict_encoder_.reset(new DictEncoder<T>(parent_->per_file_mem_pool_.get(),
+        plain_encoded_value_size_, parent_->parent_->mem_tracker()));
     dict_encoder_base_ = dict_encoder_.get();
     page_stats_.reset(
         new ColumnStats<T>(parent_->per_file_mem_pool_.get(), 
plain_encoded_value_size_));
@@ -439,7 +456,7 @@ class HdfsParquetTableWriter::ColumnWriter :
  protected:
   virtual bool ProcessValue(void* value, int64_t* bytes_needed) {
     T* val = CastValue(value);
-    if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
+    if (IsDictionaryEncoding(current_encoding_)) {
       if (UNLIKELY(num_values_since_dict_size_check_ >=
                    DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) {
         num_values_since_dict_size_check_ = 0;
@@ -753,7 +770,7 @@ Status 
HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
     // Write dictionary page header
     parquet::DictionaryPageHeader dict_header;
     dict_header.num_values = dict_encoder_base_->num_entries();
-    dict_header.encoding = parquet::Encoding::PLAIN_DICTIONARY;
+    dict_header.encoding = DictPageEncoding();
     ++dict_encoding_stats_[dict_header.encoding];
 
     parquet::PageHeader header;
@@ -869,7 +886,7 @@ Status 
HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
   // around a parquet MR bug (see IMPALA-759 for more details).
   if (current_page_->num_non_null == 0) current_encoding_ = 
parquet::Encoding::PLAIN;
 
-  if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) 
WriteDictDataPage();
+  if (IsDictionaryEncoding(current_encoding_)) WriteDictDataPage();
 
   parquet::PageHeader& header = current_page_->header;
   header.data_page_header.encoding = current_encoding_;
diff --git a/be/src/exec/parquet/parquet-column-chunk-reader.cc 
b/be/src/exec/parquet/parquet-column-chunk-reader.cc
index 0dec59f..aa4bacf 100644
--- a/be/src/exec/parquet/parquet-column-chunk-reader.cc
+++ b/be/src/exec/parquet/parquet-column-chunk-reader.cc
@@ -130,6 +130,8 @@ Status 
ParquetColumnChunkReader::ReadDictionaryData(ScopedBuffer* uncompressed_b
       return Status("Dictionary page does not have dictionary header set.");
     }
   }
+  // Check that the dictionary page is PLAIN encoded. PLAIN_DICTIONARY in the 
context of
+  // a dictionary page means the same thing.
   if (dict_header != nullptr &&
       dict_header->encoding != Encoding::PLAIN &&
       dict_header->encoding != Encoding::PLAIN_DICTIONARY) {
diff --git a/be/src/exec/parquet/parquet-column-readers.cc 
b/be/src/exec/parquet/parquet-column-readers.cc
index b3030d7..a993600 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -318,14 +318,14 @@ Status ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::InitDataPag
   DCHECK(slot_desc_ == nullptr || slot_desc_->type().type != TYPE_BOOLEAN)
       << "Bool has specialized impl";
   page_encoding_ = col_chunk_reader_.encoding();
-  if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY
+  if (!IsDictionaryEncoding(page_encoding_)
       && page_encoding_ != parquet::Encoding::PLAIN) {
     return GetUnsupportedDecodingError();
   }
 
-  // If slot_desc_ is NULL, we don't need so decode any values so 
dict_decoder_ does
+  // If slot_desc_ is NULL, we don't need to decode any values so 
dict_decoder_ does
   // not need to be initialized.
-  if (page_encoding_ == Encoding::PLAIN_DICTIONARY && slot_desc_ != nullptr) {
+  if (IsDictionaryEncoding(page_encoding_) && slot_desc_ != nullptr) {
     if (!dict_decoder_init_) {
       return Status("File corrupt. Missing dictionary page.");
     }
@@ -363,7 +363,7 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE,
   if (bool_decoder_) {
     return bool_decoder_->SkipValues(num_values);
   }
-  if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+  if (IsDictionaryEncoding(page_encoding_)) {
     return dict_decoder_.SkipValues(num_values);
   } else {
     DCHECK_EQ(page_encoding_, Encoding::PLAIN);
@@ -389,7 +389,7 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::ReadValue(
   if (MATERIALIZED) {
     if (def_level_ >= max_def_level()) {
       bool continue_execution;
-      if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+      if (IsDictionaryEncoding(page_encoding_)) {
         continue_execution = NeedsConversionInline() ?
             ReadSlot<Encoding::PLAIN_DICTIONARY, true>(tuple) :
             ReadSlot<Encoding::PLAIN_DICTIONARY, false>(tuple);
@@ -643,7 +643,7 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::MaterializeVa
     int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
     int* RESTRICT num_values) RESTRICT {
   // Dispatch to the correct templated implementation of 
MaterializeValueBatch().
-  if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+  if (IsDictionaryEncoding(page_encoding_)) {
     if (NeedsConversionInline()) {
       return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY, 
true>(
           max_values, tuple_size, tuple_mem, num_values);
@@ -753,7 +753,7 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::DecodeValue(
     uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end,
     InternalType* RESTRICT val) RESTRICT {
   DCHECK_EQ(page_encoding_, ENCODING);
-  if (ENCODING == Encoding::PLAIN_DICTIONARY) {
+  if (IsDictionaryEncoding(ENCODING)) {
     if (UNLIKELY(!dict_decoder_.GetNextValue(val))) {
       SetDictDecodeError();
       return false;
@@ -803,7 +803,7 @@ bool ScalarColumnReader<bool, parquet::Type::BOOLEAN, 
true>::DecodeValue(
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
 bool ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::DecodeValues(
     int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT {
-  if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+  if (IsDictionaryEncoding(page_encoding_)) {
     return DecodeValues<Encoding::PLAIN_DICTIONARY>(stride, count, out_vals);
   } else {
     DCHECK_EQ(page_encoding_, Encoding::PLAIN);
@@ -815,7 +815,7 @@ template <typename InternalType, parquet::Type::type 
PARQUET_TYPE, bool MATERIAL
 template <Encoding::type ENCODING>
 bool ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::DecodeValues(
     int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT {
-  if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
+  if (IsDictionaryEncoding(page_encoding_)) {
     if (UNLIKELY(!dict_decoder_.GetNextValues(out_vals, stride, count))) {
       SetDictDecodeError();
       return false;
diff --git a/be/src/exec/parquet/parquet-common.h 
b/be/src/exec/parquet/parquet-common.h
index a295f58..7a98521 100644
--- a/be/src/exec/parquet/parquet-common.h
+++ b/be/src/exec/parquet/parquet-common.h
@@ -52,6 +52,13 @@ inline bool operator<(const RowRange& lhs, const RowRange& 
rhs) {
   return std::tie(lhs.first, lhs.last) < std::tie(rhs.first, rhs.last);
 }
 
+// Return true if this is an encoding enum value that indicates that a data 
page is
+// dictionary encoded.
+inline bool IsDictionaryEncoding(parquet::Encoding::type encoding) {
+  return encoding == parquet::Encoding::PLAIN_DICTIONARY
+      || encoding == parquet::Encoding::RLE_DICTIONARY;
+}
+
 /// Return the Impala compression type for the given Parquet codec. The caller 
must
 /// validate that the codec is a supported one, otherwise this will DCHECK.
 THdfsCompression::type 
ConvertParquetToImpalaCodec(parquet::CompressionCodec::type codec);
diff --git a/be/src/exec/parquet/parquet-metadata-utils.cc 
b/be/src/exec/parquet/parquet-metadata-utils.cc
index 2caa949..c999e60 100644
--- a/be/src/exec/parquet/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet/parquet-metadata-utils.cc
@@ -94,6 +94,7 @@ static bool IsEncodingSupported(parquet::Encoding::type e) {
     case parquet::Encoding::PLAIN_DICTIONARY:
     case parquet::Encoding::BIT_PACKED:
     case parquet::Encoding::RLE:
+    case parquet::Encoding::RLE_DICTIONARY:
       return true;
     default:
       return false;
diff --git a/testdata/data/README b/testdata/data/README
index 79bf4e8..0121cef 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -612,3 +612,10 @@ We generated data by spark-shell, version is 2.4.x, and 
table data is in
 testdata/data/iceberg_test/hadoop_catalog/iceberg_resolution_test, this table
 are generated with HadoopCatalog and Parquet fileformat. We use this table to
 test complex types for field id resolving.
+
+alltypes_tiny_rle_dictionary.parquet:
+Tiny file using the RLE_DICTIONARY encoding.
+Started impala with -write_new_parquet_dictionary_encodings=true
+  set max_fs_writers=1;
+  create table att stored as parquet as
+  select * from functional_parquet.alltypestiny;
diff --git a/testdata/data/alltypes_tiny_rle_dictionary.parquet 
b/testdata/data/alltypes_tiny_rle_dictionary.parquet
new file mode 100644
index 0000000..17065b2
Binary files /dev/null and b/testdata/data/alltypes_tiny_rle_dictionary.parquet 
differ
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-rle-dictionary.test
 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-rle-dictionary.test
new file mode 100644
index 0000000..8a64361
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-rle-dictionary.test
@@ -0,0 +1,16 @@
+====
+---- QUERY
+# Verify that all columns can be read correctly.
+select * from alltypes_tiny_rle_dictionary
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, 
TIMESTAMP, INT, INT
+---- RESULTS
+0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 
00:01:00,2009,1
+2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 
00:01:00,2009,2
+4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
+5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 
00:01:00,2009,3
+6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 
00:01:00,2009,4
+====
diff --git a/tests/query_test/test_scanners.py 
b/tests/query_test/test_scanners.py
index 5a865cc..e333dad 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -856,6 +856,12 @@ class TestParquet(ImpalaTestSuite):
         "select * from {0}.{1}".format(unique_database, TABLE_NAME))
     assert(len(result.data) == 33)
 
+  def test_rle_dictionary_encoding(self, vector, unique_database):
+    """IMPALA-6434: Add support to decode RLE_DICTIONARY encoded pages."""
+    TABLE_NAME = "alltypes_tiny_rle_dictionary"
+    create_table_from_parquet(self.client, unique_database, TABLE_NAME)
+    self.run_test_case("QueryTest/parquet-rle-dictionary", vector, 
unique_database)
+
   def test_type_widening(self, vector, unique_database):
     """IMPALA-6373: Test that Impala can read parquet file with column types 
smaller than
        the schema with larger types"""

Reply via email to