This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5879a63a9169993c298ecc892a69f3b43415ea97 Author: huanghaibin <[email protected]> AuthorDate: Sat Sep 16 16:25:55 2023 +0800 [fix](compaction) compaction should catch exception when read next block (#24484) --- be/src/vec/olap/vcollect_iterator.cpp | 11 +++--- be/src/vec/olap/vertical_merge_iterator.cpp | 52 ++++++++++++++++------------- be/src/vec/olap/vertical_merge_iterator.h | 4 +-- 3 files changed, 37 insertions(+), 30 deletions(-) diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index a85f526d65..02af05bb4a 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -789,10 +789,13 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block* block) { // clear block column data if (pre_row_ref.row_pos + continuous_row_in_block == pre_row_ref.block->rows()) { const auto& src_block = pre_row_ref.block; - for (size_t i = 0; i < column_count; ++i) { - target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column), - pre_row_ref.row_pos, continuous_row_in_block); - } + RETURN_IF_CATCH_EXCEPTION({ + for (size_t i = 0; i < column_count; ++i) { + target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column), + pre_row_ref.row_pos, + continuous_row_in_block); + } + }); continuous_row_in_block = 0; pre_row_ref.reset(); } diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp b/be/src/vec/olap/vertical_merge_iterator.cpp index 27bcc4f8e0..e394196617 100644 --- a/be/src/vec/olap/vertical_merge_iterator.cpp +++ b/be/src/vec/olap/vertical_merge_iterator.cpp @@ -277,45 +277,49 @@ bool VerticalMergeIteratorContext::compare(const VerticalMergeIteratorContext& r return result; } -void VerticalMergeIteratorContext::copy_rows(Block* block, size_t count) { +Status VerticalMergeIteratorContext::copy_rows(Block* block, size_t count) { Block& src = *_block; Block& dst = *block; DCHECK(count > 0); auto start = _index_in_block; _index_in_block += count - 1; + RETURN_IF_CATCH_EXCEPTION({ + for (size_t i = 0; i < _ori_return_cols; ++i) { + auto& s_col = src.get_by_position(i); + auto& d_col = dst.get_by_position(i); - for (size_t i = 0; i < _ori_return_cols; ++i) { - auto& s_col = src.get_by_position(i); - auto& d_col = dst.get_by_position(i); + ColumnPtr& s_cp = s_col.column; + ColumnPtr& d_cp = d_col.column; - ColumnPtr& s_cp = s_col.column; - ColumnPtr& d_cp = d_col.column; - - d_cp->assume_mutable()->insert_range_from(*s_cp, start, count); - } + d_cp->assume_mutable()->insert_range_from(*s_cp, start, count); + } + }); + return Status::OK(); } // `advanced = false` when current block finished -void VerticalMergeIteratorContext::copy_rows(Block* block, bool advanced) { +Status VerticalMergeIteratorContext::copy_rows(Block* block, bool advanced) { Block& src = *_block; Block& dst = *block; if (_cur_batch_num == 0) { - return; + return Status::OK(); } // copy a row to dst block column by column size_t start = _index_in_block - _cur_batch_num + 1 - advanced; + RETURN_IF_CATCH_EXCEPTION({ + for (size_t i = 0; i < _ori_return_cols; ++i) { + auto& s_col = src.get_by_position(i); + auto& d_col = dst.get_by_position(i); - for (size_t i = 0; i < _ori_return_cols; ++i) { - auto& s_col = src.get_by_position(i); - auto& d_col = dst.get_by_position(i); + ColumnPtr& s_cp = s_col.column; + ColumnPtr& d_cp = d_col.column; - ColumnPtr& s_cp = s_col.column; - ColumnPtr& d_cp = d_col.column; - - d_cp->assume_mutable()->insert_range_from(*s_cp, start, _cur_batch_num); - } + d_cp->assume_mutable()->insert_range_from(*s_cp, start, _cur_batch_num); + } + }); _cur_batch_num = 0; + return Status::OK(); } Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts) { @@ -426,14 +430,14 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) { // skip cur row, copy pre ctx ++_merged_rows; if (pre_ctx) { - pre_ctx->copy_rows(block); + RETURN_IF_ERROR(pre_ctx->copy_rows(block)); pre_ctx = nullptr; } } else { ctx->add_cur_batch(); if (pre_ctx != ctx) { if (pre_ctx) { - pre_ctx->copy_rows(block); + RETURN_IF_ERROR(pre_ctx->copy_rows(block)); } pre_ctx = ctx; } @@ -444,7 +448,7 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) { if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) { // current block finished, ctx not advance // so copy start_idx = (_index_in_block - _cur_batch_num + 1) - ctx->copy_rows(block, false); + RETURN_IF_ERROR(ctx->copy_rows(block, false)); pre_ctx = nullptr; } } @@ -547,7 +551,7 @@ Status VerticalFifoMergeIterator::next_batch(Block* block) { if (_cur_iter_ctx->is_cur_block_finished() || row_idx >= _block_row_max) { // current block finished, ctx not advance // so copy start_idx = (_index_in_block - _cur_batch_num + 1) - _cur_iter_ctx->copy_rows(block, false); + RETURN_IF_ERROR(_cur_iter_ctx->copy_rows(block, false)); } RETURN_IF_ERROR(_cur_iter_ctx->advance()); @@ -727,7 +731,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) { auto same_source_cnt = _row_sources_buf->same_source_count(order, limit); _row_sources_buf->advance(same_source_cnt); // copy rows to block - ctx->copy_rows(block, same_source_cnt); + RETURN_IF_ERROR(ctx->copy_rows(block, same_source_cnt)); RETURN_IF_ERROR(ctx->advance()); rows += same_source_cnt; st = _row_sources_buf->has_remaining(); diff --git a/be/src/vec/olap/vertical_merge_iterator.h b/be/src/vec/olap/vertical_merge_iterator.h index b587b6f2cd..70a452b2b6 100644 --- a/be/src/vec/olap/vertical_merge_iterator.h +++ b/be/src/vec/olap/vertical_merge_iterator.h @@ -164,8 +164,8 @@ public: Status block_reset(const std::shared_ptr<Block>& block); Status init(const StorageReadOptions& opts); bool compare(const VerticalMergeIteratorContext& rhs) const; - void copy_rows(Block* block, bool advanced = true); - void copy_rows(Block* block, size_t count); + Status copy_rows(Block* block, bool advanced = true); + Status copy_rows(Block* block, size_t count); Status advance(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
