pitrou commented on code in PR #36517:
URL: https://github.com/apache/arrow/pull/36517#discussion_r1287123577


##########
cpp/src/arrow/flight/test_definitions.cc:
##########
@@ -1656,5 +1812,107 @@ void ErrorHandlingTest::TestDoExchange() {
   reader_thread.join();
 }
 
+//------------------------------------------------------------
+// Test async clients
+
+void AsyncClientTest::SetUpTest() {
+  if (!supports_async()) {
+    GTEST_SKIP() << "async is not supported";
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto location, Location::ForScheme(transport(), 
"127.0.0.1", 0));
+
+  server_ = ExampleTestServer();
+  FlightServerOptions server_options(location);
+  ASSERT_OK(server_->Init(server_options));
+
+  std::string uri = location.scheme() + "://127.0.0.1:" + 
std::to_string(server_->port());
+  ASSERT_OK_AND_ASSIGN(auto real_location, Location::Parse(uri));
+  FlightClientOptions client_options = FlightClientOptions::Defaults();
+  ASSERT_OK_AND_ASSIGN(client_, FlightClient::Connect(real_location, 
client_options));
+
+  ASSERT_TRUE(client_->supports_async());
+}
+void AsyncClientTest::TearDownTest() {
+  if (supports_async()) {
+    ASSERT_OK(client_->Close());
+    ASSERT_OK(server_->Shutdown());
+  }
+}
+
+void AsyncClientTest::TestGetFlightInfo() {
+  class Listener : public AsyncListener<FlightInfo> {
+   public:
+    void OnNext(FlightInfo info) override {
+      info_ = std::move(info);
+      counter_++;
+    }
+
+    void OnFinish(Status status) override {
+      ASSERT_FALSE(future_.is_finished());
+      if (status.ok()) {
+        future_.MarkFinished(std::move(info_));
+      } else {
+        future_.MarkFinished(std::move(status));
+      }
+    }
+
+    int counter_ = 0;
+    FlightInfo info_ = FlightInfo(FlightInfo::Data());
+    arrow::Future<FlightInfo> future_ = arrow::Future<FlightInfo>::Make();
+  };
+
+  auto descr = FlightDescriptor::Command("status-outofmemory");
+  auto listener = std::make_shared<Listener>();
+  client_->GetFlightInfoAsync(descr, listener);
+
+  ASSERT_FINISHES_AND_RAISES(UnknownError, listener->future_);
+  ASSERT_THAT(listener->future_.status().ToString(), 
::testing::HasSubstr("Sentinel"));
+  ASSERT_EQ(0, listener->counter_);
+}
+void AsyncClientTest::TestGetFlightInfoFuture() {
+  auto descr = FlightDescriptor::Command("status-outofmemory");
+  auto future = client_->GetFlightInfoAsync(descr);
+  ASSERT_FINISHES_AND_RAISES(UnknownError, future);
+  ASSERT_THAT(future.status().ToString(), ::testing::HasSubstr("Sentinel"));
+
+  descr = FlightDescriptor::Command("my_command");
+  future = client_->GetFlightInfoAsync(descr);
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto info, future);
+  // See test_util.cc:ExampleFlightInfo
+  ASSERT_EQ(descr, info.descriptor());
+  ASSERT_EQ(1000, info.total_records());
+  ASSERT_EQ(100000, info.total_bytes());
+}
+void AsyncClientTest::TestListenerLifetime() {
+  arrow::Future<FlightInfo> future = arrow::Future<FlightInfo>::Make();
+
+  class Listener : public AsyncListener<FlightInfo> {
+   public:
+    void OnNext(FlightInfo info) override { info_ = std::move(info); }
+
+    void OnFinish(Status status) override {
+      if (status.ok()) {
+        future_.MarkFinished(std::move(info_));
+      } else {
+        future_.MarkFinished(std::move(status));
+      }
+    }
+
+    FlightInfo info_ = FlightInfo(FlightInfo::Data());
+    arrow::Future<FlightInfo> future_;
+  };
+
+  // Bad client code: don't retain a reference to the listener
+  {
+    auto descr = FlightDescriptor::Command("my_command");
+    auto listener = std::make_shared<Listener>();
+    listener->future_ = future;
+    client_->GetFlightInfoAsync(descr, std::move(listener));
+  }
+
+  ASSERT_FINISHES_OK(future);
+}
+

Review Comment:
   It would be nice to add a test that 1) starts a long-running async operation 
2) closes the client 3) checks the Future is well-behaved (perhaps it returns 
`Status::Cancelled`?).
   
   Perhaps in another PR?



