This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new f03cee5e30d [enhancement](oom) add exception in olap data convertor
when memory is not enough to prevent oom (#35761)
f03cee5e30d is described below
commit f03cee5e30d919833a70e7da8a8c5f22f20f9a28
Author: yiguolei <[email protected]>
AuthorDate: Sun Jun 2 21:11:18 2024 +0800
[enhancement](oom) add exception in olap data convertor when memory is not
enough to prevent oom (#35761)
Issue Number: close #xxx
<!--Describe your changes.-->
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/olap/rowset/segment_v2/plain_page.h | 2 +-
be/src/olap/rowset/segment_v2/segment_writer.cpp | 8 ++++----
.../rowset/segment_v2/vertical_segment_writer.cpp | 12 ++++++------
be/src/vec/core/block.cpp | 19 +++++++++++--------
be/src/vec/core/block.h | 2 +-
be/src/vec/olap/olap_data_convertor.cpp | 19 +++++++++++++------
be/src/vec/olap/olap_data_convertor.h | 4 ++--
be/src/vec/sink/group_commit_block_sink.cpp | 2 +-
be/src/vec/sink/writer/vtablet_writer.cpp | 3 ++-
9 files changed, 41 insertions(+), 30 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/plain_page.h
b/be/src/olap/rowset/segment_v2/plain_page.h
index cbcc96f31ba..af31275002a 100644
--- a/be/src/olap/rowset/segment_v2/plain_page.h
+++ b/be/src/olap/rowset/segment_v2/plain_page.h
@@ -39,7 +39,7 @@ public:
Status init() override {
// Reserve enough space for the page, plus a bit of slop since
// we often overrun the page by a few values.
- _buffer.reserve(_options.data_page_size + 1024);
+ RETURN_IF_CATCH_EXCEPTION(_buffer.reserve(_options.data_page_size +
1024));
return reset();
}
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 83e93631ab1..ec3bb9c993e 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -385,8 +385,8 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
for (auto i : including_cids) {
full_block.replace_by_position(i,
block->get_by_position(input_id++).column);
}
-
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block,
row_pos, num_rows,
-
including_cids);
+
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
+ &full_block, row_pos, num_rows, including_cids));
bool have_input_seq_column = false;
// write including columns
@@ -561,8 +561,8 @@ Status
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
}
// convert missing columns and send to column writer
-
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block,
row_pos, num_rows,
-
missing_cids);
+
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
+ &full_block, row_pos, num_rows, missing_cids));
for (auto cid : missing_cids) {
auto converted_result = _olap_data_convertor->convert_column_data(cid);
if (!converted_result.first.ok()) {
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 5d2ddedb204..48b892afc38 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -321,8 +321,8 @@ Status
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
for (auto i : including_cids) {
full_block.replace_by_position(i,
data.block->get_by_position(input_id++).column);
}
-
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block,
data.row_pos,
-
data.num_rows, including_cids);
+
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
+ &full_block, data.row_pos, data.num_rows, including_cids));
bool have_input_seq_column = false;
// write including columns
@@ -497,8 +497,8 @@ Status
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
// convert missing columns and send to column writer
const auto& missing_cids =
_opts.rowset_ctx->partial_update_info->missing_cids;
-
_olap_data_convertor->set_source_content_with_specifid_columns(&full_block,
data.row_pos,
-
data.num_rows, missing_cids);
+
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
+ &full_block, data.row_pos, data.num_rows, missing_cids));
for (auto cid : missing_cids) {
auto [status, column] = _olap_data_convertor->convert_column_data(cid);
if (!status.ok()) {
@@ -747,8 +747,8 @@ Status VerticalSegmentWriter::write_batch() {
for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
RETURN_IF_ERROR(_create_column_writer(cid,
_tablet_schema->column(cid)));
for (auto& data : _batched_blocks) {
- _olap_data_convertor->set_source_content_with_specifid_columns(
- data.block, data.row_pos, data.num_rows,
std::vector<uint32_t> {cid});
+
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
+ data.block, data.row_pos, data.num_rows,
std::vector<uint32_t> {cid}));
// convert column data from engine format to storage layer format
auto [status, column] =
_olap_data_convertor->convert_column_data(cid);
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 466c9b3b559..e6bedd6c78e 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -786,15 +786,18 @@ Block Block::copy_block(const std::vector<int>&
column_offset) const {
return columns_with_type_and_name;
}
-void Block::append_to_block_by_selector(MutableBlock* dst,
- const IColumn::Selector& selector)
const {
- DCHECK_EQ(data.size(), dst->mutable_columns().size());
- for (size_t i = 0; i < data.size(); i++) {
- // FIXME: this is a quickfix. we assume that only partition functions
make there some
- if (!is_column_const(*data[i].column)) {
- data[i].column->append_data_by_selector(dst->mutable_columns()[i],
selector);
+Status Block::append_to_block_by_selector(MutableBlock* dst,
+ const IColumn::Selector& selector)
const {
+ RETURN_IF_CATCH_EXCEPTION({
+ DCHECK_EQ(data.size(), dst->mutable_columns().size());
+ for (size_t i = 0; i < data.size(); i++) {
+ // FIXME: this is a quickfix. we assume that only partition
functions make there some
+ if (!is_column_const(*data[i].column)) {
+
data[i].column->append_data_by_selector(dst->mutable_columns()[i], selector);
+ }
}
- }
+ });
+ return Status::OK();
}
Status Block::filter_block(Block* block, const std::vector<uint32_t>&
columns_to_filter,
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 89f8e99b66a..c9b3f2d5b5e 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -274,7 +274,7 @@ public:
// copy a new block by the offset column
Block copy_block(const std::vector<int>& column_offset) const;
- void append_to_block_by_selector(MutableBlock* dst, const
IColumn::Selector& selector) const;
+ Status append_to_block_by_selector(MutableBlock* dst, const
IColumn::Selector& selector) const;
// need exception safety
static void filter_block_internal(Block* block, const
std::vector<uint32_t>& columns_to_filter,
diff --git a/be/src/vec/olap/olap_data_convertor.cpp
b/be/src/vec/olap/olap_data_convertor.cpp
index 3da1f7c8678..86c1d2d6669 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -214,16 +214,19 @@ void OlapBlockDataConvertor::set_source_content(const
vectorized::Block* block,
}
}
-void OlapBlockDataConvertor::set_source_content_with_specifid_columns(
+Status OlapBlockDataConvertor::set_source_content_with_specifid_columns(
const vectorized::Block* block, size_t row_pos, size_t num_rows,
std::vector<uint32_t> cids) {
DCHECK(block != nullptr);
DCHECK(num_rows > 0);
DCHECK(row_pos + num_rows <= block->rows());
- for (auto i : cids) {
- DCHECK(i < _convertors.size());
- _convertors[i]->set_source_column(block->get_by_position(i), row_pos,
num_rows);
- }
+ RETURN_IF_CATCH_EXCEPTION({
+ for (auto i : cids) {
+ DCHECK(i < _convertors.size());
+ _convertors[i]->set_source_column(block->get_by_position(i),
row_pos, num_rows);
+ }
+ });
+ return Status::OK();
}
void OlapBlockDataConvertor::clear_source_content() {
@@ -235,7 +238,11 @@ void OlapBlockDataConvertor::clear_source_content() {
std::pair<Status, IOlapColumnDataAccessor*>
OlapBlockDataConvertor::convert_column_data(
size_t cid) {
assert(cid < _convertors.size());
- auto status = _convertors[cid]->convert_to_olap();
+ auto convert_func = [&]() -> Status {
+
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_convertors[cid]->convert_to_olap());
+ return Status::OK();
+ };
+ auto status = convert_func();
return {status, _convertors[cid].get()};
}
diff --git a/be/src/vec/olap/olap_data_convertor.h
b/be/src/vec/olap/olap_data_convertor.h
index d6a721f9792..0ec720fcdc1 100644
--- a/be/src/vec/olap/olap_data_convertor.h
+++ b/be/src/vec/olap/olap_data_convertor.h
@@ -75,8 +75,8 @@ public:
OlapBlockDataConvertor(const TabletSchema* tablet_schema);
OlapBlockDataConvertor(const TabletSchema* tablet_schema, const
std::vector<uint32_t>& col_ids);
void set_source_content(const vectorized::Block* block, size_t row_pos,
size_t num_rows);
- void set_source_content_with_specifid_columns(const vectorized::Block*
block, size_t row_pos,
- size_t num_rows,
std::vector<uint32_t> cids);
+ Status set_source_content_with_specifid_columns(const vectorized::Block*
block, size_t row_pos,
+ size_t num_rows,
std::vector<uint32_t> cids);
void clear_source_content();
std::pair<Status, IOlapColumnDataAccessor*> convert_column_data(size_t
cid);
void add_column_data_convertor(const TabletColumn& column);
diff --git a/be/src/vec/sink/group_commit_block_sink.cpp
b/be/src/vec/sink/group_commit_block_sink.cpp
index 8aa60bb3f22..97ab60a8801 100644
--- a/be/src/vec/sink/group_commit_block_sink.cpp
+++ b/be/src/vec/sink/group_commit_block_sink.cpp
@@ -217,7 +217,7 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
for (auto i = 0; i < block->rows(); i++) {
selector.emplace_back(i);
}
- block->append_to_block_by_selector(cur_mutable_block.get(), selector);
+
RETURN_IF_ERROR(block->append_to_block_by_selector(cur_mutable_block.get(),
selector));
}
std::shared_ptr<vectorized::Block> output_block =
vectorized::Block::create_shared();
output_block->swap(cur_mutable_block->to_block());
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 70d1c05b453..818bff422f9 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -516,7 +516,8 @@ Status VNodeChannel::add_block(vectorized::Block* block,
const Payload* payload)
}
SCOPED_RAW_TIMER(&_stat.append_node_channel_ns);
- block->append_to_block_by_selector(_cur_mutable_block.get(),
*(payload->first));
+ RETURN_IF_ERROR(
+ block->append_to_block_by_selector(_cur_mutable_block.get(),
*(payload->first)));
for (auto tablet_id : payload->second) {
_cur_add_block_request->add_tablet_ids(tablet_id);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]