This is an automated email from the ASF dual-hosted git repository. tarmstrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1fd5e4279c75a7cb3e51e737d9ca7c36435412dc Author: guojingfeng <guojingf...@tencent.com> AuthorDate: Thu Nov 5 03:30:33 2020 +0000 IMPALA-10310: Fix couldn't skip rows in parquet file on NextRowGroup In practice we recommend that hdfs block size should align with parquet row group size.But in fact some compute engine like spark, default parquet row group size is 128MB, and if ETL user doesn't change the default property spark will generate row groups that smaller than hdfs block size. The result is a single hdfs block may contain multiple parquet row groups. In planner stage, length of impala generated scan range may be bigger than row group size. thus a single scan range contains multiple row group. In current parquet scanner when move to next row group, some of internal stat in parquet column readers need to reset. eg: num_buffered_values_, column chunk metadata, reset internal stat of column chunk readers. But current_row_range_ offset is not reset currently, this will cause errors "Couldn't skip rows in file hdfs://xxx" as IMPALA-10310 points out. This patch simply reset current_row_range_ to 0 when moving into next row group in parquet column readers. Fix the bug IMPALA-10310. Testing: * Add e2e test for parquet multi blocks per file and multi pages per block * Ran all core tests offline. * Manually tested all cases encountered in my production environment. Change-Id: I964695cd53f5d5fdb6485a85cd82e7a72ca6092c Reviewed-on: http://gerrit.cloudera.org:8080/16697 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/exec/parquet/parquet-column-readers.cc | 1 + testdata/data/README | 15 +++++++++++++++ testdata/data/customer_multiblock_page_index.parquet | Bin 0 -> 451607 bytes .../queries/QueryTest/parquet-page-index.test | 16 ++++++++++++++++ tests/query_test/test_parquet_stats.py | 2 ++ 5 files changed, 34 insertions(+) diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc index b00ba51..b3030d7 100644 --- a/be/src/exec/parquet/parquet-column-readers.cc +++ b/be/src/exec/parquet/parquet-column-readers.cc @@ -1225,6 +1225,7 @@ void BaseScalarColumnReader::ResetPageFiltering() { candidate_page_idx_ = -1; current_row_ = -1; levels_readahead_ = false; + current_row_range_ = 0; } Status BaseScalarColumnReader::StartPageFiltering() { diff --git a/testdata/data/README b/testdata/data/README index 3954d4b..5d24b22 100644 --- a/testdata/data/README +++ b/testdata/data/README @@ -592,3 +592,18 @@ int64_t r = random(); if (r % 2 + r % 3 + r % 5 == 0) return true; Also modified HdfsParquetTableWriter::BaseColumnWriter::Flush to randomly invalidate the offset index: if (r ... ) location.offset = -1; + +customer_multiblock_page_index.parquet +Parquet file that contains multiple blocks in a single file Needed to test IMPALA-10310. +In order to generate this file, execute the following instruments: +// use 1.11.0 to generate page index +1. export HIVE_AUX_JARS_PATH=/path/parquet-hadoop-1.11.0.jar +// in hive shell +2. SET parquet.block.size=8192; // use little block size +3. SET parquet.page.row.count.limit=10; // little page row count generate multi pages +4. CREATE TABLE customer_multiblock_page_index_6 + STORED AS PARQUET + TBLPROPERTIES('parquet.compression'='SNAPPY') + AS SELECT * FROM tpcds.customer + WHERE c_current_cdemo_sk IS NOT NULL ORDER BY c_current_cdemo_sk LIMIT 2000; +generated file will contains multi blocks, multi pages per block. diff --git a/testdata/data/customer_multiblock_page_index.parquet b/testdata/data/customer_multiblock_page_index.parquet new file mode 100644 index 0000000..21fa9a5 Binary files /dev/null and b/testdata/data/customer_multiblock_page_index.parquet differ diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test index 7c44e21..d3cbd9f 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-page-index.test @@ -300,3 +300,19 @@ BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumStatsFilteredPages): 0 ==== +---- QUERY +# Query table with multi blocks in a single file +select c_birth_country,count(distinct c_customer_id) from customer_multiblock_page_index +where c_current_cdemo_sk < 100 group by c_birth_country; +---- RESULTS +'SLOVAKIA',1 +'BRUNEI DARUSSALAM',1 +'BURKINA FASO',1 +'SIERRA LEONE',1 +'PORTUGAL',1 +---- TYPES +STRING, BIGINT +---- RUNTIME_PROFILE +aggregation(SUM, NumPages): 30 +aggregation(SUM, NumStatsFilteredPages): 27 +==== diff --git a/tests/query_test/test_parquet_stats.py b/tests/query_test/test_parquet_stats.py index 5087fa2..319dd1a 100644 --- a/tests/query_test/test_parquet_stats.py +++ b/tests/query_test/test_parquet_stats.py @@ -88,6 +88,8 @@ class TestParquetStats(ImpalaTestSuite): create_table_from_parquet(self.client, unique_database, 'alltypes_tiny_pages_plain') create_table_from_parquet(self.client, unique_database, 'alltypes_empty_pages') create_table_from_parquet(self.client, unique_database, 'alltypes_invalid_pages') + create_table_from_parquet(self.client, unique_database, + 'customer_multiblock_page_index') for batch_size in [1]: new_vector.get_value('exec_option')['batch_size'] = batch_size