Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#6).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
......................................................................

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control
this feature. Given N as the number of backend executors excluding the
coordinator, the selected number of intermediate aggregators M =
min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR
<= 0 will disable the intermediate aggregator feature. In the backend
scheduler, M impalad will be selected randomly as the intermediate
aggregator for that runtime filter. Information of this M selected
impalad then passed from scheduler to coordinator as a
RuntimeFilterAggregatorInfoPB. The coordinator then converts the
RuntimeFilterAggregatorInfoPB into a filter routing information
TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which then send to the coordinator. Note that due to the random
selection of M impalad, an additional memory reservation for
'pending_remote_filter' is added for all impalad, even when only M
impalad is doing the intermediate aggregation per runtime filter.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+---------------------------+------------------------+
| MAX_NUM_FILTER_AGGREGATOR | Aggregation speed (ms) |
+---------------------------+------------------------+
|                         0 |                   1296 |
|                         1 |                   1229 |
|                         2 |                    608 |
|                         4 |                    329 |
|                         8 |                    205 |
+---------------------------+------------------------+

Testing:
- Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and
  query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/runtime-profile-counters.h
M common/protobuf/admission_control_service.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
M common/thrift/ImpalaService.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
M testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
M 
testdata/workloads/functional-query/queries/QueryTest/runtime_row_filter_reservations.test
M testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
M tests/common/impala_test_suite.py
M tests/common/test_vector.py
A tests/custom_cluster/test_runtime_filter_aggregation.py
M tests/query_test/test_runtime_filters.py
33 files changed, 1,082 insertions(+), 194 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/12/20612/6
--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 6
Gerrit-Owner: Riza Suminto <riza.sumi...@cloudera.com>
Gerrit-Reviewer: Abhishek Rawat <ara...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <csringho...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kdesc...@cloudera.com>
Gerrit-Reviewer: Michael Smith <michael.sm...@cloudera.com>
Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com>

Reply via email to