Rahul Shivu Mahadev has posted comments on this change. ( http://gerrit.cloudera.org:8080/11055 )
Change subject: IMPALA-3825: Distribute Runtime Filtering Aggregation ...................................................................... Patch Set 3: (37 comments) http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc File be/src/runtime/coordinator-backend-state.cc: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc@95 PS3, Line 95: x > naming Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc@98 PS3, Line 98: LOG(INFO) << "DebuggingPublishFilter AggregatorAddress " << tfs.aggregator_address; > remove Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator-backend-state.cc@95 PS3, Line 95: for (auto const& x : filter_routing_table) { : TFilterState tfs; : x.second.ToThrift(&tfs); : LOG(INFO) << "DebuggingPublishFilter AggregatorAddress " << tfs.aggregator_address; : rpc_params->filter_routing_table.insert( : std::pair<int32_t, TFilterState>(x.first, tfs)); : } > Add a comment about what you're doing here. Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@260 PS3, Line 260: x > naming. Call it 'filter' or something similar Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@262 PS3, Line 262: num_filters_ > Is this needed as a member variable? Doesn't seem to be used anywhere. do you think I should make it static ? and have better distribution across queries ? http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@690 PS3, Line 690: if (params.__isset.filter_updates_received) { > Add a comment above this line: Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@691 PS3, Line 691: if (backend_state->GetNumReceivedFilters() < params.filter_updates_received) { > Add another comment above this line: Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/coordinator.cc@884 PS3, Line 884: LOG(INFO) : << "DebuggingPublishFilter Coordinator sending filter to fragment with id " : << fragment_idx; > remove Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h File be/src/runtime/filter-state.h: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@141 PS3, Line 141: FilterTarget > const ref Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@150 PS3, Line 150: /// Need to cast the int type of this class to int32_t of thrift > Still needed? Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@151 PS3, Line 151: set > Why not unordered? Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@152 PS3, Line 152: int i > int32_t here and remove the cast in the next line. Also, rename to 'idx'. Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@167 PS3, Line 167: TFilterTarget > const ref Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@171 PS3, Line 171: boost > std Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@130 PS3, Line 130: bool disabled() const { : if (is_bloom_filter()) { : return bloom_filter_.always_true; : } else { : DCHECK(is_min_max_filter()); : return min_max_filter_.always_true; : } : } : : void ToThrift(TFilterState* f) const { : std::vector<TFilterTarget> t_targets; : for (FilterTarget filter_target : targets_) { : TFilterTarget thrift_filter_target; : filter_target.ToThrift(&thrift_filter_target); : t_targets.push_back(thrift_filter_target); : } : f->__set_targets(t_targets); : f->__set_desc(desc_); : f->__set_src(src_); : f->__set_pending_count(pending_count_); : /// Need to cast the int type of this class to int32_t of thrift : std::set<int32_t> src_fragment_instance_idxs; : for (int i : src_fragment_instance_idxs_) { : src_fragment_instance_idxs.insert((int32_t)i); : } : f->__set_src_fragment_instance_idxs(src_fragment_instance_idxs); : f->__set_bloom_filter(bloom_filter_); : f->__set_min_max_filter(min_max_filter_); : f->__set_first_arrival_time(first_arrival_time_); : f->__set_completion_time(completion_time_); : f->__set_aggregator_address(aggregator_address_); : } : : static FilterState FromThrift(const TFilterState& f) { : FilterState fs(f.desc, f.src, f.aggregator_address, f.pending_count, : f.first_arrival_time, f.completion_time, f.bloom_filter, f.min_max_filter); : std::vector<FilterTarget> targets; : for (TFilterTarget filter_target : f.targets) { : targets.push_back(FilterTarget::FromThrift(filter_target)); : } : fs.set_targets(targets); : boost::unordered_set<int> src_fragment_instance_idxs; : for (int idx : f.src_fragment_instance_idxs) { : src_fragment_instance_idxs.insert(idx); : } : fs.set_src_fragment_instance_idxs(src_fragment_instance_idxs); : : return fs; : } > These can be moved to the .cc file. Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@191 PS3, Line 191: TPlanNodeId src_; : std::vector<FilterTarget> targets_; > Add a comment for these both. It wasn't added to the CoordinatorFilterState These two fields were infact present in coordinator-filter-state. https://github.com/apache/impala/blob/master/be/src/runtime/coordinator-filter-state.h#L106 http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/filter-state.h@198 PS3, Line 198: boost::unordered_set > We can use the std version for this Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-exec-mgr.cc File be/src/runtime/query-exec-mgr.cc: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-exec-mgr.cc@58 PS3, Line 58: if (params.filter_routing_table.size() != 0) { > Add a comment above this: Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.h File be/src/runtime/query-state.h: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.h@181 PS3, Line 181: const std::map<int32_t, TFilterState> > Add the parameter name as well. Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.h@206 PS3, Line 206: std::map<int32_t, FilterState> filter_routing_table_; : : SpinLock filter_lock_; : : std::map<TNetworkAddress, std::set<int32_t>> backend_list_; > Add comments for these. Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc File be/src/runtime/query-state.cc: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@457 PS3, Line 457: const std::map<int32_t, TFilterState> > Take this as a const ref, so that we avoid copying the map on calling this Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@458 PS3, Line 458: std::pair<int, TFilterState> it > Using 'auto' for iterators is reasonable. Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@459 PS3, Line 459: s > Avoid using names that aren't self explanatory. Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@466 PS3, Line 466: std::map<int32_t, FilterState>::iterator > auto Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/query-state.cc@551 PS3, Line 551: t > 'backend' Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@134 PS3, Line 134: Couldn't send filter to aggregator > Couldn't open a connection to the aggregator node Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@137 PS3, Line 137: for (int i = 0; i < MAX_FSEND_RETRY; ++i) { > (Just FYI for now) If we decide to make this run asynchronously as part of Ok http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@138 PS3, Line 138: LOG(INFO) << "DebuggingPublishFilter trying to contact aggregator\n"; > This isn't needed. It might make the logs too noisy. Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@141 PS3, Line 141: LOG(INFO) << res.status.status_code; > Not needed. Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@142 PS3, Line 142: if (res.status.status_code == TErrorCode::OK) { : break; : } > You need to check 'status' too. That is the status of the RPC from the send Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@147 PS3, Line 147: } > Add a newline before this Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@196 PS3, Line 196: LOG(INFO) << "DebuggingPublishFilter Produced filter with filter id" << filter_id; > Not needed. The logs will get too noisy. Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/runtime/runtime-filter-bank.cc@203 PS3, Line 203: TODO > super-nit: "TODO:" Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc File be/src/service/impala-server.cc: http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc@2221 PS3, Line 2221: s > naming Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc@2221 PS3, Line 2221: QueryState* s = : ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(params.query_id); > Move inside the 'if' condition. Done http://gerrit.cloudera.org:8080/#/c/11055/3/be/src/service/impala-server.cc@2230 PS3, Line 2230: result.status.__set_status_code(TErrorCode::OK); : result.__isset.status = true; > Better to set this after the call to UpdateFilter Done http://gerrit.cloudera.org:8080/#/c/11055/3/common/thrift/ImpalaInternalService.thrift File common/thrift/ImpalaInternalService.thrift: http://gerrit.cloudera.org:8080/#/c/11055/3/common/thrift/ImpalaInternalService.thrift@822 PS3, Line 822: > nit:whitespace Done -- To view, visit http://gerrit.cloudera.org:8080/11055 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I94e183a0353fc46f8d3eccae029d2d52c5cdc40c Gerrit-Change-Number: 11055 Gerrit-PatchSet: 3 Gerrit-Owner: Rahul Shivu Mahadev <rahul.maha...@cloudera.com> Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Gerrit-Reviewer: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Rahul Shivu Mahadev <rahul.maha...@cloudera.com> Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com> Gerrit-Comment-Date: Tue, 31 Jul 2018 20:42:59 +0000 Gerrit-HasComments: Yes