##########
cpp/src/arrow/flight/test_definitions.cc:
##########
@@ -1656,5 +1812,107 @@ void ErrorHandlingTest::TestDoExchange() {
   reader_thread.join();
 }
 
+//------------------------------------------------------------
+// Test async clients
+
+void AsyncClientTest::SetUpTest() {
+  if (!supports_async()) {
+    GTEST_SKIP() << "async is not supported";
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto location, Location::ForScheme(transport(), 
"127.0.0.1", 0));
+
+  server_ = ExampleTestServer();
+  FlightServerOptions server_options(location);
+  ASSERT_OK(server_->Init(server_options));
+
+  std::string uri = location.scheme() + "://127.0.0.1:" + 
std::to_string(server_->port());
+  ASSERT_OK_AND_ASSIGN(auto real_location, Location::Parse(uri));
+  FlightClientOptions client_options = FlightClientOptions::Defaults();
+  ASSERT_OK_AND_ASSIGN(client_, FlightClient::Connect(real_location, 
client_options));
+
+  ASSERT_TRUE(client_->supports_async());
+}
+void AsyncClientTest::TearDownTest() {
+  if (supports_async()) {
+    ASSERT_OK(client_->Close());
+    ASSERT_OK(server_->Shutdown());
+  }
+}
+
+void AsyncClientTest::TestGetFlightInfo() {
+  class Listener : public AsyncListener<FlightInfo> {
+   public:
+    void OnNext(FlightInfo info) override {
+      info_ = std::move(info);
+      counter_++;
+    }
+
+    void OnFinish(Status status) override {
+      ASSERT_FALSE(future_.is_finished());
+      if (status.ok()) {
+        future_.MarkFinished(std::move(info_));
+      } else {
+        future_.MarkFinished(std::move(status));
+      }
+    }
+
+    int counter_ = 0;

Review Comment:
   Do we need to make this `std::atomic<int>`?



##########
cpp/src/arrow/flight/transport/grpc/grpc_client.cc:
##########
@@ -549,6 +554,127 @@ class GrpcResultStream : public ResultStream {
   std::unique_ptr<::grpc::ClientReader<pb::Result>> stream_;
 };
 
+#ifdef GRPC_ENABLE_ASYNC
+/// Force destruction to wait for RPC completion.
+class FinishedFlag {
+ public:
+  ~FinishedFlag() { Wait(); }
+
+  void Finish() {
+    std::lock_guard<std::mutex> guard(mutex_);
+    finished_ = true;
+    cv_.notify_all();
+  }
+  void Wait() const {
+    std::unique_lock<std::mutex> guard(mutex_);
+    cv_.wait(guard, [&]() { return finished_; });
+  }
+
+ private:
+  mutable std::mutex mutex_;
+  mutable std::condition_variable cv_;
+  bool finished_{false};
+};
+
+// XXX: it appears that if we destruct gRPC resources (like a
+// ClientContext) from a gRPC callback, we will be running on a gRPC
+// thread and we may attempt to join ourselves (because gRPC
+// apparently refcounts threads).  Avoid that by transferring gRPC
+// resources to a dedicated thread for destruction.
+class GrpcGarbageBin {
+ public:
+  GrpcGarbageBin() {
+    grpc_destructor_thread_ = std::thread([&]() {
+      while (true) {
+        std::unique_lock<std::mutex> guard(grpc_destructor_mutex_);
+        grpc_destructor_cv_.wait(guard,
+                                 [&]() { return !running_ || 
!garbage_bin_.empty(); });
+
+        garbage_bin_.clear();
+
+        if (!running_) return;
+      }
+    });
+  }
+
+  void Dispose(std::unique_ptr<internal::AsyncRpc> trash) {
+    std::unique_lock<std::mutex> guard(grpc_destructor_mutex_);
+    if (!running_) return;
+    garbage_bin_.push_back(std::move(trash));
+    grpc_destructor_cv_.notify_all();
+  }
+
+  void Stop() {
+    {
+      std::unique_lock<std::mutex> guard(grpc_destructor_mutex_);
+      running_ = false;
+      grpc_destructor_cv_.notify_all();
+    }
+    grpc_destructor_thread_.join();
+  }
+
+ private:
+  bool running_ = true;
+  std::thread grpc_destructor_thread_;
+  std::mutex grpc_destructor_mutex_;
+  std::condition_variable grpc_destructor_cv_;
+  std::deque<std::unique_ptr<internal::AsyncRpc>> garbage_bin_;
+};
+
+template <typename Result, typename Request, typename Response>
+class UnaryUnaryAsyncCall : public ::grpc::ClientUnaryReactor, public 
internal::AsyncRpc {
+ public:
+  ClientRpc rpc;
+  std::shared_ptr<AsyncListener<Result>> listener;
+  std::shared_ptr<GrpcGarbageBin> garbage_bin_;
+
+  Request pb_request;
+  Response pb_response;
+  Status client_status;
+
+  // Destruct last
+  FinishedFlag finished;
+
+  explicit UnaryUnaryAsyncCall(const FlightCallOptions& options,
+                               std::shared_ptr<AsyncListener<Result>> listener,
+                               std::shared_ptr<GrpcGarbageBin> garbage_bin)
+      : rpc(options),
+        listener(std::move(listener)),
+        garbage_bin_(std::move(garbage_bin)) {}
+
+  void TryCancel() override { rpc.context.TryCancel(); }
+
+  void OnDone(const ::grpc::Status& status) override {
+    if (status.ok()) {
+      auto result = internal::FromProto(pb_response);
+      client_status = result.status();
+      if (client_status.ok()) {
+        listener->OnNext(std::move(result).MoveValueUnsafe());
+      }
+    }
+    Finish(status);
+  }
+
+  void Finish(const ::grpc::Status& status) {
+    auto listener = std::move(this->listener);
+    listener->OnFinish(
+        CombinedTransportStatus(status, std::move(client_status), 
&rpc.context));
+    // SetAsyncRpc may trigger destruction, so Finish() first
+    finished.Finish();
+    // Instead of potentially destructing gRPC resources here,
+    // transfer it to a dedicated background thread
+    garbage_bin_->Dispose(
+        flight::internal::ClientTransport::ReleaseAsyncRpc(listener.get()));
+  }
+};
+
+#define LISTENER_NOT_OK(LISTENER, EXPR)                 \

Review Comment:
   Undef this at end of file?



##########
cpp/src/arrow/flight/test_definitions.cc:
##########
@@ -1656,5 +1812,107 @@ void ErrorHandlingTest::TestDoExchange() {
   reader_thread.join();
 }
 
+//------------------------------------------------------------
+// Test async clients
+
+void AsyncClientTest::SetUpTest() {
+  if (!supports_async()) {
+    GTEST_SKIP() << "async is not supported";
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto location, Location::ForScheme(transport(), 
"127.0.0.1", 0));
+
+  server_ = ExampleTestServer();
+  FlightServerOptions server_options(location);
+  ASSERT_OK(server_->Init(server_options));
+
+  std::string uri = location.scheme() + "://127.0.0.1:" + 
std::to_string(server_->port());
+  ASSERT_OK_AND_ASSIGN(auto real_location, Location::Parse(uri));
+  FlightClientOptions client_options = FlightClientOptions::Defaults();
+  ASSERT_OK_AND_ASSIGN(client_, FlightClient::Connect(real_location, 
client_options));
+
+  ASSERT_TRUE(client_->supports_async());
+}
+void AsyncClientTest::TearDownTest() {
+  if (supports_async()) {
+    ASSERT_OK(client_->Close());
+    ASSERT_OK(server_->Shutdown());
+  }
+}
+
+void AsyncClientTest::TestGetFlightInfo() {
+  class Listener : public AsyncListener<FlightInfo> {
+   public:
+    void OnNext(FlightInfo info) override {
+      info_ = std::move(info);
+      counter_++;
+    }
+
+    void OnFinish(Status status) override {
+      ASSERT_FALSE(future_.is_finished());
+      if (status.ok()) {
+        future_.MarkFinished(std::move(info_));
+      } else {
+        future_.MarkFinished(std::move(status));
+      }
+    }
+
+    int counter_ = 0;
+    FlightInfo info_ = FlightInfo(FlightInfo::Data());
+    arrow::Future<FlightInfo> future_ = arrow::Future<FlightInfo>::Make();
+  };
+
+  auto descr = FlightDescriptor::Command("status-outofmemory");
+  auto listener = std::make_shared<Listener>();
+  client_->GetFlightInfoAsync(descr, listener);
+
+  ASSERT_FINISHES_AND_RAISES(UnknownError, listener->future_);
+  ASSERT_THAT(listener->future_.status().ToString(), 
::testing::HasSubstr("Sentinel"));
+  ASSERT_EQ(0, listener->counter_);
+}
+void AsyncClientTest::TestGetFlightInfoFuture() {

Review Comment:
   Style nit, but can you skip a line between method definitions?
   ```suggestion
   }
   
   void AsyncClientTest::TestGetFlightInfoFuture() {
   ```
   



##########
cpp/src/arrow/flight/transport/grpc/grpc_client.cc:
##########
@@ -702,10 +828,22 @@ class GrpcClientImpl : public internal::ClientTransport {
     stub_ = pb::FlightService::NewStub(
         ::grpc::experimental::CreateCustomChannelWithInterceptors(
             grpc_uri.str(), creds, args, std::move(interceptors)));
+
+#ifdef GRPC_ENABLE_ASYNC
+    garbage_bin_ = std::make_shared<GrpcGarbageBin>();
+#endif
+
     return Status::OK();
   }
 
   Status Close() override {
+#ifdef GRPC_ENABLE_ASYNC
+    // XXX: if there are async RPCs running when the client is
+    // stopped, then when they go to use the garbage bin, they'll
+    // instead synchronously dispose of resources from the callback
+    // thread, and will likely crash.

Review Comment:
   Perhaps open an issue for this?
   



##########
cpp/src/arrow/flight/test_definitions.cc:
##########
@@ -1656,5 +1812,107 @@ void ErrorHandlingTest::TestDoExchange() {
   reader_thread.join();
 }
 
+//------------------------------------------------------------
+// Test async clients
+
+void AsyncClientTest::SetUpTest() {
+  if (!supports_async()) {
+    GTEST_SKIP() << "async is not supported";
+  }
+
+  ASSERT_OK_AND_ASSIGN(auto location, Location::ForScheme(transport(), 
"127.0.0.1", 0));
+
+  server_ = ExampleTestServer();
+  FlightServerOptions server_options(location);
+  ASSERT_OK(server_->Init(server_options));
+
+  std::string uri = location.scheme() + "://127.0.0.1:" + 
std::to_string(server_->port());
+  ASSERT_OK_AND_ASSIGN(auto real_location, Location::Parse(uri));
+  FlightClientOptions client_options = FlightClientOptions::Defaults();
+  ASSERT_OK_AND_ASSIGN(client_, FlightClient::Connect(real_location, 
client_options));
+
+  ASSERT_TRUE(client_->supports_async());
+}
+void AsyncClientTest::TearDownTest() {
+  if (supports_async()) {
+    ASSERT_OK(client_->Close());
+    ASSERT_OK(server_->Shutdown());
+  }
+}
+
+void AsyncClientTest::TestGetFlightInfo() {
+  class Listener : public AsyncListener<FlightInfo> {
+   public:
+    void OnNext(FlightInfo info) override {
+      info_ = std::move(info);
+      counter_++;
+    }
+
+    void OnFinish(Status status) override {
+      ASSERT_FALSE(future_.is_finished());
+      if (status.ok()) {
+        future_.MarkFinished(std::move(info_));
+      } else {
+        future_.MarkFinished(std::move(status));
+      }
+    }
+
+    int counter_ = 0;
+    FlightInfo info_ = FlightInfo(FlightInfo::Data());
+    arrow::Future<FlightInfo> future_ = arrow::Future<FlightInfo>::Make();
+  };
+
+  auto descr = FlightDescriptor::Command("status-outofmemory");
+  auto listener = std::make_shared<Listener>();
+  client_->GetFlightInfoAsync(descr, listener);
+
+  ASSERT_FINISHES_AND_RAISES(UnknownError, listener->future_);
+  ASSERT_THAT(listener->future_.status().ToString(), 
::testing::HasSubstr("Sentinel"));
+  ASSERT_EQ(0, listener->counter_);
+}
+void AsyncClientTest::TestGetFlightInfoFuture() {
+  auto descr = FlightDescriptor::Command("status-outofmemory");
+  auto future = client_->GetFlightInfoAsync(descr);
+  ASSERT_FINISHES_AND_RAISES(UnknownError, future);
+  ASSERT_THAT(future.status().ToString(), ::testing::HasSubstr("Sentinel"));
+
+  descr = FlightDescriptor::Command("my_command");
+  future = client_->GetFlightInfoAsync(descr);
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto info, future);
+  // See test_util.cc:ExampleFlightInfo
+  ASSERT_EQ(descr, info.descriptor());
+  ASSERT_EQ(1000, info.total_records());
+  ASSERT_EQ(100000, info.total_bytes());
+}
+void AsyncClientTest::TestListenerLifetime() {
+  arrow::Future<FlightInfo> future = arrow::Future<FlightInfo>::Make();
+
+  class Listener : public AsyncListener<FlightInfo> {
+   public:
+    void OnNext(FlightInfo info) override { info_ = std::move(info); }
+
+    void OnFinish(Status status) override {
+      if (status.ok()) {
+        future_.MarkFinished(std::move(info_));
+      } else {
+        future_.MarkFinished(std::move(status));
+      }
+    }
+
+    FlightInfo info_ = FlightInfo(FlightInfo::Data());
+    arrow::Future<FlightInfo> future_;
+  };
+
+  // Bad client code: don't retain a reference to the listener

Review Comment:
   It would be nice to explain a bit more what the expectations are in this 
test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to