Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
......................................................................


Patch Set 5:

(18 comments)

http://gerrit.cloudera.org:8080/#/c/20612/3//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/20612/3//COMMIT_MSG@43
PS3, Line 43: This patch currently targets the bloom filter produced by 
partitioned
> bloom filter, not boom filter
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc@488
PS3, Line 488:       const RuntimeFilterAggregatorInfoPB& agg_info =
> aggregator_idx_to_report producing an agg_idx is confusing me. There's some
This is the translation from RuntimeFilterAggregatorInfoPB 
(FragmentExecParamsPB.filter_agg_info) to TRuntimeFilterAggDesc 
(TRuntimeFilterSource.aggregator_desc).

aggregator_idx_to_report id s list belonging to RuntimeFilterAggregatorInfoPB.
agg_idx is an indices of aggregator_idx_to_report.

Rewritten this code to clarify.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@71
PS3, Line 71:   bool need_subaggregation = false;
> nit: should be need_subaggregation
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@227
PS3, Line 227:     /// Pointer to runtime filter that hold the merge result of 
all remote updates.
> nit: result
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@235
PS3, Line 235:     /// Return number of remaining filter producers, both remote 
and local.
> These should all probably have function comments.
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@196
PS3, Line 196:   {
> This feels weird to me because you've already guaranteed that it's false at
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@217
PS3, Line 217:     return;
> It'd be helpful to VLOG_RPC that the message was ignored.
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@252
PS3, Line 252:                   << " This filter is now disabled.";
> You use VLOG(3) enough it might warrant its own define in logging.h to name
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@256
PS3, Line 256:       VLOG_FILTER << "filter_id=" << params.filter_id()
> Wouldn't this + line 286 cause pending_remotes to be negative?
Changed assignment to 1.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@281
PS3, Line 281:         DCHECK(!pending_merge_filter->AlwaysFalse());
> Is there a case where this produces a filter that's always true and we woul
That will require checking the underlying buffer if all the bits have become 
all 1, and there seems no fast way to do so.
Hence, there is separate always_true() flag for this purpose, which checked in 
previous branch.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@690
PS3, Line 690:   if (query_state_->query_options().runtime_filter_wait_time_ms 
> 0) {
> Can cancelled_ be updated externally? That seems like a problem, because I
cancelled_ can turn from False to True once through CancelLocked(). Once it is 
True, it stays True.
This method returns early and stop sending the remaining incomplete filters if 
query is cancelled midway.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter.h
File be/src/runtime/runtime-filter.h:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter.h@151
PS3, Line 151:
> These would benefit from function comments.
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@288
PS3, Line 288:
> Why 2?
Typo, it is removed.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@291
PS3, Line 291:   // src_state->instance_states organize fragment instances as 
one dimension vector
> I'm not entirely clear what these pairs represent.
Replaced with typedef and add comments.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@298
PS3, Line 298:   for (int i = 0; i < src_state->instance_states.size(); ++i) {
> Please add a comment describing what this is preventing.
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/service/data-stream-service.cc@139
PS3, Line 139:     // query_id has complete their execution.
> When would it happen. Should this be a warning?
Done. Also added comment.


http://gerrit.cloudera.org:8080/#/c/20612/3/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
File fe/src/main/java/org/apache/impala/planner/PlanFragment.java:

http://gerrit.cloudera.org:8080/#/c/20612/3/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@472
PS3, Line 472:         consumedGlobalRuntimeFiltersMemReservationBytes_ += 
f.getFilterSize();
> This is probably more than needed, but seems a lot more complicated to have
Intermediate aggregator will have N + 1 filter buffer, where N is the number of 
local filter producer.

std::unique_ptr<RuntimeFilter> pending_merge_filter;  // pointer to pending 
local filter for host level aggregation.
std::unique_ptr<RuntimeFilter> pending_remote_filter;  // pointer to buffer for 
remote update aggregation.

The pending_merge_filter is just a pointer to one of local producer buffer, 
whichever is behind during local aggregation. The pending_remote_filter buffer, 
is not associated with any local producers and need to be preallocated to 
ensure remote filter update can always merge. This extra memory overhead is for 
pending_remote_filter (the 1 in N + 1 above).

For one filter id, Intermediate aggregator will merge 1 incoming filter at a 
time to its respective pending_remote_filter by first obtaining 
PerFilterState.lock. The RPC thread is the one that initiate the filter merge, 
so there can be at most 1 remote filter merge per unique filter id at a time 
and datastream_service_num_svc_threads concurrent remote filter merge on 
distinct filter ids.

It is difficult to make this more fine grained in Frontend since cluster 
membership might change in between planning and scheduling, and the random 
selection happen in backend scheduler.

I thought about exclusively using pending_merge_filter only for aggregation 
destination (thus, no extra memory is needed). But it quickly met with RPC 
deadlock because circular dependency where at least 1 local producer should 
complete and initialize pending_merge_filter, but none of them can complete 
because the RPC queue is full with UpdateFilterFromRemote payload that can not 
merge until pending_merge_filter is initialized by local producer.


http://gerrit.cloudera.org:8080/#/c/20612/3/tests/query_test/test_runtime_filters.py
File tests/query_test/test_runtime_filters.py:

http://gerrit.cloudera.org:8080/#/c/20612/3/tests/query_test/test_runtime_filters.py@60
PS3, Line 60:       extra_exec_options={
> I'm surprised there wasn't already a way to do this.
Until today, extra exec options usually specified via 
cls.ImpalaTestMatrix.add_dimension(), and then the test method need to deepcopy 
the test vector and append them as needed.

I add this change to declare them more fluently and reduce the need to deepcopy 
and append in individual test method.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 5
Gerrit-Owner: Riza Suminto <riza.sumi...@cloudera.com>
Gerrit-Reviewer: Abhishek Rawat <ara...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <csringho...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kdesc...@cloudera.com>
Gerrit-Reviewer: Michael Smith <michael.sm...@cloudera.com>
Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com>
Gerrit-Comment-Date: Wed, 25 Oct 2023 03:14:00 +0000
Gerrit-HasComments: Yes

Reply via email to