This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 61826e3a77 [Improvement](parquet-reader) Improve performance of 
parquet reader filter calculation. (#16934)
61826e3a77 is described below

commit 61826e3a77573d82cdec11c5e9c5111b50ab136f
Author: Qi Chen <[email protected]>
AuthorDate: Thu Feb 23 14:41:30 2023 +0800

    [Improvement](parquet-reader) Improve performance of parquet reader filter 
calculation. (#16934)
    
    Improve performance of parquet reader filter calculation.
    
    - Use `filter_data` instead of `(*filter_ptr)` to merge filter to improve 
performance.
    - Use mutable column filter func instead of original new column filter func 
which introduced by #16850.
    - Avoid column ref-count increasing which caused unnecessary copying by 
passing column pointer ref.
---
 .../exec/format/parquet/vparquet_group_reader.cpp  | 116 ++++++++++++++-------
 .../exec/format/parquet/vparquet_group_reader.h    |   7 +-
 2 files changed, 84 insertions(+), 39 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 2877e5ccc9..b5a3bf37b6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -115,7 +115,7 @@ Status RowGroupReader::next_batch(Block* block, size_t 
batch_size, size_t* read_
         if (_lazy_read_ctx.vconjunct_ctx != nullptr) {
             int result_column_id = -1;
             RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, 
&result_column_id));
-            ColumnPtr filter_column = 
block->get_by_position(result_column_id).column;
+            ColumnPtr& filter_column = 
block->get_by_position(result_column_id).column;
             RETURN_IF_ERROR(_filter_block(block, filter_column, 
column_to_keep, columns_to_filter));
         } else {
             RETURN_IF_ERROR(_filter_block(block, column_to_keep, 
columns_to_filter));
@@ -256,7 +256,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
             // generated from next batch, so the filter column is removed 
ahead.
             DCHECK_EQ(block->rows(), 0);
         } else {
-            ColumnPtr filter_column = 
block->get_by_position(filter_column_id).column;
+            ColumnPtr& filter_column = 
block->get_by_position(filter_column_id).column;
             RETURN_IF_ERROR(_filter_block(block, filter_column, 
origin_column_num,
                                           
_lazy_read_ctx.all_predicate_col_ids));
         }
@@ -292,28 +292,39 @@ const uint8_t* 
RowGroupReader::_build_filter_map(ColumnPtr& sv, size_t num_rows,
             *can_filter_all = true;
         } else {
             DCHECK_EQ(column_size, num_rows);
-            const uint8_t* null_map_column = 
nullable_column->get_null_map_data().data();
+            const auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
             ColumnUInt8* concrete_column = typeid_cast<ColumnUInt8*>(
                     
nullable_column->get_nested_column_ptr()->assume_mutable().get());
-            uint8_t* filter_data = concrete_column->get_data().data();
-            for (int i = 0; i < num_rows; ++i) {
-                (*_filter_ptr)[i] &= (!null_map_column[i]) & filter_data[i];
+            auto* __restrict filter_data = concrete_column->get_data().data();
+            if (_position_delete_ctx.has_filter) {
+                auto* __restrict pos_delete_filter_data = 
_pos_delete_filter_ptr->data();
+                for (size_t i = 0; i < num_rows; ++i) {
+                    filter_data[i] &= (!null_map_data[i]) & 
pos_delete_filter_data[i];
+                }
+            } else {
+                for (size_t i = 0; i < num_rows; ++i) {
+                    filter_data[i] &= (!null_map_data[i]);
+                }
             }
-            filter_map = _filter_ptr->data();
+            filter_map = filter_data;
         }
     } else if (auto* const_column = check_and_get_column<ColumnConst>(*sv)) {
         // filter all
         *can_filter_all = !const_column->get_bool(0);
     } else {
-        const IColumn::Filter& filter =
-                assert_cast<const 
doris::vectorized::ColumnVector<UInt8>&>(*sv).get_data();
-
+        MutableColumnPtr mutable_holder = sv->assume_mutable();
+        ColumnUInt8* mutable_filter_column = 
typeid_cast<ColumnUInt8*>(mutable_holder.get());
+        IColumn::Filter& filter = mutable_filter_column->get_data();
         auto* __restrict filter_data = filter.data();
         const size_t size = filter.size();
-        for (size_t i = 0; i < size; ++i) {
-            (*_filter_ptr)[i] &= filter_data[i];
+
+        if (_position_delete_ctx.has_filter) {
+            auto* __restrict pos_delete_filter_data = 
_pos_delete_filter_ptr->data();
+            for (size_t i = 0; i < size; ++i) {
+                filter_data[i] &= pos_delete_filter_data[i];
+            }
         }
-        filter_map = filter.data();
+        filter_map = filter_data;
     }
     return filter_map;
 }
@@ -437,11 +448,13 @@ Status RowGroupReader::_read_empty_batch(size_t 
batch_size, size_t* read_rows, b
 }
 
 Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) {
-    _filter_ptr.reset(new IColumn::Filter(read_rows, 1));
     if (!_position_delete_ctx.has_filter) {
+        _pos_delete_filter_ptr.reset(nullptr);
         _total_read_rows += read_rows;
         return Status::OK();
     }
+    _pos_delete_filter_ptr.reset(new IColumn::Filter(read_rows, 1));
+    auto* __restrict _pos_delete_filter_data = _pos_delete_filter_ptr->data();
     while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
         const int64_t delete_row_index_in_row_group =
                 _position_delete_ctx.delete_rows[_position_delete_ctx.index] -
@@ -459,7 +472,7 @@ Status RowGroupReader::_build_pos_delete_filter(size_t 
read_rows) {
                     _total_read_rows += read_rows;
                     return Status::OK();
                 }
-                (*_filter_ptr)[index] = 0;
+                _pos_delete_filter_data[index] = 0;
                 ++_position_delete_ctx.index;
                 break;
             } else { // delete_row >= range.last_row
@@ -480,10 +493,10 @@ Status RowGroupReader::_build_pos_delete_filter(size_t 
read_rows) {
     return Status::OK();
 }
 
-Status RowGroupReader::_filter_block(Block* block, const ColumnPtr 
filter_column,
+Status RowGroupReader::_filter_block(Block* block, const ColumnPtr& 
filter_column,
                                      int column_to_keep, std::vector<uint32_t> 
columns_to_filter) {
     if (auto* nullable_column = 
check_and_get_column<ColumnNullable>(*filter_column)) {
-        ColumnPtr nested_column = nullable_column->get_nested_column_ptr();
+        const ColumnPtr& nested_column = 
nullable_column->get_nested_column_ptr();
 
         MutableColumnPtr mutable_holder =
                 nested_column->use_count() == 1
@@ -496,15 +509,22 @@ Status RowGroupReader::_filter_block(Block* block, const 
ColumnPtr filter_column
                     "Illegal type {} of column for filter. Must be UInt8 or 
Nullable(UInt8).",
                     filter_column->get_name());
         }
-        auto* __restrict null_map = 
nullable_column->get_null_map_data().data();
+        auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
         IColumn::Filter& filter = concrete_column->get_data();
         auto* __restrict filter_data = filter.data();
-
         const size_t size = filter.size();
-        for (size_t i = 0; i < size; ++i) {
-            (*_filter_ptr)[i] &= (!null_map[i]) & filter_data[i];
+
+        if (_position_delete_ctx.has_filter) {
+            auto* __restrict pos_delete_filter_data = 
_pos_delete_filter_ptr->data();
+            for (size_t i = 0; i < size; ++i) {
+                filter_data[i] &= (!null_map_data[i]) & 
pos_delete_filter_data[i];
+            }
+        } else {
+            for (size_t i = 0; i < size; ++i) {
+                filter_data[i] &= (!null_map_data[i]);
+            }
         }
-        RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter));
+        RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter, 
filter));
     } else if (auto* const_column = 
check_and_get_column<ColumnConst>(*filter_column)) {
         bool ret = const_column->get_bool(0);
         if (!ret) {
@@ -513,16 +533,28 @@ Status RowGroupReader::_filter_block(Block* block, const 
ColumnPtr filter_column
             }
         }
     } else {
-        const IColumn::Filter& filter =
-                assert_cast<const 
doris::vectorized::ColumnVector<UInt8>&>(*filter_column)
-                        .get_data();
+        MutableColumnPtr mutable_holder =
+                filter_column->use_count() == 1
+                        ? filter_column->assume_mutable()
+                        : filter_column->clone_resized(filter_column->size());
+        ColumnUInt8* mutable_filter_column = 
typeid_cast<ColumnUInt8*>(mutable_holder.get());
+        if (!mutable_filter_column) {
+            return Status::InvalidArgument(
+                    "Illegal type {} of column for filter. Must be UInt8 or 
Nullable(UInt8).",
+                    filter_column->get_name());
+        }
 
+        IColumn::Filter& filter = mutable_filter_column->get_data();
         auto* __restrict filter_data = filter.data();
-        const size_t size = filter.size();
-        for (size_t i = 0; i < size; ++i) {
-            (*_filter_ptr)[i] &= filter_data[i];
+
+        if (_position_delete_ctx.has_filter) {
+            auto* __restrict pos_delete_filter_data = 
_pos_delete_filter_ptr->data();
+            const size_t size = filter.size();
+            for (size_t i = 0; i < size; ++i) {
+                filter_data[i] &= pos_delete_filter_data[i];
+            }
         }
-        RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter));
+        RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter, 
filter));
     }
     Block::erase_useless_column(block, column_to_keep);
     return Status::OK();
