This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit eb0a41fdb0642baa61a58f925bb82157330abe7a Author: Alexey Serbin <ale...@apache.org> AuthorDate: Wed May 8 17:43:57 2024 -0700 [common] get rid of MutexLock Since contemporary STL library provides both std::lock_guard and std::unique_lock, there is no need to keep MutexLock. Change-Id: I49e0ef2c688ef8be74d018bb9bffe70b6655e654 Reviewed-on: http://gerrit.cloudera.org:8080/21415 Tested-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com> --- src/kudu/clock/builtin_ntp.cc | 8 +-- src/kudu/master/auto_leader_rebalancer.cc | 2 +- src/kudu/master/catalog_manager.cc | 4 +- src/kudu/rpc/service_pool.cc | 8 ++- src/kudu/rpc/service_pool.h | 2 +- src/kudu/rpc/service_queue.h | 4 +- src/kudu/server/diagnostics_log.cc | 14 ++--- src/kudu/tablet/tablet_metadata.cc | 8 +-- src/kudu/tablet/tablet_metadata.h | 2 +- src/kudu/tools/table_scanner.cc | 4 +- src/kudu/tserver/heartbeater.cc | 6 +- src/kudu/tserver/scanners.cc | 4 +- src/kudu/tserver/scanners.h | 2 +- src/kudu/tserver/tablet_copy_service.cc | 16 +++--- src/kudu/tserver/tablet_copy_service.h | 2 +- src/kudu/util/async_logger.cc | 13 +++-- src/kudu/util/async_logger.h | 6 +- src/kudu/util/barrier.h | 4 +- src/kudu/util/blocking_queue-test.cc | 6 +- src/kudu/util/blocking_queue.h | 25 ++++---- src/kudu/util/cloud/instance_detector.cc | 6 +- src/kudu/util/countdown_latch.h | 17 +++--- src/kudu/util/debug/trace_event_impl.cc | 6 +- src/kudu/util/debug/trace_event_synthetic_delay.cc | 16 +++--- src/kudu/util/kernel_stack_watchdog.cc | 2 +- src/kudu/util/mem_tracker.cc | 12 ++-- src/kudu/util/mem_tracker.h | 1 + src/kudu/util/mutex.h | 67 +--------------------- src/kudu/util/pstack_watcher.cc | 8 +-- src/kudu/util/rwc_lock.cc | 14 +++-- src/kudu/util/test_graph.cc | 5 +- src/kudu/util/threadpool.cc | 41 ++++++------- src/kudu/util/threadpool.h | 8 +-- 33 files changed, 146 insertions(+), 197 deletions(-) diff --git a/src/kudu/clock/builtin_ntp.cc b/src/kudu/clock/builtin_ntp.cc index 29b64f541..5a28980c9 100644 --- a/src/kudu/clock/builtin_ntp.cc +++ b/src/kudu/clock/builtin_ntp.cc @@ -548,7 +548,7 @@ BuiltInNtp::~BuiltInNtp() { } Status BuiltInNtp::Init() { - MutexLock l(state_lock_); + std::lock_guard l(state_lock_); CHECK_EQ(kUninitialized, state_); RETURN_NOT_OK(InitImpl()); @@ -669,13 +669,13 @@ Status BuiltInNtp::PopulateServers(std::vector<HostPort> servers) { } bool BuiltInNtp::is_shutdown() const { - MutexLock l(state_lock_); + std::lock_guard l(state_lock_); return state_ == kShutdown; } void BuiltInNtp::Shutdown() { { - MutexLock l(state_lock_); + std::lock_guard l(state_lock_); if (state_ == kShutdown) { return; } @@ -1116,7 +1116,7 @@ Status BuiltInNtp::CombineClocks() { // We got a valid clock result, so wake up Init() that we are ready to be used. { - MutexLock l(state_lock_); + std::lock_guard l(state_lock_); if (state_ == kStarting) { state_ = kStarted; } diff --git a/src/kudu/master/auto_leader_rebalancer.cc b/src/kudu/master/auto_leader_rebalancer.cc index fdf058623..b6bd1268d 100644 --- a/src/kudu/master/auto_leader_rebalancer.cc +++ b/src/kudu/master/auto_leader_rebalancer.cc @@ -387,7 +387,7 @@ Status AutoLeaderRebalancerTask::RunLeaderRebalanceForTable( } Status AutoLeaderRebalancerTask::RunLeaderRebalancer() { - MutexLock auto_lock(running_mutex_); + std::lock_guard guard(running_mutex_); // If catalog manager isn't initialized or isn't the leader, don't do leader // rebalancing. Putting the auto-rebalancer to sleep shouldn't affect the diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc index 42aeee833..98af0f0b3 100644 --- a/src/kudu/master/catalog_manager.cc +++ b/src/kudu/master/catalog_manager.cc @@ -742,13 +742,13 @@ class CatalogManagerBgTasks { void Shutdown(); void Wake() { - MutexLock lock(lock_); + std::lock_guard lock(lock_); pending_updates_ = true; cond_.Broadcast(); } void Wait(int msec) { - MutexLock lock(lock_); + std::lock_guard lock(lock_); if (closing_) return; if (!pending_updates_) { cond_.WaitFor(MonoDelta::FromMilliseconds(msec)); diff --git a/src/kudu/rpc/service_pool.cc b/src/kudu/rpc/service_pool.cc index 848e07d04..1d7a4a07b 100644 --- a/src/kudu/rpc/service_pool.cc +++ b/src/kudu/rpc/service_pool.cc @@ -101,10 +101,12 @@ Status ServicePool::Init(int num_threads) { void ServicePool::Shutdown() { service_queue_.Shutdown(); - MutexLock lock(shutdown_lock_); - if (closing_) return; + std::lock_guard lock(shutdown_lock_); + if (closing_) { + return; + } closing_ = true; - // TODO: Use a proper thread pool implementation. + // TODO(mpercy): Use a proper thread pool implementation. for (scoped_refptr<kudu::Thread>& thread : threads_) { CHECK_OK(ThreadJoiner(thread.get()).Join()); } diff --git a/src/kudu/rpc/service_pool.h b/src/kudu/rpc/service_pool.h index 1eb7c4756..62f68cef1 100644 --- a/src/kudu/rpc/service_pool.h +++ b/src/kudu/rpc/service_pool.h @@ -99,7 +99,7 @@ class ServicePool : public RpcService { scoped_refptr<Counter> rpcs_timed_out_in_queue_; scoped_refptr<Counter> rpcs_queue_overflow_; - mutable Mutex shutdown_lock_; + Mutex shutdown_lock_; bool closing_; std::function<void(void)> too_busy_hook_; diff --git a/src/kudu/rpc/service_queue.h b/src/kudu/rpc/service_queue.h index ff2df1a67..8cbf1dca6 100644 --- a/src/kudu/rpc/service_queue.h +++ b/src/kudu/rpc/service_queue.h @@ -144,7 +144,7 @@ class LifoServiceQueue final { } void Post(InboundCall* call) { - MutexLock l(lock_); + std::lock_guard l(lock_); DCHECK(!call_); call_ = call; should_wake_ = true; @@ -152,7 +152,7 @@ class LifoServiceQueue final { } InboundCall* Wait() { - MutexLock l(lock_); + std::lock_guard l(lock_); while (!should_wake_) { cond_.Wait(); } diff --git a/src/kudu/server/diagnostics_log.cc b/src/kudu/server/diagnostics_log.cc index e5f81870d..3ae7e1758 100644 --- a/src/kudu/server/diagnostics_log.cc +++ b/src/kudu/server/diagnostics_log.cc @@ -21,6 +21,7 @@ #include <cstdint> #include <functional> #include <memory> +#include <mutex> #include <optional> #include <ostream> #include <queue> @@ -46,7 +47,6 @@ #include "kudu/util/logging.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" -#include "kudu/util/mutex.h" #include "kudu/util/random.h" #include "kudu/util/random_util.h" #include "kudu/util/rolling_log.h" @@ -126,12 +126,12 @@ DiagnosticsLog::~DiagnosticsLog() { } void DiagnosticsLog::SetMetricsLogInterval(MonoDelta interval) { - MutexLock l(lock_); + std::lock_guard l(lock_); metrics_log_interval_ = interval; } void DiagnosticsLog::DumpStacksNow(std::string reason) { - MutexLock l(lock_); + std::lock_guard l(lock_); dump_stacks_now_reason_ = std::move(reason); wake_.Signal(); } @@ -154,7 +154,7 @@ void DiagnosticsLog::Stop() { if (!thread_) return; { - MutexLock l(lock_); + std::lock_guard l(lock_); stop_ = true; wake_.Signal(); } @@ -191,7 +191,7 @@ MonoTime DiagnosticsLog::ComputeNextWakeup(DiagnosticsLog::WakeupType type) cons } void DiagnosticsLog::RunThread() { - MutexLock l(lock_); + std::unique_lock l(lock_); // Set up a priority queue which tracks our future scheduled wake-ups. typedef pair<MonoTime, WakeupType> QueueElem; @@ -221,8 +221,8 @@ void DiagnosticsLog::RunThread() { // Unlock the mutex while actually logging metrics or stacks since it's somewhat // slow and we don't want to block threads trying to signal us. - l.Unlock(); - SCOPED_CLEANUP({ l.Lock(); }); + l.unlock(); + SCOPED_CLEANUP({ l.lock(); }); Status s; if (what == WakeupType::METRICS) { WARN_NOT_OK(LogMetrics(), "Unable to collect metrics to diagnostics log"); diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc index 3aae9500f..9b7206f88 100644 --- a/src/kudu/tablet/tablet_metadata.cc +++ b/src/kudu/tablet/tablet_metadata.cc @@ -611,7 +611,7 @@ Status TabletMetadata::Flush() { TRACE_EVENT1("tablet", "TabletMetadata::Flush", "tablet_id", tablet_id_); - MutexLock l_flush(flush_lock_); + std::unique_lock l_flush(flush_lock_); BlockIdContainer orphaned; TabletSuperBlockPB pb; vector<unique_ptr<MinLogIndexAnchorer>> anchors_needing_flush; @@ -639,7 +639,7 @@ Status TabletMetadata::Flush() { pre_flush_callback_(); RETURN_NOT_OK(ReplaceSuperBlockUnlocked(pb)); TRACE("Metadata flushed"); - l_flush.Unlock(); + l_flush.unlock(); // Now that we've flushed, we can unanchor our WALs by destructing our // anchors. @@ -692,7 +692,7 @@ Status TabletMetadata::UpdateUnlocked( Status TabletMetadata::ReplaceSuperBlock(const TabletSuperBlockPB &pb) { { - MutexLock l(flush_lock_); + std::lock_guard l(flush_lock_); RETURN_NOT_OK_PREPEND(ReplaceSuperBlockUnlocked(pb), "Unable to replace superblock"); fs_manager_->dd_manager()->DeleteDataDirGroup(tablet_id_); } @@ -719,7 +719,7 @@ Status TabletMetadata::ReplaceSuperBlockUnlocked(const TabletSuperBlockPB &pb) { } void TabletMetadata::SetPreFlushCallback(StatusClosure callback) { - MutexLock l_flush(flush_lock_); + std::lock_guard l(flush_lock_); pre_flush_callback_ = std::move(callback); } diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h index b5580c6e6..16e382b85 100644 --- a/src/kudu/tablet/tablet_metadata.h +++ b/src/kudu/tablet/tablet_metadata.h @@ -420,7 +420,7 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> { // Lock protecting flushing the data to disk. // If taken together with 'data_lock_', must be acquired first. - mutable Mutex flush_lock_; + Mutex flush_lock_; const std::string tablet_id_; std::string table_id_; diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc index 35b87f70f..5e33c8182 100644 --- a/src/kudu/tools/table_scanner.cc +++ b/src/kudu/tools/table_scanner.cc @@ -599,7 +599,7 @@ Status TableScanner::ScanData(const vector<KuduScanToken*>& tokens, if (FLAGS_report_scanner_stats && out_) { auto& out = *out_; - MutexLock l(output_lock_); + std::lock_guard l(output_lock_); out << Substitute("T $0 scanned $1 rows in $2 seconds\n", token->tablet().id(), count, sw.elapsed().wall_seconds()); const auto& metrics = scanner->GetResourceMetrics(); @@ -617,7 +617,7 @@ void TableScanner::ScanTask(const vector<KuduScanToken*>& tokens, Status* thread DCHECK(thread_status); *thread_status = ScanData(tokens, [&](const KuduScanBatch& batch) { if (out_ && FLAGS_show_values) { - MutexLock l(output_lock_); + std::lock_guard l(output_lock_); for (const auto& row : batch) { *out_ << row.ToString() << "\n"; } diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc index 94757699b..e88a297bc 100644 --- a/src/kudu/tserver/heartbeater.cc +++ b/src/kudu/tserver/heartbeater.cc @@ -620,7 +620,7 @@ void Heartbeater::Thread::RunThread() { // Wait for either the heartbeat interval to elapse, or for an "ASAP" heartbeat, // or for the signal to shut down. { - MutexLock l(mutex_); + std::lock_guard l(mutex_); while (next_heartbeat > MonoTime::Now() && !heartbeat_asap_ && should_run_) { @@ -730,7 +730,7 @@ Status Heartbeater::Thread::Stop() { } { - MutexLock l(mutex_); + std::lock_guard l(mutex_); should_run_ = false; cond_.Signal(); } @@ -740,7 +740,7 @@ Status Heartbeater::Thread::Stop() { } void Heartbeater::Thread::TriggerASAP() { - MutexLock l(mutex_); + std::lock_guard l(mutex_); heartbeat_asap_ = true; cond_.Signal(); } diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc index 29590b4b3..06703666d 100644 --- a/src/kudu/tserver/scanners.cc +++ b/src/kudu/tserver/scanners.cc @@ -160,7 +160,7 @@ ScannerManager::ScannerManager(const scoped_refptr<MetricEntity>& metric_entity) ScannerManager::~ScannerManager() { { - MutexLock l(shutdown_lock_); + std::lock_guard l(shutdown_lock_); shutdown_ = true; shutdown_cv_.Broadcast(); } @@ -181,7 +181,7 @@ void ScannerManager::RunCollectAndRemovalThread() { while (true) { // Loop until we are shutdown. { - MutexLock l(shutdown_lock_); + std::lock_guard l(shutdown_lock_); if (shutdown_) { return; } diff --git a/src/kudu/tserver/scanners.h b/src/kudu/tserver/scanners.h index 0fbaa8ec5..f46d0aa73 100644 --- a/src/kudu/tserver/scanners.h +++ b/src/kudu/tserver/scanners.h @@ -172,7 +172,7 @@ class ScannerManager { // If true, removal thread should shut itself down. Protected // by 'shutdown_lock_' and 'shutdown_cv_'. bool shutdown_; - mutable Mutex shutdown_lock_; + Mutex shutdown_lock_; ConditionVariable shutdown_cv_; std::vector<ScannerMapStripe*> scanner_maps_; diff --git a/src/kudu/tserver/tablet_copy_service.cc b/src/kudu/tserver/tablet_copy_service.cc index 473ee88d1..036366e18 100644 --- a/src/kudu/tserver/tablet_copy_service.cc +++ b/src/kudu/tserver/tablet_copy_service.cc @@ -154,7 +154,7 @@ void TabletCopyServiceImpl::BeginTabletCopySession( scoped_refptr<RemoteTabletCopySourceSession> session; bool new_session; { - MutexLock l(sessions_lock_); + std::lock_guard l(sessions_lock_); const SessionEntry* session_entry = FindOrNull(sessions_, session_id); new_session = session_entry == nullptr; if (new_session) { @@ -189,7 +189,7 @@ void TabletCopyServiceImpl::BeginTabletCopySession( // fails, then remove it from the sessions map. Status status = session->Init(); if (!status.ok()) { - MutexLock l(sessions_lock_); + std::lock_guard l(sessions_lock_); SessionEntry* session_entry = FindOrNull(sessions_, session_id); // Identity check the session to ensure that other interleaved threads have // not already removed the failed session, and replaced it with a new one. @@ -227,7 +227,7 @@ void TabletCopyServiceImpl::BeginTabletCopySession( LOG_WITH_PREFIX(WARNING) << "Timing out tablet copy session due to flag " << "--tablet_copy_early_session_timeout_prob " << "being set to " << timeout_prob; - MutexLock l(sessions_lock_); + std::lock_guard l(sessions_lock_); TabletCopyErrorPB::Code app_error; WARN_NOT_OK(TabletCopyServiceImpl::DoEndTabletCopySessionUnlocked(session_id, &app_error), Substitute("Unable to forcibly end tablet copy session $0", session_id)); @@ -244,7 +244,7 @@ void TabletCopyServiceImpl::CheckSessionActive( // Look up and validate tablet copy session. scoped_refptr<RemoteTabletCopySourceSession> session; - MutexLock l(sessions_lock_); + std::lock_guard l(sessions_lock_); TabletCopyErrorPB::Code app_error; Status status = FindSessionUnlocked(session_id, &app_error, &session); if (status.ok()) { @@ -273,7 +273,7 @@ void TabletCopyServiceImpl::FetchData(const FetchDataRequestPB* req, // Look up and validate tablet copy session. scoped_refptr<RemoteTabletCopySourceSession> session; { - MutexLock l(sessions_lock_); + std::lock_guard l(sessions_lock_); TabletCopyErrorPB::Code app_error = TabletCopyErrorPB::UNKNOWN_ERROR; RPC_RETURN_NOT_OK(FindSessionUnlocked(session_id, &app_error, &session), app_error, "No such session", context); @@ -337,7 +337,7 @@ void TabletCopyServiceImpl::EndTabletCopySession( EndTabletCopySessionResponsePB* /*resp*/, rpc::RpcContext* context) { { - MutexLock l(sessions_lock_); + std::lock_guard l(sessions_lock_); TabletCopyErrorPB::Code app_error = TabletCopyErrorPB::UNKNOWN_ERROR; LOG_WITH_PREFIX(INFO) << "Request end of tablet copy session " << req->session_id() << " received from " << context->requestor_string(); @@ -352,7 +352,7 @@ void TabletCopyServiceImpl::Shutdown() { session_expiration_thread_->Join(); // Destroy all tablet copy sessions. - MutexLock l(sessions_lock_); + std::lock_guard l(sessions_lock_); auto iter = sessions_.cbegin(); while (iter != sessions_.cend()) { const string& session_id = iter->first; @@ -438,7 +438,7 @@ Status TabletCopyServiceImpl::DoEndTabletCopySessionUnlocked( void TabletCopyServiceImpl::EndExpiredSessions() { do { - MutexLock l(sessions_lock_); + std::lock_guard l(sessions_lock_); const MonoTime now = MonoTime::Now(); vector<SessionEntry> expired_session_entries; diff --git a/src/kudu/tserver/tablet_copy_service.h b/src/kudu/tserver/tablet_copy_service.h index 44ec596e2..90da992e9 100644 --- a/src/kudu/tserver/tablet_copy_service.h +++ b/src/kudu/tserver/tablet_copy_service.h @@ -123,7 +123,7 @@ class TabletCopyServiceImpl : public TabletCopyServiceIf { TabletReplicaLookupIf* tablet_replica_lookup_; // Protects sessions_ map. - mutable Mutex sessions_lock_; + Mutex sessions_lock_; SessionMap sessions_; ThreadSafeRandom rand_; diff --git a/src/kudu/util/async_logger.cc b/src/kudu/util/async_logger.cc index 3a2617740..6655ce6c3 100644 --- a/src/kudu/util/async_logger.cc +++ b/src/kudu/util/async_logger.cc @@ -17,6 +17,7 @@ #include "kudu/util/async_logger.h" +#include <mutex> #include <string> #include <thread> @@ -47,7 +48,7 @@ void AsyncLogger::Start() { void AsyncLogger::Stop() { { - MutexLock l(lock_); + std::lock_guard l(lock_); DCHECK_EQ(state_, RUNNING); state_ = STOPPED; wake_flusher_cond_.Signal(); @@ -62,7 +63,7 @@ void AsyncLogger::Write(bool force_flush, const char* message, size_t message_len) { { - MutexLock l(lock_); + std::lock_guard l(lock_); DCHECK_EQ(state_, RUNNING); while (BufferFull(*active_buf_)) { app_threads_blocked_count_for_tests_++; @@ -91,7 +92,7 @@ void AsyncLogger::Write(bool force_flush, } void AsyncLogger::Flush() { - MutexLock l(lock_); + std::lock_guard l(lock_); DCHECK_EQ(state_, RUNNING); // Wake up the writer thread at least twice. @@ -110,7 +111,7 @@ uint32_t AsyncLogger::LogSize() { } void AsyncLogger::RunThread() { - MutexLock l(lock_); + std::unique_lock l(lock_); while (state_ == RUNNING || active_buf_->needs_flush_or_write()) { while (!active_buf_->needs_flush_or_write() && state_ == RUNNING) { if (!wake_flusher_cond_.WaitFor(MonoDelta::FromSeconds(FLAGS_logbufsecs))) { @@ -126,7 +127,7 @@ void AsyncLogger::RunThread() { if (BufferFull(*flushing_buf_)) { free_buffer_cond_.Broadcast(); } - l.Unlock(); + l.unlock(); for (const auto& msg : flushing_buf_->messages) { wrapped_->Write(false, msg.ts, msg.message.data(), msg.message.size()); @@ -136,7 +137,7 @@ void AsyncLogger::RunThread() { } flushing_buf_->clear(); - l.Lock(); + l.lock(); flush_count_++; flush_complete_cond_.Broadcast(); } diff --git a/src/kudu/util/async_logger.h b/src/kudu/util/async_logger.h index 8c7c347e2..e9af3b51a 100644 --- a/src/kudu/util/async_logger.h +++ b/src/kudu/util/async_logger.h @@ -16,11 +16,10 @@ // under the License. #pragma once -#include "kudu/gutil/macros.h" - #include <cstdint> #include <ctime> #include <memory> +#include <mutex> // IWYU pragma: keep #include <string> #include <thread> #include <utility> @@ -28,6 +27,7 @@ #include <glog/logging.h> +#include "kudu/gutil/macros.h" #include "kudu/util/condition_variable.h" #include "kudu/util/mutex.h" @@ -100,7 +100,7 @@ class AsyncLogger final : public google::base::Logger { // blocked due to the buffers being full and the writer thread // not keeping up. int app_threads_blocked_count_for_tests() const { - MutexLock l(lock_); + std::lock_guard l(lock_); return app_threads_blocked_count_for_tests_; } diff --git a/src/kudu/util/barrier.h b/src/kudu/util/barrier.h index 88e56828d..8b5bf7eab 100644 --- a/src/kudu/util/barrier.h +++ b/src/kudu/util/barrier.h @@ -16,6 +16,8 @@ // under the License. #pragma once +#include <mutex> + #include "kudu/gutil/macros.h" #include "kudu/util/condition_variable.h" #include "kudu/util/mutex.h" @@ -42,7 +44,7 @@ class Barrier { // to the initial count. void Wait() { ThreadRestrictions::AssertWaitAllowed(); - MutexLock l(mutex_); + std::lock_guard l(mutex_); if (--count_ == 0) { count_ = initial_count_; cycle_count_++; diff --git a/src/kudu/util/blocking_queue-test.cc b/src/kudu/util/blocking_queue-test.cc index 11c4c6acc..eea7cc0e5 100644 --- a/src/kudu/util/blocking_queue-test.cc +++ b/src/kudu/util/blocking_queue-test.cc @@ -294,7 +294,7 @@ class MultiThreadTest { for (int i = 0; i < blocking_puts_; i++) { ASSERT_OK(queue_.BlockingPut(arg)); } - MutexLock guard(lock_); + std::lock_guard guard(lock_); if (--num_inserters_ == 0) { queue_.Shutdown(); } @@ -307,7 +307,7 @@ class MultiThreadTest { if (!s.ok()) { arg = -1; } - MutexLock guard(lock_); + std::lock_guard guard(lock_); gotten_[arg] = gotten_[arg] + 1; } } @@ -324,7 +324,7 @@ class MultiThreadTest { thread.join(); } // Let's check to make sure we got what we should have. - MutexLock guard(lock_); + std::lock_guard guard(lock_); for (int i = 0; i < nthreads_; i++) { ASSERT_EQ(puts_ + blocking_puts_, gotten_[i]); } diff --git a/src/kudu/util/blocking_queue.h b/src/kudu/util/blocking_queue.h index 8c4a853a5..6724082bc 100644 --- a/src/kudu/util/blocking_queue.h +++ b/src/kudu/util/blocking_queue.h @@ -21,6 +21,7 @@ #include <algorithm> #include <deque> #include <memory> +#include <mutex> #include <string> #include <type_traits> #include <utility> @@ -83,13 +84,13 @@ class BlockingQueue { // - TimedOut if the deadline passed // - Aborted if the queue shut down Status BlockingGet(T* out, MonoTime deadline = {}) { - MutexLock l(lock_); + std::unique_lock l(lock_); while (true) { if (!queue_.empty()) { *out = std::move(queue_.front()); queue_.pop_front(); decrement_size_unlocked(*out); - l.Unlock(); + l.unlock(); not_full_.Signal(); return Status::OK(); } @@ -118,7 +119,7 @@ class BlockingQueue { // - TimedOut if the deadline passed // - Aborted if the queue shut down Status BlockingDrainTo(std::vector<T>* out, MonoTime deadline = {}) { - MutexLock l(lock_); + std::unique_lock l(lock_); while (true) { if (!queue_.empty()) { out->reserve(queue_.size()); @@ -127,7 +128,7 @@ class BlockingQueue { } std::move(queue_.begin(), queue_.end(), std::back_inserter(*out)); queue_.clear(); - l.Unlock(); + l.unlock(); not_full_.Signal(); return Status::OK(); } @@ -153,7 +154,7 @@ class BlockingQueue { // https://en.cppreference.com/w/cpp/utility/forward for details. template<typename U> QueueStatus Put(U&& val) { - MutexLock l(lock_); + std::unique_lock l(lock_); if (PREDICT_FALSE(shutdown_)) { return QUEUE_SHUTDOWN; } @@ -162,7 +163,7 @@ class BlockingQueue { } increment_size_unlocked(val); queue_.emplace_back(std::forward<U>(val)); - l.Unlock(); + l.unlock(); not_empty_.Signal(); return QUEUE_SUCCESS; } @@ -187,7 +188,7 @@ class BlockingQueue { if (PREDICT_FALSE(deadline.Initialized() && MonoTime::Now() > deadline)) { return Status::TimedOut(""); } - MutexLock l(lock_); + std::unique_lock l(lock_); while (true) { if (PREDICT_FALSE(shutdown_)) { return Status::Aborted(""); @@ -195,7 +196,7 @@ class BlockingQueue { if (size_ < max_size_) { increment_size_unlocked(val); queue_.emplace_back(std::forward<U>(val)); - l.Unlock(); + l.unlock(); not_empty_.Signal(); return Status::OK(); } @@ -215,14 +216,14 @@ class BlockingQueue { // Existing elements will drain out of it, and then BlockingGet will start // returning false. void Shutdown() { - MutexLock l(lock_); + std::lock_guard l(lock_); shutdown_ = true; not_full_.Broadcast(); not_empty_.Broadcast(); } bool empty() const { - MutexLock l(lock_); + std::lock_guard l(lock_); return queue_.empty(); } @@ -231,14 +232,14 @@ class BlockingQueue { } size_t size() const { - MutexLock l(lock_); + std::lock_guard l(lock_); return size_; } std::string ToString() const { std::string ret; - MutexLock l(lock_); + std::lock_guard l(lock_); for (const T& t : queue_) { ret.append(t->ToString()); ret.append("\n"); diff --git a/src/kudu/util/cloud/instance_detector.cc b/src/kudu/util/cloud/instance_detector.cc index 83968643b..0d3d8f2d8 100644 --- a/src/kudu/util/cloud/instance_detector.cc +++ b/src/kudu/util/cloud/instance_detector.cc @@ -61,7 +61,7 @@ InstanceDetector::~InstanceDetector() { Status InstanceDetector::Detect(unique_ptr<InstanceMetadata>* metadata) { { // An extra sanity check. - MutexLock lock(mutex_); + std::lock_guard lock(mutex_); CHECK_EQ(0, num_running_detectors_); CHECK_EQ(kNoIdx, result_detector_idx_); num_running_detectors_ = detectors_.size(); @@ -85,7 +85,7 @@ Status InstanceDetector::Detect(unique_ptr<InstanceMetadata>* metadata) { // Spurious wakeups are ignored by the virtue of checking the value of the // 'result_detector_idx_' field. { - MutexLock lock(mutex_); + std::lock_guard lock(mutex_); while (result_detector_idx_ == kNoIdx && num_running_detectors_ > 0) { cv_.Wait(); } @@ -103,7 +103,7 @@ void InstanceDetector::GetInstanceInfo(InstanceMetadata* imd, size_t idx) { DCHECK(imd); const auto s = imd->Init(); { - MutexLock lock(mutex_); + std::lock_guard lock(mutex_); --num_running_detectors_; if (s.ok()) { CHECK_EQ(kNoIdx, result_detector_idx_) diff --git a/src/kudu/util/countdown_latch.h b/src/kudu/util/countdown_latch.h index c75348634..c3adc6b56 100644 --- a/src/kudu/util/countdown_latch.h +++ b/src/kudu/util/countdown_latch.h @@ -14,8 +14,10 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#ifndef KUDU_UTIL_COUNTDOWN_LATCH_H -#define KUDU_UTIL_COUNTDOWN_LATCH_H + +#pragma once + +#include <mutex> #include "kudu/gutil/macros.h" #include "kudu/util/condition_variable.h" @@ -42,7 +44,7 @@ class CountDownLatch { // Otherwise, returns true. bool CountDown(int amount) { DCHECK_GE(amount, 0); - MutexLock lock(lock_); + std::lock_guard lock(lock_); if (count_ == 0) { return false; } @@ -72,7 +74,7 @@ class CountDownLatch { // If the count is already zero, this returns immediately. void Wait() const { ThreadRestrictions::AssertWaitAllowed(); - MutexLock lock(lock_); + std::lock_guard lock(lock_); while (count_ > 0) { cond_.Wait(); } @@ -82,7 +84,7 @@ class CountDownLatch { // Returns true if the count became zero, false otherwise. bool WaitUntil(const MonoTime& when) const { ThreadRestrictions::AssertWaitAllowed(); - MutexLock lock(lock_); + std::lock_guard lock(lock_); while (count_ > 0) { if (!cond_.WaitUntil(when)) { return false; @@ -101,7 +103,7 @@ class CountDownLatch { // the latch. If 'count' is 0, and there are currently waiters, those waiters // will be triggered as if you counted down to 0. void Reset(uint64_t count) { - MutexLock lock(lock_); + std::lock_guard lock(lock_); count_ = count; if (count_ == 0) { // Awake any waiters if we reset to 0. @@ -110,7 +112,7 @@ class CountDownLatch { } uint64_t count() const { - MutexLock lock(lock_); + std::lock_guard lock(lock_); return count_; } @@ -137,4 +139,3 @@ class CountDownOnScopeExit { }; } // namespace kudu -#endif diff --git a/src/kudu/util/debug/trace_event_impl.cc b/src/kudu/util/debug/trace_event_impl.cc index 5cd038eef..19a93c568 100644 --- a/src/kudu/util/debug/trace_event_impl.cc +++ b/src/kudu/util/debug/trace_event_impl.cc @@ -1569,7 +1569,7 @@ void TraceLog::Flush(const TraceLog::OutputCallback& cb) { { // Holding the active threads lock ensures that no thread will exit and // delete its own PerThreadInfo object. - MutexLock l(active_threads_lock_); + std::lock_guard l(active_threads_lock_); for (const ActiveThreadMap::value_type& entry : active_threads_) { int64_t tid = entry.first; PerThreadInfo* thr_info = entry.second; @@ -1729,7 +1729,7 @@ TraceLog::PerThreadInfo* TraceLog::SetupThreadLocalBuffer() { threadlocal::internal::AddDestructor(&TraceLog::ThreadExitingCB, this); { - MutexLock lock(active_threads_lock_); + std::lock_guard lock_(active_threads_lock_); InsertOrDie(&active_threads_, cur_tid, thr_info); } return thr_info; @@ -1759,7 +1759,7 @@ void TraceLog::ThreadExiting() { delete buf; { - MutexLock lock(active_threads_lock_); + std::lock_guard lock_(active_threads_lock_); active_threads_.erase(cur_tid); } delete thr_info; diff --git a/src/kudu/util/debug/trace_event_synthetic_delay.cc b/src/kudu/util/debug/trace_event_synthetic_delay.cc index 615911281..776df84f4 100644 --- a/src/kudu/util/debug/trace_event_synthetic_delay.cc +++ b/src/kudu/util/debug/trace_event_synthetic_delay.cc @@ -64,19 +64,19 @@ void TraceEventSyntheticDelay::Initialize( } void TraceEventSyntheticDelay::SetTargetDuration(const MonoDelta& target_duration) { - MutexLock lock(lock_); + std::lock_guard lock(lock_); target_duration_ = target_duration; trigger_count_ = 0; begin_count_ = 0; } void TraceEventSyntheticDelay::SetMode(Mode mode) { - MutexLock lock(lock_); + std::lock_guard lock(lock_); mode_ = mode; } void TraceEventSyntheticDelay::SetClock(TraceEventSyntheticDelayClock* clock) { - MutexLock lock(lock_); + std::lock_guard lock(lock_); clock_ = clock; } @@ -92,7 +92,7 @@ void TraceEventSyntheticDelay::Begin() { MonoTime start_time = clock_->Now(); { - MutexLock lock(lock_); + std::lock_guard lock(lock_); if (++begin_count_ != 1) return; end_time_ = CalculateEndTimeLocked(start_time); @@ -109,7 +109,7 @@ void TraceEventSyntheticDelay::BeginParallel(MonoTime* out_end_time) { MonoTime start_time = clock_->Now(); { - MutexLock lock(lock_); + std::lock_guard lock(lock_); *out_end_time = CalculateEndTimeLocked(start_time); } } @@ -122,7 +122,7 @@ void TraceEventSyntheticDelay::End() { MonoTime end_time; { - MutexLock lock(lock_); + std::lock_guard lock(lock_); if (!begin_count_ || --begin_count_ != 0) return; end_time = end_time_; @@ -170,7 +170,7 @@ TraceEventSyntheticDelay* TraceEventSyntheticDelayRegistry::GetOrCreateDelay( return &delays_[i]; } - MutexLock lock(lock_); + std::lock_guard lock(lock_); delay_count = base::subtle::Acquire_Load(&delay_count_); for (int i = 0; i < delay_count; ++i) { if (!strcmp(name, delays_[i].name_.c_str())) @@ -192,7 +192,7 @@ MonoTime TraceEventSyntheticDelayRegistry::Now() { } void TraceEventSyntheticDelayRegistry::ResetAllDelays() { - MutexLock lock(lock_); + std::lock_guard lock(lock_); int delay_count = base::subtle::Acquire_Load(&delay_count_); for (int i = 0; i < delay_count; ++i) { delays_[i].SetTargetDuration(MonoDelta()); diff --git a/src/kudu/util/kernel_stack_watchdog.cc b/src/kudu/util/kernel_stack_watchdog.cc index c920e4e11..f83c6693d 100644 --- a/src/kudu/util/kernel_stack_watchdog.cc +++ b/src/kudu/util/kernel_stack_watchdog.cc @@ -148,7 +148,7 @@ void KernelStackWatchdog::RunThread() { // NOTE: it's still possible that the thread will have exited in between grabbing its pointer // and sending a signal, but DumpThreadStack() already is safe about not sending a signal // to some other non-Kudu thread. - MutexLock l(unregister_lock_); + std::lock_guard l(unregister_lock_); // Take the snapshot of the thread information under a short lock. // diff --git a/src/kudu/util/mem_tracker.cc b/src/kudu/util/mem_tracker.cc index 80c66da80..fa4cc0d80 100644 --- a/src/kudu/util/mem_tracker.cc +++ b/src/kudu/util/mem_tracker.cc @@ -90,7 +90,7 @@ MemTracker::~MemTracker() { << " has unreleased consumption " << consumption(); parent_->Release(consumption()); - MutexLock l(parent_->child_trackers_lock_); + std::lock_guard l(parent_->child_trackers_lock_); if (child_tracker_it_ != parent_->child_trackers_.end()) { parent_->child_trackers_.erase(child_tracker_it_); child_tracker_it_ = parent_->child_trackers_.end(); @@ -124,7 +124,7 @@ bool MemTracker::FindTrackerInternal(const string& id, list<weak_ptr<MemTracker>> children; { - MutexLock l(parent->child_trackers_lock_); + std::lock_guard l(parent->child_trackers_lock_); children = parent->child_trackers_; } @@ -162,7 +162,7 @@ shared_ptr<MemTracker> MemTracker::FindOrCreateGlobalTracker( // globally-visible MemTrackers which are the exception rather than the rule, // it's reasonable to synchronize their creation on a singleton lock. static Mutex find_or_create_lock; - MutexLock l(find_or_create_lock); + std::lock_guard l(find_or_create_lock); shared_ptr<MemTracker> found; if (FindTrackerInternal(id, &found, GetRootTracker())) { @@ -181,7 +181,7 @@ void MemTracker::ListTrackers(vector<shared_ptr<MemTracker>>* trackers) { trackers->push_back(t); { - MutexLock l(t->child_trackers_lock_); + std::lock_guard l(t->child_trackers_lock_); for (const auto& child_weak : t->child_trackers_) { shared_ptr<MemTracker> child = child_weak.lock(); if (child) { @@ -209,7 +209,7 @@ void MemTracker::TrackersToPb(MemTrackerPB* pb) { tracker_pb->set_current_consumption(tracker->consumption()); tracker_pb->set_peak_consumption(tracker->peak_consumption()); { - MutexLock l(tracker->child_trackers_lock_); + std::lock_guard l(tracker->child_trackers_lock_); for (const auto& child_weak : tracker->child_trackers_) { shared_ptr<MemTracker> child = child_weak.lock(); if (child) { @@ -324,7 +324,7 @@ void MemTracker::Init() { } void MemTracker::AddChildTracker(const shared_ptr<MemTracker>& tracker) { - MutexLock l(child_trackers_lock_); + std::lock_guard l(child_trackers_lock_); tracker->child_tracker_it_ = child_trackers_.insert(child_trackers_.end(), tracker); } diff --git a/src/kudu/util/mem_tracker.h b/src/kudu/util/mem_tracker.h index fca754001..867c528cf 100644 --- a/src/kudu/util/mem_tracker.h +++ b/src/kudu/util/mem_tracker.h @@ -20,6 +20,7 @@ #include <list> #include <memory> #include <string> +#include <type_traits> #include <utility> #include <vector> diff --git a/src/kudu/util/mutex.h b/src/kudu/util/mutex.h index 3d4d37827..83ddad607 100644 --- a/src/kudu/util/mutex.h +++ b/src/kudu/util/mutex.h @@ -14,8 +14,8 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#ifndef KUDU_UTIL_MUTEX_H -#define KUDU_UTIL_MUTEX_H + +#pragma once #include <pthread.h> #include <sys/types.h> @@ -76,67 +76,4 @@ class Mutex { DISALLOW_COPY_AND_ASSIGN(Mutex); }; -// A helper class that acquires the given Lock while the MutexLock is in scope. -class MutexLock { - public: - struct AlreadyAcquired {}; - - // Acquires 'lock' (must be unheld) and wraps around it. - // - // Sample usage: - // { - // MutexLock l(lock_); // acquired - // ... - // } // released - explicit MutexLock(Mutex& lock) - : lock_(&lock), - owned_(true) { - lock_->Acquire(); - } - - // Wraps around 'lock' (must already be held by this thread). - // - // Sample usage: - // { - // lock_.Acquire(); // acquired - // ... - // MutexLock l(lock_, AlreadyAcquired()); - // ... - // } // released - MutexLock(Mutex& lock, const AlreadyAcquired&) - : lock_(&lock), - owned_(true) { - lock_->AssertAcquired(); - } - - void Lock() { - DCHECK(!owned_); - lock_->Acquire(); - owned_ = true; - } - - void Unlock() { - DCHECK(owned_); - lock_->AssertAcquired(); - lock_->Release(); - owned_ = false; - } - - ~MutexLock() { - if (owned_) { - Unlock(); - } - } - - bool OwnsLock() const { - return owned_; - } - - private: - Mutex* lock_; - bool owned_; - DISALLOW_COPY_AND_ASSIGN(MutexLock); -}; - } // namespace kudu -#endif /* KUDU_UTIL_MUTEX_H */ diff --git a/src/kudu/util/pstack_watcher.cc b/src/kudu/util/pstack_watcher.cc index 07ab6cef3..c727ca49a 100644 --- a/src/kudu/util/pstack_watcher.cc +++ b/src/kudu/util/pstack_watcher.cc @@ -60,7 +60,7 @@ PstackWatcher::~PstackWatcher() { void PstackWatcher::Shutdown() { { - MutexLock guard(lock_); + std::lock_guard guard(lock_); running_ = false; cond_.Broadcast(); } @@ -71,19 +71,19 @@ void PstackWatcher::Shutdown() { } bool PstackWatcher::IsRunning() const { - MutexLock guard(lock_); + std::lock_guard guard(lock_); return running_; } void PstackWatcher::Wait() const { - MutexLock lock(lock_); + std::lock_guard guard(lock_); while (running_) { cond_.Wait(); } } void PstackWatcher::Run() { - MutexLock guard(lock_); + std::lock_guard guard(lock_); if (!running_) return; cond_.WaitFor(timeout_); if (!running_) return; diff --git a/src/kudu/util/rwc_lock.cc b/src/kudu/util/rwc_lock.cc index ffe4cbb46..9acf533e4 100644 --- a/src/kudu/util/rwc_lock.cc +++ b/src/kudu/util/rwc_lock.cc @@ -17,6 +17,8 @@ #include "kudu/util/rwc_lock.h" +#include <mutex> // IWYU pragma: keep + #include <glog/logging.h> #ifndef NDEBUG @@ -47,12 +49,12 @@ RWCLock::~RWCLock() { } void RWCLock::ReadLock() { - MutexLock l(lock_); + std::lock_guard l(lock_); reader_count_++; } void RWCLock::ReadUnlock() { - MutexLock l(lock_); + std::lock_guard l(lock_); DCHECK(HasReadersUnlocked()); reader_count_--; if (reader_count_ == 0) { @@ -61,7 +63,7 @@ void RWCLock::ReadUnlock() { } bool RWCLock::HasReaders() const { - MutexLock l(lock_); + std::lock_guard l(lock_); return HasReadersUnlocked(); } @@ -71,7 +73,7 @@ bool RWCLock::HasReadersUnlocked() const { } bool RWCLock::HasWriteLock() const { - MutexLock l(lock_); + std::lock_guard l(lock_); return HasWriteLockUnlocked(); } @@ -85,7 +87,7 @@ bool RWCLock::HasWriteLockUnlocked() const { } void RWCLock::WriteLock() { - MutexLock l(lock_); + std::lock_guard l(lock_); // Wait for any other mutations to finish. while (write_locked_) { no_mutators_.Wait(); @@ -99,7 +101,7 @@ void RWCLock::WriteLock() { } void RWCLock::WriteUnlock() { - MutexLock l(lock_); + std::lock_guard l(lock_); DCHECK(HasWriteLockUnlocked()); write_locked_ = false; #ifndef NDEBUG diff --git a/src/kudu/util/test_graph.cc b/src/kudu/util/test_graph.cc index 44b28a172..31a98ce4a 100644 --- a/src/kudu/util/test_graph.cc +++ b/src/kudu/util/test_graph.cc @@ -20,6 +20,7 @@ #include <mutex> #include <ostream> #include <thread> +#include <type_traits> #include <utility> #include <glog/logging.h> @@ -57,7 +58,7 @@ TimeSeriesCollector::~TimeSeriesCollector() { } shared_ptr<TimeSeries> TimeSeriesCollector::GetTimeSeries(const string& key) { - MutexLock l(series_lock_); + std::lock_guard l_(series_lock_); SeriesMap::const_iterator it = series_map_.find(key); if (it != series_map_.end()) { return (*it).second; @@ -103,7 +104,7 @@ void TimeSeriesCollector::DumperThread() { void TimeSeriesCollector::BuildMetricsString( WallTime time_since_start, faststring* dst_buf) const { - MutexLock l(series_lock_); + std::lock_guard l_(series_lock_); dst_buf->append(StringPrintf("{ \"scope\": \"%s\", \"time\": %.3f", scope_.c_str(), time_since_start)); diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc index 4454ca04f..d22cda2c8 100644 --- a/src/kudu/util/threadpool.cc +++ b/src/kudu/util/threadpool.cc @@ -21,6 +21,7 @@ #include <functional> #include <limits> #include <memory> +#include <mutex> #include <ostream> #include <string> #include <utility> @@ -142,7 +143,7 @@ void SchedulerThread::RunLoop() { MonoTime now = MonoTime::Now(); vector<SchedulerTask> pending_tasks; { - MutexLock auto_lock(mutex_); + std::lock_guard auto_lock(mutex_); auto upper_it = future_tasks_.upper_bound(now); for (auto it = future_tasks_.begin(); it != upper_it; it++) { pending_tasks.emplace_back(std::move(it->second)); @@ -198,7 +199,7 @@ Status ThreadPoolToken::Submit(std::function<void()> f) { } void ThreadPoolToken::Shutdown() { - MutexLock unique_lock(pool_->lock_); + std::unique_lock lock(pool_->lock_); pool_->CheckNotPoolThreadUnlocked(); // Clear the queue under the lock, but defer the releasing of the tasks @@ -249,7 +250,7 @@ void ThreadPoolToken::Shutdown() { } // Finally release the queued tasks, outside the lock. - unique_lock.Unlock(); + lock.unlock(); for (auto& t : to_release) { if (t.trace) { t.trace->Release(); @@ -266,7 +267,7 @@ Status ThreadPoolToken::Schedule(std::function<void()> f, int64_t delay_ms) { } void ThreadPoolToken::Wait() { - MutexLock unique_lock(pool_->lock_); + std::lock_guard unique_lock(pool_->lock_); pool_->CheckNotPoolThreadUnlocked(); while (IsActive()) { not_running_cond_.Wait(); @@ -274,7 +275,7 @@ void ThreadPoolToken::Wait() { } bool ThreadPoolToken::WaitUntil(const MonoTime& until) { - MutexLock unique_lock(pool_->lock_); + std::lock_guard unique_lock(pool_->lock_); pool_->CheckNotPoolThreadUnlocked(); while (IsActive()) { if (!not_running_cond_.WaitUntil(until)) { @@ -415,14 +416,14 @@ Status ThreadPool::Init() { void ThreadPool::Shutdown() { { - MutexLock l(scheduler_lock_); + std::lock_guard l(scheduler_lock_); if (scheduler_) { delete scheduler_; scheduler_ = nullptr; } } - MutexLock unique_lock(lock_); + std::unique_lock lock(lock_); CheckNotPoolThreadUnlocked(); // Note: this is the same error seen at submission if the pool is at // capacity, so clients can't tell them apart. This isn't really a practical @@ -478,7 +479,7 @@ void ThreadPool::Shutdown() { } // Finally release the queued tasks, outside the lock. - unique_lock.Unlock(); + lock.unlock(); for (auto& token : to_release) { for (auto& t : token) { if (t.trace) { @@ -494,7 +495,7 @@ unique_ptr<ThreadPoolToken> ThreadPool::NewToken(ExecutionMode mode) { unique_ptr<ThreadPoolToken> ThreadPool::NewTokenWithMetrics( ExecutionMode mode, ThreadPoolMetrics metrics) { - MutexLock guard(lock_); + std::lock_guard guard(lock_); unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode, std::move(metrics))); @@ -514,7 +515,7 @@ bool ThreadPool::QueueOverloaded(MonoDelta* overloaded_time, } void ThreadPool::ReleaseToken(ThreadPoolToken* t) { - MutexLock guard(lock_); + std::lock_guard guard(lock_); CHECK(!t->IsActive()) << Substitute("Token with state $0 may not be released", ThreadPoolToken::StateToString(t->state())); CHECK_EQ(1, tokens_.erase(t)); @@ -527,7 +528,7 @@ Status ThreadPool::Submit(std::function<void()> f) { Status ThreadPool::Schedule(ThreadPoolToken* token, std::function<void()> f, MonoTime execute_time) { - MutexLock l(scheduler_lock_); + std::lock_guard l(scheduler_lock_); if (!scheduler_) { return Status::IllegalState("scheduler thread has been shutdown"); } @@ -539,7 +540,7 @@ Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) { DCHECK(token); const MonoTime submit_time = MonoTime::Now(); - MutexLock guard(lock_); + std::unique_lock guard(lock_); if (PREDICT_FALSE(!pool_status_.ok())) { return pool_status_; } @@ -632,7 +633,7 @@ Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) { token->metrics_.queue_length_histogram->Increment(length_at_submit); } - guard.Unlock(); + guard.unlock(); if (metrics_.queue_length_histogram) { metrics_.queue_length_histogram->Increment(length_at_submit); @@ -641,7 +642,7 @@ Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) { if (need_a_thread) { Status status = CreateThread(); if (!status.ok()) { - guard.Lock(); + guard.lock(); num_threads_pending_start_--; if (num_threads_ + num_threads_pending_start_ == 0) { // If we have no threads, we can't do any work. @@ -658,7 +659,7 @@ Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) { } void ThreadPool::Wait() { - MutexLock unique_lock(lock_); + std::lock_guard guard(lock_); CheckNotPoolThreadUnlocked(); while (total_queued_tasks_ > 0 || active_threads_ > 0) { idle_cond_.Wait(); @@ -666,7 +667,7 @@ void ThreadPool::Wait() { } bool ThreadPool::WaitUntil(const MonoTime& until) { - MutexLock unique_lock(lock_); + std::lock_guard guard(lock_); CheckNotPoolThreadUnlocked(); while (total_queued_tasks_ > 0 || active_threads_ > 0) { if (!idle_cond_.WaitUntil(until)) { @@ -681,7 +682,7 @@ bool ThreadPool::WaitFor(const MonoDelta& delta) { } void ThreadPool::DispatchThread() { - MutexLock unique_lock(lock_); + std::unique_lock lock(lock_); InsertOrDie(&threads_, Thread::current_thread()); DCHECK_GT(num_threads_pending_start_, 0); num_threads_++; @@ -749,7 +750,7 @@ void ThreadPool::DispatchThread() { const MonoDelta queue_time = now - task.submit_time; NotifyLoadMeterUnlocked(queue_time); - unique_lock.Unlock(); + lock.unlock(); // Release the reference which was held by the queued item. ADOPT_TRACE(task.trace); @@ -793,7 +794,7 @@ void ThreadPool::DispatchThread() { // In the worst case, the destructor might even try to do something // with this threadpool, and produce a deadlock. task.func = nullptr; - unique_lock.Lock(); + lock.lock(); // Possible states: // 1. The token was shut down while we ran its task. Transition to QUIESCED. @@ -829,7 +830,7 @@ void ThreadPool::DispatchThread() { // It's important that we hold the lock between exiting the loop and dropping // num_threads_. Otherwise it's possible someone else could come along here // and add a new task just as the last running thread is about to exit. - CHECK(unique_lock.OwnsLock()); + CHECK(lock.owns_lock()); CHECK_EQ(threads_.erase(Thread::current_thread()), 1); num_threads_--; diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h index 6f4119f86..2afe1c326 100644 --- a/src/kudu/util/threadpool.h +++ b/src/kudu/util/threadpool.h @@ -200,12 +200,12 @@ public: void Schedule(ThreadPoolToken* token, std::function<void()> f, const MonoTime& execute_time) { - MutexLock unique_lock(mutex_); + std::lock_guard lock(mutex_); future_tasks_.insert({execute_time, SchedulerTask({token, std::move(f)})}); } bool empty() const { - MutexLock unique_lock(mutex_); + std::lock_guard lock(mutex_); return future_tasks_.empty(); } @@ -317,7 +317,7 @@ class ThreadPool { // Return the number of threads currently running (or in the process of starting up) // for this thread pool. int num_threads() const { - MutexLock l(lock_); + std::lock_guard l(lock_); return num_threads_ + num_threads_pending_start_; } @@ -514,7 +514,7 @@ class ThreadPool { // Return the number of threads currently running for this thread pool. // Used by tests to avoid tsan test case down. int num_active_threads() { - MutexLock l(lock_); + std::lock_guard l(lock_); return active_threads_; }