This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new f03943cf1f GH-36369: [C++][FlightRPC] Fix a hang bug in 
FlightClient::Authenticate*() (#36372)
f03943cf1f is described below

commit f03943cf1f5e17cefe51c871cd705c87b322015f
Author: Sutou Kouhei <[email protected]>
AuthorDate: Sun Jul 2 06:11:47 2023 +0900

    GH-36369: [C++][FlightRPC] Fix a hang bug in FlightClient::Authenticate*() 
(#36372)
    
    ### Rationale for this change
    
    We need to drain all responses from server before we call gRPC's Finish() 
to avoid hanging.
    
    ### What changes are included in this PR?
    
    Read all responses before calling Finish().
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    Yes.
    * Closes: #36369
    
    Authored-by: Sutou Kouhei <[email protected]>
    Signed-off-by: Sutou Kouhei <[email protected]>
---
 cpp/src/arrow/flight/transport/grpc/grpc_client.cc | 56 ++++++++++++----------
 1 file changed, 30 insertions(+), 26 deletions(-)

diff --git a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc 
b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc
index 5abecd91a3..a1d0e3266b 100644
--- a/cpp/src/arrow/flight/transport/grpc/grpc_client.cc
+++ b/cpp/src/arrow/flight/transport/grpc/grpc_client.cc
@@ -285,7 +285,7 @@ class FinishableDataStream : public 
internal::ClientDataStream {
     // reader finishes, so it's OK to assume the client no longer
     // wants to read and drain the read side. (If the client wants to
     // indicate that it is done writing, but not done reading, it
-    // should use DoneWriting.
+    // should use DoneWriting.)
     ReadPayloadType message;
     while (ReadPayload(stream_.get(), &message)) {
       // Drain the read side to avoid gRPC hanging in Finish()
@@ -732,38 +732,16 @@ class GrpcClientImpl : public internal::ClientTransport {
                       std::unique_ptr<ClientAuthHandler> auth_handler) 
override {
     auth_handler_ = std::move(auth_handler);
     ClientRpc rpc(options);
-    std::shared_ptr<
-        ::grpc::ClientReaderWriter<pb::HandshakeRequest, 
pb::HandshakeResponse>>
-        stream = stub_->Handshake(&rpc.context);
-    GrpcClientAuthSender outgoing{stream};
-    GrpcClientAuthReader incoming{stream};
-    RETURN_NOT_OK(auth_handler_->Authenticate(&outgoing, &incoming));
-    // Explicitly close our side of the connection
-    bool finished_writes = stream->WritesDone();
-    RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context));
-    if (!finished_writes) {
-      return MakeFlightError(FlightStatusCode::Internal,
-                             "Could not finish writing before closing");
-    }
-    return Status::OK();
+    return AuthenticateInternal(rpc);
   }
 
   arrow::Result<std::pair<std::string, std::string>> AuthenticateBasicToken(
       const FlightCallOptions& options, const std::string& username,
       const std::string& password) override {
-    // Add basic auth headers to outgoing headers.
     ClientRpc rpc(options);
+    // Add basic auth headers to outgoing headers.
     AddBasicAuthHeaders(&rpc.context, username, password);
-    std::shared_ptr<
-        ::grpc::ClientReaderWriter<pb::HandshakeRequest, 
pb::HandshakeResponse>>
-        stream = stub_->Handshake(&rpc.context);
-    // Explicitly close our side of the connection.
-    bool finished_writes = stream->WritesDone();
-    RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context));
-    if (!finished_writes) {
-      return MakeFlightError(FlightStatusCode::Internal,
-                             "Could not finish writing before closing");
-    }
+    RETURN_NOT_OK(AuthenticateInternal(rpc));
     // Grab bearer token from incoming headers.
     return GetBearerTokenHeader(rpc.context);
   }
@@ -893,6 +871,32 @@ class GrpcClientImpl : public internal::ClientTransport {
   }
 
  private:
+  Status AuthenticateInternal(ClientRpc& rpc) {
+    std::shared_ptr<
+        ::grpc::ClientReaderWriter<pb::HandshakeRequest, 
pb::HandshakeResponse>>
+        stream = stub_->Handshake(&rpc.context);
+    if (auth_handler_) {
+      GrpcClientAuthSender outgoing{stream};
+      GrpcClientAuthReader incoming{stream};
+      RETURN_NOT_OK(auth_handler_->Authenticate(&outgoing, &incoming));
+    }
+    // Explicitly close our side of the connection
+    bool finished_writes = stream->WritesDone();
+    if (!finished_writes) {
+      return MakeFlightError(FlightStatusCode::Internal,
+                             "Could not finish writing before closing");
+    }
+    // Drain the read side, as otherwise gRPC Finish() will hang. We
+    // only call Finish() when the client closes the writer or the
+    // reader finishes, so it's OK to assume the client no longer
+    // wants to read and drain the read side.
+    pb::HandshakeResponse response;
+    while (stream->Read(&response)) {
+    }
+    RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context));
+    return Status::OK();
+  }
+
   std::unique_ptr<pb::FlightService::Stub> stub_;
   std::shared_ptr<ClientAuthHandler> auth_handler_;
 #if defined(GRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS) && \

Reply via email to