This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 752cec9e19 [Fix](multi-catalog) Fix not single slot filter conjuncts 
with dict filter issue. (#22052)
752cec9e19 is described below

commit 752cec9e192623f3871a5c9028d2202bf5d12ee3
Author: Qi Chen <[email protected]>
AuthorDate: Mon Jul 24 22:31:18 2023 +0800

    [Fix](multi-catalog) Fix not single slot filter conjuncts with dict filter 
issue. (#22052)
    
    ### Issue
    Dictionary filtering is a mechanism that directly reads the dictionary 
encoding of a single string column filter condition for filter comparison. But 
dictionary filtered single string columns may be included in other multi-column 
filter conditions. This can cause problems.
    
    For example:
    `select * from multi_catalog.lineitem_string_date_orc where l_commitdate < 
l_receiptdate and l_receiptdate = '1995-01-01'  order by l_orderkey, l_partkey, 
l_suppkey, l_linenumber limit 10;`
    
    `l_receiptdate` is string filter column,it is included by multi-column 
filter condition `l_commitdate < l_receiptdate`.
    
    ### Solution
    Resolve it by separating the multi-column filter conditions and executing 
it after the dictionary filter column is converted to string.
---
 be/src/vec/exec/format/orc/vorc_reader.cpp         | 71 ++++++++++++++++++----
 be/src/vec/exec/format/orc/vorc_reader.h           |  1 +
 .../exec/format/parquet/vparquet_group_reader.cpp  | 56 +++++++++++++----
 .../exec/format/parquet/vparquet_group_reader.h    |  1 +
 .../hive/test_external_catalog_hive.out            | 24 ++++++++
 .../hive/test_external_catalog_hive.groovy         |  4 ++
 6 files changed, 135 insertions(+), 22 deletions(-)

diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 001d32b821..e81f42a98e 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -253,12 +253,13 @@ Status OrcReader::init_reader(
     _is_acid = is_acid;
     _tuple_descriptor = tuple_descriptor;
     _row_descriptor = row_descriptor;
+    if (not_single_slot_filter_conjuncts != nullptr && 
!not_single_slot_filter_conjuncts->empty()) {
+        
_not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(),
+                                                 
not_single_slot_filter_conjuncts->begin(),
+                                                 
not_single_slot_filter_conjuncts->end());
+    }
     _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
     _text_converter.reset(new TextConverter('\\'));
-    if (not_single_slot_filter_conjuncts) {
-        _filter_conjuncts.insert(_filter_conjuncts.end(), 
not_single_slot_filter_conjuncts->begin(),
-                                 not_single_slot_filter_conjuncts->end());
-    }
     _obj_pool = std::make_shared<ObjectPool>();
     {
         SCOPED_RAW_TIMER(&_statistics.create_reader_time);
@@ -1389,6 +1390,7 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                 return Status::OK();
             }
         }
+
         std::vector<orc::ColumnVectorBatch*> batch_vec;
         _fill_batch_vec(batch_vec, _batch.get(), 0);
         for (auto& col_name : _lazy_read_ctx.lazy_read_columns) {
@@ -1407,8 +1409,24 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
 
         RETURN_IF_ERROR(_fill_partition_columns(block, rr, 
_lazy_read_ctx.partition_columns));
         RETURN_IF_ERROR(_fill_missing_columns(block, rr, 
_lazy_read_ctx.missing_columns));
-        RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, 
columns_to_filter, *_filter));
-        Block::erase_useless_column(block, column_to_keep);
+
+        if (block->rows() == 0) {
+            *eof = true;
+            return Status::OK();
+        }
+
+        if (!_not_single_slot_filter_conjuncts.empty()) {
+            std::vector<IColumn::Filter*> filters;
+            filters.push_back(_filter.get());
+            RETURN_IF_CATCH_EXCEPTION(
+                    
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
+                            _not_single_slot_filter_conjuncts, &filters, 
block, columns_to_filter,
+                            column_to_keep)));
+        } else {
+            RETURN_IF_CATCH_EXCEPTION(
+                    Block::filter_block_internal(block, columns_to_filter, 
*_filter));
+            Block::erase_useless_column(block, column_to_keep);
+        }
     } else {
         uint64_t rr;
         SCOPED_RAW_TIMER(&_statistics.column_read_time);
@@ -1457,10 +1475,16 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                     batch_vec[orc_col_idx->second], _batch->numElements));
         }
         *read_rows = rr;
+
         RETURN_IF_ERROR(
                 _fill_partition_columns(block, *read_rows, 
_lazy_read_ctx.partition_columns));
         RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, 
_lazy_read_ctx.missing_columns));
 
+        if (block->rows() == 0) {
+            *eof = true;
+            return Status::OK();
+        }
+
         _build_delete_row_filter(block, rr);
 
         std::vector<uint32_t> columns_to_filter;
