This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 33ddff37f6 GH-31521: [C++][Flight] Migrate Flight SQL client to Result
(#36559)
33ddff37f6 is described below
commit 33ddff37f616123f623f922c2317b03f87a88489
Author: Dane Pitkin <[email protected]>
AuthorDate: Sat Jul 8 12:33:15 2023 -0400
GH-31521: [C++][Flight] Migrate Flight SQL client to Result (#36559)
### Rationale for this change
Arrow introduced a `Result` return object that can be used in place of
passing output parameters. Most of Flight has already been migrated.
### Are these changes tested?
C++ tests pass, which involve the sqlite and acero example servers. Any
other testing we would like to see?
```
100% tests passed, 0 tests failed out of 84
Label Time Summary:
arrow-tests = 168.18 sec*proc (35 tests)
arrow_acero = 106.32 sec*proc (12 tests)
arrow_compute = 76.46 sec*proc (13 tests)
arrow_dataset = 78.09 sec*proc (12 tests)
arrow_flight = 123.81 sec*proc (2 tests)
arrow_flight_sql = 19.29 sec*proc (1 test)
filesystem = 10.39 sec*proc (2 tests)
parquet-tests = 55.24 sec*proc (9 tests)
unittest = 627.40 sec*proc (84 tests)
Total Test time (real) = 120.83 sec
```
### Are there any user-facing changes?
Yes. The protected methods DoPut, DoGet, and DoAction for the SQL client
have been updated. DoGet already exposed a public API and that has been updated
to be virtual.
* Closes: #31521
Authored-by: Dane Pitkin <[email protected]>
Signed-off-by: David Li <[email protected]>
---
cpp/src/arrow/flight/sql/client.cc | 67 ++++++++++++++------------------------
cpp/src/arrow/flight/sql/client.h | 26 +++++----------
2 files changed, 33 insertions(+), 60 deletions(-)
diff --git a/cpp/src/arrow/flight/sql/client.cc
b/cpp/src/arrow/flight/sql/client.cc
index a914a0587c..db95557490 100644
--- a/cpp/src/arrow/flight/sql/client.cc
+++ b/cpp/src/arrow/flight/sql/client.cc
@@ -213,22 +213,20 @@ arrow::Result<int64_t>
FlightSqlClient::ExecuteUpdate(const FlightCallOptions& o
ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor,
GetFlightDescriptorForCommand(command));
- std::unique_ptr<FlightStreamWriter> writer;
- std::unique_ptr<FlightMetadataReader> reader;
-
- ARROW_RETURN_NOT_OK(DoPut(options, descriptor, arrow::schema({}), &writer,
&reader));
+ ARROW_ASSIGN_OR_RAISE(auto result, DoPut(options, descriptor,
arrow::schema({})))
std::shared_ptr<Buffer> metadata;
- ARROW_RETURN_NOT_OK(reader->ReadMetadata(&metadata));
- ARROW_RETURN_NOT_OK(writer->Close());
+ ARROW_RETURN_NOT_OK(result.reader->ReadMetadata(&metadata));
+ ARROW_RETURN_NOT_OK(result.writer->Close());
if (!metadata) return Status::IOError("Server did not send a response");
- flight_sql_pb::DoPutUpdateResult result;
- if (!result.ParseFromArray(metadata->data(),
static_cast<int>(metadata->size()))) {
+ flight_sql_pb::DoPutUpdateResult update_result;
+ if (!update_result.ParseFromArray(metadata->data(),
+ static_cast<int>(metadata->size()))) {
return Status::Invalid("Unable to parse DoPutUpdateResult");
}
- return result.record_count();
+ return update_result.record_count();
}
arrow::Result<int64_t> FlightSqlClient::ExecuteSubstraitUpdate(
@@ -243,21 +241,19 @@ arrow::Result<int64_t>
FlightSqlClient::ExecuteSubstraitUpdate(
ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor,
GetFlightDescriptorForCommand(command));
- std::unique_ptr<FlightStreamWriter> writer;
- std::unique_ptr<FlightMetadataReader> reader;
-
- ARROW_RETURN_NOT_OK(DoPut(options, descriptor, arrow::schema({}), &writer,
&reader));
+ ARROW_ASSIGN_OR_RAISE(auto result, DoPut(options, descriptor,
arrow::schema({})));
std::shared_ptr<Buffer> metadata;
- ARROW_RETURN_NOT_OK(reader->ReadMetadata(&metadata));
- ARROW_RETURN_NOT_OK(writer->Close());
+ ARROW_RETURN_NOT_OK(result.reader->ReadMetadata(&metadata));
+ ARROW_RETURN_NOT_OK(result.writer->Close());
- flight_sql_pb::DoPutUpdateResult result;
- if (!result.ParseFromArray(metadata->data(),
static_cast<int>(metadata->size()))) {
+ flight_sql_pb::DoPutUpdateResult update_result;
+ if (!update_result.ParseFromArray(metadata->data(),
+ static_cast<int>(metadata->size()))) {
return Status::Invalid("Unable to parse DoPutUpdateResult");
}
- return result.record_count();
+ return update_result.record_count();
}
arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetCatalogs(
@@ -478,10 +474,7 @@ arrow::Result<std::unique_ptr<SchemaResult>>
FlightSqlClient::GetSqlInfoSchema(
arrow::Result<std::unique_ptr<FlightStreamReader>> FlightSqlClient::DoGet(
const FlightCallOptions& options, const Ticket& ticket) {
- std::unique_ptr<FlightStreamReader> stream;
- ARROW_RETURN_NOT_OK(DoGet(options, ticket, &stream));
-
- return std::move(stream);
+ return impl_->DoGet(options, ticket);
}
arrow::Result<std::shared_ptr<PreparedStatement>> FlightSqlClient::Prepare(
@@ -493,9 +486,8 @@ arrow::Result<std::shared_ptr<PreparedStatement>>
FlightSqlClient::Prepare(
request.set_transaction_id(transaction.transaction_id());
}
- std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("CreatePreparedStatement",
request));
- ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+ ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action))
return PreparedStatement::ParseResponse(this, std::move(results));
}
@@ -509,9 +501,8 @@ arrow::Result<std::shared_ptr<PreparedStatement>>
FlightSqlClient::PrepareSubstr
request.set_transaction_id(transaction.transaction_id());
}
- std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("CreatePreparedSubstraitPlan",
request));
- ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+ ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action))
return PreparedStatement::ParseResponse(this, std::move(results));
}
@@ -657,9 +648,8 @@ Status PreparedStatement::Close(const FlightCallOptions&
options) {
flight_sql_pb::ActionClosePreparedStatementRequest request;
request.set_prepared_statement_handle(handle_);
- std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("ClosePreparedStatement",
request));
- ARROW_RETURN_NOT_OK(client_->DoAction(options, action, &results));
+ ARROW_ASSIGN_OR_RAISE(auto results, client_->DoAction(options, action));
ARROW_RETURN_NOT_OK(results->Drain());
is_closed_ = true;
@@ -670,9 +660,8 @@ Status PreparedStatement::Close(const FlightCallOptions&
options) {
const FlightCallOptions& options) {
flight_sql_pb::ActionBeginTransactionRequest request;
- std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("BeginTransaction", request));
- ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+ ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));
flight_sql_pb::ActionBeginTransactionResult transaction;
ARROW_RETURN_NOT_OK(ReadResult(results.get(), &transaction));
@@ -695,9 +684,8 @@ Status PreparedStatement::Close(const FlightCallOptions&
options) {
request.set_transaction_id(transaction.transaction_id());
request.set_name(name);
- std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("BeginSavepoint", request));
- ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+ ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));
flight_sql_pb::ActionBeginSavepointResult savepoint;
ARROW_RETURN_NOT_OK(ReadResult(results.get(), &savepoint));
@@ -719,9 +707,8 @@ Status FlightSqlClient::Commit(const FlightCallOptions&
options,
request.set_transaction_id(transaction.transaction_id());
request.set_action(flight_sql_pb::ActionEndTransactionRequest::END_TRANSACTION_COMMIT);
- std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndTransaction", request));
- ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+ ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));
ARROW_RETURN_NOT_OK(results->Drain());
return Status::OK();
@@ -737,9 +724,8 @@ Status FlightSqlClient::Release(const FlightCallOptions&
options,
request.set_savepoint_id(savepoint.savepoint_id());
request.set_action(flight_sql_pb::ActionEndSavepointRequest::END_SAVEPOINT_RELEASE);
- std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndSavepoint", request));
- ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+ ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));
ARROW_RETURN_NOT_OK(results->Drain());
return Status::OK();
@@ -756,9 +742,8 @@ Status FlightSqlClient::Rollback(const FlightCallOptions&
options,
request.set_action(
flight_sql_pb::ActionEndTransactionRequest::END_TRANSACTION_ROLLBACK);
- std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndTransaction", request));
- ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+ ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));
ARROW_RETURN_NOT_OK(results->Drain());
return Status::OK();
@@ -774,9 +759,8 @@ Status FlightSqlClient::Rollback(const FlightCallOptions&
options,
request.set_savepoint_id(savepoint.savepoint_id());
request.set_action(flight_sql_pb::ActionEndSavepointRequest::END_SAVEPOINT_ROLLBACK);
- std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndSavepoint", request));
- ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+ ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action));
ARROW_RETURN_NOT_OK(results->Drain());
return Status::OK();
@@ -788,9 +772,8 @@ Status FlightSqlClient::Rollback(const FlightCallOptions&
options,
ARROW_ASSIGN_OR_RAISE(auto serialized_info, info.SerializeToString());
request.set_info(std::move(serialized_info));
- std::unique_ptr<ResultStream> results;
ARROW_ASSIGN_OR_RAISE(auto action, PackAction("CancelQuery", request));
- ARROW_RETURN_NOT_OK(DoAction(options, action, &results));
+ ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action))
flight_sql_pb::ActionCancelQueryResult result;
ARROW_RETURN_NOT_OK(ReadResult(results.get(), &result));
diff --git a/cpp/src/arrow/flight/sql/client.h
b/cpp/src/arrow/flight/sql/client.h
index 812b5ffb58..5f3fc7d857 100644
--- a/cpp/src/arrow/flight/sql/client.h
+++ b/cpp/src/arrow/flight/sql/client.h
@@ -131,7 +131,7 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
/// \param[in] options Per-RPC options
/// \param[in] ticket The flight ticket to use
/// \return The returned RecordBatchReader
- arrow::Result<std::unique_ptr<FlightStreamReader>> DoGet(
+ virtual arrow::Result<std::unique_ptr<FlightStreamReader>> DoGet(
const FlightCallOptions& options, const Ticket& ticket);
/// \brief Request a list of tables.
@@ -364,25 +364,15 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
Status Close();
protected:
- virtual Status DoPut(const FlightCallOptions& options,
- const FlightDescriptor& descriptor,
- const std::shared_ptr<Schema>& schema,
- std::unique_ptr<FlightStreamWriter>* writer,
- std::unique_ptr<FlightMetadataReader>* reader) {
- ARROW_ASSIGN_OR_RAISE(auto result, impl_->DoPut(options, descriptor,
schema));
- *writer = std::move(result.writer);
- *reader = std::move(result.reader);
- return Status::OK();
+ virtual ::arrow::Result<FlightClient::DoPutResult> DoPut(
+ const FlightCallOptions& options, const FlightDescriptor& descriptor,
+ const std::shared_ptr<Schema>& schema) {
+ return impl_->DoPut(options, descriptor, schema);
}
- virtual Status DoGet(const FlightCallOptions& options, const Ticket& ticket,
- std::unique_ptr<FlightStreamReader>* stream) {
- return impl_->DoGet(options, ticket).Value(stream);
- }
-
- virtual Status DoAction(const FlightCallOptions& options, const Action&
action,
- std::unique_ptr<ResultStream>* results) {
- return impl_->DoAction(options, action).Value(results);
+ virtual ::arrow::Result<std::unique_ptr<ResultStream>> DoAction(
+ const FlightCallOptions& options, const Action& action) {
+ return impl_->DoAction(options, action);
}
};