This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 33ddff37f6 GH-31521: [C++][Flight] Migrate Flight SQL client to Result 
(#36559)
33ddff37f6 is described below

commit 33ddff37f616123f623f922c2317b03f87a88489
Author: Dane Pitkin <[email protected]>
AuthorDate: Sat Jul 8 12:33:15 2023 -0400

    GH-31521: [C++][Flight] Migrate Flight SQL client to Result (#36559)
    
    ### Rationale for this change
    
    Arrow introduced a `Result` return object that can be used in place of 
passing output parameters. Most of Flight has already been migrated.
    
    ### Are these changes tested?
    
    C++ tests pass, which involve the sqlite and acero example servers. Any 
other testing we would like to see?
    
    ```
    100% tests passed, 0 tests failed out of 84
    
    Label Time Summary:
    arrow-tests         = 168.18 sec*proc (35 tests)
    arrow_acero         = 106.32 sec*proc (12 tests)
    arrow_compute       =  76.46 sec*proc (13 tests)
    arrow_dataset       =  78.09 sec*proc (12 tests)
    arrow_flight        = 123.81 sec*proc (2 tests)
    arrow_flight_sql    =  19.29 sec*proc (1 test)
    filesystem          =  10.39 sec*proc (2 tests)
    parquet-tests       =  55.24 sec*proc (9 tests)
    unittest            = 627.40 sec*proc (84 tests)
    
    Total Test time (real) = 120.83 sec
    ```
    
    ### Are there any user-facing changes?
    
    Yes. The protected methods DoPut, DoGet, and DoAction for the SQL client 
have been updated. DoGet already exposed a public API and that has been updated 
to be virtual.
    * Closes: #31521
    
    Authored-by: Dane Pitkin <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 cpp/src/arrow/flight/sql/client.cc | 67 ++++++++++++++------------------------
 cpp/src/arrow/flight/sql/client.h  | 26 +++++----------
 2 files changed, 33 insertions(+), 60 deletions(-)

diff --git a/cpp/src/arrow/flight/sql/client.cc 
b/cpp/src/arrow/flight/sql/client.cc
index a914a0587c..db95557490 100644
--- a/cpp/src/arrow/flight/sql/client.cc
+++ b/cpp/src/arrow/flight/sql/client.cc
@@ -213,22 +213,20 @@ arrow::Result<int64_t> 
FlightSqlClient::ExecuteUpdate(const FlightCallOptions& o
   ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor,
                         GetFlightDescriptorForCommand(command));
 
-  std::unique_ptr<FlightStreamWriter> writer;
-  std::unique_ptr<FlightMetadataReader> reader;
-
-  ARROW_RETURN_NOT_OK(DoPut(options, descriptor, arrow::schema({}), &writer, 
&reader));
+  ARROW_ASSIGN_OR_RAISE(auto result, DoPut(options, descriptor, 
arrow::schema({})))
   std::shared_ptr<Buffer> metadata;
-  ARROW_RETURN_NOT_OK(reader->ReadMetadata(&metadata));
-  ARROW_RETURN_NOT_OK(writer->Close());
+  ARROW_RETURN_NOT_OK(result.reader->ReadMetadata(&metadata));
+  ARROW_RETURN_NOT_OK(result.writer->Close());
 
   if (!metadata) return Status::IOError("Server did not send a response");
 
-  flight_sql_pb::DoPutUpdateResult result;
-  if (!result.ParseFromArray(metadata->data(), 
static_cast<int>(metadata->size()))) {
+  flight_sql_pb::DoPutUpdateResult update_result;
+  if (!update_result.ParseFromArray(metadata->data(),
+                                    static_cast<int>(metadata->size()))) {
     return Status::Invalid("Unable to parse DoPutUpdateResult");
   }
 
-  return result.record_count();
+  return update_result.record_count();
 }
 
 arrow::Result<int64_t> FlightSqlClient::ExecuteSubstraitUpdate(
@@ -243,21 +241,19 @@ arrow::Result<int64_t> 
FlightSqlClient::ExecuteSubstraitUpdate(
   ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor,
                         GetFlightDescriptorForCommand(command));
 
-  std::unique_ptr<FlightStreamWriter> writer;
-  std::unique_ptr<FlightMetadataReader> reader;
-
-  ARROW_RETURN_NOT_OK(DoPut(options, descriptor, arrow::schema({}), &writer, 
&reader));
+  ARROW_ASSIGN_OR_RAISE(auto result, DoPut(options, descriptor, 
arrow::schema({})));
 
   std::shared_ptr<Buffer> metadata;
-  ARROW_RETURN_NOT_OK(reader->ReadMetadata(&metadata));
-  ARROW_RETURN_NOT_OK(writer->Close());
+  ARROW_RETURN_NOT_OK(result.reader->ReadMetadata(&metadata));
+  ARROW_RETURN_NOT_OK(result.writer->Close());
 
-  flight_sql_pb::DoPutUpdateResult result;
-  if (!result.ParseFromArray(metadata->data(), 
static_cast<int>(metadata->size()))) {
+  flight_sql_pb::DoPutUpdateResult update_result;
+  if (!update_result.ParseFromArray(metadata->data(),
+                                    static_cast<int>(metadata->size()))) {
     return Status::Invalid("Unable to parse DoPutUpdateResult");
   }
 
-  return result.record_count();
+  return update_result.record_count();
 }
 
 arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetCatalogs(
@@ -478,10 +474,7 @@ arrow::Result<std::unique_ptr<SchemaResult>> 
FlightSqlClient::GetSqlInfoSchema(
 
 arrow::Result<std::unique_ptr<FlightStreamReader>> FlightSqlClient::DoGet(
     const FlightCallOptions& options, const Ticket& ticket) {
-  std::unique_ptr<FlightStreamReader> stream;
-  ARROW_RETURN_NOT_OK(DoGet(options, ticket, &stream));
-
-  return std::move(stream);
+  return impl_->DoGet(options, ticket);
 }
 
 arrow::Result<std::shared_ptr<PreparedStatement>> FlightSqlClient::Prepare(
@@ -493,9 +486,8 @@ arrow::Result<std::shared_ptr<PreparedStatement>> 
FlightSqlClient::Prepare(
     request.set_transaction_id(transaction.transaction_id());
   }
 
-  std::unique_ptr<ResultStream> results;
   ARROW_ASSIGN_OR_RAISE(auto action, PackAction("CreatePreparedStatement", 
request));
-  ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+  ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action))
 
   return PreparedStatement::ParseResponse(this, std::move(results));
 }
@@ -509,9 +501,8 @@ arrow::Result<std::shared_ptr<PreparedStatement>> 
FlightSqlClient::PrepareSubstr
     request.set_transaction_id(transaction.transaction_id());
   }
 
-  std::unique_ptr<ResultStream> results;
   ARROW_ASSIGN_OR_RAISE(auto action, PackAction("CreatePreparedSubstraitPlan", 
request));
-  ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+  ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action))
 
   return PreparedStatement::ParseResponse(this, std::move(results));
 }
