This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 76ac77b1d0 [fix](compaction) compaction should catch exception when 
read next block (#24484)
76ac77b1d0 is described below

commit 76ac77b1d0a237727117cefb8534d028befd059c
Author: huanghaibin <[email protected]>
AuthorDate: Sat Sep 16 16:25:55 2023 +0800

    [fix](compaction) compaction should catch exception when read next block 
(#24484)
---
 be/src/vec/olap/vcollect_iterator.cpp       | 11 +++---
 be/src/vec/olap/vertical_merge_iterator.cpp | 52 ++++++++++++++++-------------
 be/src/vec/olap/vertical_merge_iterator.h   |  4 +--
 3 files changed, 37 insertions(+), 30 deletions(-)

diff --git a/be/src/vec/olap/vcollect_iterator.cpp 
b/be/src/vec/olap/vcollect_iterator.cpp
index e62b9ca4ce..b34893b630 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -795,10 +795,13 @@ Status 
VCollectIterator::Level1Iterator::_merge_next(Block* block) {
         // clear block column data
         if (pre_row_ref.row_pos + continuous_row_in_block == 
pre_row_ref.block->rows()) {
             const auto& src_block = pre_row_ref.block;
-            for (size_t i = 0; i < column_count; ++i) {
-                
target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column),
-                                                     pre_row_ref.row_pos, 
continuous_row_in_block);
-            }
+            RETURN_IF_CATCH_EXCEPTION({
+                for (size_t i = 0; i < column_count; ++i) {
+                    
target_columns[i]->insert_range_from(*(src_block->get_by_position(i).column),
+                                                         pre_row_ref.row_pos,
+                                                         
continuous_row_in_block);
+                }
+            });
             continuous_row_in_block = 0;
             pre_row_ref.reset();
         }
diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp 
b/be/src/vec/olap/vertical_merge_iterator.cpp
index 6e84c5f2b1..3ff1282fd2 100644
--- a/be/src/vec/olap/vertical_merge_iterator.cpp
+++ b/be/src/vec/olap/vertical_merge_iterator.cpp
@@ -277,45 +277,49 @@ bool VerticalMergeIteratorContext::compare(const 
VerticalMergeIteratorContext& r
     return result;
 }
 
-void VerticalMergeIteratorContext::copy_rows(Block* block, size_t count) {
+Status VerticalMergeIteratorContext::copy_rows(Block* block, size_t count) {
     Block& src = *_block;
     Block& dst = *block;
     DCHECK(count > 0);
 
     auto start = _index_in_block;
     _index_in_block += count - 1;
+    RETURN_IF_CATCH_EXCEPTION({
+        for (size_t i = 0; i < _ori_return_cols; ++i) {
+            auto& s_col = src.get_by_position(i);
+            auto& d_col = dst.get_by_position(i);
 
-    for (size_t i = 0; i < _ori_return_cols; ++i) {
-        auto& s_col = src.get_by_position(i);
-        auto& d_col = dst.get_by_position(i);
+            ColumnPtr& s_cp = s_col.column;
+            ColumnPtr& d_cp = d_col.column;
 
-        ColumnPtr& s_cp = s_col.column;
-        ColumnPtr& d_cp = d_col.column;
-
-        d_cp->assume_mutable()->insert_range_from(*s_cp, start, count);
-    }
+            d_cp->assume_mutable()->insert_range_from(*s_cp, start, count);
+        }
+    });
+    return Status::OK();
 }
 // `advanced = false` when current block finished
-void VerticalMergeIteratorContext::copy_rows(Block* block, bool advanced) {
+Status VerticalMergeIteratorContext::copy_rows(Block* block, bool advanced) {
     Block& src = *_block;
     Block& dst = *block;
     if (_cur_batch_num == 0) {
-        return;
+        return Status::OK();
     }
 
     // copy a row to dst block column by column
     size_t start = _index_in_block - _cur_batch_num + 1 - advanced;
+    RETURN_IF_CATCH_EXCEPTION({
+        for (size_t i = 0; i < _ori_return_cols; ++i) {
+            auto& s_col = src.get_by_position(i);
+            auto& d_col = dst.get_by_position(i);
 
-    for (size_t i = 0; i < _ori_return_cols; ++i) {
-        auto& s_col = src.get_by_position(i);
-        auto& d_col = dst.get_by_position(i);
+            ColumnPtr& s_cp = s_col.column;
+            ColumnPtr& d_cp = d_col.column;
 
-        ColumnPtr& s_cp = s_col.column;
-        ColumnPtr& d_cp = d_col.column;
-
-        d_cp->assume_mutable()->insert_range_from(*s_cp, start, 
_cur_batch_num);
-    }
+            d_cp->assume_mutable()->insert_range_from(*s_cp, start, 
_cur_batch_num);
+        }
+    });
     _cur_batch_num = 0;
+    return Status::OK();
 }
 
 Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts) {
@@ -426,14 +430,14 @@ Status VerticalHeapMergeIterator::next_batch(Block* 
block) {
             // skip cur row, copy pre ctx
             ++_merged_rows;
             if (pre_ctx) {
-                pre_ctx->copy_rows(block);
+                RETURN_IF_ERROR(pre_ctx->copy_rows(block));
                 pre_ctx = nullptr;
             }
         } else {
             ctx->add_cur_batch();
             if (pre_ctx != ctx) {
                 if (pre_ctx) {
-                    pre_ctx->copy_rows(block);
+                    RETURN_IF_ERROR(pre_ctx->copy_rows(block));
                 }
                 pre_ctx = ctx;
             }
@@ -444,7 +448,7 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) {
             if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) {
                 // current block finished, ctx not advance
                 // so copy start_idx = (_index_in_block - _cur_batch_num + 1)
-                ctx->copy_rows(block, false);
+                RETURN_IF_ERROR(ctx->copy_rows(block, false));
                 pre_ctx = nullptr;
             }
         }
@@ -547,7 +551,7 @@ Status VerticalFifoMergeIterator::next_batch(Block* block) {
         if (_cur_iter_ctx->is_cur_block_finished() || row_idx >= 
_block_row_max) {
             // current block finished, ctx not advance
             // so copy start_idx = (_index_in_block - _cur_batch_num + 1)
-            _cur_iter_ctx->copy_rows(block, false);
+            RETURN_IF_ERROR(_cur_iter_ctx->copy_rows(block, false));
         }
 
         RETURN_IF_ERROR(_cur_iter_ctx->advance());
@@ -727,7 +731,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) {
         auto same_source_cnt = _row_sources_buf->same_source_count(order, 
limit);
         _row_sources_buf->advance(same_source_cnt);
         // copy rows to block
-        ctx->copy_rows(block, same_source_cnt);
+        RETURN_IF_ERROR(ctx->copy_rows(block, same_source_cnt));
         RETURN_IF_ERROR(ctx->advance());
         rows += same_source_cnt;
         st = _row_sources_buf->has_remaining();
diff --git a/be/src/vec/olap/vertical_merge_iterator.h 
b/be/src/vec/olap/vertical_merge_iterator.h
index b587b6f2cd..70a452b2b6 100644
--- a/be/src/vec/olap/vertical_merge_iterator.h
+++ b/be/src/vec/olap/vertical_merge_iterator.h
@@ -164,8 +164,8 @@ public:
     Status block_reset(const std::shared_ptr<Block>& block);
     Status init(const StorageReadOptions& opts);
     bool compare(const VerticalMergeIteratorContext& rhs) const;
-    void copy_rows(Block* block, bool advanced = true);
-    void copy_rows(Block* block, size_t count);
+    Status copy_rows(Block* block, bool advanced = true);
+    Status copy_rows(Block* block, size_t count);
 
     Status advance();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to