This is an automated email from the ASF dual-hosted git repository. tarmstrong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit b81ff2d25f53586914695b95c77a9d9e31a2a51a Author: Tim Armstrong <tarmstr...@cloudera.com> AuthorDate: Tue Nov 17 10:32:10 2020 -0800 IMPALA-10276: thread-safe access to RuntimeProfile::counter_map_ The bug was that 'counter_map_' can be mutated concurrent with total_time_counter() or inactive_timer() being called. This is fixed by storing a pointer directly to those counters and bypassing 'counter_map_'. This is then thread-safe and also has low overhead (adding lock acquisitions might have some perf impact, since total_time_counter() is called throughout query execution). Change-Id: Ic21a13acf9c7c326a27334e61ce3729f1e3cab42 Reviewed-on: http://gerrit.cloudera.org:8080/16739 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/util/runtime-profile.cc | 39 ++++++++++++++++++++------------------- be/src/util/runtime-profile.h | 29 ++++++++++++++--------------- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc index 3d0e1f7..32e2e75 100644 --- a/be/src/util/runtime-profile.cc +++ b/be/src/util/runtime-profile.cc @@ -162,8 +162,20 @@ const char* ProfileEntryPrototype::SignificanceDescription( } } -RuntimeProfileBase::RuntimeProfileBase(ObjectPool* pool, const string& name) - : pool_(pool), name_(name) {} +RuntimeProfileBase::RuntimeProfileBase(ObjectPool* pool, const string& name, + Counter* total_time_counter, Counter* inactive_timer) + : pool_(pool), + name_(name), + total_time_counter_(total_time_counter), + inactive_timer_(inactive_timer) { + DCHECK(total_time_counter != nullptr); + DCHECK(inactive_timer != nullptr); + set<string>& root_counters = child_counter_map_[ROOT_COUNTER]; + counter_map_[TOTAL_TIME_COUNTER_NAME] = total_time_counter; + root_counters.emplace(TOTAL_TIME_COUNTER_NAME); + counter_map_[INACTIVE_TIME_COUNTER_NAME] = inactive_timer; + root_counters.emplace(INACTIVE_TIME_COUNTER_NAME); +} RuntimeProfileBase::~RuntimeProfileBase() {} @@ -172,13 +184,8 @@ RuntimeProfile* RuntimeProfile::Create(ObjectPool* pool, const string& name) { } RuntimeProfile::RuntimeProfile(ObjectPool* pool, const string& name) - : RuntimeProfileBase(pool, name) { - set<string>& root_counters = child_counter_map_[ROOT_COUNTER]; - counter_map_[TOTAL_TIME_COUNTER_NAME] = &counter_total_time_; - root_counters.emplace(TOTAL_TIME_COUNTER_NAME); - counter_map_[INACTIVE_TIME_COUNTER_NAME] = &inactive_timer_; - root_counters.emplace(INACTIVE_TIME_COUNTER_NAME); -} + : RuntimeProfileBase( + pool, name, &builtin_counter_total_time_, &builtin_inactive_timer_) {} RuntimeProfile::~RuntimeProfile() { DCHECK(!has_active_periodic_counters_); @@ -2151,18 +2158,12 @@ void RuntimeProfile::EventSequence::ToJson(Document& document, Value* value) { AggregatedRuntimeProfile::AggregatedRuntimeProfile( ObjectPool* pool, const string& name, int num_input_profiles, bool is_root) - : RuntimeProfileBase(pool, name), num_input_profiles_(num_input_profiles) { + : RuntimeProfileBase(pool, name, + pool->Add(new AveragedCounter(TUnit::TIME_NS, num_input_profiles)), + pool->Add(new AveragedCounter(TUnit::TIME_NS, num_input_profiles))), + num_input_profiles_(num_input_profiles) { DCHECK_GE(num_input_profiles, 0); if (is_root) input_profile_names_.resize(num_input_profiles); - set<string>& root_counters = child_counter_map_[ROOT_COUNTER]; - Counter* total_time_counter = - pool->Add(new AveragedCounter(TUnit::TIME_NS, num_input_profiles)); - Counter* inactive_timer = - pool->Add(new AveragedCounter(TUnit::TIME_NS, num_input_profiles)); - counter_map_[TOTAL_TIME_COUNTER_NAME] = total_time_counter; - root_counters.emplace(TOTAL_TIME_COUNTER_NAME); - counter_map_[INACTIVE_TIME_COUNTER_NAME] = inactive_timer; - root_counters.emplace(INACTIVE_TIME_COUNTER_NAME); } AggregatedRuntimeProfile* AggregatedRuntimeProfile::Create( diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h index 905c1ce..6c061b6 100644 --- a/be/src/util/runtime-profile.h +++ b/be/src/util/runtime-profile.h @@ -176,19 +176,9 @@ class RuntimeProfileBase { const TRuntimeProfileNodeMetadata& metadata() const { return metadata_; } /// Returns the counter for the total elapsed time. - Counter* total_time_counter() const { - auto it = counter_map_.find(TOTAL_TIME_COUNTER_NAME); - DCHECK(it != counter_map_.end()); - return it->second; - } - + Counter* total_time_counter() const { return total_time_counter_; } /// Returns the counter for the inactive time. - Counter* inactive_timer() const { - auto it = counter_map_.find(INACTIVE_TIME_COUNTER_NAME); - DCHECK(it != counter_map_.end()); - return it->second; - } - + Counter* inactive_timer() const { return inactive_timer_; } int64_t local_time() const { return local_time_ns_.Load(); } int64_t total_time() const { return total_time_ns_.Load(); } @@ -241,6 +231,14 @@ class RuntimeProfileBase { /// RuntimeProfile::has_active_periodic_counters_. mutable SpinLock counter_map_lock_; + /// Reference to counter_map_[TOTAL_TIME_COUNTER_NAME] for thread-safe access without + /// acquiring 'counter_map_lock_'. + Counter* const total_time_counter_; + + /// Reference to counter_map_[INACTIVE_TIME_COUNTER_NAME] for thread-safe access without + /// acquiring 'counter_map_lock_'. + Counter* const inactive_timer_; + /// TODO: IMPALA-9382: info strings can be moved to RuntimeProfile once we remove /// callsites for this function on AggregatedRuntimeProfile. typedef std::map<std::string, std::string> InfoStrings; @@ -283,7 +281,8 @@ class RuntimeProfileBase { /// ComputeTimeInProfile() executing. AtomicInt64 total_time_ns_{0}; - RuntimeProfileBase(ObjectPool* pool, const std::string& name); + RuntimeProfileBase(ObjectPool* pool, const std::string& name, + Counter* total_time_counter, Counter* inactive_timer); /// Inserts 'child' before the iterator 'insert_pos' in 'children_'. /// 'children_lock_' must be held by the caller. @@ -702,12 +701,12 @@ class RuntimeProfile : public RuntimeProfileBase { /// Protects summary_stats_map_. mutable SpinLock summary_stats_map_lock_; - Counter counter_total_time_{TUnit::TIME_NS}; + Counter builtin_counter_total_time_{TUnit::TIME_NS}; /// Total time spent waiting (on non-children) that should not be counted when /// computing local_time_frac_. This is updated for example in the exchange /// node when waiting on the sender from another fragment. - Counter inactive_timer_{TUnit::TIME_NS}; + Counter builtin_inactive_timer_{TUnit::TIME_NS}; /// The Exec Summary TExecSummary t_exec_summary_;