@@ -657,9 +648,8 @@ Status PreparedStatement::Close(const FlightCallOptions& 
options) {
   flight_sql_pb::ActionClosePreparedStatementRequest request;
   request.set_prepared_statement_handle(handle_);
 
-  std::unique_ptr<ResultStream> results;
   ARROW_ASSIGN_OR_RAISE(auto action, PackAction("ClosePreparedStatement", 
request));
-  ARROW_RETURN_NOT_OK(client_->DoAction(options, action, &results));
+  ARROW_ASSIGN_OR_RAISE(auto results, client_->DoAction(options, action));
   ARROW_RETURN_NOT_OK(results->Drain());
 
   is_closed_ = true;
@@ -670,9 +660,8 @@ Status PreparedStatement::Close(const FlightCallOptions& 
options) {
     const FlightCallOptions& options) {
   flight_sql_pb::ActionBeginTransactionRequest request;
 
-  std::unique_ptr<ResultStream> results;
   ARROW_ASSIGN_OR_RAISE(auto action, PackAction("BeginTransaction", request));
-  ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+  ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));
 
   flight_sql_pb::ActionBeginTransactionResult transaction;
   ARROW_RETURN_NOT_OK(ReadResult(results.get(), &transaction));
@@ -695,9 +684,8 @@ Status PreparedStatement::Close(const FlightCallOptions& 
options) {
   request.set_transaction_id(transaction.transaction_id());
   request.set_name(name);
 
-  std::unique_ptr<ResultStream> results;
   ARROW_ASSIGN_OR_RAISE(auto action, PackAction("BeginSavepoint", request));
-  ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+  ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));
 
   flight_sql_pb::ActionBeginSavepointResult savepoint;
   ARROW_RETURN_NOT_OK(ReadResult(results.get(), &savepoint));
@@ -719,9 +707,8 @@ Status FlightSqlClient::Commit(const FlightCallOptions& 
options,
   request.set_transaction_id(transaction.transaction_id());
   
request.set_action(flight_sql_pb::ActionEndTransactionRequest::END_TRANSACTION_COMMIT);
 
-  std::unique_ptr<ResultStream> results;
   ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndTransaction", request));
-  ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+  ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));
 
   ARROW_RETURN_NOT_OK(results->Drain());
   return Status::OK();
