[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-20 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has submitted this change and it was merged. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
to control this feature. Given N as the number of backend executors
excluding the coordinator, the selected number of intermediate
aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
aggregator feature. In the backend scheduler, M impalad will be selected
randomly as the intermediate aggregator for that runtime filter.
Information of this M selected impalad then passed from the scheduler to
coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
converts the RuntimeFilterAggregatorInfoPB into a filter routing
information TRuntimeFilterAggDesc that is piggy-backed in
TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which is then sent to the coordinator. RuntimeFilterBank of the
intermediate aggregator will wait for all remote filter updates for at
least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and
RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be
marked as ALWAYS_TRUE and sent to the coordinator.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+-++
| num aggregator node | Aggregation speed (ms) |
+-++
|   0 |   1296 |
|   1 |   1229 |
|   2 |608 |
|   4 |329 |
|   8 |205 |
+-++

Testing:
- Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
  test_runtime_filters.py and query-options-test.cc
- Add TestRuntimeFiltersLateRemoteUpdate.
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Reviewed-on: http://gerrit.cloudera.org:8080/20612
Reviewed-by: Impala Public Jenkins 
Tested-by: Impala Public Jenkins 
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/network-util.h
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-20 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 20: Verified+1


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 20
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 20 Dec 2023 12:29:53 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-20 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 20:

Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/10084/ 
DRY_RUN=false


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 20
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 20 Dec 2023 08:00:18 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-20 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 20: Code-Review+2


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 20
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 20 Dec 2023 08:00:17 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-19 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 19:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14800/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 19
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 20 Dec 2023 07:58:59 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-19 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 19: Code-Review+2

(3 comments)

Thank you Csaba and Michael for your review!
Carry +2.

http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py
File tests/query_test/test_runtime_filters.py:

http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py@486
PS18, Line 486: work
> nit: works
Done


http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py@503
PS18, Line 503:
> nit: +2 indentation
Done


http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py@517
PS18, Line 517: assert result.data[0]
> is this supposed to be an assert?
Yes! Thanks for catching this.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 19
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 20 Dec 2023 07:33:06 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-19 Thread Riza Suminto (Code Review)
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#19).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
to control this feature. Given N as the number of backend executors
excluding the coordinator, the selected number of intermediate
aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
aggregator feature. In the backend scheduler, M impalad will be selected
randomly as the intermediate aggregator for that runtime filter.
Information of this M selected impalad then passed from the scheduler to
coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
converts the RuntimeFilterAggregatorInfoPB into a filter routing
information TRuntimeFilterAggDesc that is piggy-backed in
TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which is then sent to the coordinator. RuntimeFilterBank of the
intermediate aggregator will wait for all remote filter updates for at
least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and
RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be
marked as ALWAYS_TRUE and sent to the coordinator.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+-++
| num aggregator node | Aggregation speed (ms) |
+-++
|   0 |   1296 |
|   1 |   1229 |
|   2 |608 |
|   4 |329 |
|   8 |205 |
+-++

Testing:
- Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
  test_runtime_filters.py and query-options-test.cc
- Add TestRuntimeFiltersLateRemoteUpdate.
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/network-util.h
M be/src/util/runtime-profile-counters.h
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-19 Thread Csaba Ringhofer (Code Review)
Csaba Ringhofer has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 18: Code-Review+2

(3 comments)

just some nits for the tests, feel free to merge after resolving them

http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py
File tests/query_test/test_runtime_filters.py:

http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py@486
PS18, Line 486: work
nit: works


http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py@503
PS18, Line 503: '
nit: +2 indentation


http://gerrit.cloudera.org:8080/#/c/20612/18/tests/query_test/test_runtime_filters.py@517
PS18, Line 517: result.data[0] = '620'
is this supposed to be an assert?



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 18
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 20 Dec 2023 07:12:34 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-19 Thread Michael Smith (Code Review)
Michael Smith has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 18: Code-Review+1

(1 comment)

http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/ImpalaService.thrift
File common/thrift/ImpalaService.thrift:

http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/ImpalaService.thrift@849
PS18, Line 849:   // (no change). Defaults to 0.5.
> This is carried from parent commit b37a35aa (IMPALA-12018).
Oh right. Makes a lot more sense if I look at the whole diff.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 18
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Tue, 19 Dec 2023 22:27:48 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-19 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 18:

(2 comments)

Sorry, I should mention that ps18 was rebased on top of recent asf-master HEAD 
(commit b37a35aa, IMPALA-12018).

IMPALA-12018 adds query option RUNTIME_FILTER_CARDINALITY_REDUCTION_SCALE. That 
is why MAX_NUM_FILTERS_AGGREGATED_PER_HOST is shifted after that in ps18.

http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/ImpalaService.thrift
File common/thrift/ImpalaService.thrift:

http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/ImpalaService.thrift@849
PS18, Line 849:   // (no change). Defaults to 0.5.
> Why update this text, but not RUNTIME_FILTER_CARDINALITY_REDUCTION_SCALE or
This is carried from parent commit b37a35aa (IMPALA-12018).


http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/Query.thrift
File common/thrift/Query.thrift:

http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/Query.thrift@692
PS18, Line 692:   172: optional double 
runtime_filter_cardinality_reduction_scale = 1.0
> Why are we renumbering these rather than moving runtime_filter_cardinality_
This is carried from parent commit b37a35aa (IMPALA-12018).



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 18
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Tue, 19 Dec 2023 22:17:04 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-19 Thread Michael Smith (Code Review)
Michael Smith has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 18:

(2 comments)

http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/ImpalaService.thrift
File common/thrift/ImpalaService.thrift:

http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/ImpalaService.thrift@849
PS18, Line 849:   // (no change). Defaults to 0.5.
Why update this text, but not RUNTIME_FILTER_CARDINALITY_REDUCTION_SCALE or 
MAX_NUM_FILTERS_AGGREGATED_PER_HOST?


http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/Query.thrift
File common/thrift/Query.thrift:

http://gerrit.cloudera.org:8080/#/c/20612/18/common/thrift/Query.thrift@692
PS18, Line 692:   172: optional double 
runtime_filter_cardinality_reduction_scale = 1.0
Why are we renumbering these rather than moving 
runtime_filter_cardinality_reduction_scale to the end?

I don't think it matters in this case, but seems like a good habit for the 
times it does.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 18
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Tue, 19 Dec 2023 22:09:57 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-19 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 18:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14797/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 18
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Tue, 19 Dec 2023 21:35:23 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-19 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 18:

(3 comments)

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722
PS10, Line 722: t entry_limit = query_stat
> Runtime filter is most needed in big scan fragment, which most likely distr
Added TestRuntimeFiltersLateRemoteUpdate in ps18 to exercise this code path.


http://gerrit.cloudera.org:8080/#/c/20612/17/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/17/be/src/runtime/runtime-filter-bank.cc@753
PS17, Line 753: k> l(entr
> Just found out that SetFilter can not be called multiple times. I'll submit
Replaced this with DisableBloomFilter in ps18.


http://gerrit.cloudera.org:8080/#/c/20612/18/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/18/be/src/runtime/runtime-filter-bank.cc@412
PS18, Line 412: CombinePeerAndLocalUpdates
CombinePeerAndLocalUpdates is refactored to cover all possible states that can 
happen during merge of local_filter and remote_filter. Local aggregation must 
be complete before calling CombinePeerAndLocalUpdates.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 18
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Tue, 19 Dec 2023 21:15:43 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-19 Thread Riza Suminto (Code Review)
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#18).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
to control this feature. Given N as the number of backend executors
excluding the coordinator, the selected number of intermediate
aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
aggregator feature. In the backend scheduler, M impalad will be selected
randomly as the intermediate aggregator for that runtime filter.
Information of this M selected impalad then passed from the scheduler to
coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
converts the RuntimeFilterAggregatorInfoPB into a filter routing
information TRuntimeFilterAggDesc that is piggy-backed in
TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which is then sent to the coordinator. RuntimeFilterBank of the
intermediate aggregator will wait for all remote filter updates for at
least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and
RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be
marked as ALWAYS_TRUE and sent to the coordinator.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+-++
| num aggregator node | Aggregation speed (ms) |
+-++
|   0 |   1296 |
|   1 |   1229 |
|   2 |608 |
|   4 |329 |
|   8 |205 |
+-++

Testing:
- Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
  test_runtime_filters.py and query-options-test.cc
- Add TestRuntimeFiltersLateRemoteUpdate.
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/network-util.h
M be/src/util/runtime-profile-counters.h
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-18 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 17:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/20612/17/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/17/be/src/runtime/runtime-filter-bank.cc@753
PS17, Line 753: SetFilter
Just found out that SetFilter can not be called multiple times. I'll submit 
another patch set to fix this logic.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 17
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Tue, 19 Dec 2023 05:36:56 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-18 Thread Michael Smith (Code Review)
Michael Smith has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 17: Code-Review+1


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 17
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Mon, 18 Dec 2023 20:11:10 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-18 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 17:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722
PS10, Line 722:
> My assumption is A. I believe it is rare but not impossible. That is why I
Runtime filter is most needed in big scan fragment, which most likely 
distributed across all executor nodes, including nodes selected as intermediate 
aggregator. If an executor still has running scan, then its RuntimeFilterBank 
will stay open.

If an executor does not have any fragment instance running anymore, then its 
RuntimeFilterBank may be closing. If this is true for all executors, then 
pending runtime filter does not matter anymore since query is completing anyway.

I can think a corner case where intermediate aggregator is also working on the 
big scan fragment, but only scheduled to handle fewer scan ranges than other 
nodes. Thus, it completes faster than other executor nodes.

In any case, missing filter should not impact query correctness. Just its 
overall performance.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 17
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Mon, 18 Dec 2023 19:22:42 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-18 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 17:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14783/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 17
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Mon, 18 Dec 2023 19:03:00 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-18 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 17:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722
PS10, Line 722:
> > It is closing only if query is completed, or canceled
My assumption is A. I believe it is rare but not impossible. That is why I add 
logic to wait runtime filter arrival here.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 17
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Mon, 18 Dec 2023 18:51:24 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-18 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 17:

(3 comments)

http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc@241
PS16, Line 241: // but recover from it in RELEASE build by disabling 
filter
  : // (setting to ALWAYS_TRUE_FILTER).
  : DCHECK(false) << "Initial buffer for pending_re
> Is my understanding right that we should fail the query in this case as thi
Yes. Added DCHECK.


http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc@291
PS16, Line 291:
> nit: turned
Done


http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc@293
PS16, Line 293: --produced_filter.pending_remotes;
> Should we also set pending_producers to 0? It would be make sense for produ
Done



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 17
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Mon, 18 Dec 2023 18:30:03 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-18 Thread Riza Suminto (Code Review)
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#17).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
to control this feature. Given N as the number of backend executors
excluding the coordinator, the selected number of intermediate
aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
aggregator feature. In the backend scheduler, M impalad will be selected
randomly as the intermediate aggregator for that runtime filter.
Information of this M selected impalad then passed from the scheduler to
coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
converts the RuntimeFilterAggregatorInfoPB into a filter routing
information TRuntimeFilterAggDesc that is piggy-backed in
TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which is then sent to the coordinator. RuntimeFilterBank of the
intermediate aggregator will wait for all remote filter updates for at
least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and
RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be
marked as ALWAYS_TRUE and sent to the coordinator.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+-++
| num aggregator node | Aggregation speed (ms) |
+-++
|   0 |   1296 |
|   1 |   1229 |
|   2 |608 |
|   4 |329 |
|   8 |205 |
+-++

Testing:
- Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
  test_runtime_filters.py and query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/network-util.h
M be/src/util/runtime-profile-counters.h
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-18 Thread Csaba Ringhofer (Code Review)
Csaba Ringhofer has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 16:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722
PS10, Line 722: HECK_EQ(0, produced_filter
> It is closing only if query is completed, or canceled

It is not clear to what is the exact scenario when QueryState::Close is called.

A.it is enough for all fragment instances on the given impalad to finish 
execution, or
B. QueryState is kept alive as long as the query is running, even if this 
process has no tasks anymore.

In case of A, the content of the filters can still matter on fragment instances 
running in other processes. In case of B it seems wasteful to bother with 
sending filters  / waiting for not yet arrived remote filters.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 16
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Mon, 18 Dec 2023 18:17:15 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-18 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 16:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14782/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 16
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Mon, 18 Dec 2023 17:48:56 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-18 Thread Csaba Ringhofer (Code Review)
Csaba Ringhofer has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 16: Code-Review+1

(4 comments)

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@139
PS10, Line 139: esc.filter_id);
> I think what you mean is, it is ok to allocate later as long as the whole t
Yes, this is the way I meant it, thanks


http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc@241
PS16, Line 241: LOG(ERROR) << "Cannot allocate scratch bloom filter for 
pending_remote_filter "
  :<< "of filter " << params.filter_id();
  : bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
Is my understanding right that we should fail the query in this case as this 
means a program error?

If failing the query from here is tricky, a DCHECK could be hit in DEBUG while 
RELEASE mode could rely on setting to ALWAYS_TRUE_FILTER. My concern with the 
graceful handling is that if there is an error like underestimating memory it 
may remain undiscovered in tests.


http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc@291
PS16, Line 291: turn
nit: turned


http://gerrit.cloudera.org:8080/#/c/20612/16/be/src/runtime/runtime-filter-bank.cc@293
PS16, Line 293:   produced_filter.pending_remotes = 0;
Should we also set pending_producers to 0? It would be make sense for 
produced_filter.IsComplete() to return true below. Or it can be a problem if we 
won't wait for local filters?

Also, can you add a VLOG(2) line here to be able to trace this scenario?



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 16
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Mon, 18 Dec 2023 17:36:49 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-18 Thread Riza Suminto (Code Review)
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#16).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
to control this feature. Given N as the number of backend executors
excluding the coordinator, the selected number of intermediate
aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
aggregator feature. In the backend scheduler, M impalad will be selected
randomly as the intermediate aggregator for that runtime filter.
Information of this M selected impalad then passed from the scheduler to
coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
converts the RuntimeFilterAggregatorInfoPB into a filter routing
information TRuntimeFilterAggDesc that is piggy-backed in
TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which is then sent to the coordinator. RuntimeFilterBank of the
intermediate aggregator will wait for all remote filter updates for at
least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and
RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be
marked as ALWAYS_TRUE and sent to the coordinator.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+-++
| num aggregator node | Aggregation speed (ms) |
+-++
|   0 |   1296 |
|   1 |   1229 |
|   2 |608 |
|   4 |329 |
|   8 |205 |
+-++

Testing:
- Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
  test_runtime_filters.py and query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/network-util.h
M be/src/util/runtime-profile-counters.h
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-18 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 16:

ps16 is a rebase.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 16
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Mon, 18 Dec 2023 16:54:32 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-13 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 15:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14712/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 15
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 13 Dec 2023 22:49:16 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-13 Thread Michael Smith (Code Review)
Michael Smith has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 15: Code-Review+1


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 15
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 13 Dec 2023 22:28:20 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-13 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 15:

(5 comments)

Thank you, Michael.

http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/query-state.cc
File be/src/runtime/query-state.cc:

http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/query-state.cc@271
PS14, Line 271:   // Making a copy of the "filepath to hosts" mapping into std 
library types.
> This comment doesn't really explain why this is necessary.
I'm not sure either. This is from added by IMPALA-12308 
https://gerrit.cloudera.org/c/20548/
Not part of this patch.


http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@107
PS14, Line 107: /// selected as intermediate filter aggregator to help 
coordinator. Besides doing
> nit: remove "of", so it says "Besides doing"
Done


http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@108
PS14, Line 108: /// local aggregation, each intermediate aggregator will also 
listen and aggregate
> grammar: "each intermediate aggregator will"
Done


http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@109
PS14, Line 109: /// filter updates from at most 
MAX_NUM_FILTERS_AGGREGATED_PER_HOST-1 other executors.
> "filter updates from"
Done


http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@110
PS14, Line 110: /// Intermediate aggregator then sends the aggregated filter 
update to coordinator for
> "then sends the"
Done



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 15
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 13 Dec 2023 22:21:44 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-13 Thread Riza Suminto (Code Review)
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#15).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
to control this feature. Given N as the number of backend executors
excluding the coordinator, the selected number of intermediate
aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
aggregator feature. In the backend scheduler, M impalad will be selected
randomly as the intermediate aggregator for that runtime filter.
Information of this M selected impalad then passed from the scheduler to
coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
converts the RuntimeFilterAggregatorInfoPB into a filter routing
information TRuntimeFilterAggDesc that is piggy-backed in
TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which is then sent to the coordinator. RuntimeFilterBank of the
intermediate aggregator will wait for all remote filter updates for at
least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and
RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be
marked as ALWAYS_TRUE and sent to the coordinator.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+-++
| num aggregator node | Aggregation speed (ms) |
+-++
|   0 |   1296 |
|   1 |   1229 |
|   2 |608 |
|   4 |329 |
|   8 |205 |
+-++

Testing:
- Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
  test_runtime_filters.py and query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/network-util.h
M be/src/util/runtime-profile-counters.h
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-13 Thread Michael Smith (Code Review)
Michael Smith has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 14:

(5 comments)

http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/query-state.cc
File be/src/runtime/query-state.cc:

http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/query-state.cc@271
PS14, Line 271:   // Making a copy of the "filepath to hosts" mapping into std 
library types.
This comment doesn't really explain why this is necessary.


http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@107
PS14, Line 107: /// selected as intermediate filter aggregator to help 
coordinator. Besides of doing
nit: remove "of", so it says "Besides doing"


http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@108
PS14, Line 108: /// local aggregation, each intermediate aggregators will also 
listen and aggregate
grammar: "each intermediate aggregator will"


http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@109
PS14, Line 109: /// filter update from at most 
MAX_NUM_FILTERS_AGGREGATED_PER_HOST-1 other executors.
"filter updates from"


http://gerrit.cloudera.org:8080/#/c/20612/14/be/src/runtime/runtime-filter-bank.h@110
PS14, Line 110: /// Intermediate aggregator then send the aggregated filter 
update to coordinator for
"then sends the"



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 14
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 13 Dec 2023 21:49:58 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-13 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 14:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14709/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 14
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 13 Dec 2023 20:55:19 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-13 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 14:

(2 comments)

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@139
PS10, Line 139: esc.filter_id);
> If the reservation is claimed, then it is considered a fatal error if alloc
I think what you mean is, it is ok to allocate later as long as the whole 
total_bloom_filter_mem_required_ is already claimed. Is that correct?

ps14 move the initialization to UpdateFilterFromRemote().


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722
PS10, Line 722: HECK_EQ(0, produced_filter
> Few things about this:
If this can be a reassurrance, note that SendIncompleteFilters is only called 
when RuntimeFilterBank is closing.
RuntimeFilterBank lifetime is equal to query lifetime in that executor node. It 
is closing only if query is completed, or canceled. On both case, plan root 
sink is basically done, and runtime filter value does not matter anymore. 
Coordinator can just drop runtime filter update by then.

CombinePeerAndLocalUpdates() is done here for correctness. It cleanup 
'pending_merge_filter' and 'pending_remote_filter' of 'produced_filter'.

This feature should be exercised in TestRuntimeFilters, TestBloomFilters, 
TestBloomFiltersOnParquet, and TestRuntimeRowFilters. And 
test_wait_time_cancellation is within TestRuntimeFilters.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 14
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 13 Dec 2023 20:31:34 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-13 Thread Riza Suminto (Code Review)
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#14).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
to control this feature. Given N as the number of backend executors
excluding the coordinator, the selected number of intermediate
aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
aggregator feature. In the backend scheduler, M impalad will be selected
randomly as the intermediate aggregator for that runtime filter.
Information of this M selected impalad then passed from the scheduler to
coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
converts the RuntimeFilterAggregatorInfoPB into a filter routing
information TRuntimeFilterAggDesc that is piggy-backed in
TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which is then sent to the coordinator. RuntimeFilterBank of the
intermediate aggregator will wait for all remote filter updates for at
least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and
RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be
marked as ALWAYS_TRUE and sent to the coordinator.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+-++
| num aggregator node | Aggregation speed (ms) |
+-++
|   0 |   1296 |
|   1 |   1229 |
|   2 |608 |
|   4 |329 |
|   8 |205 |
+-++

Testing:
- Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
  test_runtime_filters.py and query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/network-util.h
M be/src/util/runtime-profile-counters.h
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-13 Thread Csaba Ringhofer (Code Review)
Csaba Ringhofer has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 13:

(2 comments)

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@139
PS10, Line 139: AllocateScratchBloomFilter
> Filter aggregation can happen sometime later after all fragment start. I'm
If the reservation is claimed, then it is considered a fatal error if 
allocating memory (up to the reservation) fails. 
https://github.com/apache/impala/blob/aeb9a8206028b68833ce6e49421990854f0c8ba4/be/src/runtime/bufferpool/buffer-pool.h#L77

Other queries should be only able to use the memory for buffers that can be 
dropped (e.g. spilled)


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722
PS10, Line 722: result_filter->SetFilter(B
> Agree. Done.
Few things about this:
- it is not clear to me whether this was a real bug, so whether not setting it 
could lead to dropping rows incorrectly

- it would be nice to have a test for this scenario. Adding a delay/jitter 
debug action + low runtime_filter_wait_time_ms should be able to lead to the 
situation where all/some remote filters do not arrive in time. Looked around 
for tests like this and I couldn't find any that intentionally creates late 
filters. A cancellation related test seems to something like this: 
https://github.com/apache/impala/blob/b03e8ef95c856f499d17ea7815831e30e2e9f467/tests/query_test/test_runtime_filters.py#L110
 I am ok moving test creation to a follow up Jira if it seems complicated.

- do we actually need to run CombinePeerAndLocalUpdates? it seems unnecessary 
as the filter is all true - I don't think that we should put effort in 
optimizing this case, but a comment could mention that this is done for 
simplicity



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 13
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 13 Dec 2023 10:25:23 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-12 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 13:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14689/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 13
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 13 Dec 2023 01:54:17 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-12 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 12:

Build Failed

https://jenkins.impala.io/job/gerrit-code-review-checks/14688/ : Initial code 
review checks failed. See linked job for details on the failure.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 12
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 13 Dec 2023 01:42:49 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-12 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 13:

(15 comments)

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/coordinator.cc@478
PS10, Line 478: int pending_count =
> It looks a bit strange that pending count is set at line 432, then we set i
Done


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/query-state.cc
File be/src/runtime/query-state.cc:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/query-state.cc@438
PS10, Line 438: his filter regist
> The new address compare function could be used here.
Done


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@100
PS10, Line 100: /// Filters are aggregated, first locally in this 
RuntimeFilterBank, if there are multiple
> Can you mention the new aggregation mechanism in the class comment?
Done


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@196
PS10, Line 196:
> nit: has completed?
Done


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@197
PS10, Line 197: _t, std::unique_
> nit: that has not completed?
Done


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@198
PS10, Line 198: const boost::unordered_map<
> Can you mention RUNTIME_FILTER_WAIT_TIME_MS handling in the commit message?
Done


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@199
PS10, Line 199: ;
> nit: doesn't happen?
Done


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@227
PS10, Line 227: xpec
> nit: holds
Done


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@246
PS10, Line 246:
> nit: number of?
Done


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@139
PS10, Line 139: AllocateScratchBloomFilter
> Would it be hard to allocate the filter only when we first need it?
Filter aggregation can happen sometime later after all fragment start. I'm 
worried that if memory is not preallocated since start, other fragment can get 
greedy in scaling their memory reservation beyond initial reservation, and the 
remaining memory will not be sufficient to allocate an intermediary bloom 
filter then.
This is why I preallocated them here.


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722
PS10, Line 722: result_filter->SetFilter(B
> Shouldn't we mark the filter as always true?
Agree. Done.


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@756
PS10, Line 756:
> nit: cancelled
Done


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/util/network-util.h
File be/src/util/network-util.h:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/util/network-util.h@107
PS10, Line 107: KrpcAddressEqual
> naming: I think that KrpcAddressEquals would be clearer
Done


http://gerrit.cloudera.org:8080/#/c/20612/10/common/thrift/ImpalaInternalService.thrift
File common/thrift/ImpalaInternalService.thrift:

http://gerrit.cloudera.org:8080/#/c/20612/10/common/thrift/ImpalaInternalService.thrift@54
PS10, Line 54: desi
> nit: "that" not needed
Done


http://gerrit.cloudera.org:8080/#/c/20612/10/common/thrift/ImpalaInternalService.thrift@54
PS10, Line 54: gator.
> nit: aggregator?
Done



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 13
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 13 Dec 2023 01:17:34 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-12 Thread Riza Suminto (Code Review)
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#13).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
to control this feature. Given N as the number of backend executors
excluding the coordinator, the selected number of intermediate
aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
aggregator feature. In the backend scheduler, M impalad will be selected
randomly as the intermediate aggregator for that runtime filter.
Information of this M selected impalad then passed from the scheduler to
coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
converts the RuntimeFilterAggregatorInfoPB into a filter routing
information TRuntimeFilterAggDesc that is piggy-backed in
TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which is then sent to the coordinator. RuntimeFilterBank of the
intermediate aggregator will wait for all remote filter updates for at
least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and
RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be
marked as ALWAYS_TRUE and sent to the coordinator.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+-++
| num aggregator node | Aggregation speed (ms) |
+-++
|   0 |   1296 |
|   1 |   1229 |
|   2 |608 |
|   4 |329 |
|   8 |205 |
+-++

Testing:
- Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
  test_runtime_filters.py and query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/network-util.h
M be/src/util/runtime-profile-counters.h
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-12 Thread Riza Suminto (Code Review)
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#12).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
to control this feature. Given N as the number of backend executors
excluding the coordinator, the selected number of intermediate
aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
aggregator feature. In the backend scheduler, M impalad will be selected
randomly as the intermediate aggregator for that runtime filter.
Information of this M selected impalad then passed from the scheduler to
coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
converts the RuntimeFilterAggregatorInfoPB into a filter routing
information TRuntimeFilterAggDesc that is piggy-backed in
TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which is then sent to the coordinator. RuntimeFilterBank of the
intermediate aggregator will wait for all remote filter updates for at
least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and
RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be
marked as ALWAYS_TRUE and sent to the coordinator.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+-++
| num aggregator node | Aggregation speed (ms) |
+-++
|   0 |   1296 |
|   1 |   1229 |
|   2 |608 |
|   4 |329 |
|   8 |205 |
+-++

Testing:
- Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
  test_runtime_filters.py and query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/network-util.h
M be/src/util/runtime-profile-counters.h
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-12 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 11:

ps11 is a rebase.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 11
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Tue, 12 Dec 2023 22:15:21 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-12-11 Thread Csaba Ringhofer (Code Review)
Csaba Ringhofer has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 10:

(16 comments)

looks great, mostly nits

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/coordinator.cc@478
PS10, Line 478: f->set_pending_count(num_agg);
It looks a bit strange that pending count is set at line 432, then we set it 
here again. Can you move that logic here, e.g. in an else block?


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/query-state.cc
File be/src/runtime/query-state.cc:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/query-state.cc@438
PS10, Line 438: this_krpc_address
The new address compare function could be used here.


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@100
PS10, Line 100: /// Filters are aggregated, first locally in this 
RuntimeFilterBank, if there are multiple
Can you mention the new aggregation mechanism in the class comment?


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@196
PS10, Line 196: has complete
nit: has completed?


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@197
PS10, Line 197: has not complete
nit: that has not completed?


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@198
PS10, Line 198: RUNTIME_FILTER_WAIT_TIME_MS
Can you mention RUNTIME_FILTER_WAIT_TIME_MS handling in the commit message?


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@199
PS10, Line 199: not
nit: doesn't happen?


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@227
PS10, Line 227: hold
nit: holds


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.h@246
PS10, Line 246:  number
nit: number of?


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@139
PS10, Line 139: AllocateScratchBloomFilter
Would it be hard to allocate the filter only when we first need it?
It look strange that the filters are both initialized and allocated in 
ClaimBufferReservation.


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@722
PS10, Line 722: CombinePeerAndLocalUpdates
Shouldn't we mark the filter as always true?


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/runtime/runtime-filter-bank.cc@756
PS10, Line 756: canceled
nit: cancelled


http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc@317
PS7, Line 317:
> I think it still make sense.
I see, I agree that this makes sense if there are a lot of runtime filters. 
Sending the runtime filters to each executor will be still a pretty large work 
for the coordinator though, so in the long run it would be better to move that 
work to the aggregating executors too.


http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/util/network-util.h
File be/src/util/network-util.h:

http://gerrit.cloudera.org:8080/#/c/20612/10/be/src/util/network-util.h@107
PS10, Line 107: KrpcAddressMatch
naming: I think that KrpcAddressEquals would be clearer


http://gerrit.cloudera.org:8080/#/c/20612/10/common/thrift/ImpalaInternalService.thrift
File common/thrift/ImpalaInternalService.thrift:

http://gerrit.cloudera.org:8080/#/c/20612/10/common/thrift/ImpalaInternalService.thrift@54
PS10, Line 54: aggregation
nit: aggregator?


http://gerrit.cloudera.org:8080/#/c/20612/10/common/thrift/ImpalaInternalService.thrift@54
PS10, Line 54: that
nit: "that" not needed



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 10
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Mon, 11 Dec 2023 16:47:13 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-11-01 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 10:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14301/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 10
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 01 Nov 2023 21:59:06 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-11-01 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 9:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14300/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 9
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 01 Nov 2023 21:51:57 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-11-01 Thread Michael Smith (Code Review)
Michael Smith has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 10: Code-Review+1


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 10
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 01 Nov 2023 21:32:03 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-11-01 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 10:

(2 comments)

http://gerrit.cloudera.org:8080/#/c/20612/8/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

http://gerrit.cloudera.org:8080/#/c/20612/8/be/src/scheduling/scheduler.cc@312
PS8, Line 312:
> This calculation is confusing me a little bit. I would expect it to read "t
Done


http://gerrit.cloudera.org:8080/#/c/20612/8/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
File testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test:

http://gerrit.cloudera.org:8080/#/c/20612/8/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test@184
PS8, Line 184: # or exactly 1 if MAX_NUM_FILTERS_AGGREGATED_PER_HOST equals to 
num executors
> The minicluster has 3 nodes, but this works because we exclude the coordina
That is correct. Rephrased the comment.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 10
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 01 Nov 2023 21:31:20 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-11-01 Thread Riza Suminto (Code Review)
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#10).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
to control this feature. Given N as the number of backend executors
excluding the coordinator, the selected number of intermediate
aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
aggregator feature. In the backend scheduler, M impalad will be selected
randomly as the intermediate aggregator for that runtime filter.
Information of this M selected impalad then passed from scheduler to
coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
converts the RuntimeFilterAggregatorInfoPB into a filter routing
information TRuntimeFilterAggDesc that is piggy-backed in
TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which is then sent to the coordinator. Note that due to the random
selection of M impalad, an additional memory reservation for
'pending_remote_filter' is added for all impalad, even when only M
impalad is doing the intermediate aggregation per runtime filter.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+-++
| num aggregator node | Aggregation speed (ms) |
+-++
|   0 |   1296 |
|   1 |   1229 |
|   2 |608 |
|   4 |329 |
|   8 |205 |
+-++

Testing:
- Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
  test_runtime_filters.py and query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/network-util.h
M be/src/util/runtime-profile-counters.h
M common/protobuf/admission_control_service.proto
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-11-01 Thread Michael Smith (Code Review)
Michael Smith has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 9: Code-Review+1


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 9
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 01 Nov 2023 21:25:48 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-11-01 Thread Riza Suminto (Code Review)
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#9).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
to control this feature. Given N as the number of backend executors
excluding the coordinator, the selected number of intermediate
aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
aggregator feature. In the backend scheduler, M impalad will be selected
randomly as the intermediate aggregator for that runtime filter.
Information of this M selected impalad then passed from scheduler to
coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
converts the RuntimeFilterAggregatorInfoPB into a filter routing
information TRuntimeFilterAggDesc that is piggy-backed in
TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which is then sent to the coordinator. Note that due to the random
selection of M impalad, an additional memory reservation for
'pending_remote_filter' is added for all impalad, even when only M
impalad is doing the intermediate aggregation per runtime filter.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+-++
| num aggregator node | Aggregation speed (ms) |
+-++
|   0 |   1296 |
|   1 |   1229 |
|   2 |608 |
|   4 |329 |
|   8 |205 |
+-++

Testing:
- Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
  test_runtime_filters.py and query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/network-util.h
M be/src/util/runtime-profile-counters.h
M common/protobuf/admission_control_service.proto
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-11-01 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 9:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/20612/9/be/src/service/query-options.h
File be/src/service/query-options.h:

http://gerrit.cloudera.org:8080/#/c/20612/9/be/src/service/query-options.h@314
PS9, Line 314:   QUERY_OPT_FN(max_num_filters_aggregated_per_host, 
MAX_NUM_FILTERS_AGGREGATED_PER_HOST,  \
line too long (91 > 90)



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 9
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 01 Nov 2023 21:24:03 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-11-01 Thread Michael Smith (Code Review)
Michael Smith has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 8:

(2 comments)

http://gerrit.cloudera.org:8080/#/c/20612/8/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

http://gerrit.cloudera.org:8080/#/c/20612/8/be/src/scheduling/scheduler.cc@312
PS8, Line 312:   int num_agg = (int)ceil((double)instance_groups.size() / 
num_filters_per_host);
This calculation is confusing me a little bit. I would expect it to read 
"total_number_of_filters / num_filters_per_host", with a cap of 
instance_groups.size(). I guess it works because the number of filters is 
really the number of filter sources (which would be the number of instances 
participating in the query, e.g. instance_groups.size()). Might be worth a 
comment reminding readers of that.


http://gerrit.cloudera.org:8080/#/c/20612/8/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
File testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test:

http://gerrit.cloudera.org:8080/#/c/20612/8/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test@184
PS8, Line 184: # or exactly 1 if MAX_NUM_FILTERS_AGGREGATED_PER_HOST=2.
The minicluster has 3 nodes, but this works because we exclude the coordinator. 
Right?



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 8
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 01 Nov 2023 20:44:57 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-11-01 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 8:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14299/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 8
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 01 Nov 2023 20:41:10 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-11-01 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 8:

(3 comments)

http://gerrit.cloudera.org:8080/#/c/20612/7//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/20612/7//COMMIT_MSG@20
PS7, Line 20: Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
: to control this feature. Given N as the number of backend 
executors
: excluding the coordinator, the selected number of intermediate
: aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). 
Setting
: MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable
> MAX_NUM_FILTER_AGGREGATOR seems ok for me in the short term to test this fe
Done. Changed MAX_NUM_FILTER_AGGREGATOR to MAX_NUM_FILTERS_AGGREGATED_PER_HOST.


http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc@282
PS7, Line 282: typedef vector> InstanceToAggPairs;
 :
 : v
> Can you move this to network-util.h?
Done


http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc@317
PS7, Line 317: // Put coordinator grou
> Does num_agg = 1 make sense?
I think it still make sense.

Say a query has 50 bloom runtime filters and running in 100 executors cluster.
num_agg=1 means coordinator handle only 50 finalized filter instead of 5000 
filter updates.
In this case, coordinator will only responsible to publish the finalized filter 
to all executors, while the aggregation happen in other nodes. Note that only 
coordinator has full KRPC addresses of cluster members via 
Coordinator::BackendState.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 8
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 01 Nov 2023 20:19:56 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-11-01 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 8:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/20612/8/be/src/service/query-options.h
File be/src/service/query-options.h:

http://gerrit.cloudera.org:8080/#/c/20612/8/be/src/service/query-options.h@314
PS8, Line 314:   QUERY_OPT_FN(max_num_filters_aggregated_per_host, 
MAX_NUM_FILTERS_AGGREGATED_PER_HOST,  \
line too long (91 > 90)



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 8
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 01 Nov 2023 20:14:17 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-11-01 Thread Riza Suminto (Code Review)
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#8).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
to control this feature. Given N as the number of backend executors
excluding the coordinator, the selected number of intermediate
aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
aggregator feature. In the backend scheduler, M impalad will be selected
randomly as the intermediate aggregator for that runtime filter.
Information of this M selected impalad then passed from scheduler to
coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
converts the RuntimeFilterAggregatorInfoPB into a filter routing
information TRuntimeFilterAggDesc that is piggy-backed in
TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which is then sent to the coordinator. Note that due to the random
selection of M impalad, an additional memory reservation for
'pending_remote_filter' is added for all impalad, even when only M
impalad is doing the intermediate aggregation per runtime filter.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+-++
| num aggregator node | Aggregation speed (ms) |
+-++
|   0 |   1296 |
|   1 |   1229 |
|   2 |608 |
|   4 |329 |
|   8 |205 |
+-++

Testing:
- Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
  test_runtime_filters.py and query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/network-util.h
M be/src/util/runtime-profile-counters.h
M common/protobuf/admission_control_service.proto
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-30 Thread Csaba Ringhofer (Code Review)
Csaba Ringhofer has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 7:

(3 comments)

The implementation looks great in general, just a few comments

http://gerrit.cloudera.org:8080/#/c/20612/7//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/20612/7//COMMIT_MSG@20
PS7, Line 20: Query option MAX_NUM_FILTER_AGGREGATOR is added to control
: this feature. Given N as the number of backend executors 
excluding the
: coordinator, the selected number of intermediate aggregators M =
: min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting 
MAX_NUM_FILTER_AGGREGATOR
: <= 0 will disable the intermediate aggregator feature.
MAX_NUM_FILTER_AGGREGATOR seems ok for me in the short term to test this 
feature, but I think that there should be a more sophisticated to tune this on 
the long term. My concern is that depending on the number of hosts used in the 
query, a static default for  MAX_NUM_FILTER_AGGREGATOR  can be easily too low 
or too large.

I think that ideally we should have an option like 
MAX_NUM_FILTERS_AGGREGATED_PER_HOST, so if this value is 10, then a 20 host 
query will have 2 pre-aggregators, while a 100 host query will have 10.


http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc@282
PS7, Line 282: inline bool KrpcAddressMatch(const NetworkAddressPB& lhs, const 
NetworkAddressPB& rhs) {
 :   return lhs.hostname() == rhs.hostname() && lhs.port() == 
rhs.port();
 : }
Can you move this to network-util.h?
I think that there should be a single function to compare addresses, as it is 
not completely self-evident (due to uds_ address)


http://gerrit.cloudera.org:8080/#/c/20612/7/be/src/scheduling/scheduler.cc@317
PS7, Line 317:   if (num_agg <= 0) return;
Does num_agg = 1 make sense?



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 7
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Mon, 30 Oct 2023 14:37:24 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-27 Thread Michael Smith (Code Review)
Michael Smith has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 7: Code-Review+1


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 7
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Fri, 27 Oct 2023 21:03:49 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-26 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 7:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14272/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 7
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Fri, 27 Oct 2023 05:32:46 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-26 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 7:

patch set 7 is a rebase


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 7
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Fri, 27 Oct 2023 05:07:26 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-26 Thread Riza Suminto (Code Review)
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#7).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control
this feature. Given N as the number of backend executors excluding the
coordinator, the selected number of intermediate aggregators M =
min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR
<= 0 will disable the intermediate aggregator feature. In the backend
scheduler, M impalad will be selected randomly as the intermediate
aggregator for that runtime filter. Information of this M selected
impalad then passed from scheduler to coordinator as a
RuntimeFilterAggregatorInfoPB. The coordinator then converts the
RuntimeFilterAggregatorInfoPB into a filter routing information
TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which then send to the coordinator. Note that due to the random
selection of M impalad, an additional memory reservation for
'pending_remote_filter' is added for all impalad, even when only M
impalad is doing the intermediate aggregation per runtime filter.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+---++
| MAX_NUM_FILTER_AGGREGATOR | Aggregation speed (ms) |
+---++
| 0 |   1296 |
| 1 |   1229 |
| 2 |608 |
| 4 |329 |
| 8 |205 |
+---++

Testing:
- Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and
  query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/runtime-profile-counters.h
M common/protobuf/admission_control_service.proto
M common/protobuf/data_stream_service.proto
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-26 Thread Michael Smith (Code Review)
Michael Smith has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 6: Code-Review+1

(2 comments)

http://gerrit.cloudera.org:8080/#/c/20612/5/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/20612/5/be/src/runtime/runtime-filter-bank.h@243
PS5, Line 243: /// Return true if all filter updates have been received.
> nit: updates have been
Done


http://gerrit.cloudera.org:8080/#/c/20612/5/be/src/runtime/runtime-filter-bank.h@246
PS5, Line 246: /// Return number filter updates that have been received.
> nit: updates that have been
Done



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 6
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Thu, 26 Oct 2023 20:22:32 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-25 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 6:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/20612/6/tests/common/test_vector.py
File tests/common/test_vector.py:

http://gerrit.cloudera.org:8080/#/c/20612/6/tests/common/test_vector.py@179
PS6, Line 179: s
> These changes in test_vector.py is getting complex. It is probably worth to
Filed a separate patch for this at https://gerrit.cloudera.org/c/20625/



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 6
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Thu, 26 Oct 2023 05:17:18 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-25 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 6:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/20612/6/tests/common/test_vector.py
File tests/common/test_vector.py:

http://gerrit.cloudera.org:8080/#/c/20612/6/tests/common/test_vector.py@179
PS6, Line 179: s
> flake8: E501 line too long (92 > 90 characters)
These changes in test_vector.py is getting complex. It is probably worth to put 
it as a separate patch to make backport easier.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 6
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Thu, 26 Oct 2023 01:54:08 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-25 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 6:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14257/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 6
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Thu, 26 Oct 2023 01:43:26 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-25 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 6:

(2 comments)

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@690
PS3, Line 690: wait_time_ms = 
query_state_->query_options().runtime_filter_wait_time_ms;
> Still seems like that should use an atomic then.
Changed to hold lock before checking cancelled_.
This should be safe, because cancelled_ can only change if all locks are hold.


http://gerrit.cloudera.org:8080/#/c/20612/5/tests/common/test_dimensions.py
File tests/common/test_dimensions.py:

http://gerrit.cloudera.org:8080/#/c/20612/5/tests/common/test_dimensions.py@261
PS5, Line 261:   combinations = product(*(exec_option_dimensions[name] for name 
in keys))
 :   exec_option_dimension_values = [dict(zip(keys, prod)) for prod 
in combinations]
 :
> This is probably the reason why extra exec options are always declared with
Added ImpalaTestMatrix.add_exec_option_dimension() in patch set 5 for easier 
extra exec option declaration without losing pairwise combination.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 6
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Thu, 26 Oct 2023 01:22:24 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-25 Thread Riza Suminto (Code Review)
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#6).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control
this feature. Given N as the number of backend executors excluding the
coordinator, the selected number of intermediate aggregators M =
min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR
<= 0 will disable the intermediate aggregator feature. In the backend
scheduler, M impalad will be selected randomly as the intermediate
aggregator for that runtime filter. Information of this M selected
impalad then passed from scheduler to coordinator as a
RuntimeFilterAggregatorInfoPB. The coordinator then converts the
RuntimeFilterAggregatorInfoPB into a filter routing information
TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which then send to the coordinator. Note that due to the random
selection of M impalad, an additional memory reservation for
'pending_remote_filter' is added for all impalad, even when only M
impalad is doing the intermediate aggregation per runtime filter.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+---++
| MAX_NUM_FILTER_AGGREGATOR | Aggregation speed (ms) |
+---++
| 0 |   1296 |
| 1 |   1229 |
| 2 |608 |
| 4 |329 |
| 8 |205 |
+---++

Testing:
- Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and
  query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/runtime-profile-counters.h
M common/protobuf/admission_control_service.proto
M common/protobuf/data_stream_service.proto
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-25 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 6:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/20612/6/tests/common/test_vector.py
File tests/common/test_vector.py:

http://gerrit.cloudera.org:8080/#/c/20612/6/tests/common/test_vector.py@179
PS6, Line 179: s
flake8: E501 line too long (92 > 90 characters)



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 6
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Thu, 26 Oct 2023 01:16:11 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-25 Thread Michael Smith (Code Review)
Michael Smith has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 5:

(6 comments)

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc@488
PS3, Line 488:   const RuntimeFilterAggregatorInfoPB& agg_info =
> This is the translation from RuntimeFilterAggregatorInfoPB (FragmentExecPar
Oh, I think part of my confusion is that "to report" sounds like a mapping, but 
it's actually an action. The comment helps.


http://gerrit.cloudera.org:8080/#/c/20612/5/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/20612/5/be/src/runtime/runtime-filter-bank.h@243
PS5, Line 243: /// Return true if all filter updates has been received.
nit: updates have been


http://gerrit.cloudera.org:8080/#/c/20612/5/be/src/runtime/runtime-filter-bank.h@246
PS5, Line 246: /// Return number filter updates that has been received.
nit: updates that have been


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@281
PS3, Line 281: DCHECK(!pending_merge_filter->AlwaysFalse());
> That will require checking the underlying buffer if all the bits have becom
Ack


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@690
PS3, Line 690:   if (query_state_->query_options().runtime_filter_wait_time_ms 
> 0) {
> cancelled_ can turn from False to True once through CancelLocked(). Once it
Still seems like that should use an atomic then.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/service/data-stream-service.cc@139
PS3, Line 139: // query_id has complete their execution.
> Done. Also added comment.
Based on the comment, it probably doesn't need to be a warning.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 5
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 25 Oct 2023 21:08:17 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-25 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 5:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/20612/5/tests/common/test_dimensions.py
File tests/common/test_dimensions.py:

http://gerrit.cloudera.org:8080/#/c/20612/5/tests/common/test_dimensions.py@261
PS5, Line 261:
 :   TODO: In the future we could generate these values using 
pairwise to reduce total
 :   execution time.
This is probably the reason why extra exec options are always declared with 
separate cls.ImpalaTestMatrix.add_dimension().
It ensure the pairwise combination is done properly.



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 5
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 25 Oct 2023 16:12:38 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-24 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 5:

(18 comments)

http://gerrit.cloudera.org:8080/#/c/20612/3//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/20612/3//COMMIT_MSG@43
PS3, Line 43: This patch currently targets the bloom filter produced by 
partitioned
> bloom filter, not boom filter
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc@488
PS3, Line 488:   const RuntimeFilterAggregatorInfoPB& agg_info =
> aggregator_idx_to_report producing an agg_idx is confusing me. There's some
This is the translation from RuntimeFilterAggregatorInfoPB 
(FragmentExecParamsPB.filter_agg_info) to TRuntimeFilterAggDesc 
(TRuntimeFilterSource.aggregator_desc).

aggregator_idx_to_report id s list belonging to RuntimeFilterAggregatorInfoPB.
agg_idx is an indices of aggregator_idx_to_report.

Rewritten this code to clarify.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@71
PS3, Line 71:   bool need_subaggregation = false;
> nit: should be need_subaggregation
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@227
PS3, Line 227: /// Pointer to runtime filter that hold the merge result of 
all remote updates.
> nit: result
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@235
PS3, Line 235: /// Return number of remaining filter producers, both remote 
and local.
> These should all probably have function comments.
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@196
PS3, Line 196:   {
> This feels weird to me because you've already guaranteed that it's false at
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@217
PS3, Line 217: return;
> It'd be helpful to VLOG_RPC that the message was ignored.
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@252
PS3, Line 252:   << " This filter is now disabled.";
> You use VLOG(3) enough it might warrant its own define in logging.h to name
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@256
PS3, Line 256:   VLOG_FILTER << "filter_id=" << params.filter_id()
> Wouldn't this + line 286 cause pending_remotes to be negative?
Changed assignment to 1.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@281
PS3, Line 281: DCHECK(!pending_merge_filter->AlwaysFalse());
> Is there a case where this produces a filter that's always true and we woul
That will require checking the underlying buffer if all the bits have become 
all 1, and there seems no fast way to do so.
Hence, there is separate always_true() flag for this purpose, which checked in 
previous branch.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@690
PS3, Line 690:   if (query_state_->query_options().runtime_filter_wait_time_ms 
> 0) {
> Can cancelled_ be updated externally? That seems like a problem, because I
cancelled_ can turn from False to True once through CancelLocked(). Once it is 
True, it stays True.
This method returns early and stop sending the remaining incomplete filters if 
query is cancelled midway.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter.h
File be/src/runtime/runtime-filter.h:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter.h@151
PS3, Line 151:
> These would benefit from function comments.
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@288
PS3, Line 288:
> Why 2?
Typo, it is removed.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@291
PS3, Line 291:   // src_state->instance_states organize fragment instances as 
one dimension vector
> I'm not entirely clear what these pairs represent.
Replaced with typedef and add comments.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@298
PS3, Line 298:   for (int i = 0; i < src_state->instance_states.size(); ++i) {
> Please add a comment describing what this is preventing.
Done


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-24 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 4:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14239/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 4
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Michael Smith 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Wed, 25 Oct 2023 03:18:44 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-24 Thread Riza Suminto (Code Review)
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#5).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control
this feature. Given N as the number of backend executors excluding the
coordinator, the selected number of intermediate aggregators M =
min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR
<= 0 will disable the intermediate aggregator feature. In the backend
scheduler, M impalad will be selected randomly as the intermediate
aggregator for that runtime filter. Information of this M selected
impalad then passed from scheduler to coordinator as a
RuntimeFilterAggregatorInfoPB. The coordinator then converts the
RuntimeFilterAggregatorInfoPB into a filter routing information
TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which then send to the coordinator. Note that due to the random
selection of M impalad, an additional memory reservation for
'pending_remote_filter' is added for all impalad, even when only M
impalad is doing the intermediate aggregation per runtime filter.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+---++
| MAX_NUM_FILTER_AGGREGATOR | Aggregation speed (ms) |
+---++
| 0 |   1296 |
| 1 |   1229 |
| 2 |608 |
| 4 |329 |
| 8 |205 |
+---++

Testing:
- Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and
  query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/runtime-profile-counters.h
M common/protobuf/admission_control_service.proto
M common/protobuf/data_stream_service.proto
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-24 Thread Riza Suminto (Code Review)
Hello Kurt Deschler, Abhishek Rawat, Csaba Ringhofer, Michael Smith, Impala 
Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#4).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control
this feature. Given N as the number of backend executors excluding the
coordinator, the selected number of intermediate aggregators M =
min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR
<= 0 will disable the intermediate aggregator feature. In the backend
scheduler, M impalad will be selected randomly as the intermediate
aggregator for that runtime filter. Information of this M selected
impalad then passed from scheduler to coordinator as a
RuntimeFilterAggregatorInfoPB. The coordinator then converts the
RuntimeFilterAggregatorInfoPB into a filter routing information
TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which then send to the coordinator. Note that due to the random
selection of M impalad, an additional memory reservation for
'pending_remote_filter' is added for all impalad, even when only M
impalad is doing the intermediate aggregation per runtime filter.

This patch currently targets the boom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+---++
| MAX_NUM_FILTER_AGGREGATOR | Aggregation speed (ms) |
+---++
| 0 |   1296 |
| 1 |   1229 |
| 2 |608 |
| 4 |329 |
| 8 |205 |
+---++

Testing:
- Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and
  query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/common/logging.h
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/runtime-profile-counters.h
M common/protobuf/admission_control_service.proto
M common/protobuf/data_stream_service.proto
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-24 Thread Michael Smith (Code Review)
Michael Smith has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 3:

(18 comments)

http://gerrit.cloudera.org:8080/#/c/20612/3//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/20612/3//COMMIT_MSG@43
PS3, Line 43: This patch currently targets the boom filter produced by 
partitioned
bloom filter, not boom filter


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/coordinator.cc@488
PS3, Line 488:   int agg_idx = agg_info.aggregator_idx_to_report(i);
aggregator_idx_to_report producing an agg_idx is confusing me. There's 
something about the names I'm missing.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h
File be/src/runtime/runtime-filter-bank.h:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@71
PS3, Line 71:   bool need_subaggregegation = false;
nit: should be need_subaggregation


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@227
PS3, Line 227: // Pointer to runtime filter that hold the merge resut of 
all remote updates.
nit: result


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.h@235
PS3, Line 235: inline int AllRemainingProducers() { return pending_remotes 
+ pending_producers; }
These should all probably have function comments.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc
File be/src/runtime/runtime-filter-bank.cc:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@196
PS3, Line 196:   DCHECK_EQ(res->status().status_code(), TErrorCode::OK);
This feels weird to me because you've already guaranteed that it's false at 
line 188. Maybe simplify with

  if (res->status().status_code() != TErrorCode::OK) {
// ... never sat an error status
DCHECK(is_remote_update);
...
  }


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@217
PS3, Line 217: // Late RPC might come while filter bank is closing.
It'd be helpful to VLOG_RPC that the message was ignored.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@252
PS3, Line 252:   VLOG(3) << "filter_id=" << params.filter_id()
You use VLOG(3) enough it might warrant its own define in logging.h to name 
this logging scenario. VLOG_FILTER?


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@256
PS3, Line 256:   produced_filter.pending_remotes = 0;
Wouldn't this + line 286 cause pending_remotes to be negative?


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@281
PS3, Line 281: target->Or(params.bloom_filter(), sidecar_slice);
Is there a case where this produces a filter that's always true and we would 
want to stop waiting for other remotes? If FalsePositiveProb == 1.0, then 
presumably we should just use an ALWAYS_TRUE_FILTER.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter-bank.cc@690
PS3, Line 690:   bool try_wait_aggregation = !cancelled_;
Can cancelled_ be updated externally? That seems like a problem, because I 
don't see a lock or atomic.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter.h
File be/src/runtime/runtime-filter.h:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/runtime/runtime-filter.h@151
PS3, Line 151:   bool IsReportToPeerRpc() const {
These would benefit from function comments.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@288
PS3, Line 288:   // Walk the instances and pick two random krpc backend for 
intermediate runtime
Why 2?


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@291
PS3, Line 291:   vector>> instance_groups;
I'm not entirely clear what these pairs represent.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/scheduling/scheduler.cc@298
PS3, Line 298:   if (i == 0
Please add a comment describing what this is preventing.


http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/service/data-stream-service.cc
File be/src/service/data-stream-service.cc:

http://gerrit.cloudera.org:8080/#/c/20612/3/be/src/service/data-stream-service.cc@139
PS3, Line 139: LOG(INFO) << err_msg;
When would it happen. Should this be a warning?


http://gerrit.cloudera.org:8080/#/c/20612/3/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
File fe/src/main/java/org/apache/impala/planner/PlanFragment.java:


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-23 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 3:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14230/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 3
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Tue, 24 Oct 2023 01:36:06 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-23 Thread Riza Suminto (Code Review)
Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 3:

(2 comments)

http://gerrit.cloudera.org:8080/#/c/20612/1/tests/custom_cluster/test_runtime_filter_aggregation.py
File tests/custom_cluster/test_runtime_filter_aggregation.py:

http://gerrit.cloudera.org:8080/#/c/20612/1/tests/custom_cluster/test_runtime_filter_aggregation.py@45
PS1, Line 45:
> flake8: F821 undefined name 'add_exec_option_dimension'
Done


http://gerrit.cloudera.org:8080/#/c/20612/1/tests/custom_cluster/test_runtime_filter_aggregation.py@49
PS1, Line 49:
> flake8: E231 missing whitespace after ','
Done



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 3
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Abhishek Rawat 
Gerrit-Reviewer: Csaba Ringhofer 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Reviewer: Kurt Deschler 
Gerrit-Reviewer: Riza Suminto 
Gerrit-Comment-Date: Tue, 24 Oct 2023 01:07:36 +
Gerrit-HasComments: Yes


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-23 Thread Riza Suminto (Code Review)
Hello Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#3).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control
this feature. Given N as the number of backend executors excluding the
coordinator, the selected number of intermediate aggregators M =
min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR
<= 0 will disable the intermediate aggregator feature. In the backend
scheduler, M impalad will be selected randomly as the intermediate
aggregator for that runtime filter. Information of this M selected
impalad then passed from scheduler to coordinator as a
RuntimeFilterAggregatorInfoPB. The coordinator then converts the
RuntimeFilterAggregatorInfoPB into a filter routing information
TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which then send to the coordinator. Note that due to the random
selection of M impalad, an additional memory reservation for
'pending_remote_filter' is added for all impalad, even when only M
impalad is doing the intermediate aggregation per runtime filter.

This patch currently targets the boom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+---++
| MAX_NUM_FILTER_AGGREGATOR | Aggregation speed (ms) |
+---++
| 0 |   1296 |
| 1 |   1229 |
| 2 |608 |
| 4 |329 |
| 8 |205 |
+---++

Testing:
- Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and
  query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/runtime-profile-counters.h
M common/protobuf/admission_control_service.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
M common/thrift/ImpalaService.thrift
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-23 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 1:

Build Successful

https://jenkins.impala.io/job/gerrit-code-review-checks/14229/ : Initial code 
review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun 
to run full precommit tests.


--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 1
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Comment-Date: Tue, 24 Oct 2023 00:58:55 +
Gerrit-HasComments: No


[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-23 Thread Riza Suminto (Code Review)
Hello Impala Public Jenkins,

I'd like you to reexamine a change. Please visit

http://gerrit.cloudera.org:8080/20612

to look at the new patch set (#2).

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control
this feature. Given N as the number of backend executors excluding the
coordinator, the selected number of intermediate aggregators M =
min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR
<= 0 will disable the intermediate aggregator feature. In the backend
scheduler, M impalad will be selected randomly as the intermediate
aggregator for that runtime filter. Information of this M selected
impalad then passed from scheduler to coordinator as a
RuntimeFilterAggregatorInfoPB. The coordinator then converts the
RuntimeFilterAggregatorInfoPB into a filter routing information
TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which then send to the coordinator. Note that due to the random
selection of M impalad, an additional memory reservation for
'pending_remote_filter' is added for all impalad, even when only M
impalad is doing the intermediate aggregation per runtime filter.

This patch currently targets the boom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+---++
| MAX_NUM_FILTER_AGGREGATOR | Aggregation speed (ms) |
+---++
| 0 |   1296 |
| 1 |   1229 |
| 2 |608 |
| 4 |329 |
| 8 |205 |
+---++

Testing:
- Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and
  query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/runtime-profile-counters.h
M common/protobuf/admission_control_service.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
M common/thrift/ImpalaService.thrift
M 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-23 Thread Riza Suminto (Code Review)
Riza Suminto has uploaded this change for review. ( 
http://gerrit.cloudera.org:8080/20612


Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTER_AGGREGATOR is added to control
this feature. Given N as the number of backend executors excluding the
coordinator, the selected number of intermediate aggregators M =
min(MAX_NUM_FILTER_AGGREGATOR, N / 2). Setting MAX_NUM_FILTER_AGGREGATOR
<= 0 will disable the intermediate aggregator feature. In the backend
scheduler, M impalad will be selected randomly as the intermediate
aggregator for that runtime filter. Information of this M selected
impalad then passed from scheduler to coordinator as a
RuntimeFilterAggregatorInfoPB. The coordinator then converts the
RuntimeFilterAggregatorInfoPB into a filter routing information
TRuntimeFilterAggDesc that is piggy-backed in TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which then send to the coordinator. Note that due to the random
selection of M impalad, an additional memory reservation for
'pending_remote_filter' is added for all impalad, even when only M
impalad is doing the intermediate aggregation per runtime filter.

This patch currently targets the boom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

Testing:
- Exercise MAX_NUM_FILTER_AGGREGATOR in test_runtime_filters.py and
  query-options-test.cc
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive end-to-end and custom-cluster tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
---
M be/src/runtime/coordinator.cc
M be/src/runtime/data-stream-test.cc
M be/src/runtime/query-state.cc
M be/src/runtime/query-state.h
M be/src/runtime/runtime-filter-bank.cc
M be/src/runtime/runtime-filter-bank.h
M be/src/runtime/runtime-filter.cc
M be/src/runtime/runtime-filter.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/data-stream-service.cc
M be/src/service/data-stream-service.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M be/src/util/bloom-filter.cc
M be/src/util/bloom-filter.h
M be/src/util/runtime-profile-counters.h
M common/protobuf/admission_control_service.proto
M common/protobuf/data_stream_service.proto
M common/thrift/ImpalaInternalService.thrift
M common/thrift/ImpalaService.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
M testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
M 
testdata/workloads/functional-query/queries/QueryTest/runtime_row_filter_reservations.test
M testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
M tests/common/impala_test_suite.py
M tests/common/test_dimensions.py
A tests/custom_cluster/test_runtime_filter_aggregation.py
M tests/query_test/test_runtime_filters.py
32 files changed, 1,030 insertions(+), 188 deletions(-)



  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/12/20612/1
--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: 

[Impala-ASF-CR] IMPALA-3825: Delegate runtime filter aggregation to some executors

2023-10-23 Thread Impala Public Jenkins (Code Review)
Impala Public Jenkins has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/20612 )

Change subject: IMPALA-3825: Delegate runtime filter aggregation to some 
executors
..


Patch Set 1:

(3 comments)

http://gerrit.cloudera.org:8080/#/c/20612/1/tests/common/impala_test_suite.py
File tests/common/impala_test_suite.py:

http://gerrit.cloudera.org:8080/#/c/20612/1/tests/common/impala_test_suite.py@778
PS1, Line 778: n
flake8: E501 line too long (95 > 90 characters)


http://gerrit.cloudera.org:8080/#/c/20612/1/tests/custom_cluster/test_runtime_filter_aggregation.py
File tests/custom_cluster/test_runtime_filter_aggregation.py:

http://gerrit.cloudera.org:8080/#/c/20612/1/tests/custom_cluster/test_runtime_filter_aggregation.py@45
PS1, Line 45: a
flake8: F821 undefined name 'add_exec_option_dimension'


http://gerrit.cloudera.org:8080/#/c/20612/1/tests/custom_cluster/test_runtime_filter_aggregation.py@49
PS1, Line 49: ,
flake8: E231 missing whitespace after ','



--
To view, visit http://gerrit.cloudera.org:8080/20612
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Gerrit-Change-Number: 20612
Gerrit-PatchSet: 1
Gerrit-Owner: Riza Suminto 
Gerrit-Reviewer: Impala Public Jenkins 
Gerrit-Comment-Date: Tue, 24 Oct 2023 00:27:10 +
Gerrit-HasComments: Yes