This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new f03943cf1f GH-36369: [C++][FlightRPC] Fix a hang bug in
FlightClient::Authenticate*() (#36372)
f03943cf1f is described below
commit f03943cf1f5e17cefe51c871cd705c87b322015f
Author: Sutou Kouhei <[email protected]>
AuthorDate: Sun Jul 2 06:11:47 2023 +0900
GH-36369: [C++][FlightRPC] Fix a hang bug in FlightClient::Authenticate*()
(#36372)
### Rationale for this change
We need to drain all responses from server before we call gRPC's Finish()
to avoid hanging.
### What changes are included in this PR?
Read all responses before calling Finish().
### Are these changes tested?
Yes.
### Are there any user-facing changes?
Yes.
* Closes: #36369
Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
cpp/src/arrow/flight/transport/grpc/grpc_client.cc | 56 ++++++++++++----------
1 file changed, 30 insertions(+), 26 deletions(-)
diff --git a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc
b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc
index 5abecd91a3..a1d0e3266b 100644
--- a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc
+++ b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc
@@ -285,7 +285,7 @@ class FinishableDataStream : public
internal::ClientDataStream {
// reader finishes, so it's OK to assume the client no longer
// wants to read and drain the read side. (If the client wants to
// indicate that it is done writing, but not done reading, it
- // should use DoneWriting.
+ // should use DoneWriting.)
ReadPayloadType message;
while (ReadPayload(stream_.get(), &message)) {
// Drain the read side to avoid gRPC hanging in Finish()
@@ -732,38 +732,16 @@ class GrpcClientImpl : public internal::ClientTransport {
std::unique_ptr<ClientAuthHandler> auth_handler)
override {
auth_handler_ = std::move(auth_handler);
ClientRpc rpc(options);
- std::shared_ptr<
- ::grpc::ClientReaderWriter<pb::HandshakeRequest,
pb::HandshakeResponse>>
- stream = stub_->Handshake(&rpc.context);
- GrpcClientAuthSender outgoing{stream};
- GrpcClientAuthReader incoming{stream};
- RETURN_NOT_OK(auth_handler_->Authenticate(&outgoing, &incoming));
- // Explicitly close our side of the connection
- bool finished_writes = stream->WritesDone();
- RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context));
- if (!finished_writes) {
- return MakeFlightError(FlightStatusCode::Internal,
- "Could not finish writing before closing");
- }
- return Status::OK();
+ return AuthenticateInternal(rpc);
}
arrow::Result<std::pair<std::string, std::string>> AuthenticateBasicToken(
const FlightCallOptions& options, const std::string& username,
const std::string& password) override {
- // Add basic auth headers to outgoing headers.
ClientRpc rpc(options);
+ // Add basic auth headers to outgoing headers.
AddBasicAuthHeaders(&rpc.context, username, password);
- std::shared_ptr<
- ::grpc::ClientReaderWriter<pb::HandshakeRequest,
pb::HandshakeResponse>>
- stream = stub_->Handshake(&rpc.context);
- // Explicitly close our side of the connection.
- bool finished_writes = stream->WritesDone();
- RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context));
- if (!finished_writes) {
- return MakeFlightError(FlightStatusCode::Internal,
- "Could not finish writing before closing");
- }
+ RETURN_NOT_OK(AuthenticateInternal(rpc));
// Grab bearer token from incoming headers.
return GetBearerTokenHeader(rpc.context);
}
@@ -893,6 +871,32 @@ class GrpcClientImpl : public internal::ClientTransport {
}
private:
+ Status AuthenticateInternal(ClientRpc& rpc) {
+ std::shared_ptr<
+ ::grpc::ClientReaderWriter<pb::HandshakeRequest,
pb::HandshakeResponse>>
+ stream = stub_->Handshake(&rpc.context);
+ if (auth_handler_) {
+ GrpcClientAuthSender outgoing{stream};
+ GrpcClientAuthReader incoming{stream};
+ RETURN_NOT_OK(auth_handler_->Authenticate(&outgoing, &incoming));
+ }
+ // Explicitly close our side of the connection
+ bool finished_writes = stream->WritesDone();
+ if (!finished_writes) {
+ return MakeFlightError(FlightStatusCode::Internal,
+ "Could not finish writing before closing");
+ }
+ // Drain the read side, as otherwise gRPC Finish() will hang. We
+ // only call Finish() when the client closes the writer or the
+ // reader finishes, so it's OK to assume the client no longer
+ // wants to read and drain the read side.
+ pb::HandshakeResponse response;
+ while (stream->Read(&response)) {
+ }
+ RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context));
+ return Status::OK();
+ }
+
std::unique_ptr<pb::FlightService::Stub> stub_;
std::shared_ptr<ClientAuthHandler> auth_handler_;
#if defined(GRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS) && \