This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b81ff2d25f53586914695b95c77a9d9e31a2a51a
Author: Tim Armstrong <tarmstr...@cloudera.com>
AuthorDate: Tue Nov 17 10:32:10 2020 -0800

    IMPALA-10276: thread-safe access to RuntimeProfile::counter_map_
    
    The bug was that 'counter_map_' can be mutated concurrent with
    total_time_counter() or inactive_timer() being called.
    
    This is fixed by storing a pointer directly to those counters
    and bypassing 'counter_map_'. This is then thread-safe and
    also has low overhead (adding lock acquisitions might have
    some perf impact, since total_time_counter() is called
    throughout query execution).
    
    Change-Id: Ic21a13acf9c7c326a27334e61ce3729f1e3cab42
    Reviewed-on: http://gerrit.cloudera.org:8080/16739
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/util/runtime-profile.cc | 39 ++++++++++++++++++++-------------------
 be/src/util/runtime-profile.h  | 29 ++++++++++++++---------------
 2 files changed, 34 insertions(+), 34 deletions(-)

diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 3d0e1f7..32e2e75 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -162,8 +162,20 @@ const char* ProfileEntryPrototype::SignificanceDescription(
   }
 }
 
-RuntimeProfileBase::RuntimeProfileBase(ObjectPool* pool, const string& name)
-  : pool_(pool), name_(name) {}
+RuntimeProfileBase::RuntimeProfileBase(ObjectPool* pool, const string& name,
+    Counter* total_time_counter, Counter* inactive_timer)
+  : pool_(pool),
+    name_(name),
+    total_time_counter_(total_time_counter),
+    inactive_timer_(inactive_timer) {
+  DCHECK(total_time_counter != nullptr);
+  DCHECK(inactive_timer != nullptr);
+  set<string>& root_counters = child_counter_map_[ROOT_COUNTER];
+  counter_map_[TOTAL_TIME_COUNTER_NAME] = total_time_counter;
+  root_counters.emplace(TOTAL_TIME_COUNTER_NAME);
+  counter_map_[INACTIVE_TIME_COUNTER_NAME] = inactive_timer;
+  root_counters.emplace(INACTIVE_TIME_COUNTER_NAME);
+}
 
 RuntimeProfileBase::~RuntimeProfileBase() {}
 
@@ -172,13 +184,8 @@ RuntimeProfile* RuntimeProfile::Create(ObjectPool* pool, 
const string& name) {
 }
 
 RuntimeProfile::RuntimeProfile(ObjectPool* pool, const string& name)
