[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has submitted this change and it was merged. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from the scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which is then sent to the coordinator. RuntimeFilterBank of the intermediate aggregator will wait for all remote filter updates for at least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be marked as ALWAYS_TRUE and sent to the coordinator. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +-++ | num aggregator node | Aggregation speed (ms) | +-++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +-++ Testing: - Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in test_runtime_filters.py and query-options-test.cc - Add TestRuntimeFiltersLateRemoteUpdate. - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Reviewed-on: http://gerrit.cloudera.org:8080/20612 Reviewed-by: Impala Public Jenkins Tested-by: Impala Public Jenkins --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/network-util.h M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 20: Verified+1 -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 20 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 20 Dec 2023 12:29:53 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 20: Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/10084/ DRY_RUN=false -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 20 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 20 Dec 2023 08:00:18 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 20: Code-Review+2 -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 20 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 20 Dec 2023 08:00:17 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 19: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14800/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 19 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 20 Dec 2023 07:58:59 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 19: Code-Review+2 (3 comments) Thank you Csaba and Michael for your review! Carry +2. http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py File tests/query_test/test_runtime_filters.py: http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py@486 PS18, Line 486: work > nit: works Done http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py@503 PS18, Line 503: > nit: +2 indentation Done http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py@517 PS18, Line 517: assert result.data[0] > is this supposed to be an assert? Yes! Thanks for catching this. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 19 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 20 Dec 2023 07:33:06 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#19). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from the scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which is then sent to the coordinator. RuntimeFilterBank of the intermediate aggregator will wait for all remote filter updates for at least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be marked as ALWAYS_TRUE and sent to the coordinator. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +-++ | num aggregator node | Aggregation speed (ms) | +-++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +-++ Testing: - Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in test_runtime_filters.py and query-options-test.cc - Add TestRuntimeFiltersLateRemoteUpdate. - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/network-util.h M be/src/util/runtime-profile-counters.h M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Csaba Ringhofer has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 18: Code-Review+2 (3 comments) just some nits for the tests, feel free to merge after resolving them http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py File tests/query_test/test_runtime_filters.py: http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py@486 PS18, Line 486: work nit: works http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py@503 PS18, Line 503: ' nit: +2 indentation http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py@517 PS18, Line 517: result.data[0] = '620' is this supposed to be an assert? -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 18 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 20 Dec 2023 07:12:34 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Michael Smith has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 18: Code-Review+1 (1 comment) http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/ImpalaService.thrift File common/thrift/ImpalaService.thrift: http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/ImpalaService.thrift@849 PS18, Line 849: // (no change). Defaults to 0.5. > This is carried from parent commit b37a35aa (IMPALA-12018). Oh right. Makes a lot more sense if I look at the whole diff. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 18 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Tue, 19 Dec 2023 22:27:48 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 18: (2 comments) Sorry, I should mention that ps18 was rebased on top of recent asf-master HEAD (commit b37a35aa, IMPALA-12018). IMPALA-12018 adds query option RUNTIME_FILTER_CARDINALITY_REDUCTION_SCALE. That is why MAX_NUM_FILTERS_AGGREGATED_PER_HOST is shifted after that in ps18. http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/ImpalaService.thrift File common/thrift/ImpalaService.thrift: http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/ImpalaService.thrift@849 PS18, Line 849: // (no change). Defaults to 0.5. > Why update this text, but not RUNTIME_FILTER_CARDINALITY_REDUCTION_SCALE or This is carried from parent commit b37a35aa (IMPALA-12018). http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/Query.thrift File common/thrift/Query.thrift: http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/Query.thrift@692 PS18, Line 692: 172: optional double runtime_filter_cardinality_reduction_scale = 1.0 > Why are we renumbering these rather than moving runtime_filter_cardinality_ This is carried from parent commit b37a35aa (IMPALA-12018). -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 18 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Tue, 19 Dec 2023 22:17:04 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Michael Smith has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 18: (2 comments) http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/ImpalaService.thrift File common/thrift/ImpalaService.thrift: http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/ImpalaService.thrift@849 PS18, Line 849: // (no change). Defaults to 0.5. Why update this text, but not RUNTIME_FILTER_CARDINALITY_REDUCTION_SCALE or MAX_NUM_FILTERS_AGGREGATED_PER_HOST? http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/Query.thrift File common/thrift/Query.thrift: http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/Query.thrift@692 PS18, Line 692: 172: optional double runtime_filter_cardinality_reduction_scale = 1.0 Why are we renumbering these rather than moving runtime_filter_cardinality_reduction_scale to the end? I don't think it matters in this case, but seems like a good habit for the times it does. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 18 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Tue, 19 Dec 2023 22:09:57 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 18: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14797/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 18 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Tue, 19 Dec 2023 21:35:23 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 18: (3 comments) http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722 PS10, Line 722: t entry_limit = query_stat > Runtime filter is most needed in big scan fragment, which most likely distr Added TestRuntimeFiltersLateRemoteUpdate in ps18 to exercise this code path. http://gerrit.cloudera.org:8080/#/c/20612/17/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/17/be/src/runtime/runtime-filter-bank.cc@753 PS17, Line 753: k> l(entr > Just found out that SetFilter can not be called multiple times. I'll submit Replaced this with DisableBloomFilter in ps18. http://gerrit.cloudera.org:8080/#/c/20612/18/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/18/be/src/runtime/runtime-filter-bank.cc@412 PS18, Line 412: CombinePeerAndLocalUpdates CombinePeerAndLocalUpdates is refactored to cover all possible states that can happen during merge of local_filter and remote_filter. Local aggregation must be complete before calling CombinePeerAndLocalUpdates. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 18 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Tue, 19 Dec 2023 21:15:43 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#18). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from the scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which is then sent to the coordinator. RuntimeFilterBank of the intermediate aggregator will wait for all remote filter updates for at least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be marked as ALWAYS_TRUE and sent to the coordinator. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +-++ | num aggregator node | Aggregation speed (ms) | +-++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +-++ Testing: - Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in test_runtime_filters.py and query-options-test.cc - Add TestRuntimeFiltersLateRemoteUpdate. - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/network-util.h M be/src/util/runtime-profile-counters.h M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 17: (1 comment) http://gerrit.cloudera.org:8080/#/c/20612/17/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/17/be/src/runtime/runtime-filter-bank.cc@753 PS17, Line 753: SetFilter Just found out that SetFilter can not be called multiple times. I'll submit another patch set to fix this logic. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 17 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Tue, 19 Dec 2023 05:36:56 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Michael Smith has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 17: Code-Review+1 -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 17 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Mon, 18 Dec 2023 20:11:10 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 17: (1 comment) http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722 PS10, Line 722: > My assumption is A. I believe it is rare but not impossible. That is why I Runtime filter is most needed in big scan fragment, which most likely distributed across all executor nodes, including nodes selected as intermediate aggregator. If an executor still has running scan, then its RuntimeFilterBank will stay open. If an executor does not have any fragment instance running anymore, then its RuntimeFilterBank may be closing. If this is true for all executors, then pending runtime filter does not matter anymore since query is completing anyway. I can think a corner case where intermediate aggregator is also working on the big scan fragment, but only scheduled to handle fewer scan ranges than other nodes. Thus, it completes faster than other executor nodes. In any case, missing filter should not impact query correctness. Just its overall performance. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 17 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Mon, 18 Dec 2023 19:22:42 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 17: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14783/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 17 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Mon, 18 Dec 2023 19:03:00 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 17: (1 comment) http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722 PS10, Line 722: > > It is closing only if query is completed, or canceled My assumption is A. I believe it is rare but not impossible. That is why I add logic to wait runtime filter arrival here. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 17 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Mon, 18 Dec 2023 18:51:24 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 17: (3 comments) http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc@241 PS16, Line 241: // but recover from it in RELEASE build by disabling filter : // (setting to ALWAYS_TRUE_FILTER). : DCHECK(false) << "Initial buffer for pending_re > Is my understanding right that we should fail the query in this case as thi Yes. Added DCHECK. http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc@291 PS16, Line 291: > nit: turned Done http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc@293 PS16, Line 293: --produced_filter.pending_remotes; > Should we also set pending_producers to 0? It would be make sense for produ Done -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 17 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Mon, 18 Dec 2023 18:30:03 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#17). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from the scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which is then sent to the coordinator. RuntimeFilterBank of the intermediate aggregator will wait for all remote filter updates for at least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be marked as ALWAYS_TRUE and sent to the coordinator. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +-++ | num aggregator node | Aggregation speed (ms) | +-++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +-++ Testing: - Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/network-util.h M be/src/util/runtime-profile-counters.h M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Csaba Ringhofer has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 16: (1 comment) http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722 PS10, Line 722: HECK_EQ(0, produced_filter > It is closing only if query is completed, or canceled It is not clear to what is the exact scenario when QueryState::Close is called. A.it is enough for all fragment instances on the given impalad to finish execution, or B. QueryState is kept alive as long as the query is running, even if this process has no tasks anymore. In case of A, the content of the filters can still matter on fragment instances running in other processes. In case of B it seems wasteful to bother with sending filters / waiting for not yet arrived remote filters. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 16 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Mon, 18 Dec 2023 18:17:15 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 16: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14782/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 16 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Mon, 18 Dec 2023 17:48:56 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Csaba Ringhofer has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 16: Code-Review+1 (4 comments) http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@139 PS10, Line 139: esc.filter_id); > I think what you mean is, it is ok to allocate later as long as the whole t Yes, this is the way I meant it, thanks http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc@241 PS16, Line 241: LOG(ERROR) << "Cannot allocate scratch bloom filter for pending_remote_filter " :<< "of filter " << params.filter_id(); : bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER; Is my understanding right that we should fail the query in this case as this means a program error? If failing the query from here is tricky, a DCHECK could be hit in DEBUG while RELEASE mode could rely on setting to ALWAYS_TRUE_FILTER. My concern with the graceful handling is that if there is an error like underestimating memory it may remain undiscovered in tests. http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc@291 PS16, Line 291: turn nit: turned http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc@293 PS16, Line 293: produced_filter.pending_remotes = 0; Should we also set pending_producers to 0? It would be make sense for produced_filter.IsComplete() to return true below. Or it can be a problem if we won't wait for local filters? Also, can you add a VLOG(2) line here to be able to trace this scenario? -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 16 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Mon, 18 Dec 2023 17:36:49 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#16). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from the scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which is then sent to the coordinator. RuntimeFilterBank of the intermediate aggregator will wait for all remote filter updates for at least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be marked as ALWAYS_TRUE and sent to the coordinator. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +-++ | num aggregator node | Aggregation speed (ms) | +-++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +-++ Testing: - Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/network-util.h M be/src/util/runtime-profile-counters.h M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 16: ps16 is a rebase. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 16 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Mon, 18 Dec 2023 16:54:32 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 15: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14712/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 15 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 13 Dec 2023 22:49:16 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Michael Smith has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 15: Code-Review+1 -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 15 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 13 Dec 2023 22:28:20 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 15: (5 comments) Thank you, Michael. http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/query-state.cc File be/src/runtime/query-state.cc: http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/query-state.cc@271 PS14, Line 271: // Making a copy of the "filepath to hosts" mapping into std library types. > This comment doesn't really explain why this is necessary. I'm not sure either. This is from added by IMPALA-12308 https://gerrit.cloudera.org/c/20548/ Not part of this patch. http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h File be/src/runtime/runtime-filter-bank.h: http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@107 PS14, Line 107: /// selected as intermediate filter aggregator to help coordinator. Besides doing > nit: remove "of", so it says "Besides doing" Done http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@108 PS14, Line 108: /// local aggregation, each intermediate aggregator will also listen and aggregate > grammar: "each intermediate aggregator will" Done http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@109 PS14, Line 109: /// filter updates from at most MAX_NUM_FILTERS_AGGREGATED_PER_HOST-1 other executors. > "filter updates from" Done http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@110 PS14, Line 110: /// Intermediate aggregator then sends the aggregated filter update to coordinator for > "then sends the" Done -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 15 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 13 Dec 2023 22:21:44 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#15). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from the scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which is then sent to the coordinator. RuntimeFilterBank of the intermediate aggregator will wait for all remote filter updates for at least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be marked as ALWAYS_TRUE and sent to the coordinator. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +-++ | num aggregator node | Aggregation speed (ms) | +-++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +-++ Testing: - Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/network-util.h M be/src/util/runtime-profile-counters.h M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Michael Smith has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 14: (5 comments) http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/query-state.cc File be/src/runtime/query-state.cc: http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/query-state.cc@271 PS14, Line 271: // Making a copy of the "filepath to hosts" mapping into std library types. This comment doesn't really explain why this is necessary. http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h File be/src/runtime/runtime-filter-bank.h: http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@107 PS14, Line 107: /// selected as intermediate filter aggregator to help coordinator. Besides of doing nit: remove "of", so it says "Besides doing" http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@108 PS14, Line 108: /// local aggregation, each intermediate aggregators will also listen and aggregate grammar: "each intermediate aggregator will" http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@109 PS14, Line 109: /// filter update from at most MAX_NUM_FILTERS_AGGREGATED_PER_HOST-1 other executors. "filter updates from" http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@110 PS14, Line 110: /// Intermediate aggregator then send the aggregated filter update to coordinator for "then sends the" -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 14 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 13 Dec 2023 21:49:58 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 14: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14709/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 14 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 13 Dec 2023 20:55:19 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 14: (2 comments) http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@139 PS10, Line 139: esc.filter_id); > If the reservation is claimed, then it is considered a fatal error if alloc I think what you mean is, it is ok to allocate later as long as the whole total_bloom_filter_mem_required_ is already claimed. Is that correct? ps14 move the initialization to UpdateFilterFromRemote(). http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722 PS10, Line 722: HECK_EQ(0, produced_filter > Few things about this: If this can be a reassurrance, note that SendIncompleteFilters is only called when RuntimeFilterBank is closing. RuntimeFilterBank lifetime is equal to query lifetime in that executor node. It is closing only if query is completed, or canceled. On both case, plan root sink is basically done, and runtime filter value does not matter anymore. Coordinator can just drop runtime filter update by then. CombinePeerAndLocalUpdates() is done here for correctness. It cleanup 'pending_merge_filter' and 'pending_remote_filter' of 'produced_filter'. This feature should be exercised in TestRuntimeFilters, TestBloomFilters, TestBloomFiltersOnParquet, and TestRuntimeRowFilters. And test_wait_time_cancellation is within TestRuntimeFilters. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 14 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 13 Dec 2023 20:31:34 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#14). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from the scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which is then sent to the coordinator. RuntimeFilterBank of the intermediate aggregator will wait for all remote filter updates for at least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be marked as ALWAYS_TRUE and sent to the coordinator. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +-++ | num aggregator node | Aggregation speed (ms) | +-++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +-++ Testing: - Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/network-util.h M be/src/util/runtime-profile-counters.h M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Csaba Ringhofer has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 13: (2 comments) http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@139 PS10, Line 139: AllocateScratchBloomFilter > Filter aggregation can happen sometime later after all fragment start. I'm If the reservation is claimed, then it is considered a fatal error if allocating memory (up to the reservation) fails. https://github.com/apache/impala/blob/aeb9a8206028b68833ce6e49421990854f0c8ba4/be/src/runtime/bufferpool/buffer-pool.h#L77 Other queries should be only able to use the memory for buffers that can be dropped (e.g. spilled) http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722 PS10, Line 722: result_filter->SetFilter(B > Agree. Done. Few things about this: - it is not clear to me whether this was a real bug, so whether not setting it could lead to dropping rows incorrectly - it would be nice to have a test for this scenario. Adding a delay/jitter debug action + low runtime_filter_wait_time_ms should be able to lead to the situation where all/some remote filters do not arrive in time. Looked around for tests like this and I couldn't find any that intentionally creates late filters. A cancellation related test seems to something like this: https://github.com/apache/impala/blob/b03e8ef95c856f499d17ea7815831e30e2e9f467/tests/query_test/test_runtime_filters.py#L110 I am ok moving test creation to a follow up Jira if it seems complicated. - do we actually need to run CombinePeerAndLocalUpdates? it seems unnecessary as the filter is all true - I don't think that we should put effort in optimizing this case, but a comment could mention that this is done for simplicity -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 13 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 13 Dec 2023 10:25:23 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 13: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14689/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 13 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 13 Dec 2023 01:54:17 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 12: Build Failed https://jenkins.impala.io/job/gerrit-code-review-checks/14688/ : Initial code review checks failed. See linked job for details on the failure. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 12 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 13 Dec 2023 01:42:49 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 13: (15 comments) http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/coordinator.cc@478 PS10, Line 478: int pending_count = > It looks a bit strange that pending count is set at line 432, then we set i Done http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/query-state.cc File be/src/runtime/query-state.cc: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/query-state.cc@438 PS10, Line 438: his filter regist > The new address compare function could be used here. Done http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h File be/src/runtime/runtime-filter-bank.h: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@100 PS10, Line 100: /// Filters are aggregated, first locally in this RuntimeFilterBank, if there are multiple > Can you mention the new aggregation mechanism in the class comment? Done http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@196 PS10, Line 196: > nit: has completed? Done http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@197 PS10, Line 197: _t, std::unique_ > nit: that has not completed? Done http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@198 PS10, Line 198: const boost::unordered_map< > Can you mention RUNTIME_FILTER_WAIT_TIME_MS handling in the commit message? Done http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@199 PS10, Line 199: ; > nit: doesn't happen? Done http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@227 PS10, Line 227: xpec > nit: holds Done http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@246 PS10, Line 246: > nit: number of? Done http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@139 PS10, Line 139: AllocateScratchBloomFilter > Would it be hard to allocate the filter only when we first need it? Filter aggregation can happen sometime later after all fragment start. I'm worried that if memory is not preallocated since start, other fragment can get greedy in scaling their memory reservation beyond initial reservation, and the remaining memory will not be sufficient to allocate an intermediary bloom filter then. This is why I preallocated them here. http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722 PS10, Line 722: result_filter->SetFilter(B > Shouldn't we mark the filter as always true? Agree. Done. http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@756 PS10, Line 756: > nit: cancelled Done http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/util/network-util.h File be/src/util/network-util.h: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/util/network-util.h@107 PS10, Line 107: KrpcAddressEqual > naming: I think that KrpcAddressEquals would be clearer Done http://gerrit.cloudera.org:8080/#/c/20612/10/common/thrift/ImpalaInternalService.thrift File common/thrift/ImpalaInternalService.thrift: http://gerrit.cloudera.org:8080/#/c/20612/10/common/thrift/ImpalaInternalService.thrift@54 PS10, Line 54: desi > nit: "that" not needed Done http://gerrit.cloudera.org:8080/#/c/20612/10/common/thrift/ImpalaInternalService.thrift@54 PS10, Line 54: gator. > nit: aggregator? Done -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 13 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 13 Dec 2023 01:17:34 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#13). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from the scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which is then sent to the coordinator. RuntimeFilterBank of the intermediate aggregator will wait for all remote filter updates for at least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be marked as ALWAYS_TRUE and sent to the coordinator. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +-++ | num aggregator node | Aggregation speed (ms) | +-++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +-++ Testing: - Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/network-util.h M be/src/util/runtime-profile-counters.h M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#12). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from the scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which is then sent to the coordinator. RuntimeFilterBank of the intermediate aggregator will wait for all remote filter updates for at least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be marked as ALWAYS_TRUE and sent to the coordinator. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +-++ | num aggregator node | Aggregation speed (ms) | +-++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +-++ Testing: - Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/network-util.h M be/src/util/runtime-profile-counters.h M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 11: ps11 is a rebase. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 11 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Tue, 12 Dec 2023 22:15:21 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Csaba Ringhofer has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 10: (16 comments) looks great, mostly nits http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/coordinator.cc@478 PS10, Line 478: f->set_pending_count(num_agg); It looks a bit strange that pending count is set at line 432, then we set it here again. Can you move that logic here, e.g. in an else block? http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/query-state.cc File be/src/runtime/query-state.cc: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/query-state.cc@438 PS10, Line 438: this_krpc_address The new address compare function could be used here. http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h File be/src/runtime/runtime-filter-bank.h: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@100 PS10, Line 100: /// Filters are aggregated, first locally in this RuntimeFilterBank, if there are multiple Can you mention the new aggregation mechanism in the class comment? http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@196 PS10, Line 196: has complete nit: has completed? http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@197 PS10, Line 197: has not complete nit: that has not completed? http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@198 PS10, Line 198: RUNTIME_FILTER_WAIT_TIME_MS Can you mention RUNTIME_FILTER_WAIT_TIME_MS handling in the commit message? http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@199 PS10, Line 199: not nit: doesn't happen? http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@227 PS10, Line 227: hold nit: holds http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@246 PS10, Line 246: number nit: number of? http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@139 PS10, Line 139: AllocateScratchBloomFilter Would it be hard to allocate the filter only when we first need it? It look strange that the filters are both initialized and allocated in ClaimBufferReservation. http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722 PS10, Line 722: CombinePeerAndLocalUpdates Shouldn't we mark the filter as always true? http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@756 PS10, Line 756: canceled nit: cancelled http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc@317 PS7, Line 317: > I think it still make sense. I see, I agree that this makes sense if there are a lot of runtime filters. Sending the runtime filters to each executor will be still a pretty large work for the coordinator though, so in the long run it would be better to move that work to the aggregating executors too. http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/util/network-util.h File be/src/util/network-util.h: http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/util/network-util.h@107 PS10, Line 107: KrpcAddressMatch naming: I think that KrpcAddressEquals would be clearer http://gerrit.cloudera.org:8080/#/c/20612/10/common/thrift/ImpalaInternalService.thrift File common/thrift/ImpalaInternalService.thrift: http://gerrit.cloudera.org:8080/#/c/20612/10/common/thrift/ImpalaInternalService.thrift@54 PS10, Line 54: aggregation nit: aggregator? http://gerrit.cloudera.org:8080/#/c/20612/10/common/thrift/ImpalaInternalService.thrift@54 PS10, Line 54: that nit: "that" not needed -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 10 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Mon, 11 Dec 2023 16:47:13 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 10: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14301/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 10 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 01 Nov 2023 21:59:06 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 9: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14300/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 9 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 01 Nov 2023 21:51:57 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Michael Smith has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 10: Code-Review+1 -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 10 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 01 Nov 2023 21:32:03 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 10: (2 comments) http://gerrit.cloudera.org:8080/#/c/20612/8/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: http://gerrit.cloudera.org:8080/#/c/20612/8/be/src/scheduling/scheduler.cc@312 PS8, Line 312: > This calculation is confusing me a little bit. I would expect it to read "t Done http://gerrit.cloudera.org:8080/#/c/20612/8/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test File testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test: http://gerrit.cloudera.org:8080/#/c/20612/8/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test@184 PS8, Line 184: # or exactly 1 if MAX_NUM_FILTERS_AGGREGATED_PER_HOST equals to num executors > The minicluster has 3 nodes, but this works because we exclude the coordina That is correct. Rephrased the comment. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 10 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 01 Nov 2023 21:31:20 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#10). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which is then sent to the coordinator. Note that due to the random selection of M impalad, an additional memory reservation for 'pending_remote_filter' is added for all impalad, even when only M impalad is doing the intermediate aggregation per runtime filter. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +-++ | num aggregator node | Aggregation speed (ms) | +-++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +-++ Testing: - Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/network-util.h M be/src/util/runtime-profile-counters.h M common/protobuf/admission_control_service.proto M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Michael Smith has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 9: Code-Review+1 -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 9 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 01 Nov 2023 21:25:48 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#9). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which is then sent to the coordinator. Note that due to the random selection of M impalad, an additional memory reservation for 'pending_remote_filter' is added for all impalad, even when only M impalad is doing the intermediate aggregation per runtime filter. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +-++ | num aggregator node | Aggregation speed (ms) | +-++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +-++ Testing: - Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/network-util.h M be/src/util/runtime-profile-counters.h M common/protobuf/admission_control_service.proto M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 9: (1 comment) http://gerrit.cloudera.org:8080/#/c/20612/9/be/src/service/query-options.h File be/src/service/query-options.h: http://gerrit.cloudera.org:8080/#/c/20612/9/be/src/service/query-options.h@314 PS9, Line 314: QUERY_OPT_FN(max_num_filters_aggregated_per_host, MAX_NUM_FILTERS_AGGREGATED_PER_HOST, \ line too long (91 > 90) -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 9 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 01 Nov 2023 21:24:03 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Michael Smith has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 8: (2 comments) http://gerrit.cloudera.org:8080/#/c/20612/8/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: http://gerrit.cloudera.org:8080/#/c/20612/8/be/src/scheduling/scheduler.cc@312 PS8, Line 312: int num_agg = (int)ceil((double)instance_groups.size() / num_filters_per_host); This calculation is confusing me a little bit. I would expect it to read "total_number_of_filters / num_filters_per_host", with a cap of instance_groups.size(). I guess it works because the number of filters is really the number of filter sources (which would be the number of instances participating in the query, e.g. instance_groups.size()). Might be worth a comment reminding readers of that. http://gerrit.cloudera.org:8080/#/c/20612/8/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test File testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test: http://gerrit.cloudera.org:8080/#/c/20612/8/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test@184 PS8, Line 184: # or exactly 1 if MAX_NUM_FILTERS_AGGREGATED_PER_HOST=2. The minicluster has 3 nodes, but this works because we exclude the coordinator. Right? -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 8 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 01 Nov 2023 20:44:57 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 8: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14299/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 8 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 01 Nov 2023 20:41:10 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 8: (3 comments) http://gerrit.cloudera.org:8080/#/c/20612/7//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/20612/7//COMMIT_MSG@20 PS7, Line 20: Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added : to control this feature. Given N as the number of backend executors : excluding the coordinator, the selected number of intermediate : aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting : MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable > MAX_NUM_FILTER_AGGREGATOR seems ok for me in the short term to test this fe Done. Changed MAX_NUM_FILTER_AGGREGATOR to MAX_NUM_FILTERS_AGGREGATED_PER_HOST. http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc@282 PS7, Line 282: typedef vector> InstanceToAggPairs; : : v > Can you move this to network-util.h? Done http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc@317 PS7, Line 317: // Put coordinator grou > Does num_agg = 1 make sense? I think it still make sense. Say a query has 50 bloom runtime filters and running in 100 executors cluster. num_agg=1 means coordinator handle only 50 finalized filter instead of 5000 filter updates. In this case, coordinator will only responsible to publish the finalized filter to all executors, while the aggregation happen in other nodes. Note that only coordinator has full KRPC addresses of cluster members via Coordinator::BackendState. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 8 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 01 Nov 2023 20:19:56 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 8: (1 comment) http://gerrit.cloudera.org:8080/#/c/20612/8/be/src/service/query-options.h File be/src/service/query-options.h: http://gerrit.cloudera.org:8080/#/c/20612/8/be/src/service/query-options.h@314 PS8, Line 314: QUERY_OPT_FN(max_num_filters_aggregated_per_host, MAX_NUM_FILTERS_AGGREGATED_PER_HOST, \ line too long (91 > 90) -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 8 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 01 Nov 2023 20:14:17 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#8). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which is then sent to the coordinator. Note that due to the random selection of M impalad, an additional memory reservation for 'pending_remote_filter' is added for all impalad, even when only M impalad is doing the intermediate aggregation per runtime filter. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +-++ | num aggregator node | Aggregation speed (ms) | +-++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +-++ Testing: - Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/network-util.h M be/src/util/runtime-profile-counters.h M common/protobuf/admission_control_service.proto M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Csaba Ringhofer has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 7: (3 comments) The implementation looks great in general, just a few comments http://gerrit.cloudera.org:8080/#/c/20612/7//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/20612/7//COMMIT_MSG@20 PS7, Line 20: Query option MAX_NUM_FILTER_AGGREGATOR is added to control : this feature. Given N as the number of backend executors excluding the : coordinator, the selected number of intermediate aggregators M = : min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR : <= 0 will disable the intermediate aggregator feature. MAX_NUM_FILTER_AGGREGATOR seems ok for me in the short term to test this feature, but I think that there should be a more sophisticated to tune this on the long term. My concern is that depending on the number of hosts used in the query, a static default for MAX_NUM_FILTER_AGGREGATOR can be easily too low or too large. I think that ideally we should have an option like MAX_NUM_FILTERS_AGGREGATED_PER_HOST, so if this value is 10, then a 20 host query will have 2 pre-aggregators, while a 100 host query will have 10. http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc@282 PS7, Line 282: inline bool KrpcAddressMatch(const NetworkAddressPB& lhs, const NetworkAddressPB& rhs) { : return lhs.hostname() == rhs.hostname() && lhs.port() == rhs.port(); : } Can you move this to network-util.h? I think that there should be a single function to compare addresses, as it is not completely self-evident (due to uds_ address) http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc@317 PS7, Line 317: if (num_agg <= 0) return; Does num_agg = 1 make sense? -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 7 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Mon, 30 Oct 2023 14:37:24 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Michael Smith has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 7: Code-Review+1 -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 7 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Fri, 27 Oct 2023 21:03:49 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 7: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14272/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 7 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Fri, 27 Oct 2023 05:32:46 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 7: patch set 7 is a rebase -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 7 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Fri, 27 Oct 2023 05:07:26 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#7). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR <= 0 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which then send to the coordinator. Note that due to the random selection of M impalad, an additional memory reservation for 'pending_remote_filter' is added for all impalad, even when only M impalad is doing the intermediate aggregation per runtime filter. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +---++ | MAX_NUM_FILTER_AGGREGATOR | Aggregation speed (ms) | +---++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +---++ Testing: - Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/runtime-profile-counters.h M common/protobuf/admission_control_service.proto M common/protobuf/data_stream_service.proto M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Michael Smith has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 6: Code-Review+1 (2 comments) http://gerrit.cloudera.org:8080/#/c/20612/5/be/src/runtime/runtime-filter-bank.h File be/src/runtime/runtime-filter-bank.h: http://gerrit.cloudera.org:8080/#/c/20612/5/be/src/runtime/runtime-filter-bank.h@243 PS5, Line 243: /// Return true if all filter updates have been received. > nit: updates have been Done http://gerrit.cloudera.org:8080/#/c/20612/5/be/src/runtime/runtime-filter-bank.h@246 PS5, Line 246: /// Return number filter updates that have been received. > nit: updates that have been Done -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 6 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Thu, 26 Oct 2023 20:22:32 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 6: (1 comment) http://gerrit.cloudera.org:8080/#/c/20612/6/tests/common/test_vector.py File tests/common/test_vector.py: http://gerrit.cloudera.org:8080/#/c/20612/6/tests/common/test_vector.py@179 PS6, Line 179: s > These changes in test_vector.py is getting complex. It is probably worth to Filed a separate patch for this at https://gerrit.cloudera.org/c/20625/ -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 6 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Thu, 26 Oct 2023 05:17:18 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 6: (1 comment) http://gerrit.cloudera.org:8080/#/c/20612/6/tests/common/test_vector.py File tests/common/test_vector.py: http://gerrit.cloudera.org:8080/#/c/20612/6/tests/common/test_vector.py@179 PS6, Line 179: s > flake8: E501 line too long (92 > 90 characters) These changes in test_vector.py is getting complex. It is probably worth to put it as a separate patch to make backport easier. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 6 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Thu, 26 Oct 2023 01:54:08 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 6: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14257/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 6 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Thu, 26 Oct 2023 01:43:26 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 6: (2 comments) http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@690 PS3, Line 690: wait_time_ms = query_state_->query_options().runtime_filter_wait_time_ms; > Still seems like that should use an atomic then. Changed to hold lock before checking cancelled_. This should be safe, because cancelled_ can only change if all locks are hold. http://gerrit.cloudera.org:8080/#/c/20612/5/tests/common/test_dimensions.py File tests/common/test_dimensions.py: http://gerrit.cloudera.org:8080/#/c/20612/5/tests/common/test_dimensions.py@261 PS5, Line 261: combinations = product(*(exec_option_dimensions[name] for name in keys)) : exec_option_dimension_values = [dict(zip(keys, prod)) for prod in combinations] : > This is probably the reason why extra exec options are always declared with Added ImpalaTestMatrix.add_exec_option_dimension() in patch set 5 for easier extra exec option declaration without losing pairwise combination. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 6 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Thu, 26 Oct 2023 01:22:24 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#6). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR <= 0 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which then send to the coordinator. Note that due to the random selection of M impalad, an additional memory reservation for 'pending_remote_filter' is added for all impalad, even when only M impalad is doing the intermediate aggregation per runtime filter. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +---++ | MAX_NUM_FILTER_AGGREGATOR | Aggregation speed (ms) | +---++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +---++ Testing: - Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/runtime-profile-counters.h M common/protobuf/admission_control_service.proto M common/protobuf/data_stream_service.proto M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 6: (1 comment) http://gerrit.cloudera.org:8080/#/c/20612/6/tests/common/test_vector.py File tests/common/test_vector.py: http://gerrit.cloudera.org:8080/#/c/20612/6/tests/common/test_vector.py@179 PS6, Line 179: s flake8: E501 line too long (92 > 90 characters) -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 6 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Thu, 26 Oct 2023 01:16:11 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Michael Smith has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 5: (6 comments) http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc@488 PS3, Line 488: const RuntimeFilterAggregatorInfoPB& agg_info = > This is the translation from RuntimeFilterAggregatorInfoPB (FragmentExecPar Oh, I think part of my confusion is that "to report" sounds like a mapping, but it's actually an action. The comment helps. http://gerrit.cloudera.org:8080/#/c/20612/5/be/src/runtime/runtime-filter-bank.h File be/src/runtime/runtime-filter-bank.h: http://gerrit.cloudera.org:8080/#/c/20612/5/be/src/runtime/runtime-filter-bank.h@243 PS5, Line 243: /// Return true if all filter updates has been received. nit: updates have been http://gerrit.cloudera.org:8080/#/c/20612/5/be/src/runtime/runtime-filter-bank.h@246 PS5, Line 246: /// Return number filter updates that has been received. nit: updates that have been http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@281 PS3, Line 281: DCHECK(!pending_merge_filter->AlwaysFalse()); > That will require checking the underlying buffer if all the bits have becom Ack http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@690 PS3, Line 690: if (query_state_->query_options().runtime_filter_wait_time_ms > 0) { > cancelled_ can turn from False to True once through CancelLocked(). Once it Still seems like that should use an atomic then. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/service/data-stream-service.cc File be/src/service/data-stream-service.cc: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/service/data-stream-service.cc@139 PS3, Line 139: // query_id has complete their execution. > Done. Also added comment. Based on the comment, it probably doesn't need to be a warning. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 5 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 25 Oct 2023 21:08:17 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 5: (1 comment) http://gerrit.cloudera.org:8080/#/c/20612/5/tests/common/test_dimensions.py File tests/common/test_dimensions.py: http://gerrit.cloudera.org:8080/#/c/20612/5/tests/common/test_dimensions.py@261 PS5, Line 261: : TODO: In the future we could generate these values using pairwise to reduce total : execution time. This is probably the reason why extra exec options are always declared with separate cls.ImpalaTestMatrix.add_dimension(). It ensure the pairwise combination is done properly. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 5 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 25 Oct 2023 16:12:38 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 5: (18 comments) http://gerrit.cloudera.org:8080/#/c/20612/3//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/20612/3//COMMIT_MSG@43 PS3, Line 43: This patch currently targets the bloom filter produced by partitioned > bloom filter, not boom filter Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc@488 PS3, Line 488: const RuntimeFilterAggregatorInfoPB& agg_info = > aggregator_idx_to_report producing an agg_idx is confusing me. There's some This is the translation from RuntimeFilterAggregatorInfoPB (FragmentExecParamsPB.filter_agg_info) to TRuntimeFilterAggDesc (TRuntimeFilterSource.aggregator_desc). aggregator_idx_to_report id s list belonging to RuntimeFilterAggregatorInfoPB. agg_idx is an indices of aggregator_idx_to_report. Rewritten this code to clarify. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h File be/src/runtime/runtime-filter-bank.h: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@71 PS3, Line 71: bool need_subaggregation = false; > nit: should be need_subaggregation Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@227 PS3, Line 227: /// Pointer to runtime filter that hold the merge result of all remote updates. > nit: result Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@235 PS3, Line 235: /// Return number of remaining filter producers, both remote and local. > These should all probably have function comments. Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@196 PS3, Line 196: { > This feels weird to me because you've already guaranteed that it's false at Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@217 PS3, Line 217: return; > It'd be helpful to VLOG_RPC that the message was ignored. Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@252 PS3, Line 252: << " This filter is now disabled."; > You use VLOG(3) enough it might warrant its own define in logging.h to name Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@256 PS3, Line 256: VLOG_FILTER << "filter_id=" << params.filter_id() > Wouldn't this + line 286 cause pending_remotes to be negative? Changed assignment to 1. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@281 PS3, Line 281: DCHECK(!pending_merge_filter->AlwaysFalse()); > Is there a case where this produces a filter that's always true and we woul That will require checking the underlying buffer if all the bits have become all 1, and there seems no fast way to do so. Hence, there is separate always_true() flag for this purpose, which checked in previous branch. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@690 PS3, Line 690: if (query_state_->query_options().runtime_filter_wait_time_ms > 0) { > Can cancelled_ be updated externally? That seems like a problem, because I cancelled_ can turn from False to True once through CancelLocked(). Once it is True, it stays True. This method returns early and stop sending the remaining incomplete filters if query is cancelled midway. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter.h File be/src/runtime/runtime-filter.h: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter.h@151 PS3, Line 151: > These would benefit from function comments. Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@288 PS3, Line 288: > Why 2? Typo, it is removed. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@291 PS3, Line 291: // src_state->instance_states organize fragment instances as one dimension vector > I'm not entirely clear what these pairs represent. Replaced with typedef and add comments. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@298 PS3, Line 298: for (int i = 0; i < src_state->instance_states.size(); ++i) { > Please add a comment describing what this is preventing. Done http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/service/data-stream-service.cc File be/src/service/data-stream-service.cc:
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 4: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14239/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 4 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Michael Smith Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Wed, 25 Oct 2023 03:18:44 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#5). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR <= 0 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which then send to the coordinator. Note that due to the random selection of M impalad, an additional memory reservation for 'pending_remote_filter' is added for all impalad, even when only M impalad is doing the intermediate aggregation per runtime filter. This patch currently targets the bloom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +---++ | MAX_NUM_FILTER_AGGREGATOR | Aggregation speed (ms) | +---++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +---++ Testing: - Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/runtime-profile-counters.h M common/protobuf/admission_control_service.proto M common/protobuf/data_stream_service.proto M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#4). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR <= 0 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which then send to the coordinator. Note that due to the random selection of M impalad, an additional memory reservation for 'pending_remote_filter' is added for all impalad, even when only M impalad is doing the intermediate aggregation per runtime filter. This patch currently targets the boom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +---++ | MAX_NUM_FILTER_AGGREGATOR | Aggregation speed (ms) | +---++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +---++ Testing: - Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/common/logging.h M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/runtime-profile-counters.h M common/protobuf/admission_control_service.proto M common/protobuf/data_stream_service.proto M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Michael Smith has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 3: (18 comments) http://gerrit.cloudera.org:8080/#/c/20612/3//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/20612/3//COMMIT_MSG@43 PS3, Line 43: This patch currently targets the boom filter produced by partitioned bloom filter, not boom filter http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc@488 PS3, Line 488: int agg_idx = agg_info.aggregator_idx_to_report(i); aggregator_idx_to_report producing an agg_idx is confusing me. There's something about the names I'm missing. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h File be/src/runtime/runtime-filter-bank.h: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@71 PS3, Line 71: bool need_subaggregegation = false; nit: should be need_subaggregation http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@227 PS3, Line 227: // Pointer to runtime filter that hold the merge resut of all remote updates. nit: result http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@235 PS3, Line 235: inline int AllRemainingProducers() { return pending_remotes + pending_producers; } These should all probably have function comments. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@196 PS3, Line 196: DCHECK_EQ(res->status().status_code(), TErrorCode::OK); This feels weird to me because you've already guaranteed that it's false at line 188. Maybe simplify with if (res->status().status_code() != TErrorCode::OK) { // ... never sat an error status DCHECK(is_remote_update); ... } http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@217 PS3, Line 217: // Late RPC might come while filter bank is closing. It'd be helpful to VLOG_RPC that the message was ignored. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@252 PS3, Line 252: VLOG(3) << "filter_id=" << params.filter_id() You use VLOG(3) enough it might warrant its own define in logging.h to name this logging scenario. VLOG_FILTER? http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@256 PS3, Line 256: produced_filter.pending_remotes = 0; Wouldn't this + line 286 cause pending_remotes to be negative? http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@281 PS3, Line 281: target->Or(params.bloom_filter(), sidecar_slice); Is there a case where this produces a filter that's always true and we would want to stop waiting for other remotes? If FalsePositiveProb == 1.0, then presumably we should just use an ALWAYS_TRUE_FILTER. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@690 PS3, Line 690: bool try_wait_aggregation = !cancelled_; Can cancelled_ be updated externally? That seems like a problem, because I don't see a lock or atomic. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter.h File be/src/runtime/runtime-filter.h: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter.h@151 PS3, Line 151: bool IsReportToPeerRpc() const { These would benefit from function comments. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc File be/src/scheduling/scheduler.cc: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@288 PS3, Line 288: // Walk the instances and pick two random krpc backend for intermediate runtime Why 2? http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@291 PS3, Line 291: vector>> instance_groups; I'm not entirely clear what these pairs represent. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@298 PS3, Line 298: if (i == 0 Please add a comment describing what this is preventing. http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/service/data-stream-service.cc File be/src/service/data-stream-service.cc: http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/service/data-stream-service.cc@139 PS3, Line 139: LOG(INFO) << err_msg; When would it happen. Should this be a warning? http://gerrit.cloudera.org:8080/#/c/20612/3/fe/src/main/java/org/apache/impala/planner/PlanFragment.java File fe/src/main/java/org/apache/impala/planner/PlanFragment.java:
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 3: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14230/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 3 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Tue, 24 Oct 2023 01:36:06 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 3: (2 comments) http://gerrit.cloudera.org:8080/#/c/20612/1/tests/custom_cluster/test_runtime_filter_aggregation.py File tests/custom_cluster/test_runtime_filter_aggregation.py: http://gerrit.cloudera.org:8080/#/c/20612/1/tests/custom_cluster/test_runtime_filter_aggregation.py@45 PS1, Line 45: > flake8: F821 undefined name 'add_exec_option_dimension' Done http://gerrit.cloudera.org:8080/#/c/20612/1/tests/custom_cluster/test_runtime_filter_aggregation.py@49 PS1, Line 49: > flake8: E231 missing whitespace after ',' Done -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 3 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Abhishek Rawat Gerrit-Reviewer: Csaba Ringhofer Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Kurt Deschler Gerrit-Reviewer: Riza Suminto Gerrit-Comment-Date: Tue, 24 Oct 2023 01:07:36 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#3). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR <= 0 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which then send to the coordinator. Note that due to the random selection of M impalad, an additional memory reservation for 'pending_remote_filter' is added for all impalad, even when only M impalad is doing the intermediate aggregation per runtime filter. This patch currently targets the boom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +---++ | MAX_NUM_FILTER_AGGREGATOR | Aggregation speed (ms) | +---++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +---++ Testing: - Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/runtime-profile-counters.h M common/protobuf/admission_control_service.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift M common/thrift/ImpalaService.thrift M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 1: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/14229/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 1 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Impala Public Jenkins Gerrit-Comment-Date: Tue, 24 Oct 2023 00:58:55 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Hello Impala Public Jenkins, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/20612 to look at the new patch set (#2). Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR <= 0 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which then send to the coordinator. Note that due to the random selection of M impalad, an additional memory reservation for 'pending_remote_filter' is added for all impalad, even when only M impalad is doing the intermediate aggregation per runtime filter. This patch currently targets the boom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. We benchmark the aggregation speed of 1 MB runtime filter aggregation on 20 executor nodes cluster with MT_DOP=36 that is instrumented to disable local aggregation, simulating 720 runtime filter updates. The speed is approximated as the duration between the earliest time a filter update is made and the time that the coordinator publishes the complete filter. The result is following: +---++ | MAX_NUM_FILTER_AGGREGATOR | Aggregation speed (ms) | +---++ | 0 | 1296 | | 1 | 1229 | | 2 |608 | | 4 |329 | | 8 |205 | +---++ Testing: - Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/runtime-profile-counters.h M common/protobuf/admission_control_service.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift M common/thrift/ImpalaService.thrift M
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Riza Suminto has uploaded this change for review. ( http://gerrit.cloudera.org:8080/20612 Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. IMPALA-3825: Delegate runtime filter aggregation to some executors IMPALA-4400 improve the runtime filter by aggregating runtime filters locally before sending filter update to the coordinator and sharing a single RuntimeFilterBank for all fragment instances in a query. However, local filter aggregation is still insufficient if the number of nodes in an impala cluster is large. For example, in a cluster of around 700 impalad backends, aggregation of 1 MB bloom filter updates in the coordinator can exceed more than 1 second. This patch aims to reduce coordinator load and speed up runtime filter aggregation by doing intermediate aggregation in a few designated impala backends before doing final aggregation and publishing in the coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control this feature. Given N as the number of backend executors excluding the coordinator, the selected number of intermediate aggregators M = min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR <= 0 will disable the intermediate aggregator feature. In the backend scheduler, M impalad will be selected randomly as the intermediate aggregator for that runtime filter. Information of this M selected impalad then passed from scheduler to coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then converts the RuntimeFilterAggregatorInfoPB into a filter routing information TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource. A new RPC endpoint named UpdateFilterFromRemote is added in data_stream_service.proto to handle filter updates from fellow impalad executor to the designated aggregator impalad. This RPC will merge filter updates into 'pending_remote_filter'. The intermediate aggregator will then combine 'pending_remote_filter' with 'pending_merge_filter' (from local aggregation) into 'result_filter' which then send to the coordinator. Note that due to the random selection of M impalad, an additional memory reservation for 'pending_remote_filter' is added for all impalad, even when only M impalad is doing the intermediate aggregation per runtime filter. This patch currently targets the boom filter produced by partitioned join build only. Another kind of runtime filter is still efficient to aggregate in coordinator only, while the bloom filter from broadcast join only requires 1 valid filter update for publishing. test_runtime_filters.py is modified to clarify the exec_options dimension, test matrix constraints, and reduce pytest.skip() calls on each test. runtime_filters.test is also changed to use counter aggregation and assert on ExecSummary table so that they stay valid irrespective of the number of fragment instances. Testing: - Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and query-options-test.cc - Add custom_cluster/test_runtime_filter_aggregation.py. - Pass exhaustive end-to-end and custom-cluster tests. Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 --- M be/src/runtime/coordinator.cc M be/src/runtime/data-stream-test.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.cc M be/src/runtime/runtime-filter.h M be/src/scheduling/scheduler.cc M be/src/scheduling/scheduler.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/query-options-test.cc M be/src/service/query-options.cc M be/src/service/query-options.h M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/runtime-profile-counters.h M common/protobuf/admission_control_service.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift M common/thrift/ImpalaService.thrift M common/thrift/Query.thrift M fe/src/main/java/org/apache/impala/planner/PlanFragment.java M fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java M testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test M testdata/workloads/functional-query/queries/QueryTest/runtime_row_filter_reservations.test M testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test M tests/common/impala_test_suite.py M tests/common/test_dimensions.py A tests/custom_cluster/test_runtime_filter_aggregation.py M tests/query_test/test_runtime_filters.py 32 files changed, 1,030 insertions(+), 188 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/12/20612/1 -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newchange Gerrit-Change-Id:
[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/20612 ) Change subject: IMPALA-3825: Delegate runtime filter aggregation to some executors .. Patch Set 1: (3 comments) http://gerrit.cloudera.org:8080/#/c/20612/1/tests/common/impala_test_suite.py File tests/common/impala_test_suite.py: http://gerrit.cloudera.org:8080/#/c/20612/1/tests/common/impala_test_suite.py@778 PS1, Line 778: n flake8: E501 line too long (95 > 90 characters) http://gerrit.cloudera.org:8080/#/c/20612/1/tests/custom_cluster/test_runtime_filter_aggregation.py File tests/custom_cluster/test_runtime_filter_aggregation.py: http://gerrit.cloudera.org:8080/#/c/20612/1/tests/custom_cluster/test_runtime_filter_aggregation.py@45 PS1, Line 45: a flake8: F821 undefined name 'add_exec_option_dimension' http://gerrit.cloudera.org:8080/#/c/20612/1/tests/custom_cluster/test_runtime_filter_aggregation.py@49 PS1, Line 49: , flake8: E231 missing whitespace after ',' -- To view, visit http://gerrit.cloudera.org:8080/20612 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0 Gerrit-Change-Number: 20612 Gerrit-PatchSet: 1 Gerrit-Owner: Riza Suminto Gerrit-Reviewer: Impala Public Jenkins Gerrit-Comment-Date: Tue, 24 Oct 2023 00:27:10 + Gerrit-HasComments: Yes