Riza Suminto has uploaded this change for review. ( http://gerrit.cloudera.org:8080/20612
Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors ...................................................................... IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR <= 0 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which then send to the coordinator. Note that due to the random selection of M impalad, an additional memory reservation for 'pending_remote_filter' is added for all impalad, even when only M impalad is doing the intermediate aggregation per runtime filter. This patch currently targets the boom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. Testing: - Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/runtime-profile-counters.h M common/protobuf/admission_control_service.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift M common/thrift/ImpalaService.thrift M common/thrift/Query.thrift M fe/src/main/java/org/apache/impala/planner/PlanFragment.java M fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java M testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test M testdata/workloads/functional-query/queries/QueryTest/runtime_row_filter_reservations.test M testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test M tests/common/impala_test_suite.py M tests/common/test_dimensions.py A tests/custom_cluster/test_runtime_filter_aggregation.py M tests/query_test/test_runtime_filters.py 32 files changed, 1,030 insertions(+), 188 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/12/20612/1 -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newchange Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 1 Gerrit-Owner: Riza Suminto <riza.sumi...@cloudera.com>