@@ -530,25 +562,37 @@ Status RowGroupReader::_filter_block(Block* block, const 
ColumnPtr filter_column
 
 Status RowGroupReader::_filter_block(Block* block, int column_to_keep,
                                      const std::vector<uint32_t>& 
columns_to_filter) {
-    RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter));
+    if (_pos_delete_filter_ptr) {
+        RETURN_IF_ERROR(
+                _filter_block_internal(block, columns_to_filter, 
(*_pos_delete_filter_ptr)));
+    }
     Block::erase_useless_column(block, column_to_keep);
 
     return Status::OK();
 }
 
 Status RowGroupReader::_filter_block_internal(Block* block,
-                                              const std::vector<uint32_t>& 
columns_to_filter) {
-    size_t count = _filter_ptr->size() -
-                   simd::count_zero_num((int8_t*)_filter_ptr->data(), 
_filter_ptr->size());
+                                              const std::vector<uint32_t>& 
columns_to_filter,
+                                              const IColumn::Filter& filter) {
+    size_t filter_size = filter.size();
+    size_t count = filter_size - simd::count_zero_num((int8_t*)filter.data(), 
filter_size);
     if (count == 0) {
         for (auto& col : columns_to_filter) {
             
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
         }
     } else {
         for (auto& col : columns_to_filter) {
-            if (block->get_by_position(col).column->size() != count) {
-                block->get_by_position(col).column =
-                        
block->get_by_position(col).column->filter(*_filter_ptr, count);
+            size_t size = block->get_by_position(col).column->size();
+            if (size != count) {
+                auto& column = block->get_by_position(col).column;
+                if (column->size() != count) {
+                    if (column->use_count() == 1) {
+                        const auto result_size = 
column->assume_mutable()->filter(filter);
+                        CHECK_EQ(result_size, count);
+                    } else {
+                        column = column->filter(filter, count);
+                    }
+                }
             }
         }
     }
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 15ff153b60..34c9d228a1 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -132,11 +132,12 @@ private:
             Block* block, size_t rows,
             const std::unordered_map<std::string, VExprContext*>& 
missing_columns);
     Status _build_pos_delete_filter(size_t read_rows);
-    Status _filter_block(Block* block, const ColumnPtr filter_column, int 
column_to_keep,
+    Status _filter_block(Block* block, const ColumnPtr& filter_column, int 
column_to_keep,
                          std::vector<uint32_t> columns_to_filter);
     Status _filter_block(Block* block, int column_to_keep,
                          const vector<uint32_t>& columns_to_filter);
-    Status _filter_block_internal(Block* block, const vector<uint32_t>& 
columns_to_filter);
+    Status _filter_block_internal(Block* block, const vector<uint32_t>& 
columns_to_filter,
+                                  const IColumn::Filter& filter);
 
     io::FileReaderSPtr _file_reader;
     std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> 
_column_readers;
@@ -154,7 +155,7 @@ private:
     // If continuous batches are skipped, we can cache them to skip a whole 
page
     size_t _cached_filtered_rows = 0;
     std::unique_ptr<TextConverter> _text_converter = nullptr;
-    std::unique_ptr<IColumn::Filter> _filter_ptr = nullptr;
+    std::unique_ptr<IColumn::Filter> _pos_delete_filter_ptr = nullptr;
     int64_t _total_read_rows = 0;
 };
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to