This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new c4dcba5d22 [Bug](scan) fix core dump due to store_path_map (#23084)
(#23131)
c4dcba5d22 is described below
commit c4dcba5d22cb6863bf6dd959825b8042f6cbd8a0
Author: Pxl <[email protected]>
AuthorDate: Thu Aug 17 20:26:57 2023 +0800
[Bug](scan) fix core dump due to store_path_map (#23084) (#23131)
fix core dump due to store_path_map
---
be/src/runtime/exec_env.h | 5 +-
be/src/runtime/exec_env_init.cpp | 4 -
be/src/util/priority_work_stealing_thread_pool.hpp | 148 ---------------------
be/src/util/work_thread_pool.hpp | 5 +-
be/src/vec/exec/scan/scanner_scheduler.cpp | 9 +-
be/src/vec/exec/scan/vscanner.h | 5 +-
6 files changed, 9 insertions(+), 167 deletions(-)
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index f4f1cbfd67..40c489efb5 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -163,7 +163,7 @@ public:
BlockSpillManager* block_spill_mgr() { return _block_spill_mgr; }
const std::vector<StorePath>& store_paths() const { return _store_paths; }
- size_t store_path_to_index(const std::string& path) { return
_store_path_map[path]; }
+
StorageEngine* storage_engine() { return _storage_engine; }
void set_storage_engine(StorageEngine* storage_engine) { _storage_engine =
storage_engine; }
@@ -193,8 +193,7 @@ private:
bool _is_init;
std::vector<StorePath> _store_paths;
- // path => store index
- std::map<std::string, size_t> _store_path_map;
+
// Leave protected so that subclasses can override
ExternalScanContextMgr* _external_scan_context_mgr = nullptr;
doris::vectorized::VDataStreamMgr* _vstream_mgr = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 700eeda8eb..6b7e337ecd 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -105,10 +105,6 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
return Status::OK();
}
_store_paths = store_paths;
- // path_name => path_index
- for (int i = 0; i < store_paths.size(); i++) {
- _store_path_map[store_paths[i].path] = i;
- }
_external_scan_context_mgr = new ExternalScanContextMgr(this);
_vstream_mgr = new doris::vectorized::VDataStreamMgr();
diff --git a/be/src/util/priority_work_stealing_thread_pool.hpp
b/be/src/util/priority_work_stealing_thread_pool.hpp
deleted file mode 100644
index 02543ba6d8..0000000000
--- a/be/src/util/priority_work_stealing_thread_pool.hpp
+++ /dev/null
@@ -1,148 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <mutex>
-#include <thread>
-
-#include "util/blocking_priority_queue.hpp"
-#include "util/lock.h"
-#include "util/thread_group.h"
-
-namespace doris {
-
-// Work-Stealing threadpool which processes items (of type T) in parallel
which were placed on multi
-// blocking queues by Offer(). Each item is processed by a single
user-supplied method.
-class PriorityWorkStealingThreadPool : public PriorityThreadPool {
-public:
- // Creates a new thread pool and start num_threads threads.
- // -- num_threads: how many threads are part of this pool
- // -- num_queues: how many queues are part of this pool
- // -- queue_size: the maximum size of the queue on which work items are
offered. If the
- // queue exceeds this size, subsequent calls to Offer will block until
there is
- // capacity available.
- PriorityWorkStealingThreadPool(uint32_t num_threads, uint32_t num_queues,
uint32_t queue_size,
- const std::string& name)
- : PriorityThreadPool(0, 0, name) {
- DCHECK_GT(num_queues, 0);
- DCHECK_GE(num_threads, num_queues);
- // init _work_queues first because the work thread needs it
- for (int i = 0; i < num_queues; ++i) {
-
_work_queues.emplace_back(std::make_shared<BlockingPriorityQueue<Task>>(queue_size));
- }
- for (int i = 0; i < num_threads; ++i) {
- _threads.create_thread(std::bind<void>(
- std::mem_fn(&PriorityWorkStealingThreadPool::work_thread),
this, i));
- }
- }
-
- virtual ~PriorityWorkStealingThreadPool() {
- shutdown();
- join();
- }
-
- // Blocking operation that puts a work item on the queue. If the queue is
full, blocks
- // until there is capacity available.
- //
- // 'work' is copied into the work queue, but may be referenced at any time
in the
- // future. Therefore the caller needs to ensure that any data referenced
by work (if T
- // is, e.g., a pointer type) remains valid until work has been processed,
and it's up to
- // the caller to provide their own signalling mechanism to detect this (or
to wait until
- // after DrainAndshutdown returns).
- //
- // Returns true if the work item was successfully added to the queue,
false otherwise
- // (which typically means that the thread pool has already been shut down).
- bool offer(Task task) override { return
_work_queues[task.queue_id]->blocking_put(task); }
-
- bool offer(WorkFunction func) override {
- PriorityThreadPool::Task task = {0, func, 0};
- return _work_queues[task.queue_id]->blocking_put(task);
- }
-
- // Shuts the thread pool down, causing the work queue to cease accepting
offered work
- // and the worker threads to terminate once they have processed their
current work item.
- // Returns once the shutdown flag has been set, does not wait for the
threads to
- // terminate.
- void shutdown() override {
- PriorityThreadPool::shutdown();
- for (auto work_queue : _work_queues) {
- work_queue->shutdown();
- }
- }
-
- uint32_t get_queue_size() const override {
- uint32_t size = 0;
- for (auto work_queue : _work_queues) {
- size += work_queue->get_size();
- }
- return size;
- }
-
- // Blocks until the work queue is empty, and then calls shutdown to stop
the worker
- // threads and Join to wait until they are finished.
- // Any work Offer()'ed during DrainAndshutdown may or may not be processed.
- void drain_and_shutdown() override {
- {
- std::unique_lock l(_lock);
- while (get_queue_size() != 0) {
- _empty_cv.wait(l);
- }
- }
- shutdown();
- join();
- }
-
-private:
- // Driver method for each thread in the pool. Continues to read work from
the queue
- // until the pool is shutdown.
- void work_thread(int thread_id) {
- auto queue_id = thread_id % _work_queues.size();
- auto steal_queue_id = (queue_id + 1) % _work_queues.size();
- while (!is_shutdown()) {
- Task task;
- // avoid blocking get
- bool is_other_queues_empty = true;
- // steal work in round-robin if nothing to do
- while (_work_queues[queue_id]->get_size() == 0 && queue_id !=
steal_queue_id &&
- !is_shutdown()) {
- if (_work_queues[steal_queue_id]->non_blocking_get(&task)) {
- is_other_queues_empty = false;
- task.work_function();
- }
- steal_queue_id = (steal_queue_id + 1) % _work_queues.size();
- }
- if (queue_id == steal_queue_id) {
- steal_queue_id = (steal_queue_id + 1) % _work_queues.size();
- }
- if (is_other_queues_empty &&
- _work_queues[queue_id]->blocking_get(
- &task,
config::doris_blocking_priority_queue_wait_timeout_ms)) {
- task.work_function();
- }
- if (_work_queues[queue_id]->get_size() == 0) {
- _empty_cv.notify_all();
- }
- }
- }
-
- // Queue on which work items are held until a thread is available to
process them in
- // FIFO order.
- std::vector<std::shared_ptr<BlockingPriorityQueue<Task>>> _work_queues;
-};
-
-} // namespace doris
diff --git a/be/src/util/work_thread_pool.hpp b/be/src/util/work_thread_pool.hpp
index 109ae6ded8..bf0555db1c 100644
--- a/be/src/util/work_thread_pool.hpp
+++ b/be/src/util/work_thread_pool.hpp
@@ -42,7 +42,6 @@ public:
public:
int priority;
WorkFunction work_function;
- int queue_id;
bool operator<(const Task& o) const { return priority < o.priority; }
Task& operator++() {
@@ -88,12 +87,12 @@ public:
virtual bool offer(Task task) { return _work_queue.blocking_put(task); }
virtual bool offer(WorkFunction func) {
- WorkThreadPool::Task task = {0, func, 0};
+ WorkThreadPool::Task task = {0, func};
return _work_queue.blocking_put(task);
}
virtual bool try_offer(WorkFunction func) {
- WorkThreadPool::Task task = {0, func, 0};
+ WorkThreadPool::Task task = {0, func};
return _work_queue.try_put(task);
}
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 3fa10c9d78..21db4a2094 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -40,7 +40,7 @@
#include "util/blocking_queue.hpp"
#include "util/cpu_info.h"
#include "util/defer_op.h"
-#include "util/priority_work_stealing_thread_pool.hpp"
+#include "util/runtime_profile.h"
#include "util/thread.h"
#include "util/threadpool.h"
#include "util/work_thread_pool.hpp"
@@ -94,9 +94,9 @@ Status ScannerScheduler::init(ExecEnv* env) {
}
// 2. local scan thread pool
- _local_scan_thread_pool.reset(new PriorityWorkStealingThreadPool(
- config::doris_scanner_thread_pool_thread_num,
env->store_paths().size(),
- config::doris_scanner_thread_pool_queue_size, "local_scan"));
+ _local_scan_thread_pool.reset(
+ new
PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
+
config::doris_scanner_thread_pool_queue_size, "local_scan"));
// 3. remote scan thread pool
ThreadPoolBuilder("RemoteScanThreadPool")
@@ -213,7 +213,6 @@ void ScannerScheduler::_schedule_scanners(ScannerContext*
ctx) {
this->_scanner_scan(this, ctx, scanner);
};
task.priority = nice;
- task.queue_id = (*iter)->queue_id();
ret = _local_scan_thread_pool->offer(task);
} else {
ret = _remote_scan_thread_pool->submit_func([this, scanner
= *iter, ctx] {
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 97b5eb7ec5..06b3546549 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -115,13 +115,11 @@ public:
bool is_open() { return _is_open; }
void set_opened() { _is_open = true; }
- int queue_id() { return _state->exec_env()->store_path_to_index("xxx"); }
-
virtual doris::TabletStorageType get_storage_type() {
return doris::TabletStorageType::STORAGE_TYPE_REMOTE;
}
- bool need_to_close() { return _need_to_close; }
+ bool need_to_close() const { return _need_to_close; }
void mark_to_need_to_close() {
// If the scanner is failed during init or open, then not need update
counters
@@ -154,7 +152,6 @@ protected:
_conjuncts.clear();
}
-protected:
RuntimeState* _state;
VScanNode* _parent;
// Set if scan node has sort limit info
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]