@@ -1483,17 +1507,40 @@ Status OrcReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
             if (_delete_rows_filter_ptr) {
                 filters.push_back(_delete_rows_filter_ptr.get());
             }
-            RETURN_IF_CATCH_EXCEPTION(
-                    
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
-                            filter_conjuncts, &filters, block, 
columns_to_filter, column_to_keep)));
+            IColumn::Filter result_filter(block->rows(), 1);
+            bool can_filter_all = false;
+            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
+                    filter_conjuncts, &filters, block, &result_filter, 
&can_filter_all));
+            if (can_filter_all) {
+                for (auto& col : columns_to_filter) {
+                    
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+                }
+                Block::erase_useless_column(block, column_to_keep);
+                _convert_dict_cols_to_string_cols(block, &batch_vec);
+                return Status::OK();
+            }
+            if (!_not_single_slot_filter_conjuncts.empty()) {
+                _convert_dict_cols_to_string_cols(block, &batch_vec);
+                std::vector<IColumn::Filter*> merged_filters;
+                merged_filters.push_back(&result_filter);
+                RETURN_IF_CATCH_EXCEPTION(
+                        
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
+                                _not_single_slot_filter_conjuncts, 
&merged_filters, block,
+                                columns_to_filter, column_to_keep)));
+            } else {
+                RETURN_IF_CATCH_EXCEPTION(
+                        Block::filter_block_internal(block, columns_to_filter, 
result_filter));
+                Block::erase_useless_column(block, column_to_keep);
+                _convert_dict_cols_to_string_cols(block, &batch_vec);
+            }
         } else {
             if (_delete_rows_filter_ptr) {
                 RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, 
columns_to_filter,
                                                                        
(*_delete_rows_filter_ptr)));
             }
             Block::erase_useless_column(block, column_to_keep);
+            _convert_dict_cols_to_string_cols(block, &batch_vec);
         }
-        _convert_dict_cols_to_string_cols(block, &batch_vec);
     }
     return Status::OK();
 }
@@ -1581,8 +1628,9 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, 
uint16_t* sel, uint16_t s
             _fill_partition_columns(block, size, 
_lazy_read_ctx.predicate_partition_columns));
     RETURN_IF_ERROR(_fill_missing_columns(block, size, 
_lazy_read_ctx.predicate_missing_columns));
     if (_lazy_read_ctx.resize_first_column) {
+        // VExprContext.execute has an optimization, the filtering is executed 
when block->rows() > 0
+        // The following process may be tricky and time-consuming, but we have 
no other way.
         block->get_by_position(0).column->assume_mutable()->resize(size);
-        _lazy_read_ctx.resize_first_column = true;
     }
 
     // transactional hive orc delete row
@@ -1608,6 +1656,7 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, 
uint16_t* sel, uint16_t s
             filter_conjuncts, &filters, block, _filter.get(), 
&can_filter_all));
 
     if (_lazy_read_ctx.resize_first_column) {
+        // We have to clean the first column to insert right data.
         block->get_by_position(0).column->assume_mutable()->clear();
     }
 
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index d05fb4c478..b558f06465 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -539,6 +539,7 @@ private:
 
     const TupleDescriptor* _tuple_descriptor;
     const RowDescriptor* _row_descriptor;
+    VExprContextSPtrs _not_single_slot_filter_conjuncts;
     const std::unordered_map<int, VExprContextSPtrs>* 
_slot_id_to_filter_conjuncts;
     VExprContextSPtrs _dict_filter_conjuncts;
     VExprContextSPtrs _non_dict_filter_conjuncts;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 6faaab0177..3ef3581e36 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -110,12 +110,13 @@ Status RowGroupReader::init(
     _tuple_descriptor = tuple_descriptor;
     _row_descriptor = row_descriptor;
     _col_name_to_slot_id = colname_to_slot_id;
+    if (not_single_slot_filter_conjuncts != nullptr && 
!not_single_slot_filter_conjuncts->empty()) {
+        
_not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(),
+                                                 
not_single_slot_filter_conjuncts->begin(),
+                                                 
not_single_slot_filter_conjuncts->end());
+    }
     _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
     _text_converter.reset(new TextConverter('\\'));
