[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Tim Armstrong has submitted this change and it was merged. ( http://gerrit.cloudera.org:8080/14974 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Due to the incorporation of KRPC sidecar, a SpinLock is also added to prevent a BloomFilter from being deallocated before its associated KRPC call finishes. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly because of the changes to the signatures of some functions in BloomFilter. Testing: This patch has passed the exhaustive tests. Change-Id: I11a2f92a91750c2470fba082c30f97529524b9c8 Reviewed-on: http://gerrit.cloudera.org:8080/13882 Reviewed-by: Impala Public Jenkins Tested-by: Impala Public Jenkins Reviewed-on: http://gerrit.cloudera.org:8080/14974 Reviewed-by: Tim Armstrong Tested-by: Tim Armstrong --- M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/runtime/backend-client.h M be/src/runtime/client-cache.cc M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-test.cc M be/src/runtime/decimal-value.h M be/src/runtime/decimal-value.inline.h M be/src/runtime/exec-env.cc M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.h M be/src/runtime/timestamp-value.h M be/src/scheduling/request-pool-service.h M be/src/service/client-request-state.cc M be/src/service/client-request-state.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/frontend.h M be/src/service/impala-internal-service.cc M be/src/service/impala-internal-service.h M be/src/service/impala-server.cc M be/src/service/impala-server.h M be/src/util/bloom-filter-test.cc M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/min-max-filter-test.cc M be/src/util/min-max-filter.cc M be/src/util/min-max-filter.h M common/protobuf/common.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift 39 files changed, 1,094 insertions(+), 757 deletions(-) Approvals: Tim Armstrong: Looks good to me, approved; Verified -- To view, visit http://gerrit.cloudera.org:8080/14974 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: merged Gerrit-Change-Id: I11a2f92a91750c2470fba082c30f97529524b9c8 Gerrit-Change-Number: 14974 Gerrit-PatchSet: 7 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Bikramjeet Vig Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Reviewer: Tim Armstrong
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Tim Armstrong has posted comments on this change. ( http://gerrit.cloudera.org:8080/14974 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 6: Verified+1 -- To view, visit http://gerrit.cloudera.org:8080/14974 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11a2f92a91750c2470fba082c30f97529524b9c8 Gerrit-Change-Number: 14974 Gerrit-PatchSet: 6 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Bikramjeet Vig Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Reviewer: Tim Armstrong Gerrit-Comment-Date: Mon, 20 Jan 2020 18:23:34 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Tim Armstrong has posted comments on this change. ( http://gerrit.cloudera.org:8080/14974 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 6: Code-Review+2 -- To view, visit http://gerrit.cloudera.org:8080/14974 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11a2f92a91750c2470fba082c30f97529524b9c8 Gerrit-Change-Number: 14974 Gerrit-PatchSet: 6 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Bikramjeet Vig Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Reviewer: Tim Armstrong Gerrit-Comment-Date: Mon, 20 Jan 2020 18:22:48 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/14974 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 1: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/5362/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/14974 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I11a2f92a91750c2470fba082c30f97529524b9c8 Gerrit-Change-Number: 14974 Gerrit-PatchSet: 1 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Bikramjeet Vig Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Reviewer: Tim Armstrong Gerrit-Comment-Date: Fri, 03 Jan 2020 17:10:20 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Hello Impala Public Jenkins, I'd like you to do a code review. Please visit http://gerrit.cloudera.org:8080/14974 to review the following change. Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Due to the incorporation of KRPC sidecar, a SpinLock is also added to prevent a BloomFilter from being deallocated before its associated KRPC call finishes. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly because of the changes to the signatures of some functions in BloomFilter. Testing: This patch has passed the exhaustive tests. Change-Id: I11a2f92a91750c2470fba082c30f97529524b9c8 Reviewed-on: http://gerrit.cloudera.org:8080/13882 Reviewed-by: Impala Public Jenkins Tested-by: Impala Public Jenkins --- M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/runtime/backend-client.h M be/src/runtime/client-cache.cc M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-test.cc M be/src/runtime/decimal-value.h M be/src/runtime/decimal-value.inline.h M be/src/runtime/exec-env.cc M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.h M be/src/runtime/timestamp-value.h M be/src/scheduling/request-pool-service.h M be/src/service/client-request-state.cc M be/src/service/client-request-state.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/frontend.h M be/src/service/impala-internal-service.cc M be/src/service/impala-internal-service.h M be/src/service/impala-server.cc M be/src/service/impala-server.h M be/src/util/bloom-filter-test.cc M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/min-max-filter-test.cc M be/src/util/min-max-filter.cc M be/src/util/min-max-filter.h M common/protobuf/common.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift 39 files changed, 1,094 insertions(+), 757 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/74/14974/1 -- To view, visit http://gerrit.cloudera.org:8080/14974 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newchange Gerrit-Change-Id: I11a2f92a91750c2470fba082c30f97529524b9c8 Gerrit-Change-Number: 14974 Gerrit-PatchSet: 1 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Bikramjeet Vig Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Reviewer: Tim Armstrong
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 31: Verified+1 -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 31 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Sat, 09 Nov 2019 01:54:50 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has submitted this change and it was merged. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Due to the incorporation of KRPC sidecar, a SpinLock is also added to prevent a BloomFilter from being deallocated before its associated KRPC call finishes. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly because of the changes to the signatures of some functions in BloomFilter. Testing: This patch has passed the exhaustive tests. Change-Id: I6b394796d250286510e157ae326882bfc01d387a Reviewed-on: http://gerrit.cloudera.org:8080/13882 Reviewed-by: Impala Public Jenkins Tested-by: Impala Public Jenkins --- M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/runtime/backend-client.h M be/src/runtime/client-cache.cc M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-test.cc M be/src/runtime/decimal-value.h M be/src/runtime/decimal-value.inline.h M be/src/runtime/exec-env.cc M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.h M be/src/runtime/timestamp-value.h M be/src/scheduling/request-pool-service.h M be/src/service/client-request-state.cc M be/src/service/client-request-state.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/frontend.h M be/src/service/impala-internal-service.cc M be/src/service/impala-internal-service.h M be/src/service/impala-server.cc M be/src/service/impala-server.h M be/src/util/bloom-filter-test.cc M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/min-max-filter-test.cc M be/src/util/min-max-filter.cc M be/src/util/min-max-filter.h M common/protobuf/common.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift 39 files changed, 1,094 insertions(+), 757 deletions(-) Approvals: Impala Public Jenkins: Looks good to me, approved; Verified -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: merged Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 32 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 31: Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/5199/ DRY_RUN=false -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 31 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Fri, 08 Nov 2019 21:32:08 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 31: Code-Review+2 -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 31 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Fri, 08 Nov 2019 21:32:07 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 30: Verified-1 Build failed: https://jenkins.impala.io/job/gerrit-verify-dryrun/5194/ -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 30 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Fri, 08 Nov 2019 04:42:48 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 30: Code-Review+2 -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 30 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Fri, 08 Nov 2019 00:12:41 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 30: Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/5194/ DRY_RUN=false -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 30 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Fri, 08 Nov 2019 00:12:42 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 29: Code-Review+2 (1 comment) http://gerrit.cloudera.org:8080/#/c/13882/29/be/src/util/bloom-filter.h File be/src/util/bloom-filter.h: http://gerrit.cloudera.org:8080/#/c/13882/29/be/src/util/bloom-filter.h@162 PS29, Line 162: static void AddDirectorySidecar(BloomFilterPB* rpc_params, : kudu::rpc::RpcController* controller, const char* directory, : unsigned long directory_size); : static void AddDirectorySidecar(BloomFilterPB* rpc_params, : kudu::rpc::RpcController* controller, const string& directory); > I don't think its necessary to have two versions of this function. You shou We talked offline and I was wrong -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 29 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Fri, 08 Nov 2019 00:11:50 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 29: (1 comment) http://gerrit.cloudera.org:8080/#/c/13882/29/be/src/util/bloom-filter.h File be/src/util/bloom-filter.h: http://gerrit.cloudera.org:8080/#/c/13882/29/be/src/util/bloom-filter.h@162 PS29, Line 162: static void AddDirectorySidecar(BloomFilterPB* rpc_params, : kudu::rpc::RpcController* controller, const char* directory, : unsigned long directory_size); : static void AddDirectorySidecar(BloomFilterPB* rpc_params, : kudu::rpc::RpcController* controller, const string& directory); I don't think its necessary to have two versions of this function. You should be able to eliminate the 'const char*' one and just have the 'const string& one' -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 29 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Thu, 07 Nov 2019 20:10:25 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 29: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/4960/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 29 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Wed, 06 Nov 2019 07:13:31 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has uploaded a new patch set (#29). ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Due to the incorporation of KRPC sidecar, a SpinLock is also added to prevent a BloomFilter from being deallocated before its associated KRPC call finishes. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly because of the changes to the signatures of some functions in BloomFilter. Testing: This patch has passed the exhaustive tests. Change-Id: I6b394796d250286510e157ae326882bfc01d387a --- M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/runtime/backend-client.h M be/src/runtime/client-cache.cc M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-test.cc M be/src/runtime/decimal-value.h M be/src/runtime/decimal-value.inline.h M be/src/runtime/exec-env.cc M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.h M be/src/runtime/timestamp-value.h M be/src/scheduling/request-pool-service.h M be/src/service/client-request-state.cc M be/src/service/client-request-state.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/frontend.h M be/src/service/impala-internal-service.cc M be/src/service/impala-internal-service.h M be/src/service/impala-server.cc M be/src/service/impala-server.h M be/src/util/bloom-filter-test.cc M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/min-max-filter-test.cc M be/src/util/min-max-filter.cc M be/src/util/min-max-filter.h M common/protobuf/common.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift 39 files changed, 1,094 insertions(+), 757 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/29 -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 29 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 27: (3 comments) http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/min-max-filter.cc File be/src/util/min-max-filter.cc: http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/min-max-filter.cc@a696 PS27, Line 696: Don't remove this comment http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/min-max-filter.cc@a701 PS27, Line 701: Don't remove this comment http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/min-max-filter.cc@a704 PS27, Line 704: Don't remove this. I'm surprised this isn't causing any test failures, since we should be hitting the DCHECK below now. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 27 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Tue, 05 Nov 2019 01:20:37 + Gerrit-HasComments: Yes
***UNCHECKED*** [Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 27: Code-Review+2 (19 comments) LGTM. Please address the comments below. http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/benchmarks/bloom-filter-benchmark.cc File be/src/benchmarks/bloom-filter-benchmark.cc: http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/benchmarks/bloom-filter-benchmark.cc@290 PS27, Line 290: std::shared_ptr No need for shared_ptr http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1022 PS25, Line 1022: { > Thanks for pointing this out. Makes sense. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1072 PS25, Line 1072: bloom_filter_directory.swap(state->bloom_filter_directory()); : DCHECK(rpc_params.bloom_filter().always_false() > Thanks for the suggestion! I guess it can simply be *rpc_params.mutable_bloom_filter() = state->bloom_filter(); http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1074 PS25, Line 1074: || rpc_params.bloo > Thanks for the suggestion. Missed the part about the scope of state. Think it's fine to keep it as-is. Thanks for the explanation. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1098 PS25, Line 1098: > Since 'state' is defined inside of the critical section as mentioned above, Makes sense. http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/runtime/coordinator.cc@1096 PS27, Line 1096: reinterpret_cast(&(bloom_filter_directory[0])), : static_cast(bloom_filter_directory.size()) Please see comments at BloomFilter::AddDirectorySidecar(). We can pass in the string directly instead. http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/runtime/runtime-filter-bank.cc@187 PS27, Line 187: DCHECK( nit: DCHECK_EQ http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h File be/src/service/impala-server.h: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h@35 PS25, Line 35: #include "gen-cpp/ImpalaInternalService.h" > Thanks for this suggestion. Looks like it's more involved than expected. Please feel free to defer it to the follow-up patch which removes ImpalaInternalService altogether. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/service/impala-server.h@70 PS25, Line 70: class TSessionState; : class TQueryOptions; > Thanks for pointing this out! Sounds good. http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter-test.cc File be/src/util/bloom-filter-test.cc: http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter-test.cc@a68 PS27, Line 68: : : Why not keep most of this header comment which explains what this test does and talks about the output arguments (e.g. success, protobuf, directory) http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter-test.cc@103 PS27, Line 103: *success = directory_y.compare(directory_y2) == 0 ? true : false; *success = directory_y.compare(directory_y2) == 0; http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter-test.cc@365 PS27, Line 365: std::shared_ptr< No need for shared_ptr http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter-test.cc@367 PS27, Line 367: nit: unnecessary blank line http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter-test.cc@395 PS27, Line 395: TBloomFilter BloomFilterPB http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter.h File be/src/util/bloom-filter.h: http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter.h@95 PS27, Line 95: const uint8_t* directory_in, : size_t directory_in_size) Can this interface take a string only instead for the directory ? Can we use directory.size() to get the size in Init() or are there cases in which passing a string won't work ? http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter.cc File be/src/util/bloom-filter.cc: http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter.cc@79 PS27, Line 79: protobuf rpc_params http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/util/bloom-filter.cc@80 PS27, Line 80: const char* directory, : unsigned long directory_size See comments below. This could be const string& directory. I believe faststring::append() also has an interface for taking string as input.
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 27: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/4904/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 27 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Wed, 30 Oct 2019 04:17:03 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has uploaded a new patch set (#27). ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Due to the incorporation of KRPC sidecar, a SpinLock is also added to prevent a BloomFilter from being deallocated before its associated KRPC call finishes. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly because of the changes to the signatures of some functions in BloomFilter. Testing: This patch has passed the exhaustive tests. Change-Id: I6b394796d250286510e157ae326882bfc01d387a --- M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/runtime/backend-client.h M be/src/runtime/client-cache.cc M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-test.cc M be/src/runtime/decimal-value.h M be/src/runtime/decimal-value.inline.h M be/src/runtime/exec-env.cc M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.h M be/src/runtime/timestamp-value.h M be/src/scheduling/request-pool-service.h M be/src/service/client-request-state.cc M be/src/service/client-request-state.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/frontend.h M be/src/service/impala-internal-service.cc M be/src/service/impala-internal-service.h M be/src/service/impala-server.cc M be/src/service/impala-server.h M be/src/util/bloom-filter-test.cc M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/min-max-filter-test.cc M be/src/util/min-max-filter.cc M be/src/util/min-max-filter.h M common/protobuf/common.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift 39 files changed, 1,080 insertions(+), 760 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/27 -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 27 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 27: (1 comment) http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/runtime/timestamp-value.h File be/src/runtime/timestamp-value.h: http://gerrit.cloudera.org:8080/#/c/13882/27/be/src/runtime/timestamp-value.h@175 PS27, Line 175: line has trailing whitespace -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 27 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Wed, 30 Oct 2019 03:31:59 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 25: (25 comments) Hi Michael and Thomas, I have addressed most of Michael's comments in the previous iteration but have not uploaded the revised version since there are still four comments that I do not know exactly how to resolve. Please let me know how I should proceed. Thanks! http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator-backend-state.cc File be/src/runtime/coordinator-backend-state.cc: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator-backend-state.cc@556 PS25, Line 556: LOG(ERROR) << "PublishFilter() rpc failed: " << rpc_status.ToString(); > return here as there is no point in continuing to print the result status i Done http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1010 PS25, Line 1010: // >>> IMPALA-7984: Port runtime filter from Thrift RPC to KRPC > To be removed ?! Done http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1022 PS25, Line 1022: std:: > nit: no need for std:: Thanks for pointing this out. I have checked whether or not it is possible to move the declarations of 'rpc_params', 'target_fragment_idxs', and 'bloom_filter_directory' closer to their use but since these three variables are used both inside and outside the critical section below, it seems that we are not able to do that. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1072 PS25, Line 1072: BloomFilterPB& aggregated_filter = state->bloom_filter(); : aggregated_filter.Swap(rpc_params.mutable_bloom_filter()); > This swapping tricks seems unnecessary now that the bloom_filter_directory Thanks for the suggestion! I have tried the following. 1. BloomFilterPB aggregated_filter; aggregated_filter.CopyFrom(state->bloom_filter()); rpc_params.set_allocated_bloom_filter(_filter); This approach would result in the following error at runtime according to AddressSanitizer. ==26704==ERROR: AddressSanitizer: stack-use-after-scope on address 0x7fde29b39530 at pc 0x023b5cef bp 0x7fde29b392f0 sp 0x7fde29b392e8 2. rpc_params.set_allocated_bloom_filter(std::move(&(state->bloom_filter(; 3. rpc_params.set_allocated_bloom_filter(&(state->bloom_filter())); Both the second and the third approaches would result in the following error reported by AddressSanitizer. ==19218==ERROR: AddressSanitizer: attempting free on address which was not malloc()-ed: 0x6130001a0ca0 in thread T126 I think this is because the contents we put in 'rpc_params' has to live beyond this function, i.e., Coordinator::UpdateFilter(). To address this issue, we may need to allocate a piece of memory in the heap and perform the copy operation by ourselves. But since 'data_stream_service.pb.cc' (generated by the Protobuf compiler) has already provided such a functionality, i.e., 'Swap()', I was wondering if it would be easier to just use this provided function. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1074 PS25, Line 1074: bloom_filter_directory > Is there a reason to assign state->bloom_filter_directory() to this local v Thanks for the suggestion. According to the current implementation, the variable 'state' is defined according to the variable 'it', both of which are defined inside the critical section. To address your suggestion, we may need to move the following two statements out of the critical section. 1. auto it = filter_routing_table_->id_to_filter.find(params.filter_id()), and 2. FilterState* state = >second. However, it seems this will also move the use of the variable 'it' away from its definition. Please let me know how you would like to proceed. Thanks! http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1098 PS25, Line 1098: reinterpret_cast(&(bloom_filter_directory[0]) > state->bloom_filter_directory().data() Since 'state' is defined inside of the critical section as mentioned above, we might not be able to replace this argument with 'state->bloom_filter_directory().data()'. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1153 PS25, Line 1153: bloom_filter_ = BloomFilterPB(params.bloom_filter()); > bloom_filter_ = params.bloom_filter(); Done http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1155 PS25, Line 1155: std::string( : reinterpret_cast(sidecar_slice.data()), sidecar_slice.size()) > Can keep the original suggestion of sidecar_slice.ToString(). C++ ROV shoul Thanks for pointing this out. I have changed this accordingly.
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 25: (26 comments) http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator-backend-state.cc File be/src/runtime/coordinator-backend-state.cc: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator-backend-state.cc@556 PS25, Line 556: LOG(ERROR) << "PublishFilter() rpc failed: " << rpc_status.ToString(); return here as there is no point in continuing to print the result status if RPC failed. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1134 PS24, Line 1134: r is neither an always true filter nor an : // always false filter, then it must be the case that a non-empty sidecar slice > I actually found out that if we are using "std::move(sidecar_slice.ToString Sorry for the wrong advice. I believe the RVO should take care of the copy-elision so no need for the std::move. In other words, it should be sufficient to just call sidecar_slice.ToString(); http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1010 PS25, Line 1010: // >>> IMPALA-7984: Port runtime filter from Thrift RPC to KRPC To be removed ?! http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1022 PS25, Line 1022: std:: nit: no need for std:: Also, may make sense to move this declaration to somewhere closer to its use (e.g. line 1068 below). Same for rpc_params and target_fragment_idxs above. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1072 PS25, Line 1072: BloomFilterPB& aggregated_filter = state->bloom_filter(); : aggregated_filter.Swap(rpc_params.mutable_bloom_filter()); This swapping tricks seems unnecessary now that the bloom_filter_directory is a separate structure. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1074 PS25, Line 1074: bloom_filter_directory Is there a reason to assign state->bloom_filter_directory() to this local variable ? Why cannot we use state->bloom_filter_directory() directly below ? http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1098 PS25, Line 1098: reinterpret_cast(&(bloom_filter_directory[0]) state->bloom_filter_directory().data() http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1153 PS25, Line 1153: bloom_filter_ = BloomFilterPB(params.bloom_filter()); bloom_filter_ = params.bloom_filter(); The above should be sufficient. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/coordinator.cc@1155 PS25, Line 1155: std::string( : reinterpret_cast(sidecar_slice.data()), sidecar_slice.size()) Can keep the original suggestion of sidecar_slice.ToString(). C++ ROV should take care of avoiding the copy so the TODO can be removed. There is no easy way to avoid copying the sidecar from the network buffer so we have to copy at least once. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@190 PS25, Line 190: std:: nit: no need for std:: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@204 PS25, Line 204: std:: nit: no need for std:: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@205 PS25, Line 205: ++num_inflight_rpcs_; DCHECK_GE(num_inflight_rpcs_, 0); before increment. http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/runtime-filter-bank.cc@323 PS25, Line 323: std:: nit: no need for std:: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/timestamp-value.h File be/src/runtime/timestamp-value.h: http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/timestamp-value.h@170 PS25, Line 170: // Store the binary representation of this TimestampValue in 'tvalue'. : void ToTColumnValue(TColumnValue* tvalue) const { : const uint8_t* data = reinterpret_cast(this); : tvalue->timestamp_val.assign(data, data + Size()); : tvalue->__isset.timestamp_val = true; : } Is this not used anymore ? http://gerrit.cloudera.org:8080/#/c/13882/25/be/src/runtime/timestamp-value.h@183 PS25, Line 183: // Returns a new TimestampValue created from the value in 'tvalue'. : static TimestampValue FromTColumnValue(const TColumnValue& tvalue) { : TimestampValue value;
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 25: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/4854/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 25 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Tue, 22 Oct 2019 22:32:27 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has uploaded a new patch set (#25). ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Due to the incorporation of KRPC sidecar, a SpinLock is also added to prevent a BloomFilter from being deallocated before its associated KRPC call finishes. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly because of the changes to the signatures of some functions in BloomFilter. Testing: This patch has passed the exhaustive tests. Change-Id: I6b394796d250286510e157ae326882bfc01d387a --- M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/runtime/backend-client.h M be/src/runtime/client-cache.cc M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-test.cc M be/src/runtime/decimal-value.h M be/src/runtime/decimal-value.inline.h M be/src/runtime/exec-env.cc M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.h M be/src/runtime/timestamp-value.h M be/src/scheduling/request-pool-service.h M be/src/service/client-request-state.cc M be/src/service/client-request-state.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/frontend.h M be/src/service/impala-internal-service.cc M be/src/service/impala-internal-service.h M be/src/service/impala-server.cc M be/src/service/impala-server.h M be/src/util/bloom-filter-test.cc M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/min-max-filter-test.cc M be/src/util/min-max-filter.cc M be/src/util/min-max-filter.h M common/protobuf/common.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift 39 files changed, 1,086 insertions(+), 747 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/25 -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 25 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 24: (18 comments) Hi Michael and Thomas, please review my revised patch. I have addressed all of Michael's comments in the previous iteration. Thanks! http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator-filter-state.h File be/src/runtime/coordinator-filter-state.h: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator-filter-state.h@124 PS24, Line 124: string > Please also document what this field stands for. Done http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1103 PS24, Line 1103:// Workaround for fact that parameters are const& for Protobuf RPCs. : BloomFilterPB* non_const_filter = _cast(params->bloom_filter()); > I don't think this makes sense anymore with the implementation using protob Thanks! In this regard, I have used a copy constructor to initialize a BloomFilterPB instead of calling Swap(). http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1108 PS24, Line 1108: if (!bloom_filter_.has_log_bufferpool_space()) { > May be I am missing the details here but why is it not a no-op if the incom Thanks for pointing out this. There is a reason for this. After taking a closer look at the code here and performing some tests, I found that there is a corner case we have to take care of when params->bloom_filter() is always_false. Specifically, I found that we need to initialize 'bloom_filter_' even if params->bloom_filter() is always_false when 'bloom_filter_' has never been initialized by any incoming Bloom filter previously. If 'bloom_filter_' has never been initialized, its field of 'log_bufferpool_space_' would be 0, which in turn will result in an error due to a DCHECK in 'buffer-allocator.cc' (https://github.com/apache/impala/blob/master/be/src/runtime/bufferpool/buffer-allocator.cc#L278) that requires that the buffer length of the directory associated with the Bloom filter to be published is greater than some lower bound when attempting to call BloomFilter::Init() in RuntimeFilterBank::PublishGlobalFilter(). http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1110 PS24, Line 1110: bloom_filter_.Swap(non_const_filter); > See comments above. Done http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1132 PS24, Line 1132: bloom_filter_.Swap(non_const_filter); > See comments above. Done http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1134 PS24, Line 1134: std::string( : reinterpret_cast(sidecar_slice.data()), sidecar_slice.size()); > Thanks for pointing this out! I have replaced the original statement with ' I actually found out that if we are using "std::move(sidecar_slice.ToString())", then we will not be able to build Impala with the asan flag due to the following error message, and thus for now I am using the previous approach. "Moving a temporary object prevents copy elision." http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1140 PS24, Line 1140: sidecar_slice.ToString(), > Seems less expensive if we can pass the slice directly. This avoids doing a Thanks for pointing this out! I have slightly changed the signature of BloomFilter::Or() to avoid this additional copy. Due to this change, I have also modified bloom-filter-test.cc and bloom-filter-benchmark.cc where BloomFilter::Or() is tested. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1141 PS24, Line 1141: bloom_filter_directory_ > May need to do some const casting but please consider using: Indeed, because of the change to BloomFilter::Or(), I have to use the following statement to pass to BloomFilter::Or() the pointer to the beginning of the Bloom filter directory. reinterpret_cast(const_cast(bloom_filter_directory_.data())) http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/query-state.h File be/src/runtime/query-state.h: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/query-state.h@202 PS24, Line 202: const PublishFilterParamsPB* > Why not keep it as const PublishFilterParamsPB& ? Thanks for the comment. I have changed the type of 'params' to "const PublishFilterParamsPB&". To be more specific, in DataStreamService::PublishFilter(), I pass a "const PublishFilterParamsPB&" to QueryState::PublishFilter() and have made the corresponding changes to the following functions: 1. FragmentInstanceState::PublishFilter(), 2. RuntimeFilterBank::PublishGlobalFilter(). The reason I used "const PublishFilterParamsPB*" is because in the automatically generated
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 24: (17 comments) Sorry for the delays in reviews and thanks for the detailed replies. Please see some more comments below. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator-filter-state.h File be/src/runtime/coordinator-filter-state.h: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator-filter-state.h@124 PS24, Line 124: string Please also document what this field stands for. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1103 PS24, Line 1103:// Workaround for fact that parameters are const& for Protobuf RPCs. : BloomFilterPB* non_const_filter = _cast(params->bloom_filter()); I don't think this makes sense anymore with the implementation using protobuf. The reason is that the old Thrift implementation embedded the entire string directory in it but in this new implementation, it's stored in the sidecar so we aren't saving much if anything by doing the swap trick below. We can just initialize the BloomFilterPB by copying if necessary. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1108 PS24, Line 1108: if (!bloom_filter_.has_log_bufferpool_space()) { May be I am missing the details here but why is it not a no-op if the incoming filter is always false ? http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1110 PS24, Line 1110: bloom_filter_.Swap(non_const_filter); See comments above. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1132 PS24, Line 1132: bloom_filter_.Swap(non_const_filter); See comments above. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1140 PS24, Line 1140: sidecar_slice.ToString(), Seems less expensive if we can pass the slice directly. This avoids doing a copy from the slice's buffer into the string argument. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1141 PS24, Line 1141: bloom_filter_directory_ May need to do some const casting but please consider using: bloom_filter_directory_.data() http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/query-state.h File be/src/runtime/query-state.h: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/query-state.h@202 PS24, Line 202: const PublishFilterParamsPB* Why not keep it as const PublishFilterParamsPB& ? http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@174 PS24, Line 174: UpdateFilterParamsPB* params = obj_pool_.Add(new UpdateFilterParamsPB); > Thank you for pointing this out! You are correct. 'params' can be freed onc Or you can just allocate it on the stack as it isn't too large: UpdateFilterParamsPB params; http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@175 PS24, Line 175: UpdateFilterResultPB* res = obj_pool_.Add(new UpdateFilterResultPB); : RpcController* controller = obj_pool_.Add(new RpcController); > Thank you very much for the suggestion! Don't think it's safe to free the memory in the completion callback as the KRPC stack may still have reference to it. One way to share the parameters is to ensure there is one outstanding RPC per controller object but it's probably not worth the trouble so I am fine keep them as-is. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@196 PS24, Line 196: Couldn't send filter to coordinator: " Substitute("Failed to get proxy to coordinator $0: $1", host_address, get_proxy_status.msg().msg()); http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@240 PS24, Line 240: LOG(WARNING) << "Cannot get inbound sidecar: LOG(ERROR) << "Failed to get bloom filter sidecar" << ... http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@320 PS24, Line 320: // Wait for all inflight rpcs to complete before closing the filters. : { : std::unique_lock l1(num_inflight_rpcs_lock_); : while (num_inflight_rpcs_ > 0) { : krpcs_done_cv_.wait(l1); : } : } : : lock_guard l2(runtime_filter_lock_); : closed_ = true; > That 'propagation stage' above should be 'aggregation stage' instead. Sorry Actually, it's possible for RuntimeFilterBank::UpdateFilterFromLocal() and FragmentInstanceState::Close() to be called from different thread
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 24: (1 comment) http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@320 PS24, Line 320: // Wait for all inflight rpcs to complete before closing the filters. : { : std::unique_lock l1(num_inflight_rpcs_lock_); : while (num_inflight_rpcs_ > 0) { : krpcs_done_cv_.wait(l1); : } : } : : lock_guard l2(runtime_filter_lock_); : closed_ = true; > Thanks very much for the comment. That 'propagation stage' above should be 'aggregation stage' instead. Sorry for any confusion caused. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 24 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Sun, 29 Sep 2019 18:25:59 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 24: (6 comments) Hi Michael and Thomas, I have addressed some of the comments in the previous iteration. There are still two unresolved comments in runtime-filter-bank.cc. Please let me know your thoughts on my replies. Thanks! http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1134 PS24, Line 1134: std::string( : reinterpret_cast(sidecar_slice.data()), sidecar_slice.size()); > Why not std::move(sidecar_slice.ToString()) ? Thanks for pointing this out! I have replaced the original statement with 'std::move(sidecar_slice.ToString())' to make it more elegant. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@174 PS24, Line 174: UpdateFilterParamsPB* params = obj_pool_.Add(new UpdateFilterParamsPB); > Can you please double check if the parameters need to be preserved beyond t Thank you for pointing this out! You are correct. 'params' can be freed once the asynchronous RPC call is done. I have replaced the original statement with 'std::unique_ptr params = std::make_unique();' and revised the following code accordingly. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@175 PS24, Line 175: UpdateFilterResultPB* res = obj_pool_.Add(new UpdateFilterResultPB); : RpcController* controller = obj_pool_.Add(new RpcController); > I wonder if we can keep these in thread local storage and initialize them o Thank you very much for the suggestion! After briefly taking a look at the related sequence of calls, I found that starting from QueryState::ExecFInstance() (which will call FragmentInstanceState::Exec()), we will reach PhjBuilder::FlushFinal(). In PhjBuilder::FlushFinal(), we have a function call to PhjBuilder::PublishRuntimeFilters(), which will make a call to RuntimeFilterBank::UpdateFilterFromLocal() for each FilterContext associated with this PhjBuilder (refer to https://github.com/apache/impala/blob/master/be/src/exec/partitioned-hash-join-builder.cc#L488-L507). Considering that RuntimeFilterBank::UpdateFilterFromLocal() is an asynchronous RPC now, it seems we are not able to reuse 'UpdateFilterResultPB' and 'UpdateFilterResultPB'. Another possible solution is to create an instance of UpdateFilterResultPB and UpdateFilterResultPB here in UpdateFilterFromLocal() and then release the memory they occupy in RuntimeFilterBank::UpdateFilterCompleteCb(). I could probably miss something. Please let me know if I misunderstand anything. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@320 PS24, Line 320: // Wait for all inflight rpcs to complete before closing the filters. : { : std::unique_lock l1(num_inflight_rpcs_lock_); : while (num_inflight_rpcs_ > 0) { : krpcs_done_cv_.wait(l1); : } : } : : lock_guard l2(runtime_filter_lock_); : closed_ = true; > Do you need to set closed_ to true before waiting for all in-flight RPCs to Thanks very much for the comment. After reading the related code paths, I think the case where a thread may sneak in and try to issue an RPC again after we break out of the critical section could not happen. According to my current understanding, in the propagation stage of the runtime filter protocol, starting from FragmentInstanceState::Exec(), the thread executing this function will first call FragmentInstanceState::Open(), which will (1) call RuntimeFilterBank::AllocateScratchBloomFilter()/AllocateScratchMinMaxFilter() to allocate the memory space for the corresponding RuntimeFilter, and (2) call RuntimeFilterBank::UpdateFilterFromLocal() on the RuntimeFilter just allocated. After FragmentInstanceState::Open() returns, the same thread above will call FragmentInstanceState::Close(), which in turn will lead us to RuntimeFilterBank::Close(). When RuntimeFilterBank::Close() is called, the calling thread has already made all the RPC's it wants to perform (There could still be inflight RPC's but there won't be any new RPC's issued). No thread will attempt to make any RPC resulting from RuntimeFilterBank::UpdateFilterFromLocal() and then increment 'num_inflight_rpcs_' associated with this instance of RuntimeFilterBank. Hence the case where a thread sneaks in and try to issue RPC after leaving the critical section cannot happen. We have also added a
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 24: (6 comments) http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/coordinator.cc@1134 PS24, Line 1134: std::string( : reinterpret_cast(sidecar_slice.data()), sidecar_slice.size()); Why not std::move(sidecar_slice.ToString()) ? http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@174 PS24, Line 174: UpdateFilterParamsPB* params = obj_pool_.Add(new UpdateFilterParamsPB); Can you please double check if the parameters need to be preserved beyond this function ? According to (https://github.com/apache/impala/blob/master/be/src/kudu/rpc/proxy.h#L79-L84), it can be freed once the async RPC call is done. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@175 PS24, Line 175: UpdateFilterResultPB* res = obj_pool_.Add(new UpdateFilterResultPB); : RpcController* controller = obj_pool_.Add(new RpcController); I wonder if we can keep these in thread local storage and initialize them once at init time. I suppose we don't invoke this RPC more than once per fragment instance thread, right ? Otherwise, it seems a bit wasteful to keep accumulating these objects in obj_pool_. http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/runtime/runtime-filter-bank.cc@320 PS24, Line 320: // Wait for all inflight rpcs to complete before closing the filters. : { : std::unique_lock l1(num_inflight_rpcs_lock_); : while (num_inflight_rpcs_ > 0) { : krpcs_done_cv_.wait(l1); : } : } : : lock_guard l2(runtime_filter_lock_); : closed_ = true; Do you need to set closed_ to true before waiting for all in-flight RPCs to drain ? Otherwise, you may break out of the critical section above and then a thread may sneak in and try to issues RPC again. Or is it impossible because it's the same fragment instance thread which called RuntimeFilterBank::UpdateFilterFromLocal() ? http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc File be/src/service/data-stream-service.cc: http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc@137 PS24, Line 137: Substitute("Query State not found for query_id=$0", : PrintId(ProtoToQueryId(req->dst_query_id())) This can be refactored into a single string object and reuse them here and below in RespondAndReleaseRpc(). http://gerrit.cloudera.org:8080/#/c/13882/24/be/src/service/data-stream-service.cc@141 PS24, Line 141: this-> no need for this -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 24 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Wed, 25 Sep 2019 16:58:23 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 24: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/4633/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 24 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Tue, 24 Sep 2019 23:03:46 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has uploaded a new patch set (#24). ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Due to the incorporation of KRPC sidecar, a SpinLock is also added to prevent a BloomFilter from being deallocated before its associated KRPC call finishes. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly because of the changes to the signatures of some functions in BloomFilter. Testing: This patch has passed the exhaustive tests. Change-Id: I6b394796d250286510e157ae326882bfc01d387a --- M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/runtime/backend-client.h M be/src/runtime/client-cache.cc M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-test.cc M be/src/runtime/decimal-value.h M be/src/runtime/decimal-value.inline.h M be/src/runtime/exec-env.cc M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.h M be/src/runtime/timestamp-value.h M be/src/scheduling/request-pool-service.h M be/src/service/client-request-state.cc M be/src/service/client-request-state.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/frontend.h M be/src/service/impala-internal-service.cc M be/src/service/impala-internal-service.h M be/src/service/impala-server.cc M be/src/service/impala-server.h M be/src/util/bloom-filter-test.cc M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/min-max-filter-test.cc M be/src/util/min-max-filter.cc M be/src/util/min-max-filter.h M common/protobuf/common.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift 39 files changed, 1,078 insertions(+), 745 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/24 -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 24 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 22: (1 comment) http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/util/bloom-filter.h File be/src/util/bloom-filter.h: http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/util/bloom-filter.h@46 PS22, Line 46: namespace bloom_filter_test_util { : void BfUnion(const impala::BloomFilter& x, const impala::BloomFilter& y, : int64_t directory_size, bool* success, impala::BloomFilterPB* protobuf, : std::string* directory); : } // namespace bloom_filter_test_util : : namespace either { : struct TestData; : } // namespace either > Can these be moved to the test file ? I guess not. Sorry for missing the friend declaration below. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 22 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Tue, 24 Sep 2019 17:52:24 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 22: (10 comments) http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/coordinator-backend-state.cc File be/src/runtime/coordinator-backend-state.cc: http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/coordinator-backend-state.cc@565 PS22, Line 565: LOG(WARNING) LOG(ERROR) http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/coordinator-backend-state.cc@572 PS22, Line 572: LOG(WARNING) LOG(ERROR) http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/coordinator-backend-state.cc@575 PS22, Line 575: LOG(WARNING) LOG(ERROR) http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/runtime-filter-bank.cc@120 PS22, Line 120: INFO ERROR http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/runtime-filter-bank.cc@126 PS22, Line 126: std::unique_lock l(num_inflight_rpcs_lock_); DCHECK_GT(num_inflight_rpcs_, 0); http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/runtime-filter-bank.cc@127 PS22, Line 127: num_inflight_rpcs_-- nit: pre-increment/decrement: --num_inflight_rpcs_; http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/runtime-filter-bank.cc@201 PS22, Line 201: std::unique_lock l(num_inflight_rpcs_lock_); Please add a comment on why this is necessary: Increment num_inflight_rpcs_ to make sure that the filter will not be deallocated in Close() until all in-flight RPC completes. http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/runtime-filter-bank.cc@202 PS22, Line 202: num_inflight_rpcs_++; ++num_inflight_rpcs_; http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/runtime/runtime-filter-bank.cc@321 PS22, Line 321: std::chrono::milliseconds(50)) In theory, we don't need a timeout here as we need to wait till the RPC is done. That said, it may not hurt to log a statement after waiting for extended period of time. 50 ms may be a bit iffy if the network is slow or if there is a lot of outbound traffic. May be 60s ? One potential improvement is that we may as well cancel the RPC after waiting for too long. http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/util/bloom-filter.h File be/src/util/bloom-filter.h: http://gerrit.cloudera.org:8080/#/c/13882/22/be/src/util/bloom-filter.h@46 PS22, Line 46: namespace bloom_filter_test_util { : void BfUnion(const impala::BloomFilter& x, const impala::BloomFilter& y, : int64_t directory_size, bool* success, impala::BloomFilterPB* protobuf, : std::string* directory); : } // namespace bloom_filter_test_util : : namespace either { : struct TestData; : } // namespace either Can these be moved to the test file ? -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 22 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Mon, 23 Sep 2019 20:52:45 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 22: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/4593/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 22 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Wed, 18 Sep 2019 22:27:13 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has uploaded a new patch set (#22). ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Due to the incorporation of KRPC sidecar, a SpinLock is also added to prevent a BloomFilter from being deallocated before its associated KRPC call finishes. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly because of the changes to the signatures of some functions in BloomFilter. Testing: This patch has passed the exhaustive tests. Change-Id: I6b394796d250286510e157ae326882bfc01d387a --- M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/runtime/backend-client.h M be/src/runtime/client-cache.cc M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-test.cc M be/src/runtime/decimal-value.h M be/src/runtime/decimal-value.inline.h M be/src/runtime/exec-env.cc M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.h M be/src/runtime/timestamp-value.h M be/src/scheduling/request-pool-service.h M be/src/service/client-request-state.cc M be/src/service/client-request-state.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/frontend.h M be/src/service/impala-internal-service.cc M be/src/service/impala-internal-service.h M be/src/service/impala-server.cc M be/src/service/impala-server.h M be/src/util/bloom-filter-test.cc M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/min-max-filter-test.cc M be/src/util/min-max-filter.cc M be/src/util/min-max-filter.h M common/protobuf/common.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift 39 files changed, 1,072 insertions(+), 745 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/22 -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 22 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 20: Code-Review+1 (4 comments) I'll reach out to Michael and see if he wants to do a pass, otherwise I think that this can be +2ed once you address my last few comments. Thanks again for all the work on this! http://gerrit.cloudera.org:8080/#/c/13882/20/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/20/be/src/runtime/coordinator.cc@1113 PS20, Line 1113: DCHECK(!params->bloom_filter().always_false()); not really necessary http://gerrit.cloudera.org:8080/#/c/13882/20/be/src/runtime/coordinator.cc@1126 PS20, Line 1126: if I think you'll want to make this an 'else if' with the 'if' above http://gerrit.cloudera.org:8080/#/c/13882/20/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/20/be/src/runtime/runtime-filter-bank.cc@244 PS20, Line 244: Status status = bloom_filter->Init( There's a bug here - if we fail to get the sidecar above then bloom_filter will be nullptr, so we won't want to call Init() on it http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/util/min-max-filter.h File be/src/util/min-max-filter.h: http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/util/min-max-filter.h@22 PS19, Line 22: #include "impala-ir/impala-ir-functions.h" > Thanks for catching this! I found that we do not need 'data_stream_service' This is getting rather nit-picky of me, but just for your understanding: the goal isn't to have the smallest number of includes that will allow things to compile, the goal is to include exactly those things that are used in the file. Clearly, there are things from data_stream_service that are used in this file now, eg. MinMaxFilterPB. Presumably you don't need to include it for things to compile because its already included in on of the other headers thats included here. To be clear, I'm not saying that you need to go back through this whole patch and make sure your includes are exactly correct. If it compiles as is then that's fine. The most important things is to not include things that aren't used, since that can lead to unnecessary re-compilation in some cases. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 20 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Wed, 18 Sep 2019 19:22:02 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 20: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/4586/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 20 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Wed, 18 Sep 2019 16:49:53 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has uploaded a new patch set (#20). ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Due to the incorporation of KRPC sidecar, a SpinLock is also added to prevent a BloomFilter from being deallocated before its associated KRPC call finishes. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly because of the changes to the signatures of some functions in BloomFilter. Testing: This patch has passed the exhaustive tests. Change-Id: I6b394796d250286510e157ae326882bfc01d387a --- M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/runtime/backend-client.h M be/src/runtime/client-cache.cc M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-test.cc M be/src/runtime/decimal-value.h M be/src/runtime/decimal-value.inline.h M be/src/runtime/exec-env.cc M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.h M be/src/runtime/timestamp-value.h M be/src/scheduling/request-pool-service.h M be/src/service/client-request-state.cc M be/src/service/client-request-state.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/frontend.h M be/src/service/impala-internal-service.cc M be/src/service/impala-internal-service.h M be/src/service/impala-server.cc M be/src/service/impala-server.h M be/src/util/bloom-filter-test.cc M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/min-max-filter-test.cc M be/src/util/min-max-filter.cc M be/src/util/min-max-filter.h M common/protobuf/common.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift 39 files changed, 1,066 insertions(+), 738 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/20 -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 20 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 19: (26 comments) http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/rpc/thrift-server-test.cc File be/src/rpc/thrift-server-test.cc: http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/rpc/thrift-server-test.cc@22 PS19, Line 22: #include "gen-cpp/ImpalaInternalService.h" Why is this necessary? http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator-backend-state.cc File be/src/runtime/coordinator-backend-state.cc: http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator-backend-state.cc@572 PS19, Line 572: rpc_status.message().ToString() I think you can just call ToString() directly on the kudu::Status without calling .message() http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator-backend-state.cc@573 PS19, Line 573: controller.Reset(); Not needed, since we don't reuse the controller http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator-backend-state.cc@576 PS19, Line 576: res.status().error_msgs(0) We'll want to print the status code and other error messages. Easiest way is to just construct a regular Status object from the StatusPB, i.e. replace this with something like "Status(res.status()).GetDetail()" That's of course less efficient, but this shouldn't happen very often http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.h File be/src/runtime/coordinator.h: http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.h@44 PS19, Line 44: class RpcController; not used http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc@1070 PS19, Line 1070: if (rpc_params.has_bloom_filter()) { : if (!rpc_params.bloom_filter().always_false() : && !rpc_params.bloom_filter().always_true()) { combine these with '&&' http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc@1107 PS19, Line 1107: } else if (!params->bloom_filter().always_false()) { probably more straightforward to flip the conditions, i.e. put the always_false() block here and put this block in the 'else' http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc@1118 PS19, Line 1118: return; I don't think we want to just return here - we still need to hit the "if (pending_count_ == 0..." below http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc@1122 PS19, Line 1122: // There are two cases in which 'bloom_filter_.always_false()' : // evaluates to true: (1) 'bloom_filter_' has never been initialized : // by any incoming Bloom filter previously, (2) 'bloom_filter_' has : // been initialized to an always false Bloom filter by an incoming : // Bloom filter previously. We note that : // 'bloom_filter_.has_log_bufferpool_space()' evaluates to false when : // 'bloom_filter_' has never been initialized by any incoming Bloom : // filter and to true otherwise. : // In either case, we will always perform the swap operation. I think this whole comment is unnecessary http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc@1141 PS19, Line 1141: // We want to move the payload from the request rather than copy it and : // take double the memory cost. this can be removed http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc@1157 PS19, Line 1157: non_const_filter = _cast(params->bloom_filter()); might as well put this is in the 'if' for clarity http://gerrit.cloudera.org:8080/#/c/13882/19/be/src/runtime/coordinator.cc@1158 PS19, Line 1158: // Even if the current incoming Bloom filter is an always false filter, : // we still need to perform the swap operation to properly initialize : // the field of 'log_bufferpool_space_' if 'bloom_filter_' has not been : // initialized by any incoming Bloom filter yet. Otherwise we will hit a : // DCHECK error in 'buffer-allocator.cc' that requires that the buffer : // length of the directory associated with the Bloom filter to be : // published is greater than some lower bound when attempting to call : // BloomFilter::Init() in RuntimeFilterBank::PublishGlobalFilter(). Unnecessarily long comment. Lets just put a comment inside the 'if' that says something like 'bloom_filter_ hasn't been initialized yet, so do so
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 19: (1 comment) There is an additional note added to the comments on Line 1117. Thanks! http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc@1115 PS18, Line 1115: if (!status.ok()) { : LOG(ERROR) << "Cannot get inbound sidecar: " << status.message().ToString(); : Disable(coord->filter_mem_tracker_); > Thanks very much for the detailed explanation! I think the observation above actually indicates that even though the Bloom filter fed to RuntimeFilterBank::PublishGlobalFilter() corresponds to an always false filter, our current implementation will still call BloomFilter::Init() to allocate the memory space, which seems like a redundant and wasteful operation because the allocated Bloom filter is basically a bit array of all 0's. Maybe we could also simplify the current logic in a follow-up JIRA. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 19 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Thu, 12 Sep 2019 22:33:52 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 19: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/4547/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 19 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Thu, 12 Sep 2019 00:35:40 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has uploaded a new patch set (#19). ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Due to the incorporation of KRPC sidecar, a SpinLock is also added to prevent a BloomFilter from being deallocated before its associated KRPC call finishes. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly because of the changes to the signatures of some functions in BloomFilter. Testing: This patch has passed the exhaustive tests. Change-Id: I6b394796d250286510e157ae326882bfc01d387a --- M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/rpc/thrift-server-test.cc M be/src/runtime/backend-client.h M be/src/runtime/client-cache.cc M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-test.cc M be/src/runtime/decimal-value.h M be/src/runtime/decimal-value.inline.h M be/src/runtime/exec-env.cc M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.h M be/src/runtime/timestamp-value.h M be/src/scheduling/request-pool-service.h M be/src/service/client-request-state.cc M be/src/service/client-request-state.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/frontend.h M be/src/service/impala-internal-service.cc M be/src/service/impala-internal-service.h M be/src/service/impala-server.cc M be/src/service/impala-server.h M be/src/util/bloom-filter-test.cc M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/min-max-filter-test.cc M be/src/util/min-max-filter.cc M be/src/util/min-max-filter.h M common/protobuf/common.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift 41 files changed, 1,104 insertions(+), 736 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/19 -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 19 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 18: (31 comments) Thanks so much for all of your work on this! Its getting pretty close, most of these comments are cosmetic One thing: instead of me going through and marking any formatting issues, please run clang-format and take its suggestions. Let me know if you need help with this. Keep in mind that clang-format is just a suggestion and you should: - keep formatting consistent with the rest of the surrounding file or with things that you know are impala's guidelines, even if clang-format disagrees - avoid changing lines that otherwise wouldn't be affected by your patch just to fix the formatting, eg. you don't need to re-arrange includes that were already present even if clang-format says to http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator-backend-state.h File be/src/runtime/coordinator-backend-state.h: http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator-backend-state.h@42 PS18, Line 42: namespace kudu { : namespace rpc { : class RpcController; : } // namespace rpc : } // namespace kudu Is this needed? rpc_controller.h is being imported above already http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator-backend-state.cc File be/src/runtime/coordinator-backend-state.cc: http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator-backend-state.cc@551 PS18, Line 551: DCHECK DCHECK_EQ? http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator-backend-state.cc@570 PS18, Line 570: kudu::Status rpc_status; : rpc_status = proxy->PublishFilter(rpc_params, , ); This can all be one line http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator-backend-state.cc@572 PS18, Line 572: if (!rpc_status.ok()) { In addition to rpc_status, which just reflects whether the rpc actually got to the other machine and was responded to, there's also PublishFilterResultPB::status, which reflects whether the operation itself was successful, so we should add another 'if' that checks that and logs if its an error. http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc@994 PS18, Line 994: FilterState* state; I don't think there's any reason to move this here http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc@1013 PS18, Line 1013: // Check if the filter has already been sent, which could happen in four cases: I think we need another case in this comment for 'failed to get bloom filter sidecar' http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc@1070 PS18, Line 1070: bs->PublishFilter(rpc_params, controller); Could you add a TODO: make this asynchronous http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc@1083 PS18, Line 1083: void Coordinator::SetupSidecarForBloomFilter(PublishFilterParamsPB* rpc_params, This function is unnecessary, you can just call AddSidecar directly http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc@1089 PS18, Line 1089: ( extra parentheses http://gerrit.cloudera.org:8080/#/c/13882/18/be/src/runtime/coordinator.cc@1115 PS18, Line 1115: // If the incoming Bloom filter is neither an always true filter nor an : // always false filter, then it must be the case that a non-empty sidecar slice : // has been received. Refer to BloomFilter::ToProtobuf() for further details. Comment is outdated. Thinking through the cases here, I think that this all works correctly, but its a little weird. In particular, in the case where params->bloom_filter() is always_false: - if bloom_filter_ is also always_false, then we'll TryConsume(0), do a swap() that doesn't actually change the value of bloom_filter_ since its still always_false, and then re-instantiate bloom_filter_directory_ to an empty string - if bloom_filter_ is not always_false, then we'll call Or(), which will just return immediately without doing anything This is how it worked before, so it would be fine to leave it, but I think that we can make this clearer by changing the 'else' on line 1109 to be 'else (!params->bloom_filter().always_false)' and skip all the extra work in that case. Then we can make the 'if' here a DCHECK(params->bloom_filter().has_directory_sidecar_idx()), and people won't have to reason through what the code below would do in the case that sidecar_slice doesn't get filled in. You'll of course also want to add appropriate comments (and you should think it through and make sure my reasoning is
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 18: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/4498/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 18 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Mon, 09 Sep 2019 15:51:29 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 18: (16 comments) Hi Thomas and Michael, I have addressed all the previous comments. Please review my revised patch. Thank you very much! http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.h File be/src/runtime/coordinator.h: http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.h@48 PS17, Line 48: namespace impala { > don't use 'using' in header files Done http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@986 PS17, Line 986: : : exe > You can leave the formatting as-is Done http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@1041 PS17, Line 1041: > Its unfortunate that this logic is duplicated between here and BloomFilter: Thanks very much for the detailed explanation! As we have discussed, we will adopt the first option since it requires much fewer modifications to the current code. Otherwise, other than 'BloomFilter::Or()' and those places that make use of that 'BloomFilterPB' and its respective directory stored by the class 'Coordinator::FilterState', we also have to make the corresponding changes to 'bloom-filter-test.cc' and 'bloom-filter-benchmark.cc', which might take much more time to implement. http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@1089 PS17, Line 1089: reinterpret_cast(&((bloom_filter_directory)[0])), : static_cast(bloom_filter > Might be cleaner to check has_directory_sidecar_idx() here, which should be Done http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@1096 PS17, Line 1096: Coordinator* coord, RpcContext* context) { > You need to handle this error, i.e. call Disable() Done http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h File be/src/runtime/runtime-filter-bank.h: http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h@52 PS17, Line 52: /// RuntimeFilters are produced and consumed by plan nodes at run time to propagate > I don't think this is used anywhere Thanks for pointing this out! I have removed it. http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h@142 PS17, Line 142: > I think this can be private. Thanks for pointing this out. I have modified the name and made it private. http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h@154 PS17, Line 154: boost::uno > num_inflight_rpcs_ Done http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@133 PS17, Line 133: DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF) > While thinking about the locking protocol in Close(), I noticed that we don Thanks very much for the detailed explanation! I have double-checked what you have described above and I think your argument is correct. I have also added an additional 'DCHECK(!closed_);' with an additional brief comment as suggested. On the other hand, I also found that RuntimeState::ReleaseResources() will also be called by Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow() in fe-support.cc. This function will do the work for the frontend to rewrite a given batch of const expressions according to the comments of this function. So I guess in this case we do not have to worry about RuntimeState::ReleaseResources() being called before RuntimeFilterBank::UpdateFilterFromLocal() since RuntimeFilterBank::UpdateFilterFromLocal() will not be called is such queries. http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@187 PS17, Line 187: // Use 'proxy' to send the filter to the coordinator. : std::unique_ptr proxy; : Status get_proxy_status = : > This needs to be moved down after the check for get_proxy_status.ok() Thanks for pointing this out! I have moved this down after the check for get_proxy_status.ok(). http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@199 PS17, Line 199: // Use 'num_inflight_rpcs_' to keep track of the number of current in-flight > line too long (91 > 90) Done http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@238 PS17, Line 238: kudu::Status status = > I don't think that we can just return here, we need to actually handle the Thanks for pointing this out. I have set 'bloom_filter' to
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has uploaded a new patch set (#18). ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Due to the incorporation of KRPC sidecar, a SpinLock is also added to prevent a BloomFilter from being deallocated before its associated KRPC call finishes. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly because of the changes to the signatures of some functions in BloomFilter. Testing: This patch has passed the exhaustive tests. Change-Id: I6b394796d250286510e157ae326882bfc01d387a --- M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/rpc/thrift-server-test.cc M be/src/runtime/backend-client.h M be/src/runtime/client-cache.cc M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-test.cc M be/src/runtime/decimal-value.h M be/src/runtime/decimal-value.inline.h M be/src/runtime/exec-env.cc M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-filter.h M be/src/runtime/timestamp-value.h M be/src/scheduling/request-pool-service.h M be/src/service/client-request-state.cc M be/src/service/client-request-state.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/frontend.h M be/src/service/impala-internal-service.cc M be/src/service/impala-internal-service.h M be/src/service/impala-server.cc M be/src/service/impala-server.h M be/src/util/bloom-filter-test.cc M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/min-max-filter-test.cc M be/src/util/min-max-filter.cc M be/src/util/min-max-filter.h M common/protobuf/common.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift 41 files changed, 1,137 insertions(+), 738 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/18 -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 18 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 17: (15 comments) http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.h File be/src/runtime/coordinator.h: http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.h@48 PS17, Line 48: using kudu::rpc::RpcContext; don't use 'using' in header files http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@986 PS17, Line 986: { : return; : } You can leave the formatting as-is http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@1041 PS17, Line 1041: void Coordinator::SetupSidecarForBloomFilter(PublishFilterParamsPB* rpc_params, Its unfortunate that this logic is duplicated between here and BloomFilter::ToProtobuf, eg. because duplicating code makes it more likely there are bugs (for example I think that you have a bug here in the case where AddOutboundSidecar fails, since you don't disable the bloom filter, even though that bug is already fixed in BloomFilter::ToProtobuf) There are a couple of possible solutions to this: 1. Move this to be a static function in BloomFilter, eg. BloomFilter::AddSidecar(), and call it from BloomFilter::ToProtobuf() 2. Instead of having Coordinator::FilterState store a BloomFilterPB and 'string directory' just actually reconstruct a BloomFilter object, i.e. around line 1110 below, by calling BloomFilter::Init() with sidecar_slice.data(). Then we would be able to just call the existing BloomFilter::ToProtobuf() 3. some other idea I didn't think of Option 2 seems cleaner overall to me, since it allows us to better hide the details of how bloom filters work in the BloomFilter class instead of leaking things like the existence of the 'directory' string into FilterState. However, its slightly less efficient (cause we add a transition from BloomFilterPB -> Bloom Filter object and back, with the required setup, eg. in BloomFilter::Init()), though not a ton less efficient since I think it can be done without adding any copies of the directory, and it probably also is more work for you in terms of code changes (eg. BloomFilter::Or would have to be slightly rewritten to take BloomFilter objects instead). And, option 1 is consistent with the way things already work. So, I'll leave it up to you to decide. http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@1089 PS17, Line 1089: !params->bloom_filter().always_true() && : !params->bloom_filter().always_false()) Might be cleaner to check has_directory_sidecar_idx() here, which should be equivalent http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/coordinator.cc@1096 PS17, Line 1096: return; You need to handle this error, i.e. call Disable() http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h File be/src/runtime/runtime-filter-bank.h: http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h@52 PS17, Line 52: class UniqueIdPB; I don't think this is used anywhere http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h@142 PS17, Line 142: void SendFilterToCoordinatorCompleteCb(const kudu::rpc::RpcController* rpc_controller); I think this can be private. Lets also rename it UpdateFilterCompleteCb, to keep it consistent with the name of the rpc. http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.h@154 PS17, Line 154: num_krpcs_ num_inflight_rpcs_ http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@133 PS17, Line 133: DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF) While thinking about the locking protocol in Close(), I noticed that we don't check 'closed_' here like we do at the start of PublishGlobalFilter() I spent a little time looking at the code, and I think that its correct as is, because RuntimeFilterBank::Close() only gets called from RuntimeState::ReleaseResources(), which is only called from FragmentInstanceState::Close(), which can't be called until after Open() is run on exec_tree_ and UpdateFilterFromLocal() is only called during Open() (via PartitionedHashJoinNode::Open() -> BlockingJoinNode::ProcessBuildInputAndOpenProbe(), which calls on a thread and then blocks until completion for BlockingJoinNode::ProcessBuildInputAsync() -> BlockingJoinNode::SendBuildInputToSink() -> PhjBuilder::FlushFinal()) It might be nice if you could confirm my
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 17: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/4373/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 17 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Mon, 26 Aug 2019 23:44:52 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 17: Hi Thomas and Michael, I have addressed all of your previous comments. Please review this revised patch. Thanks! -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 17 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Mon, 26 Aug 2019 23:04:02 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 17: (1 comment) http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/17/be/src/runtime/runtime-filter-bank.cc@199 PS17, Line 199: LOG(INFO) << "Couldn't send filter to coordinator: " << get_proxy_status.msg().msg(); line too long (91 > 90) -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 17 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Mon, 26 Aug 2019 23:03:40 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has uploaded a new patch set (#17). ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Due to the incorporation of KRPC sidecar, a SpinLock is also added to prevent a BloomFilter from being deallocated before its associated KRPC call finishes. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly because of the changes to the signatures of some functions in BloomFilter. Testing: This patch has passed the exhaustive tests. Change-Id: I6b394796d250286510e157ae326882bfc01d387a --- M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/rpc/thrift-server-test.cc M be/src/runtime/backend-client.h M be/src/runtime/client-cache.cc M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-test.cc M be/src/runtime/decimal-value.h M be/src/runtime/decimal-value.inline.h M be/src/runtime/exec-env.cc M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/timestamp-value.h M be/src/scheduling/request-pool-service.h M be/src/service/client-request-state.cc M be/src/service/client-request-state.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/frontend.h M be/src/service/impala-internal-service.cc M be/src/service/impala-internal-service.h M be/src/service/impala-server.cc M be/src/service/impala-server.h M be/src/util/bloom-filter-test.cc M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/min-max-filter-test.cc M be/src/util/min-max-filter.cc M be/src/util/min-max-filter.h M common/protobuf/common.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift 40 files changed, 1,155 insertions(+), 733 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/17 -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 17 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 15: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/4341/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 15 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Fri, 23 Aug 2019 04:19:53 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has uploaded a new patch set (#15). ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly. TODO: 1. To add the logic that keeps the BloomFilter passed into UpdateFilterFromLocal() alive before its corresponding KRPC call finishes. Change-Id: I6b394796d250286510e157ae326882bfc01d387a --- M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/rpc/thrift-server-test.cc M be/src/runtime/backend-client.h M be/src/runtime/client-cache.cc M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-test.cc M be/src/runtime/decimal-value.h M be/src/runtime/decimal-value.inline.h M be/src/runtime/exec-env.cc M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/timestamp-value.h M be/src/scheduling/request-pool-service.h M be/src/service/client-request-state.cc M be/src/service/client-request-state.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/frontend.h M be/src/service/impala-internal-service.cc M be/src/service/impala-internal-service.h M be/src/service/impala-server.cc M be/src/service/impala-server.h M be/src/util/bloom-filter-test.cc M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/min-max-filter-test.cc M be/src/util/min-max-filter.cc M be/src/util/min-max-filter.h M common/protobuf/common.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift 40 files changed, 1,108 insertions(+), 723 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/15 -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 15 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Thomas Tauber-Marshall has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 14: (9 comments) http://gerrit.cloudera.org:8080/#/c/13882/14//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/13882/14//COMMIT_MSG@14 PS14, Line 14: respctively typo http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/runtime/coordinator.cc@1089 PS14, Line 1089: if (params->bloom_filter().has_directory_sidecar_idx()) { Is it actually possible that this won't be set or can we DCHECK this? (I think its possible with the way you have the code structured right now, but we might want to change that, see my other comments) If it is possible, do we have the right behavior here? I guess what will happen is we'll proceed to updating the bloom filter with an empty directory, which could for example hit a DCHECK in BloomFilter::Or which expects the two directories to be the same size. http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/runtime/coordinator.cc@1099 PS14, Line 1099: .ToString() This ToString() performs a copy of the data. Just call 'size()' on the slice directly. http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/runtime/coordinator.cc@1110 PS14, Line 1110: bloom_filter_directory_ = sidecar_slice.ToString(); I think this will perform two copies - one to do ToString() and one to do the assignment. I think we should be able to do this without any copies by using 'std::swap'. You may need to make bloom_filter_directory_ a char* to make it work http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/runtime/runtime-filter-bank.cc File be/src/runtime/runtime-filter-bank.cc: http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/runtime/runtime-filter-bank.cc@a103 PS14, Line 103: keep this comment http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/runtime/runtime-filter-bank.cc@208 PS14, Line 208: if (params->has_bloom_filter()) I think this is already guaranteed by the DCHECK above http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/util/bloom-filter.h File be/src/util/bloom-filter.h: http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/util/bloom-filter.h@46 PS14, Line 46: using kudu::Slice; please don't use 'using' in header files http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/util/bloom-filter.h@90 PS14, Line 90: Bucket* GetDirectory() const { return directory_; } Hmm, definitely preferable to keep all of this stuff private. I think the only reason you exposed this is for bloom-filter-test.cc/bloom-filter-benchmark.cc? If so, you should be able to just make the testing classes friends of this class For bloom-filter-test.cc that should be easy, just 'friend class BloomFilterTest'. Its a little more complicated for bloom-filter-benchmark.cc since there's multiple classes there, but maybe you can add a single super class for them? Or else just list all three with a comment about what they are. If you run into issues with that, another option is to have ToProtobuf take a 'Slice*' or whatever that it sets to the directory instead of take the RpcController. http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/util/bloom-filter.cc File be/src/util/bloom-filter.cc: http://gerrit.cloudera.org:8080/#/c/13882/14/be/src/util/bloom-filter.cc@99 PS14, Line 99: LOG(ERROR) << "Cannot add outbound sidecar: " << sidecar_status.message().ToString(); What should we do in this case? Right now, callers of this function won't know that the error happened, so they'll go ahead and send the BloomFilterPB even though its not really valid, since it doesn't have a directory or always_true/false set. How about in this case we 'disable' the bloom filter, i.e. set always_true = true. Then over in the coordinator we'll know that if always_false = always_true = false it must be the case that sidecar_idx is set. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 14 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Tue, 13 Aug 2019 18:05:51 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. Patch Set 14: Build Successful https://jenkins.impala.io/job/gerrit-code-review-checks/4218/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests. -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 14 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall Gerrit-Comment-Date: Mon, 12 Aug 2019 15:39:49 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-7984: Port runtime filter from Thrift RPC to KRPC
Fang-Yu Rao has uploaded a new patch set (#14). ( http://gerrit.cloudera.org:8080/13882 ) Change subject: IMPALA-7984: Port runtime filter from Thrift RPC to KRPC .. IMPALA-7984: Port runtime filter from Thrift RPC to KRPC Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly. TODO: To remove unnecessary code related to ImpalaInternalService. Testing: This patch has passed the exhaustive tests. Change-Id: I6b394796d250286510e157ae326882bfc01d387a --- M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/rpc/thrift-server-test.cc M be/src/runtime/backend-client.h M be/src/runtime/client-cache.cc M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-test.cc M be/src/runtime/decimal-value.h M be/src/runtime/decimal-value.inline.h M be/src/runtime/exec-env.cc M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/timestamp-value.h M be/src/scheduling/request-pool-service.h M be/src/service/client-request-state.cc M be/src/service/client-request-state.h M be/src/service/data-stream-service.cc M be/src/service/data-stream-service.h M be/src/service/frontend.h M be/src/service/impala-internal-service.cc M be/src/service/impala-internal-service.h M be/src/service/impala-server.cc M be/src/service/impala-server.h M be/src/util/bloom-filter-test.cc M be/src/util/bloom-filter.cc M be/src/util/bloom-filter.h M be/src/util/min-max-filter-test.cc M be/src/util/min-max-filter.cc M be/src/util/min-max-filter.h M common/protobuf/common.proto M common/protobuf/data_stream_service.proto M common/thrift/ImpalaInternalService.thrift 40 files changed, 1,063 insertions(+), 728 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/82/13882/14 -- To view, visit http://gerrit.cloudera.org:8080/13882 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: I6b394796d250286510e157ae326882bfc01d387a Gerrit-Change-Number: 13882 Gerrit-PatchSet: 14 Gerrit-Owner: Fang-Yu Rao Gerrit-Reviewer: Fang-Yu Rao Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Thomas Tauber-Marshall