Impala Public Jenkins has submitted this change and it was merged. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
......................................................................

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
to control this feature. Given N as the number of backend executors
excluding the coordinator, the selected number of intermediate
aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
aggregator feature. In the backend scheduler, M impalad will be selected
randomly as the intermediate aggregator for that runtime filter.
Information of this M selected impalad then passed from the scheduler to
coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
converts the RuntimeFilterAggregatorInfoPB into a filter routing
information TRuntimeFilterAggDesc that is piggy-backed in
TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which is then sent to the coordinator. RuntimeFilterBank of the
intermediate aggregator will wait for all remote filter updates for at
least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and
RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be
marked as ALWAYS_TRUE and sent to the coordinator.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+---------------------+------------------------+
| num aggregator node | Aggregation speed (ms) |
+---------------------+------------------------+
|                   0 |                   1296 |
|                   1 |                   1229 |
|                   2 |                    608 |
|                   4 |                    329 |
|                   8 |                    205 |
+---------------------+------------------------+

Testing:
- Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
  test_runtime_filters.py and query-options-test.cc
- Add TestRuntimeFiltersLateRemoteUpdate.
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Reviewed-on: http://gerrit.cloudera.org:8080/20612
Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/network-util.h
M be/src/util/runtime-profile-counters.h
M common/protobuf/admission_control_service.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
M common/thrift/ImpalaService.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
M testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
M 
testdata/workloads/functional-query/queries/QueryTest/runtime_row_filter_reservations.test
M testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
M tests/common/impala_test_suite.py
M tests/common/test_vector.py
A tests/custom_cluster/test_runtime_filter_aggregation.py
M tests/query_test/test_runtime_filters.py
34 files changed, 1,205 insertions(+), 193 deletions(-)

Approvals:
  Impala Public Jenkins: Looks good to me, approved; Verified

--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: merged
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 21
Gerrit-Owner: Riza Suminto <riza.sumi...@cloudera.com>
Gerrit-Reviewer: Abhishek Rawat <ara...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <csringho...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kdesc...@cloudera.com>
Gerrit-Reviewer: Michael Smith <michael.sm...@cloudera.com>
Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com>

Reply via email to