-    if (not_single_slot_filter_conjuncts) {
-        _filter_conjuncts.insert(_filter_conjuncts.end(), 
not_single_slot_filter_conjuncts->begin(),
-                                 not_single_slot_filter_conjuncts->end());
-    }
     _merge_read_ranges(row_ranges);
     if (_read_columns.empty()) {
         // Query task that only select columns in path.
@@ -325,12 +326,32 @@ Status RowGroupReader::next_batch(Block* block, size_t 
batch_size, size_t* read_
             if (_position_delete_ctx.has_filter) {
                 filters.push_back(_pos_delete_filter_ptr.get());
             }
-
-            RETURN_IF_CATCH_EXCEPTION(
-                    
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
-                            _filter_conjuncts, &filters, block, 
columns_to_filter,
-                            column_to_keep)));
-            _convert_dict_cols_to_string_cols(block);
+            IColumn::Filter result_filter(block->rows(), 1);
+            bool can_filter_all = false;
+            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
+                    _filter_conjuncts, &filters, block, &result_filter, 
&can_filter_all));
+            if (can_filter_all) {
+                for (auto& col : columns_to_filter) {
+                    
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+                }
+                Block::erase_useless_column(block, column_to_keep);
+                _convert_dict_cols_to_string_cols(block);
+                return Status::OK();
+            }
+            if (!_not_single_slot_filter_conjuncts.empty()) {
+                _convert_dict_cols_to_string_cols(block);
+                std::vector<IColumn::Filter*> merged_filters;
+                merged_filters.push_back(&result_filter);
+                RETURN_IF_CATCH_EXCEPTION(
+                        
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
+                                _not_single_slot_filter_conjuncts, 
&merged_filters, block,
+                                columns_to_filter, column_to_keep)));
+            } else {
+                RETURN_IF_CATCH_EXCEPTION(
+                        Block::filter_block_internal(block, columns_to_filter, 
result_filter));
+                Block::erase_useless_column(block, column_to_keep);
+                _convert_dict_cols_to_string_cols(block);
+            }
         } else {
             RETURN_IF_CATCH_EXCEPTION(
                     RETURN_IF_ERROR(_filter_block(block, column_to_keep, 
columns_to_filter)));
@@ -404,7 +425,12 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
     std::unique_ptr<ColumnSelectVector> select_vector_ptr = nullptr;
     size_t pre_read_rows;
     bool pre_eof;
+    std::vector<uint32_t> columns_to_filter;
     size_t origin_column_num = block->columns();
+    columns_to_filter.resize(origin_column_num);
+    for (uint32_t i = 0; i < origin_column_num; ++i) {
+        columns_to_filter[i] = i;
+    }
     IColumn::Filter result_filter;
     while (true) {
         // read predicate columns
@@ -538,7 +564,15 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
 
     *batch_eof = pre_eof;
     RETURN_IF_ERROR(_fill_partition_columns(block, column_size, 
_lazy_read_ctx.partition_columns));
-    return _fill_missing_columns(block, column_size, 
_lazy_read_ctx.missing_columns);
+    RETURN_IF_ERROR(_fill_missing_columns(block, column_size, 
_lazy_read_ctx.missing_columns));
+    if (!_not_single_slot_filter_conjuncts.empty()) {
+        std::vector<IColumn::Filter*> filters;
+        filters.push_back(&result_filter);
+        
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
+                _not_single_slot_filter_conjuncts, nullptr, block, 
columns_to_filter,
+                origin_column_num)));
+    }
+    return Status::OK();
 }
 
 void RowGroupReader::_rebuild_select_vector(ColumnSelectVector& select_vector,
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 393f738857..ef35f5adf2 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -209,6 +209,7 @@ private:
     const TupleDescriptor* _tuple_descriptor;
     const RowDescriptor* _row_descriptor;
     const std::unordered_map<std::string, int>* _col_name_to_slot_id;
+    VExprContextSPtrs _not_single_slot_filter_conjuncts;
     const std::unordered_map<int, VExprContextSPtrs>* 
_slot_id_to_filter_conjuncts;
     VExprContextSPtrs _dict_filter_conjuncts;
     VExprContextSPtrs _filter_conjuncts;
diff --git 
a/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive.out
 
b/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive.out
index 54dc2d9a40..ae29339cec 100644
--- 
a/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive.out
+++ 
b/regression-test/data/external_table_emr_p2/hive/test_external_catalog_hive.out
@@ -96,3 +96,27 @@ Z6n2t4XA2n7CXTECJ,PE,iBbsCh0RE1Dd2A,z48
 -- !pr21598 --
 5
 
+-- !not_single_slot_filter_conjuncts_orc --
+\N     289572  4       1980215 480218.00       24.00   31082.88        0.05    
0       R       F       1994-12-14      1995-01-01      COLLECT COD     AIR     
 final accounts. instructions boost above
+\N     388932  2       6038830 538843.00       46.00   81352.38        0.02    
0.06    A       F       1994-12-15      1995-01-01      NONE    MAIL    ven 
ideas are furiously according 
+\N     452964  3       14917531        167546.00       20.00   30955.80        
0.02    0.03    R       F       1994-12-03      1995-01-01      COLLECT COD     
AIR     deposits. blithely even deposits a
+\N     570084  4       14861731        361760.00       26.00   43991.74        
0.05    0.08    A       F       1994-11-03      1995-01-01      COLLECT COD     
MAIL    ending hockey players wake f
+\N     637092  4       15648780        148811.00       26.00   44928.00        
0.06    0.04    R       F       1994-11-14      1995-01-01      COLLECT COD     
SHIP    lar deposits. as
+\N     1084260 2       6109231 609244.00       10.00   12399.30        0.01    
0.03    R       F       1994-11-05      1995-01-01      DELIVER IN PERSON       
RAIL    efully pending sentiments. epita
+\N     1150884 1       13245123        245124.00       49.00   52305.54        
0.05    0.02    R       F       1994-12-22      1995-01-01      DELIVER IN 
PERSON       REG AIR rious deposits about the quickly bold
+\N     1578180 1       19168165        918223.00       10.00   12322.10        
0.07    0.07    R       F       1994-10-31      1995-01-01      COLLECT COD     
TRUCK   ges. accounts sublate carefully
+\N     2073732 2       13846443        596483.00       21.00   29163.75        
0.10    0.08    R       F       1994-12-06      1995-01-01      DELIVER IN 
PERSON       FOB     dolphins nag furiously q
+\N     2479044 4       9763795 13805.00        40.00   74332.40        0.05    
0.05    R       F       1994-11-16      1995-01-01      COLLECT COD     RAIL    
equests hinder qu
+
+-- !not_single_slot_filter_conjuncts_parquet --
+\N     289572  4       1980215 480218.00       24.00   31082.88        0.05    
0       R       F       1994-12-14      1995-01-01      COLLECT COD     AIR     
 final accounts. instructions boost above
+\N     388932  2       6038830 538843.00       46.00   81352.38        0.02    
0.06    A       F       1994-12-15      1995-01-01      NONE    MAIL    ven 
ideas are furiously according 
+\N     452964  3       14917531        167546.00       20.00   30955.80        
0.02    0.03    R       F       1994-12-03      1995-01-01      COLLECT COD     
AIR     deposits. blithely even deposits a
+\N     570084  4       14861731        361760.00       26.00   43991.74        
0.05    0.08    A       F       1994-11-03      1995-01-01      COLLECT COD     
MAIL    ending hockey players wake f
+\N     637092  4       15648780        148811.00       26.00   44928.00        
0.06    0.04    R       F       1994-11-14      1995-01-01      COLLECT COD     
SHIP    lar deposits. as
+\N     1084260 2       6109231 609244.00       10.00   12399.30        0.01    
0.03    R       F       1994-11-05      1995-01-01      DELIVER IN PERSON       
RAIL    efully pending sentiments. epita
+\N     1150884 1       13245123        245124.00       49.00   52305.54        
0.05    0.02    R       F       1994-12-22      1995-01-01      DELIVER IN 
PERSON       REG AIR rious deposits about the quickly bold
+\N     1578180 1       19168165        918223.00       10.00   12322.10        
0.07    0.07    R       F       1994-10-31      1995-01-01      COLLECT COD     
TRUCK   ges. accounts sublate carefully
+\N     2073732 2       13846443        596483.00       21.00   29163.75        
0.10    0.08    R       F       1994-12-06      1995-01-01      DELIVER IN 
PERSON       FOB     dolphins nag furiously q
+\N     2479044 4       9763795 13805.00        40.00   74332.40        0.05    
0.05    R       F       1994-11-16      1995-01-01      COLLECT COD     RAIL    
equests hinder qu
+
diff --git 
a/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive.groovy
 
b/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive.groovy
index df06c7e246..e0a56e89c6 100644
--- 
a/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive.groovy
+++ 
b/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_hive.groovy
@@ -89,6 +89,10 @@ suite("test_external_catalog_hive", "p2") {
         // test #21598
         qt_pr21598 """select count(*) from( (SELECT r_regionkey AS key1, 
r_name AS name, pday AS pday FROM (SELECT r_regionkey, r_name, 
replace(r_comment, ' ', 'aaaa') AS pday FROM 
${catalog_name}.tpch_1000_parquet.region) t2))x;"""
 
+        // test not_single_slot_filter_conjuncts with dict filter issue
+        qt_not_single_slot_filter_conjuncts_orc """ select * from 
multi_catalog.lineitem_string_date_orc where l_commitdate < l_receiptdate and 
l_receiptdate = '1995-01-01'  order by l_orderkey, l_partkey, l_suppkey, 
l_linenumber limit 10; """
+        qt_not_single_slot_filter_conjuncts_parquet """ select * from 
multi_catalog.lineitem_string_date_orc where l_commitdate < l_receiptdate and 
l_receiptdate = '1995-01-01'  order by l_orderkey, l_partkey, l_suppkey, 
l_linenumber limit 10; """
+
         // test remember last used database after switch / rename catalog
         sql """switch ${catalog_name};"""
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to