This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 61826e3a77 [Improvement](parquet-reader) Improve performance of
parquet reader filter calculation. (#16934)
61826e3a77 is described below
commit 61826e3a77573d82cdec11c5e9c5111b50ab136f
Author: Qi Chen <[email protected]>
AuthorDate: Thu Feb 23 14:41:30 2023 +0800
[Improvement](parquet-reader) Improve performance of parquet reader filter
calculation. (#16934)
Improve performance of parquet reader filter calculation.
- Use `filter_data` instead of `(*filter_ptr)` to merge filter to improve
performance.
- Use mutable column filter func instead of original new column filter func
which introduced by #16850.
- Avoid column ref-count increasing which caused unnecessary copying by
passing column pointer ref.
---
.../exec/format/parquet/vparquet_group_reader.cpp | 116 ++++++++++++++-------
.../exec/format/parquet/vparquet_group_reader.h | 7 +-
2 files changed, 84 insertions(+), 39 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 2877e5ccc9..b5a3bf37b6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -115,7 +115,7 @@ Status RowGroupReader::next_batch(Block* block, size_t
batch_size, size_t* read_
if (_lazy_read_ctx.vconjunct_ctx != nullptr) {
int result_column_id = -1;
RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block,
&result_column_id));
- ColumnPtr filter_column =
block->get_by_position(result_column_id).column;
+ ColumnPtr& filter_column =
block->get_by_position(result_column_id).column;
RETURN_IF_ERROR(_filter_block(block, filter_column,
column_to_keep, columns_to_filter));
} else {
RETURN_IF_ERROR(_filter_block(block, column_to_keep,
columns_to_filter));
@@ -256,7 +256,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
// generated from next batch, so the filter column is removed
ahead.
DCHECK_EQ(block->rows(), 0);
} else {
- ColumnPtr filter_column =
block->get_by_position(filter_column_id).column;
+ ColumnPtr& filter_column =
block->get_by_position(filter_column_id).column;
RETURN_IF_ERROR(_filter_block(block, filter_column,
origin_column_num,
_lazy_read_ctx.all_predicate_col_ids));
}
@@ -292,28 +292,39 @@ const uint8_t*
RowGroupReader::_build_filter_map(ColumnPtr& sv, size_t num_rows,
*can_filter_all = true;
} else {
DCHECK_EQ(column_size, num_rows);
- const uint8_t* null_map_column =
nullable_column->get_null_map_data().data();
+ const auto* __restrict null_map_data =
nullable_column->get_null_map_data().data();
ColumnUInt8* concrete_column = typeid_cast<ColumnUInt8*>(
nullable_column->get_nested_column_ptr()->assume_mutable().get());
- uint8_t* filter_data = concrete_column->get_data().data();
- for (int i = 0; i < num_rows; ++i) {
- (*_filter_ptr)[i] &= (!null_map_column[i]) & filter_data[i];
+ auto* __restrict filter_data = concrete_column->get_data().data();
+ if (_position_delete_ctx.has_filter) {
+ auto* __restrict pos_delete_filter_data =
_pos_delete_filter_ptr->data();
+ for (size_t i = 0; i < num_rows; ++i) {
+ filter_data[i] &= (!null_map_data[i]) &
pos_delete_filter_data[i];
+ }
+ } else {
+ for (size_t i = 0; i < num_rows; ++i) {
+ filter_data[i] &= (!null_map_data[i]);
+ }
}
- filter_map = _filter_ptr->data();
+ filter_map = filter_data;
}
} else if (auto* const_column = check_and_get_column<ColumnConst>(*sv)) {
// filter all
*can_filter_all = !const_column->get_bool(0);
} else {
- const IColumn::Filter& filter =
- assert_cast<const
doris::vectorized::ColumnVector<UInt8>&>(*sv).get_data();
-
+ MutableColumnPtr mutable_holder = sv->assume_mutable();
+ ColumnUInt8* mutable_filter_column =
typeid_cast<ColumnUInt8*>(mutable_holder.get());
+ IColumn::Filter& filter = mutable_filter_column->get_data();
auto* __restrict filter_data = filter.data();
const size_t size = filter.size();
- for (size_t i = 0; i < size; ++i) {
- (*_filter_ptr)[i] &= filter_data[i];
+
+ if (_position_delete_ctx.has_filter) {
+ auto* __restrict pos_delete_filter_data =
_pos_delete_filter_ptr->data();
+ for (size_t i = 0; i < size; ++i) {
+ filter_data[i] &= pos_delete_filter_data[i];
+ }
}
- filter_map = filter.data();
+ filter_map = filter_data;
}
return filter_map;
}
@@ -437,11 +448,13 @@ Status RowGroupReader::_read_empty_batch(size_t
batch_size, size_t* read_rows, b
}
Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) {
- _filter_ptr.reset(new IColumn::Filter(read_rows, 1));
if (!_position_delete_ctx.has_filter) {
+ _pos_delete_filter_ptr.reset(nullptr);
_total_read_rows += read_rows;
return Status::OK();
}
+ _pos_delete_filter_ptr.reset(new IColumn::Filter(read_rows, 1));
+ auto* __restrict _pos_delete_filter_data = _pos_delete_filter_ptr->data();
while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
const int64_t delete_row_index_in_row_group =
_position_delete_ctx.delete_rows[_position_delete_ctx.index] -
@@ -459,7 +472,7 @@ Status RowGroupReader::_build_pos_delete_filter(size_t
read_rows) {
_total_read_rows += read_rows;
return Status::OK();
}
- (*_filter_ptr)[index] = 0;
+ _pos_delete_filter_data[index] = 0;
++_position_delete_ctx.index;
break;
} else { // delete_row >= range.last_row
@@ -480,10 +493,10 @@ Status RowGroupReader::_build_pos_delete_filter(size_t
read_rows) {
return Status::OK();
}
-Status RowGroupReader::_filter_block(Block* block, const ColumnPtr
filter_column,
+Status RowGroupReader::_filter_block(Block* block, const ColumnPtr&
filter_column,
int column_to_keep, std::vector<uint32_t>
columns_to_filter) {
if (auto* nullable_column =
check_and_get_column<ColumnNullable>(*filter_column)) {
- ColumnPtr nested_column = nullable_column->get_nested_column_ptr();
+ const ColumnPtr& nested_column =
nullable_column->get_nested_column_ptr();
MutableColumnPtr mutable_holder =
nested_column->use_count() == 1
@@ -496,15 +509,22 @@ Status RowGroupReader::_filter_block(Block* block, const
ColumnPtr filter_column
"Illegal type {} of column for filter. Must be UInt8 or
Nullable(UInt8).",
filter_column->get_name());
}
- auto* __restrict null_map =
nullable_column->get_null_map_data().data();
+ auto* __restrict null_map_data =
nullable_column->get_null_map_data().data();
IColumn::Filter& filter = concrete_column->get_data();
auto* __restrict filter_data = filter.data();
-
const size_t size = filter.size();
- for (size_t i = 0; i < size; ++i) {
- (*_filter_ptr)[i] &= (!null_map[i]) & filter_data[i];
+
+ if (_position_delete_ctx.has_filter) {
+ auto* __restrict pos_delete_filter_data =
_pos_delete_filter_ptr->data();
+ for (size_t i = 0; i < size; ++i) {
+ filter_data[i] &= (!null_map_data[i]) &
pos_delete_filter_data[i];
+ }
+ } else {
+ for (size_t i = 0; i < size; ++i) {
+ filter_data[i] &= (!null_map_data[i]);
+ }
}
- RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter));
+ RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter,
filter));
} else if (auto* const_column =
check_and_get_column<ColumnConst>(*filter_column)) {
bool ret = const_column->get_bool(0);
if (!ret) {
@@ -513,16 +533,28 @@ Status RowGroupReader::_filter_block(Block* block, const
ColumnPtr filter_column
}
}
} else {
- const IColumn::Filter& filter =
- assert_cast<const
doris::vectorized::ColumnVector<UInt8>&>(*filter_column)
- .get_data();
+ MutableColumnPtr mutable_holder =
+ filter_column->use_count() == 1
+ ? filter_column->assume_mutable()
+ : filter_column->clone_resized(filter_column->size());
+ ColumnUInt8* mutable_filter_column =
typeid_cast<ColumnUInt8*>(mutable_holder.get());
+ if (!mutable_filter_column) {
+ return Status::InvalidArgument(
+ "Illegal type {} of column for filter. Must be UInt8 or
Nullable(UInt8).",
+ filter_column->get_name());
+ }
+ IColumn::Filter& filter = mutable_filter_column->get_data();
auto* __restrict filter_data = filter.data();
- const size_t size = filter.size();
- for (size_t i = 0; i < size; ++i) {
- (*_filter_ptr)[i] &= filter_data[i];
+
+ if (_position_delete_ctx.has_filter) {
+ auto* __restrict pos_delete_filter_data =
_pos_delete_filter_ptr->data();
+ const size_t size = filter.size();
+ for (size_t i = 0; i < size; ++i) {
+ filter_data[i] &= pos_delete_filter_data[i];
+ }
}
- RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter));
+ RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter,
filter));
}
Block::erase_useless_column(block, column_to_keep);
return Status::OK();
@@ -530,25 +562,37 @@ Status RowGroupReader::_filter_block(Block* block, const
ColumnPtr filter_column
Status RowGroupReader::_filter_block(Block* block, int column_to_keep,
const std::vector<uint32_t>&
columns_to_filter) {
- RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter));
+ if (_pos_delete_filter_ptr) {
+ RETURN_IF_ERROR(
+ _filter_block_internal(block, columns_to_filter,
(*_pos_delete_filter_ptr)));
+ }
Block::erase_useless_column(block, column_to_keep);
return Status::OK();
}
Status RowGroupReader::_filter_block_internal(Block* block,
- const std::vector<uint32_t>&
columns_to_filter) {
- size_t count = _filter_ptr->size() -
- simd::count_zero_num((int8_t*)_filter_ptr->data(),
_filter_ptr->size());
+ const std::vector<uint32_t>&
columns_to_filter,
+ const IColumn::Filter& filter) {
+ size_t filter_size = filter.size();
+ size_t count = filter_size - simd::count_zero_num((int8_t*)filter.data(),
filter_size);
if (count == 0) {
for (auto& col : columns_to_filter) {
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
}
} else {
for (auto& col : columns_to_filter) {
- if (block->get_by_position(col).column->size() != count) {
- block->get_by_position(col).column =
-
block->get_by_position(col).column->filter(*_filter_ptr, count);
+ size_t size = block->get_by_position(col).column->size();
+ if (size != count) {
+ auto& column = block->get_by_position(col).column;
+ if (column->size() != count) {
+ if (column->use_count() == 1) {
+ const auto result_size =
column->assume_mutable()->filter(filter);
+ CHECK_EQ(result_size, count);
+ } else {
+ column = column->filter(filter, count);
+ }
+ }
}
}
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 15ff153b60..34c9d228a1 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -132,11 +132,12 @@ private:
Block* block, size_t rows,
const std::unordered_map<std::string, VExprContext*>&
missing_columns);
Status _build_pos_delete_filter(size_t read_rows);
- Status _filter_block(Block* block, const ColumnPtr filter_column, int
column_to_keep,
+ Status _filter_block(Block* block, const ColumnPtr& filter_column, int
column_to_keep,
std::vector<uint32_t> columns_to_filter);
Status _filter_block(Block* block, int column_to_keep,
const vector<uint32_t>& columns_to_filter);
- Status _filter_block_internal(Block* block, const vector<uint32_t>&
columns_to_filter);
+ Status _filter_block_internal(Block* block, const vector<uint32_t>&
columns_to_filter,
+ const IColumn::Filter& filter);
io::FileReaderSPtr _file_reader;
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>
_column_readers;
@@ -154,7 +155,7 @@ private:
// If continuous batches are skipped, we can cache them to skip a whole
page
size_t _cached_filtered_rows = 0;
std::unique_ptr<TextConverter> _text_converter = nullptr;
- std::unique_ptr<IColumn::Filter> _filter_ptr = nullptr;
+ std::unique_ptr<IColumn::Filter> _pos_delete_filter_ptr = nullptr;
int64_t _total_read_rows = 0;
};
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]