Rahul Shivu Mahadev has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/11055 )

Change subject: IMPALA-3825: Distribute Runtime Filtering Aggregation
......................................................................


Patch Set 3:

(37 comments)

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc
File be/src/runtime/coordinator-backend-state.cc:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc@95
PS3, Line 95: x
> naming
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc@98
PS3, Line 98:     LOG(INFO) << "DebuggingPublishFilter AggregatorAddress " << 
tfs.aggregator_address;
> remove
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc@95
PS3, Line 95:   for (auto const& x : filter_routing_table) {
            :     TFilterState tfs;
            :     x.second.ToThrift(&tfs);
            :     LOG(INFO) << "DebuggingPublishFilter AggregatorAddress " << 
tfs.aggregator_address;
            :     rpc_params->filter_routing_table.insert(
            :         std::pair<int32_t, TFilterState>(x.first, tfs));
            :   }
> Add a comment about what you're doing here.
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@260
PS3, Line 260: x
> naming. Call it 'filter' or something similar
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@262
PS3, Line 262: num_filters_
> Is this needed as a member variable? Doesn't seem to be used anywhere.
do you think I should make it static ? and have better distribution across 
queries ?


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@690
PS3, Line 690:   if (params.__isset.filter_updates_received) {
> Add a comment above this line:
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@691
PS3, Line 691:     if (backend_state->GetNumReceivedFilters() < 
params.filter_updates_received) {
> Add another comment above this line:
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@884
PS3, Line 884:       LOG(INFO)
             :           << "DebuggingPublishFilter Coordinator sending filter 
to fragment with id "
             :           << fragment_idx;
> remove
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h
File be/src/runtime/filter-state.h:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@141
PS3, Line 141: FilterTarget
> const ref
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@150
PS3, Line 150: /// Need to cast the int type of this class to int32_t of thrift
> Still needed?
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@151
PS3, Line 151: set
> Why not unordered?
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@152
PS3, Line 152: int i
> int32_t here and remove the cast in the next line. Also, rename to 'idx'.
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@167
PS3, Line 167: TFilterTarget
> const ref
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@171
PS3, Line 171: boost
> std
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@130
PS3, Line 130:   bool disabled() const {
             :     if (is_bloom_filter()) {
             :       return bloom_filter_.always_true;
             :     } else {
             :       DCHECK(is_min_max_filter());
             :       return min_max_filter_.always_true;
             :     }
             :   }
             :
             :   void ToThrift(TFilterState* f) const {
             :     std::vector<TFilterTarget> t_targets;
             :     for (FilterTarget filter_target : targets_) {
             :       TFilterTarget thrift_filter_target;
             :       filter_target.ToThrift(&thrift_filter_target);
             :       t_targets.push_back(thrift_filter_target);
             :     }
             :     f->__set_targets(t_targets);
             :     f->__set_desc(desc_);
             :     f->__set_src(src_);
             :     f->__set_pending_count(pending_count_);
             :     /// Need to cast the int type of this class to int32_t of 
thrift
             :     std::set<int32_t> src_fragment_instance_idxs;
             :     for (int i : src_fragment_instance_idxs_) {
             :       src_fragment_instance_idxs.insert((int32_t)i);
             :     }
             :     
f->__set_src_fragment_instance_idxs(src_fragment_instance_idxs);
             :     f->__set_bloom_filter(bloom_filter_);
             :     f->__set_min_max_filter(min_max_filter_);
             :     f->__set_first_arrival_time(first_arrival_time_);
             :     f->__set_completion_time(completion_time_);
             :     f->__set_aggregator_address(aggregator_address_);
             :   }
             :
             :   static FilterState FromThrift(const TFilterState& f) {
             :     FilterState fs(f.desc, f.src, f.aggregator_address, 
f.pending_count,
             :         f.first_arrival_time, f.completion_time, f.bloom_filter, 
f.min_max_filter);
             :     std::vector<FilterTarget> targets;
             :     for (TFilterTarget filter_target : f.targets) {
             :       targets.push_back(FilterTarget::FromThrift(filter_target));
             :     }
             :     fs.set_targets(targets);
             :     boost::unordered_set<int> src_fragment_instance_idxs;
             :     for (int idx : f.src_fragment_instance_idxs) {
             :       src_fragment_instance_idxs.insert(idx);
             :     }
             :     
fs.set_src_fragment_instance_idxs(src_fragment_instance_idxs);
             :
             :     return fs;
             :   }
> These can be moved to the .cc file.
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@191
PS3, Line 191:   TPlanNodeId src_;
             :   std::vector<FilterTarget> targets_;
> Add a comment for these both. It wasn't added to the CoordinatorFilterState
These two fields were infact present in coordinator-filter-state.
https://github.com/apache/impala/blob/master/be/src/runtime/coordinator-filter-state.h#L106


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@198
PS3, Line 198: boost::unordered_set
> We can use the std version for this
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-exec-mgr.cc
File be/src/runtime/query-exec-mgr.cc:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-exec-mgr.cc@58
PS3, Line 58:   if (params.filter_routing_table.size() != 0) {
> Add a comment above this:
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.h
File be/src/runtime/query-state.h:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.h@181
PS3, Line 181: const std::map<int32_t, TFilterState>
> Add the parameter name as well.
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.h@206
PS3, Line 206:   std::map<int32_t, FilterState> filter_routing_table_;
             :
             :   SpinLock filter_lock_;
             :
             :   std::map<TNetworkAddress, std::set<int32_t>> backend_list_;
> Add comments for these.
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc
File be/src/runtime/query-state.cc:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@457
PS3, Line 457: const std::map<int32_t, TFilterState>
> Take this as a const ref, so that we avoid copying the map on calling this
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@458
PS3, Line 458: std::pair<int, TFilterState> it
> Using 'auto' for iterators is reasonable.
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@459
PS3, Line 459: s
> Avoid using names that aren't self explanatory.
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@466
PS3, Line 466: std::map<int32_t, FilterState>::iterator
> auto
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@551
PS3, Line 551: t
> 'backend'
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@134
PS3, Line 134: Couldn't send filter to aggregator
> Couldn't open a connection to the aggregator node
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@137
PS3, Line 137:   for (int i = 0; i < MAX_FSEND_RETRY; ++i) {
> (Just FYI for now) If we decide to make this run asynchronously as part of
Ok


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@138
PS3, Line 138: LOG(INFO) << "DebuggingPublishFilter trying to contact 
aggregator\n";
> This isn't needed. It might make the logs too noisy.
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@141
PS3, Line 141: LOG(INFO) << res.status.status_code;
> Not needed.
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@142
PS3, Line 142:     if (res.status.status_code == TErrorCode::OK) {
             :       break;
             :     }
> You need to check 'status' too. That is the status of the RPC from the send
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@147
PS3, Line 147: }
> Add a newline before this
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@196
PS3, Line 196: LOG(INFO) << "DebuggingPublishFilter Produced filter with filter 
id" << filter_id;
> Not needed. The logs will get too noisy.
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@203
PS3, Line 203: TODO
> super-nit: "TODO:"
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc
File be/src/service/impala-server.cc:

http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc@2221
PS3, Line 2221: s
> naming
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc@2221
PS3, Line 2221: QueryState* s =
              :       
ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(params.query_id);
> Move inside the 'if' condition.
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc@2230
PS3, Line 2230:       result.status.__set_status_code(TErrorCode::OK);
              :       result.__isset.status = true;
> Better to set this after the call to UpdateFilter
Done


http://gerrit.cloudera.org:8080/#/c/11055/3/common/thrift/ImpalaInternalService.thrift
File common/thrift/ImpalaInternalService.thrift:

http://gerrit.cloudera.org:8080/#/c/11055/3/common/thrift/ImpalaInternalService.thrift@822
PS3, Line 822:
> nit:whitespace
Done



--
To view, visit http://gerrit.cloudera.org:8080/11055
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I94e183a0353fc46f8d3eccae029d2d52c5cdc40c
Gerrit-Change-Number: 11055
Gerrit-PatchSet: 3
Gerrit-Owner: Rahul Shivu Mahadev <rahul.maha...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Gerrit-Reviewer: Michael Ho <k...@cloudera.com>
Gerrit-Reviewer: Rahul Shivu Mahadev <rahul.maha...@cloudera.com>
Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com>
Gerrit-Comment-Date: Tue, 31 Jul 2018 20:42:59 +0000
Gerrit-HasComments: Yes

Reply via email to