Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins,
I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#13). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors ...................................................................... IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from the scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which is then sent to the coordinator. RuntimeFilterBank of the intermediate aggregator will wait for all remote filter updates for at least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be marked as ALWAYS_TRUE and sent to the coordinator. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +---------------------+------------------------+ | num aggregator node | Aggregation speed (ms) | +---------------------+------------------------+ | 0 | 1296 | | 1 | 1229 | | 2 | 608 | | 4 | 329 | | 8 | 205 | +---------------------+------------------------+ Testing: - Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/network-util.h M be/src/util/runtime-profile-counters.h M common/protobuf/admission_control_service.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift M common/thrift/ImpalaService.thrift M common/thrift/Query.thrift M fe/src/main/java/org/apache/impala/planner/PlanFragment.java M fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java M testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test M testdata/workloads/functional-query/queries/QueryTest/runtime_row_filter_reservations.test M testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test M tests/common/impala_test_suite.py M tests/common/test_vector.py A tests/custom_cluster/test_runtime_filter_aggregation.py M tests/query_test/test_runtime_filters.py 34 files changed, 1,087 insertions(+), 193 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/12/20612/13 -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 13 Gerrit-Owner: Riza Suminto <riza.sumi...@cloudera.com> Gerrit-Reviewer: Abhishek Rawat <ara...@cloudera.com> Gerrit-Reviewer: Csaba Ringhofer <csringho...@cloudera.com> Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Gerrit-Reviewer: Kurt Deschler <kdesc...@cloudera.com> Gerrit-Reviewer: Michael Smith <michael.sm...@cloudera.com> Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com>