This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 76ac77b1d0 [fix](compaction) compaction should catch exception when
read next block (#24484)
76ac77b1d0 is described below
commit 76ac77b1d0a237727117cefb8534d028befd059c
Author: huanghaibin <[email protected]>
AuthorDate: Sat Sep 16 16:25:55 2023 +0800
[fix](compaction) compaction should catch exception when read next block
(#24484)
---
be/src/vec/olap/vcollect_iterator.cpp | 11 +++---
be/src/vec/olap/vertical_merge_iterator.cpp | 52 ++++++++++++++++-------------
be/src/vec/olap/vertical_merge_iterator.h | 4 +--
3 files changed, 37 insertions(+), 30 deletions(-)
diff --git a/be/src/vec/olap/vcollect_iterator.cpp
b/be/src/vec/olap/vcollect_iterator.cpp
index e62b9ca4ce..b34893b630 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -795,10 +795,13 @@ Status
VCollectIterator::Level1Iterator::_merge_next(Block* block) {
// clear block column data
if (pre_row_ref.row_pos + continuous_row_in_block ==
pre_row_ref.block->rows()) {
const auto& src_block = pre_row_ref.block;
- for (size_t i = 0; i < column_count; ++i) {
-
target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column),
- pre_row_ref.row_pos,
continuous_row_in_block);
- }
+ RETURN_IF_CATCH_EXCEPTION({
+ for (size_t i = 0; i < column_count; ++i) {
+
target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column),
+ pre_row_ref.row_pos,
+
continuous_row_in_block);
+ }
+ });
continuous_row_in_block = 0;
pre_row_ref.reset();
}
diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp
b/be/src/vec/olap/vertical_merge_iterator.cpp
index 6e84c5f2b1..3ff1282fd2 100644
--- a/be/src/vec/olap/vertical_merge_iterator.cpp
+++ b/be/src/vec/olap/vertical_merge_iterator.cpp
@@ -277,45 +277,49 @@ bool VerticalMergeIteratorContext::compare(const
VerticalMergeIteratorContext& r
return result;
}
-void VerticalMergeIteratorContext::copy_rows(Block* block, size_t count) {
+Status VerticalMergeIteratorContext::copy_rows(Block* block, size_t count) {
Block& src = *_block;
Block& dst = *block;
DCHECK(count > 0);
auto start = _index_in_block;
_index_in_block += count - 1;
+ RETURN_IF_CATCH_EXCEPTION({
+ for (size_t i = 0; i < _ori_return_cols; ++i) {
+ auto& s_col = src.get_by_position(i);
+ auto& d_col = dst.get_by_position(i);
- for (size_t i = 0; i < _ori_return_cols; ++i) {
- auto& s_col = src.get_by_position(i);
- auto& d_col = dst.get_by_position(i);
+ ColumnPtr& s_cp = s_col.column;
+ ColumnPtr& d_cp = d_col.column;
- ColumnPtr& s_cp = s_col.column;
- ColumnPtr& d_cp = d_col.column;
-
- d_cp->assume_mutable()->insert_range_from(*s_cp, start, count);
- }
+ d_cp->assume_mutable()->insert_range_from(*s_cp, start, count);
+ }
+ });
+ return Status::OK();
}
// `advanced = false` when current block finished
-void VerticalMergeIteratorContext::copy_rows(Block* block, bool advanced) {
+Status VerticalMergeIteratorContext::copy_rows(Block* block, bool advanced) {
Block& src = *_block;
Block& dst = *block;
if (_cur_batch_num == 0) {
- return;
+ return Status::OK();
}
// copy a row to dst block column by column
size_t start = _index_in_block - _cur_batch_num + 1 - advanced;
+ RETURN_IF_CATCH_EXCEPTION({
+ for (size_t i = 0; i < _ori_return_cols; ++i) {
+ auto& s_col = src.get_by_position(i);
+ auto& d_col = dst.get_by_position(i);
- for (size_t i = 0; i < _ori_return_cols; ++i) {
- auto& s_col = src.get_by_position(i);
- auto& d_col = dst.get_by_position(i);
+ ColumnPtr& s_cp = s_col.column;
+ ColumnPtr& d_cp = d_col.column;
- ColumnPtr& s_cp = s_col.column;
- ColumnPtr& d_cp = d_col.column;
-
- d_cp->assume_mutable()->insert_range_from(*s_cp, start,
_cur_batch_num);
- }
+ d_cp->assume_mutable()->insert_range_from(*s_cp, start,
_cur_batch_num);
+ }
+ });
_cur_batch_num = 0;
+ return Status::OK();
}
Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts) {
@@ -426,14 +430,14 @@ Status VerticalHeapMergeIterator::next_batch(Block*
block) {
// skip cur row, copy pre ctx
++_merged_rows;
if (pre_ctx) {
- pre_ctx->copy_rows(block);
+ RETURN_IF_ERROR(pre_ctx->copy_rows(block));
pre_ctx = nullptr;
}
} else {
ctx->add_cur_batch();
if (pre_ctx != ctx) {
if (pre_ctx) {
- pre_ctx->copy_rows(block);
+ RETURN_IF_ERROR(pre_ctx->copy_rows(block));
}
pre_ctx = ctx;
}
@@ -444,7 +448,7 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) {
if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) {
// current block finished, ctx not advance
// so copy start_idx = (_index_in_block - _cur_batch_num + 1)
- ctx->copy_rows(block, false);
+ RETURN_IF_ERROR(ctx->copy_rows(block, false));
pre_ctx = nullptr;
}
}
@@ -547,7 +551,7 @@ Status VerticalFifoMergeIterator::next_batch(Block* block) {
if (_cur_iter_ctx->is_cur_block_finished() || row_idx >=
_block_row_max) {
// current block finished, ctx not advance
// so copy start_idx = (_index_in_block - _cur_batch_num + 1)
- _cur_iter_ctx->copy_rows(block, false);
+ RETURN_IF_ERROR(_cur_iter_ctx->copy_rows(block, false));
}
RETURN_IF_ERROR(_cur_iter_ctx->advance());
@@ -727,7 +731,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) {
auto same_source_cnt = _row_sources_buf->same_source_count(order,
limit);
_row_sources_buf->advance(same_source_cnt);
// copy rows to block
- ctx->copy_rows(block, same_source_cnt);
+ RETURN_IF_ERROR(ctx->copy_rows(block, same_source_cnt));
RETURN_IF_ERROR(ctx->advance());
rows += same_source_cnt;
st = _row_sources_buf->has_remaining();
diff --git a/be/src/vec/olap/vertical_merge_iterator.h
b/be/src/vec/olap/vertical_merge_iterator.h
index b587b6f2cd..70a452b2b6 100644
--- a/be/src/vec/olap/vertical_merge_iterator.h
+++ b/be/src/vec/olap/vertical_merge_iterator.h
@@ -164,8 +164,8 @@ public:
Status block_reset(const std::shared_ptr<Block>& block);
Status init(const StorageReadOptions& opts);
bool compare(const VerticalMergeIteratorContext& rhs) const;
- void copy_rows(Block* block, bool advanced = true);
- void copy_rows(Block* block, size_t count);
+ Status copy_rows(Block* block, bool advanced = true);
+ Status copy_rows(Block* block, size_t count);
Status advance();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]