@@ -737,9 +724,8 @@ Status FlightSqlClient::Release(const FlightCallOptions& 
options,
   request.set_savepoint_id(savepoint.savepoint_id());
   
request.set_action(flight_sql_pb::ActionEndSavepointRequest::END_SAVEPOINT_RELEASE);
 
-  std::unique_ptr<ResultStream> results;
   ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndSavepoint", request));
-  ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+  ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));
 
   ARROW_RETURN_NOT_OK(results->Drain());
   return Status::OK();
@@ -756,9 +742,8 @@ Status FlightSqlClient::Rollback(const FlightCallOptions& 
options,
   request.set_action(
       flight_sql_pb::ActionEndTransactionRequest::END_TRANSACTION_ROLLBACK);
 
-  std::unique_ptr<ResultStream> results;
   ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndTransaction", request));
-  ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+  ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));
 
   ARROW_RETURN_NOT_OK(results->Drain());
   return Status::OK();
@@ -774,9 +759,8 @@ Status FlightSqlClient::Rollback(const FlightCallOptions& 
options,
   request.set_savepoint_id(savepoint.savepoint_id());
   
request.set_action(flight_sql_pb::ActionEndSavepointRequest::END_SAVEPOINT_ROLLBACK);
 
-  std::unique_ptr<ResultStream> results;
   ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndSavepoint", request));
-  ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+  ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));
 
   ARROW_RETURN_NOT_OK(results->Drain());
   return Status::OK();
@@ -788,9 +772,8 @@ Status FlightSqlClient::Rollback(const FlightCallOptions& 
options,
   ARROW_ASSIGN_OR_RAISE(auto serialized_info, info.SerializeToString());
   request.set_info(std::move(serialized_info));
 
-  std::unique_ptr<ResultStream> results;
   ARROW_ASSIGN_OR_RAISE(auto action, PackAction("CancelQuery", request));
-  ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+  ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action))
 
   flight_sql_pb::ActionCancelQueryResult result;
   ARROW_RETURN_NOT_OK(ReadResult(results.get(), &result));
diff --git a/cpp/src/arrow/flight/sql/client.h 
b/cpp/src/arrow/flight/sql/client.h
index 812b5ffb58..5f3fc7d857 100644
--- a/cpp/src/arrow/flight/sql/client.h
+++ b/cpp/src/arrow/flight/sql/client.h
@@ -131,7 +131,7 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
   /// \param[in] options Per-RPC options
   /// \param[in] ticket The flight ticket to use
   /// \return The returned RecordBatchReader
-  arrow::Result<std::unique_ptr<FlightStreamReader>> DoGet(
+  virtual arrow::Result<std::unique_ptr<FlightStreamReader>> DoGet(
       const FlightCallOptions& options, const Ticket& ticket);
 
   /// \brief Request a list of tables.
@@ -364,25 +364,15 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
   Status Close();
 
  protected:
-  virtual Status DoPut(const FlightCallOptions& options,
-                       const FlightDescriptor& descriptor,
-                       const std::shared_ptr<Schema>& schema,
-                       std::unique_ptr<FlightStreamWriter>* writer,
-                       std::unique_ptr<FlightMetadataReader>* reader) {
-    ARROW_ASSIGN_OR_RAISE(auto result, impl_->DoPut(options, descriptor, 
schema));
-    *writer = std::move(result.writer);
-    *reader = std::move(result.reader);
-    return Status::OK();
+  virtual ::arrow::Result<FlightClient::DoPutResult> DoPut(
+      const FlightCallOptions& options, const FlightDescriptor& descriptor,
+      const std::shared_ptr<Schema>& schema) {
+    return impl_->DoPut(options, descriptor, schema);
   }
 
-  virtual Status DoGet(const FlightCallOptions& options, const Ticket& ticket,
-                       std::unique_ptr<FlightStreamReader>* stream) {
-    return impl_->DoGet(options, ticket).Value(stream);
-  }
-
-  virtual Status DoAction(const FlightCallOptions& options, const Action& 
action,
-                          std::unique_ptr<ResultStream>* results) {
-    return impl_->DoAction(options, action).Value(results);
+  virtual ::arrow::Result<std::unique_ptr<ResultStream>> DoAction(
+      const FlightCallOptions& options, const Action& action) {
+    return impl_->DoAction(options, action);
   }
 };
 

Reply via email to