This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.1 by this push:
new 7fbca548ab VMergeIterator should use nullable info from scanner
instead of schema (#10797)
7fbca548ab is described below
commit 7fbca548abb3d7d84ba0ca0c2ebc29b5177ec0a5
Author: starocean999 <[email protected]>
AuthorDate: Thu Jul 14 12:22:28 2022 +0800
VMergeIterator should use nullable info from scanner instead of schema
(#10797)
---
be/src/olap/rowset/beta_rowset_reader.cpp | 7 +-
be/src/olap/rowset/rowset_reader_context.h | 3 +
be/src/vec/olap/block_reader.cpp | 1 +
be/src/vec/olap/vgeneric_iterators.cpp | 110 +++++++++++++++++------------
be/src/vec/olap/vgeneric_iterators.h | 4 +-
5 files changed, 78 insertions(+), 47 deletions(-)
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index e314c4c1ab..2d8f79a8da 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -117,8 +117,11 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext*
read_context) {
// merge or union segment iterator
RowwiseIterator* final_iterator;
if (config::enable_storage_vectorization && read_context->is_vec) {
- if (read_context->need_ordered_result &&
_rowset->rowset_meta()->is_segments_overlapping()) {
- final_iterator = vectorized::new_merge_iterator(iterators,
_parent_tracker, read_context->sequence_id_idx, read_context->is_unique);
+ if (read_context->need_ordered_result &&
+ _rowset->rowset_meta()->is_segments_overlapping()) {
+ final_iterator = vectorized::new_merge_iterator(
+ iterators, _parent_tracker, read_context->sequence_id_idx,
+ read_context->is_unique,
read_context->tablet_columns_convert_to_null_set);
} else {
final_iterator = vectorized::new_union_iterator(iterators,
_parent_tracker);
}
diff --git a/be/src/olap/rowset/rowset_reader_context.h
b/be/src/olap/rowset/rowset_reader_context.h
index 0ae42f6cf4..0bdb903e2e 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -64,6 +64,9 @@ struct RowsetReaderContext {
int batch_size = 1024;
bool is_vec = false;
bool is_unique = false;
+
+ // need pass this info to VMergeIterator
+ std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr;
};
} // namespace doris
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 2d5c542379..19b1192444 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -52,6 +52,7 @@ OLAPStatus BlockReader::_init_collect_iter(const
ReaderParams& read_params,
_reader_context.batch_size = _batch_size;
_reader_context.is_vec = true;
+ _reader_context.tablet_columns_convert_to_null_set =
_tablet_columns_convert_to_null_set;
for (auto& rs_reader : rs_readers) {
RETURN_NOT_OK(rs_reader->init(&_reader_context));
OLAPStatus res = _vcollect_iter.add_child(rs_reader);
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp
b/be/src/vec/olap/vgeneric_iterators.cpp
index 2440b09888..13b4acf30c 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -60,28 +60,28 @@ public:
size_t data_len = 0;
const auto* col_schema = _schema.column(j);
switch (col_schema->type()) {
- case OLAP_FIELD_TYPE_SMALLINT:
- *(int16_t*)data = _rows_returned + j;
- data_len = sizeof(int16_t);
- break;
- case OLAP_FIELD_TYPE_INT:
- *(int32_t*)data = _rows_returned + j;
- data_len = sizeof(int32_t);
- break;
- case OLAP_FIELD_TYPE_BIGINT:
- *(int64_t*)data = _rows_returned + j;
- data_len = sizeof(int64_t);
- break;
- case OLAP_FIELD_TYPE_FLOAT:
- *(float*)data = _rows_returned + j;
- data_len = sizeof(float);
- break;
- case OLAP_FIELD_TYPE_DOUBLE:
- *(double*)data = _rows_returned + j;
- data_len = sizeof(double);
- break;
- default:
- break;
+ case OLAP_FIELD_TYPE_SMALLINT:
+ *(int16_t*)data = _rows_returned + j;
+ data_len = sizeof(int16_t);
+ break;
+ case OLAP_FIELD_TYPE_INT:
+ *(int32_t*)data = _rows_returned + j;
+ data_len = sizeof(int32_t);
+ break;
+ case OLAP_FIELD_TYPE_BIGINT:
+ *(int64_t*)data = _rows_returned + j;
+ data_len = sizeof(int64_t);
+ break;
+ case OLAP_FIELD_TYPE_FLOAT:
+ *(float*)data = _rows_returned + j;
+ data_len = sizeof(float);
+ break;
+ case OLAP_FIELD_TYPE_DOUBLE:
+ *(double*)data = _rows_returned + j;
+ data_len = sizeof(double);
+ break;
+ default:
+ break;
}
vi.insert_data(data, data_len);
@@ -91,8 +91,7 @@ public:
++_rows_returned;
}
- if (row_idx > 0)
- return Status::OK();
+ if (row_idx > 0) return Status::OK();
return Status::EndOfFile("End of VAutoIncrementIterator");
}
@@ -120,12 +119,14 @@ Status VAutoIncrementIterator::init(const
StorageReadOptions& opts) {
// }
class VMergeIteratorContext {
public:
- VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx, bool
is_unique)
+ VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx, bool
is_unique,
+ const std::unordered_set<uint32_t>*
tablet_columns_convert_to_null_set)
: _iter(iter),
_sequence_id_idx(sequence_id_idx),
_is_unique(is_unique),
_num_columns(iter->schema().num_column_ids()),
- _num_key_columns(iter->schema().num_key_columns()) {}
+ _num_key_columns(iter->schema().num_key_columns()),
+
_tablet_columns_convert_to_null_set(tablet_columns_convert_to_null_set) {}
VMergeIteratorContext(const VMergeIteratorContext&) = delete;
VMergeIteratorContext(VMergeIteratorContext&&) = delete;
@@ -137,8 +138,7 @@ public:
_iter = nullptr;
}
- Status block_reset()
- {
+ Status block_reset() {
if (!_block) {
const Schema& schema = _iter->schema();
const auto& column_ids = schema.column_ids();
@@ -149,11 +149,21 @@ public:
return Status::RuntimeError("invalid data type");
}
if (column_desc->is_nullable()) {
- data_type =
std::make_shared<vectorized::DataTypeNullable>(std::move(data_type));
+ data_type =
+
std::make_shared<vectorized::DataTypeNullable>(std::move(data_type));
}
auto column = data_type->create_column();
column->reserve(_block_row_max);
- _block.insert(ColumnWithTypeAndName(std::move(column),
data_type, column_desc->name()));
+
+ if (_tablet_columns_convert_to_null_set &&
+ _tablet_columns_convert_to_null_set->find(column_ids[i]) !=
+ _tablet_columns_convert_to_null_set->end()) {
+ column =
make_nullable(std::move(column))->assume_mutable();
+ data_type = make_nullable(data_type);
+ }
+
+ _block.insert(
+ ColumnWithTypeAndName(std::move(column), data_type,
column_desc->name()));
}
} else {
_block.clear_column_data();
@@ -233,6 +243,7 @@ private:
int _block_row_max = 4096;
int _num_columns;
int _num_key_columns;
+ const std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set =
nullptr;
};
Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
@@ -281,8 +292,13 @@ Status VMergeIteratorContext::_load_next_block() {
class VMergeIterator : public RowwiseIterator {
public:
// VMergeIterator takes the ownership of input iterators
- VMergeIterator(std::vector<RowwiseIterator*>& iters,
std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) :
- _origin_iters(iters),_sequence_id_idx(sequence_id_idx),
_is_unique(is_unique) {
+ VMergeIterator(std::vector<RowwiseIterator*>& iters,
std::shared_ptr<MemTracker> parent,
+ int sequence_id_idx, bool is_unique,
+ const std::unordered_set<uint32_t>*
tablet_columns_convert_to_null_set)
+ : _origin_iters(iters),
+ _sequence_id_idx(sequence_id_idx),
+ _is_unique(is_unique),
+
_tablet_columns_convert_to_null_set(tablet_columns_convert_to_null_set) {
// use for count the mem use of Block use in Merge
_mem_tracker = MemTracker::CreateTracker(-1, "VMergeIterator", parent,
false);
}
@@ -313,15 +329,16 @@ private:
}
};
- using VMergeHeap = std::priority_queue<VMergeIteratorContext*,
- std::vector<VMergeIteratorContext*>,
- VMergeContextComparator>;
+ using VMergeHeap =
+ std::priority_queue<VMergeIteratorContext*,
std::vector<VMergeIteratorContext*>,
+ VMergeContextComparator>;
VMergeHeap _merge_heap;
int block_row_max = 0;
int _sequence_id_idx = -1;
bool _is_unique = false;
+ const std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set =
nullptr;
};
Status VMergeIterator::init(const StorageReadOptions& opts) {
@@ -331,7 +348,8 @@ Status VMergeIterator::init(const StorageReadOptions& opts)
{
_schema = &(*_origin_iters.begin())->schema();
for (auto iter : _origin_iters) {
- auto ctx = std::make_unique<VMergeIteratorContext>(iter,
_sequence_id_idx, _is_unique);
+ auto ctx = std::make_unique<VMergeIteratorContext>(iter,
_sequence_id_idx, _is_unique,
+
_tablet_columns_convert_to_null_set);
RETURN_IF_ERROR(ctx->init(opts));
if (!ctx->valid()) {
continue;
@@ -348,8 +366,7 @@ Status VMergeIterator::init(const StorageReadOptions& opts)
{
Status VMergeIterator::next_batch(vectorized::Block* block) {
while (block->rows() < block_row_max) {
- if (_merge_heap.empty())
- break;
+ if (_merge_heap.empty()) break;
auto ctx = _merge_heap.top();
_merge_heap.pop();
@@ -386,7 +403,8 @@ public:
}
~VUnionIterator() override {
- std::for_each(_origin_iters.begin(), _origin_iters.end(),
std::default_delete<RowwiseIterator>());
+ std::for_each(_origin_iters.begin(), _origin_iters.end(),
+ std::default_delete<RowwiseIterator>());
}
Status init(const StorageReadOptions& opts) override;
@@ -432,15 +450,19 @@ Status VUnionIterator::next_batch(vectorized::Block*
block) {
return Status::EndOfFile("End of VUnionIterator");
}
-
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs,
std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) {
+RowwiseIterator* new_merge_iterator(
+ std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker>
parent,
+ int sequence_id_idx, bool is_unique,
+ const std::unordered_set<uint32_t>*
tablet_columns_convert_to_null_set) {
if (inputs.size() == 1) {
return *(inputs.begin());
}
- return new VMergeIterator(inputs, parent, sequence_id_idx, is_unique);
+ return new VMergeIterator(inputs, parent, sequence_id_idx, is_unique,
+ tablet_columns_convert_to_null_set);
}
-RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs,
std::shared_ptr<MemTracker> parent) {
+RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs,
+ std::shared_ptr<MemTracker> parent) {
if (inputs.size() == 1) {
return *(inputs.begin());
}
@@ -451,6 +473,6 @@ RowwiseIterator* new_auto_increment_iterator(const Schema&
schema, size_t num_ro
return new VAutoIncrementIterator(schema, num_rows);
}
-}
+} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/olap/vgeneric_iterators.h
b/be/src/vec/olap/vgeneric_iterators.h
index 063d07da51..0b9af85ef5 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -27,7 +27,9 @@ namespace vectorized {
//
// Inputs iterators' ownership is taken by created merge iterator. And client
// should delete returned iterator after usage.
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs,
std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique);
+RowwiseIterator* new_merge_iterator(
+ std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker>
parent, int sequence_id_idx, bool is_unique,
+ const std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set
= nullptr);
// Create a union iterator for input iterators. Union iterator will read
// input iterators one by one.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]