github-actions[bot] commented on code in PR #44690: URL: https://github.com/apache/doris/pull/44690#discussion_r1877987008
########## be/src/util/interval_histogram.h: ########## @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <algorithm> +#include <boost/circular_buffer.hpp> +#include <mutex> +#include <numeric> +#include <vector> + +namespace doris { + +template <typename T> +class IntervalHistogramStat { +public: + IntervalHistogramStat(size_t N) : window(N) {} + + void add(T value) { + std::lock_guard<std::mutex> lock(mutex); + + if (window.full()) { + window.pop_front(); + } + window.push_back(value); + } + + T mean() { + std::lock_guard<std::mutex> lock(mutex); + if (window.empty()) return T(); Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (window.empty()) { return T(); } ``` ########## be/src/util/interval_histogram.h: ########## @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <algorithm> +#include <boost/circular_buffer.hpp> Review Comment: warning: 'boost/circular_buffer.hpp' file not found [clang-diagnostic-error] ```cpp #include <boost/circular_buffer.hpp> ^ ``` ########## be/src/vec/exec/scan/scanner_context.h: ########## @@ -163,6 +170,16 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>, bool _should_reset_thread_name = true; + void decrease_scanner_scheduled() { + std::lock_guard<std::mutex> l(_transfer_lock); + _num_scheduled_scanners--; + } + + int32_t num_scheduled_scanners() { Review Comment: warning: method 'num_scheduled_scanners' can be made const [readability-make-member-function-const] ```suggestion int32_t num_scheduled_scanners() const { ``` ########## be/src/vec/exec/scan/scanner_context.cpp: ########## @@ -541,4 +448,145 @@ void ScannerContext::update_peak_running_scanner(int num) { _local_state->_peak_running_scanner->add(num); } +int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock) { + // margin_1 is used to ensure each scan operator could have at least _min_scan_concurrency. + int32_t margin_1 = _min_concurrency - (_tasks_queue.size() + _num_scheduled_scanners); + + // margin_2 is used to ensure the scan scheduler could have at least _min_scan_concurrency_of_scan_scheduler scan tasks. + int32_t margin_2 = + _min_concurrency_of_scan_scheduler - + (_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size()); + + int32_t margin = 0; + // We dont want any scan operator to occupy all the resources. + if (margin_1 >= margin_2) { + margin = margin_1; + } else { + // 到这里说明 _scan_scheduler 的资源使用没有饱和,我们可以让当前的 scan operator 多提交一些 scan task + // 但是不希望 _scan_schedulers 被某一个 scan operator 占用完,所以我们限制 margin 的大小 + // 默认参数下,一个 scan node 会有 C/2 个 scan operator,那么每个 scan operator + // 提交 8 个 scan task 的话,一个 scan node 会有 (C/2) * 8 = 4C 个 task + // 恰好是一个 scan scheduler 的最小并发数 + margin = _basic_margin; + // margin = 8; + } + + VLOG_DEBUG << fmt::format( + "[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}), margin_2: {} = {} - " + "({} + {}), margin: {}", + print_id(_query_id), ctx_id, margin_1, _min_concurrency, _tasks_queue.size(), + _num_scheduled_scanners, margin_2, _min_concurrency_of_scan_scheduler, + _scanner_scheduler->get_active_threads(), _scanner_scheduler->get_queue_size(), margin); + + return margin; +} + +Status ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task, + std::unique_lock<std::mutex>& transfer_lock) { + std::list<std::shared_ptr<ScanTask>> tasks_to_submit; + + // TODO: REFINE ME + _scanner_scheduler->write_lock(); + Defer defer1([&]() { _scanner_scheduler->write_unlock(); }); + + int32_t margin = _get_margin(transfer_lock); + + // margin is less than zero. Means this scan operator could not submit any scan task for now. + if (margin <= 0) { + // Be careful with current scan task. + // We need to add it back to task queue to make sure it could be resubmitted. + if (current_scan_task && current_scan_task->cached_blocks.empty() && + !current_scan_task->is_eos()) { + _pending_scanners.push(current_scan_task->scanner); + // _pending_scanners.enqueue(current_scan_task->scanner); + // _tasks_queue.push_back(current_scan_task); + // DorisMetrics::instance()->scanner_context_scan_task_queue_size->increment(1); + VLOG_DEBUG << fmt::format( + "{} push back scanner to task queue, because diff <= 0, task_queue size " + "{}, _num_scheduled_scanners {}", + ctx_id, _tasks_queue.size(), _num_scheduled_scanners); + } + return Status::OK(); + } + + bool first_pull = true; + + while (margin-- > 0) { + std::shared_ptr<ScanTask> task_to_run; + const int32_t current_concurrency = + _tasks_queue.size() + _num_scheduled_scanners + tasks_to_submit.size(); + VLOG_DEBUG << fmt::format("{} currenct concurrency: {} = {} + {} + {}", ctx_id, + current_concurrency, _tasks_queue.size(), _num_scheduled_scanners, + tasks_to_submit.size()); + if (first_pull) { + task_to_run = pull_next_scan_task(current_scan_task, current_concurrency); + if (task_to_run == nullptr) { + // In two situations we will get nullptr. + // 1. current_concurrency alread reach _max_concurrency. + // 2. all scanners are finished. + if (current_scan_task && current_scan_task->cached_blocks.empty() && + !current_scan_task->is_eos()) { + // Current scan task is not eos, but we can not resubmit it. + // This usally happens when we should downgrade the concurrency. + // Add current_scan_task back to task queue, so that we have chance to resubmit it in the future. + _pending_scanners.push(current_scan_task->scanner); + } + } + first_pull = false; + } else { + task_to_run = pull_next_scan_task(nullptr, current_concurrency); + } + + if (task_to_run) { + tasks_to_submit.push_back(task_to_run); + } else { + break; + } + } + + if (tasks_to_submit.empty()) { + return Status::OK(); + } + + VLOG_DEBUG << fmt::format("[{}:{}] submit {} scan tasks to scheduler, remaining scanner: {}", + print_id(_query_id), ctx_id, tasks_to_submit.size(), + _pending_scanners.size()); + + for (auto& scan_task_iter : tasks_to_submit) { + Status submit_status = submit_scan_task(scan_task_iter, transfer_lock); + if (!submit_status.ok()) { + _process_status = submit_status; + _set_scanner_done(); + return _process_status; + } + } + + return Status::OK(); +} + +std::shared_ptr<ScanTask> ScannerContext::pull_next_scan_task( + std::shared_ptr<ScanTask> current_scan_task, int32_t current_concurrency) { + if (current_concurrency >= _max_concurrency) { + VLOG_DEBUG << fmt::format( Review Comment: warning: method 'pull_next_scan_task' can be made const [readability-make-member-function-const] be/src/vec/exec/scan/scanner_context.cpp:570: ```diff - task( + task( const ``` be/src/vec/exec/scan/scanner_context.h:249: ```diff - int32_t current_concurrency); + int32_t current_concurrency) const; ``` ########## be/src/util/interval_histogram.h: ########## @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <algorithm> +#include <boost/circular_buffer.hpp> +#include <mutex> +#include <numeric> +#include <vector> + +namespace doris { + +template <typename T> +class IntervalHistogramStat { +public: + IntervalHistogramStat(size_t N) : window(N) {} + + void add(T value) { + std::lock_guard<std::mutex> lock(mutex); + + if (window.full()) { + window.pop_front(); + } + window.push_back(value); + } + + T mean() { + std::lock_guard<std::mutex> lock(mutex); + if (window.empty()) return T(); + + T sum = std::accumulate(window.begin(), window.end(), T()); + return sum / window.size(); + } + + T median() { + std::lock_guard<std::mutex> lock(mutex); + if (window.empty()) return T(); Review Comment: warning: statement should be inside braces [readability-braces-around-statements] ```suggestion if (window.empty()) { return T(); } ``` ########## be/src/vec/exec/scan/scanner_scheduler.cpp: ########## @@ -376,4 +384,57 @@ int ScannerScheduler::get_remote_scan_thread_queue_size() { return config::doris_remote_scanner_thread_pool_queue_size; } +// The name of these varialbs will be useds as metric name in prometheus. +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scan_scheduler_running_tasks, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scan_scheduler_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scan_scheduler_queue_capacity, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scan_scheduler_max_thread_number, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scan_scheduler_wait_worker_ns_avg_in_last_1000_times, + MetricUnit::NANOSECONDS); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scan_scheduler_cpu_time_ns_avg_in_last_1000_times, + MetricUnit::NANOSECONDS); + +SimplifiedScanScheduler::SimplifiedScanScheduler(std::string sched_name, + std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl) Review Comment: warning: pass by value and use std::move [modernize-pass-by-value] be/src/vec/exec/scan/scanner_scheduler.cpp:399: ```diff - : _is_stop(false), _cgroup_cpu_ctl(cg_cpu_ctl), _sched_name(sched_name) { + : _is_stop(false), _cgroup_cpu_ctl(cg_cpu_ctl), _sched_name(std::move(sched_name)) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org