Joe McDonnell has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/16057 )

Change subject: IMPALA-9382: part 2/3: aggregate profiles sent to coordinator
......................................................................


Patch Set 12:

(9 comments)

http://gerrit.cloudera.org:8080/#/c/16057/12/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

http://gerrit.cloudera.org:8080/#/c/16057/12/be/src/runtime/coordinator-backend-state.cc@423
PS12, Line 423:   // 'thrift_profiles' and 'instance_exec_status' vectors have 
one-to-one correspondance.
This comment is now out of date


http://gerrit.cloudera.org:8080/#/c/16057/12/be/src/runtime/query-state.cc
File be/src/runtime/query-state.cc:

http://gerrit.cloudera.org:8080/#/c/16057/12/be/src/runtime/query-state.cc@591
PS12, Line 591:       LOG(INFO) << it->second->min_per_fragment_instance_idx();
Nit: Leftover logging?


http://gerrit.cloudera.org:8080/#/c/16057/12/be/src/util/runtime-profile-counters.h
File be/src/util/runtime-profile-counters.h:

http://gerrit.cloudera.org:8080/#/c/16057/12/be/src/util/runtime-profile-counters.h@405
PS12, Line 405: No locks are obtained within this method because 
UpdateCounter() is called from
              :   /// Update()
Nit: Now that there is an Update() method on AveragedCounter, this comment is a 
bit awkward since it's talking about AggregatedRuntimeProfile::Update().


http://gerrit.cloudera.org:8080/#/c/16057/12/be/src/util/runtime-profile.h
File be/src/util/runtime-profile.h:

http://gerrit.cloudera.org:8080/#/c/16057/12/be/src/util/runtime-profile.h@781
PS12, Line 781:   void Update(const TRuntimeProfileTree& src, int start_idx);
Nit: On first glance at code, it is easy to get confused between these two 
signatures of Update(). The signatures are almost identical and look very 
similar in usage. Once you know what is happening, it's clear which is which, 
but it might be useful to use different names.


http://gerrit.cloudera.org:8080/#/c/16057/12/be/src/util/runtime-profile.h@833
PS12, Line 833:   struct AggEventSequence {
              :     // Unique labels in the event sequence. Values are [0, 
labels.size() - 1]
              :     std::map<std::string, int32_t> labels;
              :
              :     // One entry per instance. Each entry contains the label 
indices for that instance's
              :     // event sequence.
              :     std::vector<std::vector<int32_t>> label_idxs;
              :
              :     // One entry per instance. Each entry contains the 
timestamps for that instance's
              :     // event sequence.
              :     std::vector<std::vector<int64_t>> timestamps;
              :   };
              :
              :   std::map<std::string, AggEventSequence> event_sequence_map_;
              :
              :   /// Protects event_sequence_map_ and event_sequence_labels_.
              :   mutable SpinLock event_sequence_lock_;
              :
              :   /// Time series counters. Protected by 'counter_map_lock_'.
              :   std::map<std::string, TAggTimeSeriesCounter> 
time_series_counter_map_;
Something we need to consider is how to test for correctness on the aggregated 
profile. We have a few backend tests for counters. Maybe we could extend those 
tests to cover event sequences and time series.


http://gerrit.cloudera.org:8080/#/c/16057/12/be/src/util/runtime-profile.cc
File be/src/util/runtime-profile.cc:

http://gerrit.cloudera.org:8080/#/c/16057/12/be/src/util/runtime-profile.cc@394
PS12, Line 394:   {
              :     lock_guard<SpinLock> l(agg_info_strings_lock_);
              :     lock_guard<SpinLock> m(other->info_strings_lock_);
              :     for (const auto& entry : other->info_strings_) {
              :       vector<string>& values = agg_info_strings_[entry.first];
              :       if (values.empty()) values.resize(num_input_profiles_);
              :       if (values[idx] != entry.second) values[idx] = 
entry.second;
              :     }
              :   }
A pattern in this file is having functions that have a lot of these subblocks 
each with their own lock and their own logic. This is true for the update 
functions, but also for things like PrettyPrintSubclassCounters(). This is 
mainly a style thing and it predates the aggregated profile, but I think each 
of these blocks are candidates to become their own functions.

In this case, I think it would simplify this function to hide the 
implementations of each merge operation. The same is true for the equivalent 
operations in AggregatedRuntimeProfile::UpdateFromThrift().

We have two recursive update functions, and they have steps that are very 
similar. In many ways, the update from thrift is really just a bulk version of 
this with minor type differences. Structuring the code to emphasize the 
symmetry might be nice. If each section here is its own function and we do 
something equivalent for UpdateFromThrift(), then you can have similar naming 
for this update from a real RuntimeProfile vs the same update from thrift.

