This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new c4dcba5d22 [Bug](scan) fix core dump due to store_path_map (#23084) 
(#23131)
c4dcba5d22 is described below

commit c4dcba5d22cb6863bf6dd959825b8042f6cbd8a0
Author: Pxl <[email protected]>
AuthorDate: Thu Aug 17 20:26:57 2023 +0800

    [Bug](scan) fix core dump due to store_path_map (#23084) (#23131)
    
    fix core dump due to store_path_map
---
 be/src/runtime/exec_env.h                          |   5 +-
 be/src/runtime/exec_env_init.cpp                   |   4 -
 be/src/util/priority_work_stealing_thread_pool.hpp | 148 ---------------------
 be/src/util/work_thread_pool.hpp                   |   5 +-
 be/src/vec/exec/scan/scanner_scheduler.cpp         |   9 +-
 be/src/vec/exec/scan/vscanner.h                    |   5 +-
 6 files changed, 9 insertions(+), 167 deletions(-)

diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index f4f1cbfd67..40c489efb5 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -163,7 +163,7 @@ public:
     BlockSpillManager* block_spill_mgr() { return _block_spill_mgr; }
 
     const std::vector<StorePath>& store_paths() const { return _store_paths; }
-    size_t store_path_to_index(const std::string& path) { return 
_store_path_map[path]; }
+
     StorageEngine* storage_engine() { return _storage_engine; }
     void set_storage_engine(StorageEngine* storage_engine) { _storage_engine = 
storage_engine; }
 
@@ -193,8 +193,7 @@ private:
 
     bool _is_init;
     std::vector<StorePath> _store_paths;
-    // path => store index
-    std::map<std::string, size_t> _store_path_map;
+
     // Leave protected so that subclasses can override
     ExternalScanContextMgr* _external_scan_context_mgr = nullptr;
     doris::vectorized::VDataStreamMgr* _vstream_mgr = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 700eeda8eb..6b7e337ecd 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -105,10 +105,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
         return Status::OK();
     }
     _store_paths = store_paths;
-    // path_name => path_index
-    for (int i = 0; i < store_paths.size(); i++) {
-        _store_path_map[store_paths[i].path] = i;
-    }
 
     _external_scan_context_mgr = new ExternalScanContextMgr(this);
     _vstream_mgr = new doris::vectorized::VDataStreamMgr();
diff --git a/be/src/util/priority_work_stealing_thread_pool.hpp 
b/be/src/util/priority_work_stealing_thread_pool.hpp
deleted file mode 100644
index 02543ba6d8..0000000000
--- a/be/src/util/priority_work_stealing_thread_pool.hpp
+++ /dev/null
@@ -1,148 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <mutex>
-#include <thread>
-
-#include "util/blocking_priority_queue.hpp"
-#include "util/lock.h"
-#include "util/thread_group.h"
-
-namespace doris {
-
-// Work-Stealing threadpool which processes items (of type T) in parallel 
which were placed on multi
-// blocking queues by Offer(). Each item is processed by a single 
user-supplied method.
-class PriorityWorkStealingThreadPool : public PriorityThreadPool {
-public:
-    // Creates a new thread pool and start num_threads threads.
-    //  -- num_threads: how many threads are part of this pool
-    //  -- num_queues: how many queues are part of this pool
-    //  -- queue_size: the maximum size of the queue on which work items are 
offered. If the
-    //     queue exceeds this size, subsequent calls to Offer will block until 
there is
-    //     capacity available.
-    PriorityWorkStealingThreadPool(uint32_t num_threads, uint32_t num_queues, 
uint32_t queue_size,
-                                   const std::string& name)
-            : PriorityThreadPool(0, 0, name) {
-        DCHECK_GT(num_queues, 0);
-        DCHECK_GE(num_threads, num_queues);
-        // init _work_queues first because the work thread needs it
-        for (int i = 0; i < num_queues; ++i) {
-            
_work_queues.emplace_back(std::make_shared<BlockingPriorityQueue<Task>>(queue_size));
-        }
-        for (int i = 0; i < num_threads; ++i) {
-            _threads.create_thread(std::bind<void>(
-                    std::mem_fn(&PriorityWorkStealingThreadPool::work_thread), 
this, i));
-        }
-    }
-
-    virtual ~PriorityWorkStealingThreadPool() {
-        shutdown();
-        join();
-    }
-
-    // Blocking operation that puts a work item on the queue. If the queue is 
full, blocks
-    // until there is capacity available.
-    //
-    // 'work' is copied into the work queue, but may be referenced at any time 
in the
-    // future. Therefore the caller needs to ensure that any data referenced 
by work (if T
-    // is, e.g., a pointer type) remains valid until work has been processed, 
and it's up to
-    // the caller to provide their own signalling mechanism to detect this (or 
to wait until
-    // after DrainAndshutdown returns).
-    //
-    // Returns true if the work item was successfully added to the queue, 
false otherwise
-    // (which typically means that the thread pool has already been shut down).
-    bool offer(Task task) override { return 
_work_queues[task.queue_id]->blocking_put(task); }
-
-    bool offer(WorkFunction func) override {
-        PriorityThreadPool::Task task = {0, func, 0};
-        return _work_queues[task.queue_id]->blocking_put(task);
-    }
-
-    // Shuts the thread pool down, causing the work queue to cease accepting 
offered work
-    // and the worker threads to terminate once they have processed their 
current work item.
-    // Returns once the shutdown flag has been set, does not wait for the 
threads to
-    // terminate.
-    void shutdown() override {
-        PriorityThreadPool::shutdown();
-        for (auto work_queue : _work_queues) {
-            work_queue->shutdown();
-        }
-    }
-
-    uint32_t get_queue_size() const override {
-        uint32_t size = 0;
-        for (auto work_queue : _work_queues) {
-            size += work_queue->get_size();
-        }
-        return size;
-    }
-
-    // Blocks until the work queue is empty, and then calls shutdown to stop 
the worker
-    // threads and Join to wait until they are finished.
-    // Any work Offer()'ed during DrainAndshutdown may or may not be processed.
-    void drain_and_shutdown() override {
-        {
-            std::unique_lock l(_lock);
-            while (get_queue_size() != 0) {
-                _empty_cv.wait(l);
-            }
-        }
-        shutdown();
-        join();
-    }
-
-private:
-    // Driver method for each thread in the pool. Continues to read work from 
the queue
-    // until the pool is shutdown.
-    void work_thread(int thread_id) {
-        auto queue_id = thread_id % _work_queues.size();
-        auto steal_queue_id = (queue_id + 1) % _work_queues.size();
-        while (!is_shutdown()) {
-            Task task;
-            // avoid blocking get
-            bool is_other_queues_empty = true;
-            // steal work in round-robin if nothing to do
-            while (_work_queues[queue_id]->get_size() == 0 && queue_id != 
steal_queue_id &&
-                   !is_shutdown()) {
-                if (_work_queues[steal_queue_id]->non_blocking_get(&task)) {
-                    is_other_queues_empty = false;
-                    task.work_function();
-                }
-                steal_queue_id = (steal_queue_id + 1) % _work_queues.size();
-            }
-            if (queue_id == steal_queue_id) {
-                steal_queue_id = (steal_queue_id + 1) % _work_queues.size();
-            }
-            if (is_other_queues_empty &&
-                _work_queues[queue_id]->blocking_get(
-                        &task, 
config::doris_blocking_priority_queue_wait_timeout_ms)) {
-                task.work_function();
-            }
-            if (_work_queues[queue_id]->get_size() == 0) {
-                _empty_cv.notify_all();
-            }
-        }
-    }
-
-    // Queue on which work items are held until a thread is available to 
process them in
-    // FIFO order.
-    std::vector<std::shared_ptr<BlockingPriorityQueue<Task>>> _work_queues;
-};
-
-} // namespace doris
diff --git a/be/src/util/work_thread_pool.hpp b/be/src/util/work_thread_pool.hpp
index 109ae6ded8..bf0555db1c 100644
--- a/be/src/util/work_thread_pool.hpp
+++ b/be/src/util/work_thread_pool.hpp
@@ -42,7 +42,6 @@ public:
     public:
         int priority;
         WorkFunction work_function;
-        int queue_id;
         bool operator<(const Task& o) const { return priority < o.priority; }
 
         Task& operator++() {
@@ -88,12 +87,12 @@ public:
     virtual bool offer(Task task) { return _work_queue.blocking_put(task); }
 
     virtual bool offer(WorkFunction func) {
-        WorkThreadPool::Task task = {0, func, 0};
+        WorkThreadPool::Task task = {0, func};
         return _work_queue.blocking_put(task);
     }
 
     virtual bool try_offer(WorkFunction func) {
-        WorkThreadPool::Task task = {0, func, 0};
+        WorkThreadPool::Task task = {0, func};
         return _work_queue.try_put(task);
     }
 
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 3fa10c9d78..21db4a2094 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -40,7 +40,7 @@
 #include "util/blocking_queue.hpp"
 #include "util/cpu_info.h"
 #include "util/defer_op.h"
-#include "util/priority_work_stealing_thread_pool.hpp"
+#include "util/runtime_profile.h"
 #include "util/thread.h"
 #include "util/threadpool.h"
 #include "util/work_thread_pool.hpp"
@@ -94,9 +94,9 @@ Status ScannerScheduler::init(ExecEnv* env) {
     }
 
     // 2. local scan thread pool
-    _local_scan_thread_pool.reset(new PriorityWorkStealingThreadPool(
-            config::doris_scanner_thread_pool_thread_num, 
env->store_paths().size(),
-            config::doris_scanner_thread_pool_queue_size, "local_scan"));
+    _local_scan_thread_pool.reset(
+            new 
PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
+                                   
config::doris_scanner_thread_pool_queue_size, "local_scan"));
 
     // 3. remote scan thread pool
     ThreadPoolBuilder("RemoteScanThreadPool")
@@ -213,7 +213,6 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* 
ctx) {
                         this->_scanner_scan(this, ctx, scanner);
                     };
                     task.priority = nice;
-                    task.queue_id = (*iter)->queue_id();
                     ret = _local_scan_thread_pool->offer(task);
                 } else {
                     ret = _remote_scan_thread_pool->submit_func([this, scanner 
= *iter, ctx] {
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 97b5eb7ec5..06b3546549 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -115,13 +115,11 @@ public:
     bool is_open() { return _is_open; }
     void set_opened() { _is_open = true; }
 
-    int queue_id() { return _state->exec_env()->store_path_to_index("xxx"); }
-
     virtual doris::TabletStorageType get_storage_type() {
         return doris::TabletStorageType::STORAGE_TYPE_REMOTE;
     }
 
-    bool need_to_close() { return _need_to_close; }
+    bool need_to_close() const { return _need_to_close; }
 
     void mark_to_need_to_close() {
         // If the scanner is failed during init or open, then not need update 
counters
@@ -154,7 +152,6 @@ protected:
         _conjuncts.clear();
     }
 
-protected:
     RuntimeState* _state;
     VScanNode* _parent;
     // Set if scan node has sort limit info


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to