This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 7fb98a1062b [bug](shared scan) Fix use-after-free when enable pipeline
shared scanning (#26199) (#26269)
7fb98a1062b is described below
commit 7fb98a1062b50a05f1596f36f06e70b3c2220c52
Author: Gabriel <[email protected]>
AuthorDate: Thu Nov 2 11:14:45 2023 +0800
[bug](shared scan) Fix use-after-free when enable pipeline shared scanning
(#26199) (#26269)
When enable shared scan, all scanners will be created by one instance. When
the main instance reach eos and quit, all states of it will be released. But
other instances are still possible to get block from those scanners. So we must
assure scanners will not be dependent on any states of the main instance after
it quit.
---
be/src/vec/exec/scan/pip_scanner_context.h | 10 ++++++++--
be/src/vec/exec/scan/scanner_context.cpp | 6 ++++--
2 files changed, 12 insertions(+), 4 deletions(-)
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index 13a0ef5b671..b98c628368e 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -62,8 +62,6 @@ public:
}
}
- RETURN_IF_ERROR(validate_block_schema((*block).get()));
-
_current_used_bytes -= (*block)->allocated_bytes();
return Status::OK();
}
@@ -79,6 +77,10 @@ public:
if (_need_colocate_distribute) {
std::vector<uint64_t> hash_vals;
for (const auto& block : blocks) {
+ auto st = validate_block_schema(block.get());
+ if (!st.ok()) {
+ set_status_on_error(st, false);
+ }
// vectorized calculate hash
int rows = block->rows();
const auto element_size = _num_parallel_instances;
@@ -110,6 +112,10 @@ public:
}
} else {
for (const auto& block : blocks) {
+ auto st = validate_block_schema(block.get());
+ if (!st.ok()) {
+ set_status_on_error(st, false);
+ }
local_bytes += block->allocated_bytes();
}
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index a5f575d1750..478d9fb4cb7 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -150,6 +150,10 @@ void
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
std::lock_guard l(_transfer_lock);
auto old_bytes_in_queue = _cur_bytes_in_queue;
for (auto& b : blocks) {
+ auto st = validate_block_schema(b.get());
+ if (!st.ok()) {
+ set_status_on_error(st, false);
+ }
_cur_bytes_in_queue += b->allocated_bytes();
_blocks_queue.push_back(std::move(b));
}
@@ -201,8 +205,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
*block = std::move(_blocks_queue.front());
_blocks_queue.pop_front();
- RETURN_IF_ERROR(validate_block_schema((*block).get()));
-
auto block_bytes = (*block)->allocated_bytes();
_cur_bytes_in_queue -= block_bytes;
_queued_blocks_memory_usage->add(-block_bytes);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]