This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 06ff59bc03d [Performance](sink) SIMD the tablet sink valied data
function (#25480)
06ff59bc03d is described below
commit 06ff59bc03db294508bb0d169292d4a3302ea974
Author: HappenLee <[email protected]>
AuthorDate: Tue Oct 17 16:21:08 2023 +0800
[Performance](sink) SIMD the tablet sink valied data function (#25480)
---
be/src/vec/sink/vtablet_block_convertor.cpp | 173 ++++++++++++++--------------
be/src/vec/sink/vtablet_block_convertor.h | 12 +-
be/src/vec/sink/vtablet_sink_v2.cpp | 5 +-
be/src/vec/sink/writer/vtablet_writer.cpp | 13 +--
4 files changed, 104 insertions(+), 99 deletions(-)
diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp
b/be/src/vec/sink/vtablet_block_convertor.cpp
index da9e3fc743c..436eb3639de 100644
--- a/be/src/vec/sink/vtablet_block_convertor.cpp
+++ b/be/src/vec/sink/vtablet_block_convertor.cpp
@@ -72,12 +72,12 @@ Status OlapTableBlockConvertor::validate_and_convert_block(
RETURN_IF_ERROR(_fill_auto_inc_cols(block.get(), rows));
}
- int64_t filtered_rows = 0;
+ int filtered_rows = 0;
{
SCOPED_RAW_TIMER(&_validate_data_ns);
- _filter_bitmap.Reset(block->rows());
+ _filter_map.resize(rows, 0);
bool stop_processing = false;
- RETURN_IF_ERROR(_validate_data(state, block.get(), filtered_rows,
&stop_processing));
+ RETURN_IF_ERROR(_validate_data(state, block.get(), rows,
filtered_rows, &stop_processing));
_num_filtered_rows += filtered_rows;
has_filtered_rows = filtered_rows > 0;
if (stop_processing) {
@@ -161,11 +161,12 @@ Status
OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
bool is_nullable,
vectorized::ColumnPtr column,
size_t slot_index, bool*
stop_processing,
fmt::memory_buffer&
error_prefix,
+ const uint32_t row_count,
vectorized::IColumn::Permutation* rows) {
- DCHECK((rows == nullptr) || (rows->size() == column->size()));
+ DCHECK((rows == nullptr) || (rows->size() == row_count));
fmt::memory_buffer error_msg;
auto set_invalid_and_append_error_msg = [&](int row) {
- _filter_bitmap.Set(row, true);
+ _filter_map[row] = true;
auto ret = state->append_error_msg_to_file([]() -> std::string {
return ""; },
[&error_prefix,
&error_msg]() -> std::string {
return
fmt::to_string(error_prefix) +
@@ -180,10 +181,9 @@ Status
OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
auto& real_column_ptr = column_ptr == nullptr ? column :
(column_ptr->get_nested_column_ptr());
auto null_map = column_ptr == nullptr ? nullptr :
column_ptr->get_null_map_data().data();
auto need_to_validate = [&null_map, this](size_t j, size_t row) {
- return !_filter_bitmap.Get(row) && (null_map == nullptr || null_map[j]
== 0);
+ return !_filter_map[row] && (null_map == nullptr || null_map[j] == 0);
};
- ssize_t last_invalid_row = -1;
switch (type.type) {
case TYPE_CHAR:
case TYPE_VARCHAR:
@@ -196,43 +196,49 @@ Status
OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
if (type.len > 0) {
limit = std::min(config::string_type_length_soft_limit_bytes,
type.len);
}
- for (size_t j = 0; j < column->size(); ++j) {
- auto row = rows ? (*rows)[j] : j;
- if (row == last_invalid_row) {
- continue;
- }
- if (need_to_validate(j, row)) {
- auto str_val = column_string->get_data_at(j);
- bool invalid = str_val.size > limit;
- if (invalid) {
- last_invalid_row = row;
- if (str_val.size > type.len) {
- fmt::format_to(error_msg, "{}",
- "the length of input is too long than
schema. ");
- fmt::format_to(error_msg, "first 32 bytes of input
str: [{}] ",
- str_val.to_prefix(32));
- fmt::format_to(error_msg, "schema length: {}; ",
type.len);
- fmt::format_to(error_msg, "actual length: {}; ",
str_val.size);
- } else if (str_val.size > limit) {
- fmt::format_to(error_msg, "{}",
- "the length of input string is too long
than vec schema. ");
- fmt::format_to(error_msg, "first 32 bytes of input
str: [{}] ",
- str_val.to_prefix(32));
- fmt::format_to(error_msg, "schema length: {}; ",
type.len);
- fmt::format_to(error_msg, "limit length: {}; ", limit);
- fmt::format_to(error_msg, "actual length: {}; ",
str_val.size);
+
+ auto* __restrict offsets = column_string->get_offsets().data();
+ int invalid_count = 0;
+ for (int j = 0; j < row_count; ++j) {
+ invalid_count += (offsets[j] - offsets[j - 1]) > limit;
+ }
+
+ if (invalid_count) {
+ for (size_t j = 0; j < row_count; ++j) {
+ auto row = rows ? (*rows)[j] : j;
+ if (need_to_validate(j, row)) {
+ auto str_val = column_string->get_data_at(j);
+ bool invalid = str_val.size > limit;
+ if (invalid) {
+ if (str_val.size > type.len) {
+ fmt::format_to(error_msg, "{}",
+ "the length of input is too long
than schema. ");
+ fmt::format_to(error_msg, "first 32 bytes of input
str: [{}] ",
+ str_val.to_prefix(32));
+ fmt::format_to(error_msg, "schema length: {}; ",
type.len);
+ fmt::format_to(error_msg, "actual length: {}; ",
str_val.size);
+ } else if (str_val.size > limit) {
+ fmt::format_to(
+ error_msg, "{}",
+ "the length of input string is too long
than vec schema. ");
+ fmt::format_to(error_msg, "first 32 bytes of input
str: [{}] ",
+ str_val.to_prefix(32));
+ fmt::format_to(error_msg, "schema length: {}; ",
type.len);
+ fmt::format_to(error_msg, "limit length: {}; ",
limit);
+ fmt::format_to(error_msg, "actual length: {}; ",
str_val.size);
+ }
+ RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
}
- RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
}
}
}
break;
}
case TYPE_JSONB: {
- const auto column_string =
+ const auto* column_string =
assert_cast<const
vectorized::ColumnString*>(real_column_ptr.get());
- for (size_t j = 0; j < column->size(); ++j) {
- if (!_filter_bitmap.Get(j)) {
+ for (size_t j = 0; j < row_count; ++j) {
+ if (!_filter_map[j]) {
if (is_nullable && column_ptr && column_ptr->is_null_at(j)) {
continue;
}
@@ -248,16 +254,13 @@ Status
OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
break;
}
case TYPE_DECIMALV2: {
- auto column_decimal =
const_cast<vectorized::ColumnDecimal<vectorized::Decimal128>*>(
+ auto* column_decimal =
const_cast<vectorized::ColumnDecimal<vectorized::Decimal128>*>(
assert_cast<const
vectorized::ColumnDecimal<vectorized::Decimal128>*>(
real_column_ptr.get()));
const auto& max_decimalv2 = _get_decimalv2_min_or_max<false>(type);
const auto& min_decimalv2 = _get_decimalv2_min_or_max<true>(type);
- for (size_t j = 0; j < column->size(); ++j) {
+ for (size_t j = 0; j < row_count; ++j) {
auto row = rows ? (*rows)[j] : j;
- if (row == last_invalid_row) {
- continue;
- }
if (need_to_validate(j, row)) {
auto dec_val = binary_cast<vectorized::Int128, DecimalV2Value>(
column_decimal->get_data()[j]);
@@ -284,7 +287,6 @@ Status
OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
}
if (invalid) {
- last_invalid_row = row;
RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
}
}
@@ -292,31 +294,36 @@ Status
OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
break;
}
case TYPE_DECIMAL32: {
-#define CHECK_VALIDATION_FOR_DECIMALV3(DecimalType)
\
- auto column_decimal = const_cast<vectorized::ColumnDecimal<DecimalType>*>(
\
- assert_cast<const
vectorized::ColumnDecimal<DecimalType>*>(real_column_ptr.get())); \
- const auto& max_decimal = _get_decimalv3_min_or_max<DecimalType,
false>(type); \
- const auto& min_decimal = _get_decimalv3_min_or_max<DecimalType,
true>(type); \
- for (size_t j = 0; j < column->size(); ++j) {
\
- auto row = rows ? (*rows)[j] : j;
\
- if (row == last_invalid_row) {
\
- continue;
\
- }
\
- if (need_to_validate(j, row)) {
\
- auto dec_val = column_decimal->get_data()[j];
\
- bool invalid = false;
\
- if (dec_val > max_decimal || dec_val < min_decimal) {
\
- fmt::format_to(error_msg, "{}", "decimal value is not valid
for definition"); \
- fmt::format_to(error_msg, ", value={}", dec_val);
\
- fmt::format_to(error_msg, ", precision={}, scale={}",
type.precision, type.scale); \
- fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal,
max_decimal); \
- invalid = true;
\
- }
\
- if (invalid) {
\
- last_invalid_row = row;
\
- RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
\
- }
\
- }
\
+#define CHECK_VALIDATION_FOR_DECIMALV3(DecimalType)
\
+ auto column_decimal = const_cast<vectorized::ColumnDecimal<DecimalType>*>(
\
+ assert_cast<const
vectorized::ColumnDecimal<DecimalType>*>(real_column_ptr.get())); \
+ const auto& max_decimal = _get_decimalv3_min_or_max<DecimalType,
false>(type); \
+ const auto& min_decimal = _get_decimalv3_min_or_max<DecimalType,
true>(type); \
+ const auto* __restrict datas = column_decimal->get_data().data();
\
+ int invalid_count = 0;
\
+ for (int j = 0; j < row_count; ++j) {
\
+ const auto dec_val = datas[j];
\
+ invalid_count += dec_val > max_decimal || dec_val < min_decimal;
\
+ }
\
+ if (invalid_count) {
\
+ for (size_t j = 0; j < row_count; ++j) {
\
+ auto row = rows ? (*rows)[j] : j;
\
+ if (need_to_validate(j, row)) {
\
+ auto dec_val = column_decimal->get_data()[j];
\
+ bool invalid = false;
\
+ if (dec_val > max_decimal || dec_val < min_decimal) {
\
+ fmt::format_to(error_msg, "{}", "decimal value is not
valid for definition"); \
+ fmt::format_to(error_msg, ", value={}", dec_val);
\
+ fmt::format_to(error_msg, ", precision={}, scale={}",
type.precision, \
+ type.scale);
\
+ fmt::format_to(error_msg, ", min={}, max={}; ",
min_decimal, max_decimal); \
+ invalid = true;
\
+ }
\
+ if (invalid) {
\
+ RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
\
+ }
\
+ }
\
+ }
\
}
CHECK_VALIDATION_FOR_DECIMALV3(vectorized::Decimal32);
break;
@@ -331,13 +338,13 @@ Status
OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
}
#undef CHECK_VALIDATION_FOR_DECIMALV3
case TYPE_ARRAY: {
- const auto column_array =
+ const auto* column_array =
assert_cast<const
vectorized::ColumnArray*>(real_column_ptr.get());
DCHECK(type.children.size() == 1);
auto nested_type = type.children[0];
const auto& offsets = column_array->get_offsets();
vectorized::IColumn::Permutation permutation(offsets.back());
- for (size_t r = 0; r < offsets.size(); ++r) {
+ for (size_t r = 0; r < row_count; ++r) {
for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
permutation[c] = rows ? (*rows)[r] : r;
}
@@ -345,7 +352,7 @@ Status
OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
fmt::format_to(error_prefix, "ARRAY type failed: ");
RETURN_IF_ERROR(_validate_column(state, nested_type,
type.contains_nulls[0],
column_array->get_data_ptr(),
slot_index, stop_processing,
- error_prefix, &permutation));
+ error_prefix, permutation.size(),
&permutation));
break;
}
case TYPE_MAP: {
@@ -355,7 +362,7 @@ Status
OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
auto val_type = type.children[1];
const auto& offsets = column_map->get_offsets();
vectorized::IColumn::Permutation permutation(offsets.back());
- for (size_t r = 0; r < offsets.size(); ++r) {
+ for (size_t r = 0; r < row_count; ++r) {
for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
permutation[c] = rows ? (*rows)[r] : r;
}
@@ -363,10 +370,10 @@ Status
OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
fmt::format_to(error_prefix, "MAP type failed: ");
RETURN_IF_ERROR(_validate_column(state, key_type,
type.contains_nulls[0],
column_map->get_keys_ptr(),
slot_index, stop_processing,
- error_prefix, &permutation));
+ error_prefix, permutation.size(),
&permutation));
RETURN_IF_ERROR(_validate_column(state, val_type,
type.contains_nulls[1],
column_map->get_values_ptr(),
slot_index, stop_processing,
- error_prefix, &permutation));
+ error_prefix, permutation.size(),
&permutation));
break;
}
case TYPE_STRUCT: {
@@ -377,7 +384,8 @@ Status
OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) {
RETURN_IF_ERROR(_validate_column(state, type.children[sc],
type.contains_nulls[sc],
column_struct->get_column_ptr(sc), slot_index,
- stop_processing, error_prefix));
+ stop_processing, error_prefix,
+
column_struct->get_column_ptr(sc)->size()));
}
break;
}
@@ -390,15 +398,11 @@ Status
OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
// 1. column is nullable but the desc is not nullable
// 2. desc->type is BITMAP
if ((!is_nullable || type == TYPE_OBJECT) && column_ptr) {
- for (int j = 0; j < column->size(); ++j) {
+ for (int j = 0; j < row_count; ++j) {
auto row = rows ? (*rows)[j] : j;
- if (row == last_invalid_row) {
- continue;
- }
- if (null_map[j] && !_filter_bitmap.Get(row)) {
+ if (null_map[j] && !_filter_map[row]) {
fmt::format_to(error_msg, "null value for not null column,
type={}",
type.debug_string());
- last_invalid_row = row;
RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
}
}
@@ -408,7 +412,8 @@ Status
OlapTableBlockConvertor::_validate_column(RuntimeState* state, const Type
}
Status OlapTableBlockConvertor::_validate_data(RuntimeState* state,
vectorized::Block* block,
- int64_t& filtered_rows, bool*
stop_processing) {
+ const uint32_t rows, int&
filtered_rows,
+ bool* stop_processing) {
for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
SlotDescriptor* desc = _output_tuple_desc->slots()[i];
DCHECK(block->columns() > i)
@@ -423,12 +428,12 @@ Status
OlapTableBlockConvertor::_validate_data(RuntimeState* state, vectorized::
fmt::memory_buffer error_prefix;
fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name());
RETURN_IF_ERROR(_validate_column(state, desc->type(),
desc->is_nullable(), column, i,
- stop_processing, error_prefix));
+ stop_processing, error_prefix, rows));
}
filtered_rows = 0;
- for (int i = 0; i < block->rows(); ++i) {
- filtered_rows += _filter_bitmap.Get(i);
+ for (int i = 0; i < rows; ++i) {
+ filtered_rows += _filter_map[i];
}
return Status::OK();
}
diff --git a/be/src/vec/sink/vtablet_block_convertor.h
b/be/src/vec/sink/vtablet_block_convertor.h
index bf5148cf0a7..27440c628be 100644
--- a/be/src/vec/sink/vtablet_block_convertor.h
+++ b/be/src/vec/sink/vtablet_block_convertor.h
@@ -40,14 +40,14 @@ namespace doris::vectorized {
class OlapTableBlockConvertor {
public:
OlapTableBlockConvertor(TupleDescriptor* output_tuple_desc)
- : _output_tuple_desc(output_tuple_desc), _filter_bitmap(1024) {}
+ : _output_tuple_desc(output_tuple_desc) {}
Status validate_and_convert_block(RuntimeState* state, vectorized::Block*
input_block,
std::shared_ptr<vectorized::Block>&
block,
vectorized::VExprContextSPtrs
output_vexpr_ctxs, size_t rows,
bool& has_filtered_rows);
- const Bitmap& filter_bitmap() { return _filter_bitmap; }
+ const char* filter_map() const { return _filter_map.data(); }
int64_t validate_data_ns() const { return _validate_data_ns; }
@@ -66,15 +66,15 @@ private:
Status _validate_column(RuntimeState* state, const TypeDescriptor& type,
bool is_nullable,
vectorized::ColumnPtr column, size_t slot_index,
bool* stop_processing,
- fmt::memory_buffer& error_prefix,
+ fmt::memory_buffer& error_prefix, const uint32_t
row_count,
vectorized::IColumn::Permutation* rows = nullptr);
// make input data valid for OLAP table
// return number of invalid/filtered rows.
// invalid row number is set in Bitmap
// set stop_processing if we want to stop the whole process now.
- Status _validate_data(RuntimeState* state, vectorized::Block* block,
int64_t& filtered_rows,
- bool* stop_processing);
+ Status _validate_data(RuntimeState* state, vectorized::Block* block, const
uint32_t rows,
+ int& filtered_rows, bool* stop_processing);
// some output column of output expr may have different nullable property
with dest slot desc
// so here need to do the convert operation
@@ -94,7 +94,7 @@ private:
std::map<int, int128_t> _max_decimal128_val;
std::map<int, int128_t> _min_decimal128_val;
- Bitmap _filter_bitmap;
+ std::vector<char> _filter_map;
int64_t _validate_data_ns = 0;
int64_t _num_filtered_rows = 0;
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 45fa48cb327..13d25f5cf46 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -284,9 +284,10 @@ Status VOlapTableSinkV2::send(RuntimeState* state,
vectorized::Block* input_bloc
RowsForTablet rows_for_tablet;
_tablet_finder->clear_for_new_batch();
_row_distribution_watch.start();
- auto num_rows = block->rows();
+ const auto num_rows = input_rows;
+ const auto* __restrict filter_map = _block_convertor->filter_map();
for (int i = 0; i < num_rows; ++i) {
- if (UNLIKELY(has_filtered_rows) &&
_block_convertor->filter_bitmap().Get(i)) {
+ if (UNLIKELY(has_filtered_rows) && filter_map[i]) {
continue;
}
const VOlapTablePartition* partition = nullptr;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index c0676a5670b..7035653ee72 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1343,7 +1343,7 @@ Status
VTabletWriter::_single_partition_generate(RuntimeState* state, vectorized
uint32_t tablet_index = 0;
bool stop_processing = false;
for (int32_t i = 0; i < num_rows; ++i) {
- if (UNLIKELY(has_filtered_rows) &&
_block_convertor->filter_bitmap().Get(i)) {
+ if (UNLIKELY(has_filtered_rows) && _block_convertor->filter_map()[i]) {
continue;
}
bool is_continue = false;
@@ -1373,7 +1373,7 @@ Status
VTabletWriter::_single_partition_generate(RuntimeState* state, vectorized
auto& selector = channel_to_payload[j][channel.get()].first;
auto& tablet_ids = channel_to_payload[j][channel.get()].second;
for (int32_t i = 0; i < num_rows; ++i) {
- if (UNLIKELY(has_filtered_rows) &&
_block_convertor->filter_bitmap().Get(i)) {
+ if (UNLIKELY(has_filtered_rows) &&
_block_convertor->filter_map()[i]) {
continue;
}
selector->push_back(i);
@@ -1712,7 +1712,7 @@ Status
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
// try to find tablet and save missing value
for (int i = 0; i < num_rows; ++i) {
- if (UNLIKELY(has_filtered_rows) &&
_block_convertor->filter_bitmap().Get(i)) {
+ if (UNLIKELY(has_filtered_rows) &&
_block_convertor->filter_map()[i]) {
continue;
}
const VOlapTablePartition* partition = nullptr;
@@ -1761,7 +1761,7 @@ Status
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
} // creating done
} else { // not auto partition
for (int i = 0; i < num_rows; ++i) {
- if (UNLIKELY(has_filtered_rows) &&
_block_convertor->filter_bitmap().Get(i)) {
+ if (UNLIKELY(has_filtered_rows) &&
_block_convertor->filter_map()[i]) {
continue;
}
const VOlapTablePartition* partition = nullptr;
@@ -1794,7 +1794,7 @@ Status
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
vectorized::IColumn::Filter& filter_col =
static_cast<vectorized::ColumnUInt8*>(filter.get())->get_data();
for (size_t i = 0; i < filter_col.size(); ++i) {
- filter_data[i] = !_block_convertor->filter_bitmap().Get(i);
+ filter_data[i] = !_block_convertor->filter_map()[i];
}
RETURN_IF_CATCH_EXCEPTION(vectorized::Block::filter_block_internal(
block.get(), filter_col, block->columns()));
@@ -1849,8 +1849,7 @@ Status VTabletWriter::write_wal(OlapTableBlockConvertor*
block_convertor,
auto cloneBlock = block->clone_without_columns();
auto res_block =
vectorized::MutableBlock::build_mutable_block(&cloneBlock);
for (int i = 0; i < num_rows; ++i) {
- if (block_convertor->num_filtered_rows() > 0 &&
- block_convertor->filter_bitmap().Get(i)) {
+ if (block_convertor->num_filtered_rows() > 0 &&
block_convertor->filter_map()[i]) {
continue;
}
if (tablet_finder->num_filtered_rows() > 0 &&
tablet_finder->filter_bitmap().Get(i)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]