If you wanted to heighten the analogy between the UpdateRecursive for a normal 
RuntimeProfile vs Thrift, you could create a normal RuntimeProfile equivalent 
of AggregatedRuntimeProfile::UpdateFromThrift().


http://gerrit.cloudera.org:8080/#/c/16057/12/be/src/util/runtime-profile.cc@633
PS12, Line 633:           // Decode label and assign a new index based on the 
unique labels in this
              :           // profile.
              :           int32_t src_label_idx = tseq.label_idxs[i][j];
              :           DCHECK_GE(src_label_idx, 0);
              :           DCHECK_LT(src_label_idx, tseq.unique_labels.size());
              :           const string& label = 
tseq.unique_labels[src_label_idx];
              :           auto it = seq.labels.find(label);
              :           int32_t label_idx;
              :           if (it == seq.labels.end()) {
              :             label_idx = seq.labels.size();
              :             seq.labels.emplace(label, label_idx);
              :           } else {
              :             label_idx = it->second;
              :           }
As noted in the thrift, the individual steps here make sense to be described as 
translating from one dictionary to another, so that might be a good way to 
explain it.


http://gerrit.cloudera.org:8080/#/c/16057/12/be/src/util/runtime-profile.cc@609
PS12, Line 609:   {
              :     // Merge event sequence.
              :     lock_guard<SpinLock> l(event_sequence_lock_);
              :     for (const TAggEventSequence& tseq : 
agg_node.event_sequences) {
              :       DCHECK_GT(num_input_profiles_, 0);
              :       AggEventSequence& seq = event_sequence_map_[tseq.name];
              :       if (seq.label_idxs.empty()) {
              :         seq.label_idxs.resize(num_input_profiles_);
              :         seq.timestamps.resize(num_input_profiles_);
              :       } else {
              :         DCHECK_EQ(num_input_profiles_, seq.label_idxs.size());
              :         DCHECK_EQ(num_input_profiles_, seq.timestamps.size());
              :       }
              :
              :       DCHECK_LE(start_idx + tseq.timestamps.size(), 
num_input_profiles_);
              :       DCHECK_EQ(tseq.timestamps.size(), tseq.label_idxs.size());
              :       for (int i = 0; i < tseq.timestamps.size(); ++i) {
              :         int idx = start_idx + i;
              :         std::vector<int32_t>& label_idxs = seq.label_idxs[idx];
              :         std::vector<int64_t>& timestamps = seq.timestamps[idx];
              :         label_idxs.clear();
              :         timestamps.clear();
              :         DCHECK_EQ(tseq.timestamps[i].size(), 
tseq.label_idxs[i].size());
              :         for (int j = 0; j < tseq.timestamps[i].size(); ++j) {
              :           // Decode label and assign a new index based on the 
unique labels in this
              :           // profile.
              :           int32_t src_label_idx = tseq.label_idxs[i][j];
              :           DCHECK_GE(src_label_idx, 0);
              :           DCHECK_LT(src_label_idx, tseq.unique_labels.size());
              :           const string& label = 
tseq.unique_labels[src_label_idx];
              :           auto it = seq.labels.find(label);
              :           int32_t label_idx;
              :           if (it == seq.labels.end()) {
              :             label_idx = seq.labels.size();
              :             seq.labels.emplace(label, label_idx);
              :           } else {
              :             label_idx = it->second;
              :           }
              :           label_idxs.push_back(label_idx);
              :           timestamps.push_back(tseq.timestamps[i][j]);
              :         }
              :       }
              :     }
              :   }
As noted in AggregatedRuntimeProfile::UpdateRecursive(), I think each of these 
blocks are good candidates to become their own functions.


http://gerrit.cloudera.org:8080/#/c/16057/12/common/thrift/RuntimeProfile.thrift
File common/thrift/RuntimeProfile.thrift:

http://gerrit.cloudera.org:8080/#/c/16057/12/common/thrift/RuntimeProfile.thrift@80
PS12, Line 80:   // One entry per unique label.
             :   2: required list<string> unique_labels
This is using dictionary encoding, so we could use name our variables using 
that language (e.g. label_dictionary). This is a concept we all know, and it 
might make explaining the thrift merge process easier (we are translating from 
one dictionary to the unified dictionary).



--
To view, visit http://gerrit.cloudera.org:8080/16057
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Ic680cbfe94c939c2a8fad9d0943034ed058c6bca
Gerrit-Change-Number: 16057
Gerrit-PatchSet: 12
Gerrit-Owner: Tim Armstrong <tarmstr...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Gerrit-Reviewer: Joe McDonnell <joemcdonn...@cloudera.com>
Gerrit-Reviewer: Tim Armstrong <tarmstr...@cloudera.com>
Gerrit-Comment-Date: Fri, 30 Oct 2020 20:14:24 +0000
Gerrit-HasComments: Yes

Reply via email to