-  : RuntimeProfileBase(pool, name) {
-  set<string>& root_counters = child_counter_map_[ROOT_COUNTER];
-  counter_map_[TOTAL_TIME_COUNTER_NAME] = &counter_total_time_;
-  root_counters.emplace(TOTAL_TIME_COUNTER_NAME);
-  counter_map_[INACTIVE_TIME_COUNTER_NAME] = &inactive_timer_;
-  root_counters.emplace(INACTIVE_TIME_COUNTER_NAME);
-}
+  : RuntimeProfileBase(
+        pool, name, &builtin_counter_total_time_, &builtin_inactive_timer_) {}
 
 RuntimeProfile::~RuntimeProfile() {
   DCHECK(!has_active_periodic_counters_);
@@ -2151,18 +2158,12 @@ void RuntimeProfile::EventSequence::ToJson(Document& 
document, Value* value) {
 
 AggregatedRuntimeProfile::AggregatedRuntimeProfile(
     ObjectPool* pool, const string& name, int num_input_profiles, bool is_root)
-  : RuntimeProfileBase(pool, name), num_input_profiles_(num_input_profiles) {
+  : RuntimeProfileBase(pool, name,
+        pool->Add(new AveragedCounter(TUnit::TIME_NS, num_input_profiles)),
+        pool->Add(new AveragedCounter(TUnit::TIME_NS, num_input_profiles))),
+    num_input_profiles_(num_input_profiles) {
   DCHECK_GE(num_input_profiles, 0);
   if (is_root) input_profile_names_.resize(num_input_profiles);
-  set<string>& root_counters = child_counter_map_[ROOT_COUNTER];
-  Counter* total_time_counter =
-      pool->Add(new AveragedCounter(TUnit::TIME_NS, num_input_profiles));
-  Counter* inactive_timer =
-      pool->Add(new AveragedCounter(TUnit::TIME_NS, num_input_profiles));
-  counter_map_[TOTAL_TIME_COUNTER_NAME] = total_time_counter;
-  root_counters.emplace(TOTAL_TIME_COUNTER_NAME);
-  counter_map_[INACTIVE_TIME_COUNTER_NAME] = inactive_timer;
-  root_counters.emplace(INACTIVE_TIME_COUNTER_NAME);
 }
 
 AggregatedRuntimeProfile* AggregatedRuntimeProfile::Create(
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 905c1ce..6c061b6 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -176,19 +176,9 @@ class RuntimeProfileBase {
   const TRuntimeProfileNodeMetadata& metadata() const { return metadata_; }
 
   /// Returns the counter for the total elapsed time.
-  Counter* total_time_counter() const {
-    auto it = counter_map_.find(TOTAL_TIME_COUNTER_NAME);
-    DCHECK(it != counter_map_.end());
-    return it->second;
-  }
-
+  Counter* total_time_counter() const { return total_time_counter_; }
   /// Returns the counter for the inactive time.
-  Counter* inactive_timer() const {
-    auto it = counter_map_.find(INACTIVE_TIME_COUNTER_NAME);
-    DCHECK(it != counter_map_.end());
-    return it->second;
-  }
-
+  Counter* inactive_timer() const { return inactive_timer_; }
   int64_t local_time() const { return local_time_ns_.Load(); }
   int64_t total_time() const { return total_time_ns_.Load(); }
 
@@ -241,6 +231,14 @@ class RuntimeProfileBase {
   /// RuntimeProfile::has_active_periodic_counters_.
   mutable SpinLock counter_map_lock_;
 
+  /// Reference to counter_map_[TOTAL_TIME_COUNTER_NAME] for thread-safe 
access without
+  /// acquiring 'counter_map_lock_'.
+  Counter* const total_time_counter_;
+
+  /// Reference to counter_map_[INACTIVE_TIME_COUNTER_NAME] for thread-safe 
access without
+  /// acquiring 'counter_map_lock_'.
+  Counter* const inactive_timer_;
+
   /// TODO: IMPALA-9382: info strings can be moved to RuntimeProfile once we 
remove
   /// callsites for this function on AggregatedRuntimeProfile.
   typedef std::map<std::string, std::string> InfoStrings;
@@ -283,7 +281,8 @@ class RuntimeProfileBase {
   /// ComputeTimeInProfile() executing.
   AtomicInt64 total_time_ns_{0};
 
-  RuntimeProfileBase(ObjectPool* pool, const std::string& name);
+  RuntimeProfileBase(ObjectPool* pool, const std::string& name,
+      Counter* total_time_counter, Counter* inactive_timer);
 
   ///  Inserts 'child' before the iterator 'insert_pos' in 'children_'.
   /// 'children_lock_' must be held by the caller.
@@ -702,12 +701,12 @@ class RuntimeProfile : public RuntimeProfileBase {
   /// Protects summary_stats_map_.
   mutable SpinLock summary_stats_map_lock_;
 
-  Counter counter_total_time_{TUnit::TIME_NS};
+  Counter builtin_counter_total_time_{TUnit::TIME_NS};
 
   /// Total time spent waiting (on non-children) that should not be counted 
when
   /// computing local_time_frac_. This is updated for example in the exchange
   /// node when waiting on the sender from another fragment.
-  Counter inactive_timer_{TUnit::TIME_NS};
+  Counter builtin_inactive_timer_{TUnit::TIME_NS};
 
   /// The Exec Summary
   TExecSummary t_exec_summary_;

Reply via email to