Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 )
Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors ...................................................................... Patch Set 5: (18 comments) http://gerrit.cloudera.org:8080/#/c/20612/3//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/20612/3//COMMIT_MSG@43 PS3, Line 43: This patch currently targets the bloom filter produced by partitioned > bloom filter, not boom filter Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc@488 PS3, Line 488: const RuntimeFilterAggregatorInfoPB& agg_info = > aggregator_idx_to_report producing an agg_idx is confusing me. There's some This is the translation from RuntimeFilterAggregatorInfoPB (FragmentExecParamsPB.filter_agg_info) to TRuntimeFilterAggDesc (TRuntimeFilterSource.aggregator_desc). aggregator_idx_to_report id s list belonging to RuntimeFilterAggregatorInfoPB. agg_idx is an indices of aggregator_idx_to_report. Rewritten this code to clarify. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h File be/src/runtime/runtime-filter-bank.h: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@71 PS3, Line 71: bool need_subaggregation = false; > nit: should be need_subaggregation Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@227 PS3, Line 227: /// Pointer to runtime filter that hold the merge result of all remote updates. > nit: result Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@235 PS3, Line 235: /// Return number of remaining filter producers, both remote and local. > These should all probably have function comments. Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@196 PS3, Line 196: { > This feels weird to me because you've already guaranteed that it's false at Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@217 PS3, Line 217: return; > It'd be helpful to VLOG_RPC that the message was ignored. Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@252 PS3, Line 252: << " This filter is now disabled."; > You use VLOG(3) enough it might warrant its own define in logging.h to name Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@256 PS3, Line 256: VLOG_FILTER << "filter_id=" << params.filter_id() > Wouldn't this + line 286 cause pending_remotes to be negative? Changed assignment to 1. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@281 PS3, Line 281: DCHECK(!pending_merge_filter->AlwaysFalse()); > Is there a case where this produces a filter that's always true and we woul That will require checking the underlying buffer if all the bits have become all 1, and there seems no fast way to do so. Hence, there is separate always_true() flag for this purpose, which checked in previous branch. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@690 PS3, Line 690: if (query_state_->query_options().runtime_filter_wait_time_ms > 0) { > Can cancelled_ be updated externally? That seems like a problem, because I cancelled_ can turn from False to True once through CancelLocked(). Once it is True, it stays True. This method returns early and stop sending the remaining incomplete filters if query is cancelled midway. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter.h File be/src/runtime/runtime-filter.h: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter.h@151 PS3, Line 151: > These would benefit from function comments. Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@288 PS3, Line 288: > Why 2? Typo, it is removed. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@291 PS3, Line 291: // src_state->instance_states organize fragment instances as one dimension vector > I'm not entirely clear what these pairs represent. Replaced with typedef and add comments. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@298 PS3, Line 298: for (int i = 0; i < src_state->instance_states.size(); ++i) { > Please add a comment describing what this is preventing. Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/service/data-stream-service.cc File be/src/service/data-stream-service.cc: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/service/data-stream-service.cc@139 PS3, Line 139: // query_id has complete their execution. > When would it happen. Should this be a warning? Done. Also added comment. http://gerrit.cloudera.org:8080/#/c/20612/3/fe/src/main/java/org/apache/impala/planner/PlanFragment.java File fe/src/main/java/org/apache/impala/planner/PlanFragment.java: http://gerrit.cloudera.org:8080/#/c/20612/3/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@472 PS3, Line 472: consumedGlobalRuntimeFiltersMemReservationBytes_ += f.getFilterSize(); > This is probably more than needed, but seems a lot more complicated to have Intermediate aggregator will have N + 1 filter buffer, where N is the number of local filter producer. std::unique_ptr<RuntimeFilter> pending_merge_filter; // pointer to pending local filter for host level aggregation. std::unique_ptr<RuntimeFilter> pending_remote_filter; // pointer to buffer for remote update aggregation. The pending_merge_filter is just a pointer to one of local producer buffer, whichever is behind during local aggregation. The pending_remote_filter buffer, is not associated with any local producers and need to be preallocated to ensure remote filter update can always merge. This extra memory overhead is for pending_remote_filter (the 1 in N + 1 above). For one filter id, Intermediate aggregator will merge 1 incoming filter at a time to its respective pending_remote_filter by first obtaining PerFilterState.lock. The RPC thread is the one that initiate the filter merge, so there can be at most 1 remote filter merge per unique filter id at a time and datastream_service_num_svc_threads concurrent remote filter merge on distinct filter ids. It is difficult to make this more fine grained in Frontend since cluster membership might change in between planning and scheduling, and the random selection happen in backend scheduler. I thought about exclusively using pending_merge_filter only for aggregation destination (thus, no extra memory is needed). But it quickly met with RPC deadlock because circular dependency where at least 1 local producer should complete and initialize pending_merge_filter, but none of them can complete because the RPC queue is full with UpdateFilterFromRemote payload that can not merge until pending_merge_filter is initialized by local producer. http://gerrit.cloudera.org:8080/#/c/20612/3/tests/query_test/test_runtime_filters.py File tests/query_test/test_runtime_filters.py: http://gerrit.cloudera.org:8080/#/c/20612/3/tests/query_test/test_runtime_filters.py@60 PS3, Line 60: extra_exec_options={ > I'm surprised there wasn't already a way to do this. Until today, extra exec options usually specified via cls.ImpalaTestMatrix.add_dimension(), and then the test method need to deepcopy the test vector and append them as needed. I add this change to declare them more fluently and reduce the need to deepcopy and append in individual test method. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 5 Gerrit-Owner: Riza Suminto <riza.sumi...@cloudera.com> Gerrit-Reviewer: Abhishek Rawat <ara...@cloudera.com> Gerrit-Reviewer: Csaba Ringhofer <csringho...@cloudera.com> Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Gerrit-Reviewer: Kurt Deschler <kdesc...@cloudera.com> Gerrit-Reviewer: Michael Smith <michael.sm...@cloudera.com> Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com> Gerrit-Comment-Date: Wed, 25 Oct 2023 03:14:00 +0000 Gerrit-HasComments: Yes