lidavidm commented on a change in pull request #11507:
URL: https://github.com/apache/arrow/pull/11507#discussion_r735819181



##########
File path: cpp/src/arrow/flight/flight-sql/client_impl.h
##########
@@ -0,0 +1,463 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/buffer.h>
+#include <arrow/flight/flight-sql/FlightSql.pb.h>
+#include <arrow/flight/types.h>
+#include <arrow/io/memory.h>
+#include <arrow/ipc/reader.h>
+#include <arrow/testing/gtest_util.h>
+#include <google/protobuf/any.pb.h>
+#include <google/protobuf/message_lite.h>
+
+#include <utility>
+
+namespace pb = arrow::flight::protocol;
+
+namespace arrow {
+namespace flight {
+namespace sql {
+namespace internal {
+
+template <class T>
+FlightSqlClientT<T>::FlightSqlClientT(std::unique_ptr<T>& client) {
+  this->client = std::move(client);
+}
+
+template <class T>
+PreparedStatementT<T>::PreparedStatementT(
+    T* client_, const std::string& query,
+    pb::sql::ActionCreatePreparedStatementResult& prepared_statement_result_,
+    const FlightCallOptions& options_)
+    : client(client_),
+      prepared_statement_result(std::move(prepared_statement_result_)),
+      options(options_) {
+  is_closed = false;
+}
+
+template <class T>
+FlightSqlClientT<T>::~FlightSqlClientT() = default;
+
+template <class T>
+PreparedStatementT<T>::~PreparedStatementT<T>() {
+  Close();
+}
+
+inline FlightDescriptor GetFlightDescriptorForCommand(
+    const google::protobuf::Message& command) {
+  google::protobuf::Any any;
+  any.PackFrom(command);
+
+  const std::string& string = any.SerializeAsString();
+  return FlightDescriptor::Command(string);
+}
+
+template <class T>
+Status GetFlightInfoForCommand(const std::unique_ptr<T>& client,
+                               const FlightCallOptions& options,
+                               std::unique_ptr<FlightInfo>* flight_info,
+                               const google::protobuf::Message& command) {
+  const FlightDescriptor& descriptor = GetFlightDescriptorForCommand(command);
+
+  return client->GetFlightInfo(options, descriptor, flight_info);
+}
+
+template <class T>
+Status FlightSqlClientT<T>::Execute(const FlightCallOptions& options,
+                                    const std::string& query,
+                                    std::unique_ptr<FlightInfo>* flight_info) 
const {
+  pb::sql::CommandStatementQuery command;
+  command.set_query(query);
+
+  return GetFlightInfoForCommand(client, options, flight_info, command);
+}
+
+template <class T>
+Status FlightSqlClientT<T>::ExecuteUpdate(const FlightCallOptions& options,
+                                          const std::string& query, int64_t* 
rows) const {
+  pb::sql::CommandStatementUpdate command;
+  command.set_query(query);
+
+  const FlightDescriptor& descriptor = GetFlightDescriptorForCommand(command);
+
+  std::unique_ptr<FlightStreamWriter> writer;
+  std::unique_ptr<FlightMetadataReader> reader;
+
+  ARROW_RETURN_NOT_OK(client->DoPut(options, descriptor, NULLPTR, &writer, 
&reader));
+
+  std::shared_ptr<Buffer> metadata;
+
+  const Status& status = reader->ReadMetadata(&metadata);
+
+  pb::sql::DoPutUpdateResult doPutUpdateResult;
+
+  Buffer* pBuffer = metadata.get();
+
+  const std::string& string = pBuffer->ToString();
+
+  doPutUpdateResult.ParseFrom<google::protobuf::MessageLite::kParse>(string);
+  *rows = doPutUpdateResult.record_count();
+
+  return Status::OK();
+}
+
+template <class T>
+Status FlightSqlClientT<T>::GetCatalogs(const FlightCallOptions& options,
+                                        std::unique_ptr<FlightInfo>* 
flight_info) const {
+  pb::sql::CommandGetCatalogs command;
+
+  return GetFlightInfoForCommand(client, options, flight_info, command);
+}
+
+template <class T>
+Status FlightSqlClientT<T>::GetSchemas(const FlightCallOptions& options,
+                                       const std::string* catalog,
+                                       const std::string* 
schema_filter_pattern,
+                                       std::unique_ptr<FlightInfo>* 
flight_info) const {
+  pb::sql::CommandGetSchemas command;
+  if (catalog != NULLPTR) {
+    command.set_catalog(*catalog);
+  }
+  if (schema_filter_pattern != NULLPTR) {
+    command.set_schema_filter_pattern(*schema_filter_pattern);
+  }
+
+  return GetFlightInfoForCommand(client, options, flight_info, command);
+}
+
+template <class T>
+Status FlightSqlClientT<T>::GetTables(const FlightCallOptions& options,
+                                      const std::string* catalog,
+                                      const std::string* schema_filter_pattern,
+                                      const std::string* table_filter_pattern,
+                                      bool include_schema,
+                                      std::vector<std::string>& table_types,
+                                      std::unique_ptr<FlightInfo>* 
flight_info) const {
+  pb::sql::CommandGetTables command;
+
+  if (catalog != NULLPTR) {
+    command.set_catalog(*catalog);
+  }
+
+  if (schema_filter_pattern != NULLPTR) {
+    command.set_schema_filter_pattern(*schema_filter_pattern);
+  }
+
+  if (table_filter_pattern != NULLPTR) {
+    command.set_table_name_filter_pattern(*table_filter_pattern);
+  }
+
+  command.set_include_schema(include_schema);
+
+  for (const std::string& table_type : table_types) {
+    command.add_table_types(table_type);
+  }
+
+  return GetFlightInfoForCommand(client, options, flight_info, command);
+}
+
+template <class T>
+Status FlightSqlClientT<T>::GetPrimaryKeys(
+    const FlightCallOptions& options, const std::string* catalog,
+    const std::string* schema, const std::string& table,
+    std::unique_ptr<FlightInfo>* flight_info) const {
+  pb::sql::CommandGetPrimaryKeys command;
+
+  if (catalog != NULLPTR) {
+    command.set_catalog(*catalog);
+  }
+
+  if (schema != NULLPTR) {
+    command.set_schema(*schema);
+  }
+
+  command.set_table(table);
+
+  return GetFlightInfoForCommand(client, options, flight_info, command);
+}
+
+template <class T>
+Status FlightSqlClientT<T>::GetExportedKeys(
+    const FlightCallOptions& options, const std::string* catalog,
+    const std::string* schema, const std::string& table,
+    std::unique_ptr<FlightInfo>* flight_info) const {
+  pb::sql::CommandGetExportedKeys command;
+
+  if (catalog != NULLPTR) {
+    command.set_catalog(*catalog);
+  }
+
+  if (schema != NULLPTR) {
+    command.set_schema(*schema);
+  }
+
+  command.set_table(table);
+
+  return GetFlightInfoForCommand(client, options, flight_info, command);
+}
+
+template <class T>
+Status FlightSqlClientT<T>::GetImportedKeys(
+    const FlightCallOptions& options, const std::string* catalog,
+    const std::string* schema, const std::string& table,
+    std::unique_ptr<FlightInfo>* flight_info) const {
+  pb::sql::CommandGetImportedKeys command;
+
+  if (catalog != NULLPTR) {
+    command.set_catalog(*catalog);
+  }
+
+  if (schema != NULLPTR) {
+    command.set_schema(*schema);
+  }
+
+  command.set_table(table);
+
+  return GetFlightInfoForCommand(client, options, flight_info, command);
+}
+
+template <class T>
+Status FlightSqlClientT<T>::GetCrossReference(
+    const FlightCallOptions& options, const std::string* pk_catalog,
+    const std::string* pk_schema, const std::string& pk_table,
+    const std::string* fk_catalog, const std::string* fk_schema,
+    const std::string& fk_table, std::unique_ptr<FlightInfo>* flight_info) 
const {
+  pb::sql::CommandGetCrossReference command;
+
+  if (pk_catalog != NULLPTR) {
+    command.set_pk_catalog(*pk_catalog);
+  }
+  if (pk_schema != NULLPTR) {
+    command.set_pk_schema(*pk_schema);
+  }
+  command.set_pk_table(pk_table);
+
+  if (fk_catalog != NULLPTR) {
+    command.set_fk_catalog(*fk_catalog);
+  }
+  if (fk_schema != NULLPTR) {
+    command.set_fk_schema(*fk_schema);
+  }
+  command.set_fk_table(fk_table);
+
+  return GetFlightInfoForCommand(client, options, flight_info, command);
+}
+
+template <class T>
+Status FlightSqlClientT<T>::GetTableTypes(
+    const FlightCallOptions& options, std::unique_ptr<FlightInfo>* 
flight_info) const {
+  pb::sql::CommandGetTableTypes command;
+
+  return GetFlightInfoForCommand(client, options, flight_info, command);
+}
+
+template <class T>
+Status FlightSqlClientT<T>::DoGet(const FlightCallOptions& options, const 
Ticket& ticket,
+                                  std::unique_ptr<FlightStreamReader>* stream) 
const {
+  return client->DoGet(options, ticket, stream);
+}
+
+template <class T>
+Status FlightSqlClientT<T>::Prepare(
+    const FlightCallOptions& options, const std::string& query,
+    std::shared_ptr<PreparedStatementT<T>>* prepared_statement) {
+  google::protobuf::Any command;
+  pb::sql::ActionCreatePreparedStatementRequest request;
+  request.set_query(query);
+  command.PackFrom(request);
+
+  Action action;
+  action.type = "CreatePreparedStatement";
+  action.body = Buffer::FromString(command.SerializeAsString());
+
+  std::unique_ptr<ResultStream> results;
+
+  ARROW_RETURN_NOT_OK(client->DoAction(options, action, &results));
+
+  std::unique_ptr<Result> result;
+  ARROW_RETURN_NOT_OK(results->Next(&result));
+
+  google::protobuf::Any prepared_result;
+
+  std::shared_ptr<Buffer> message = std::move(result->body);
+
+  prepared_result.ParseFromArray(message->data(), 
static_cast<int>(message->size()));
+
+  pb::sql::ActionCreatePreparedStatementResult prepared_statement_result;
+
+  prepared_result.UnpackTo(&prepared_statement_result);
+
+  prepared_statement->reset(
+      new PreparedStatementT<T>(client.get(), query, 
prepared_statement_result, options));
+
+  return Status::OK();
+}
+
+template <class T>
+Status PreparedStatementT<T>::Execute(std::unique_ptr<FlightInfo>* info) {
+  if (is_closed) {
+    return Status::Invalid("Statement already closed.");
+  }
+
+  pb::sql::CommandPreparedStatementQuery execute_query_command;
+
+  execute_query_command.set_prepared_statement_handle(
+      prepared_statement_result.prepared_statement_handle());
+
+  google::protobuf::Any any;
+  any.PackFrom(execute_query_command);
+
+  const std::string& string = any.SerializeAsString();
+  const FlightDescriptor descriptor = FlightDescriptor::Command(string);
+
+  if (parameter_binding && parameter_binding->num_rows() > 0) {
+    std::unique_ptr<FlightStreamWriter> writer;
+    std::unique_ptr<FlightMetadataReader> reader;
+    client->DoPut(options, descriptor, parameter_binding->schema(), &writer, 
&reader);

Review comment:
       Missing RETURN_NOT_OK.

##########
File path: cpp/src/arrow/flight/flight-sql/server_test.cc
##########
@@ -0,0 +1,653 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <arrow/flight/api.h>
+#include <arrow/flight/flight-sql/api.h>
+#include <arrow/flight/flight-sql/example/sqlite_server.h>
+#include <arrow/flight/flight-sql/server.h>
+#include <arrow/flight/test_util.h>
+#include <arrow/flight/types.h>
+#include <arrow/testing/gtest_util.h>
+#include <gmock/gmock.h>
+#include <google/protobuf/any.pb.h>
+#include <gtest/gtest.h>
+
+#define unparen(...) __VA_ARGS__
+#define DECLARE_ARRAY(ARRAY_NAME, TYPE_CLASS, DATA)     \
+  std::shared_ptr<arrow::TYPE_CLASS##Array> ARRAY_NAME; \
+  {                                                     \
+    arrow::TYPE_CLASS##Builder builder;                 \
+    auto data = unparen DATA;                           \
+    for (const auto& item : data) {                     \
+      ASSERT_OK(builder.Append(item));                  \
+    }                                                   \
+    ASSERT_OK(builder.Finish(&(ARRAY_NAME)));           \
+  }
+
+#define DECLARE_BINARY_ARRAY(ARRAY_NAME, DATA, LENGTH) \
+  std::shared_ptr<arrow::BinaryArray> ARRAY_NAME;      \
+  {                                                    \
+    arrow::Binary##Builder builder;                    \
+    auto data = unparen DATA;                          \
+    for (const auto& item : data) {                    \
+      ASSERT_OK(builder.Append(item, LENGTH));         \
+    }                                                  \
+    ASSERT_OK(builder.Finish(&(ARRAY_NAME)));          \
+  }
+
+#define DECLARE_NULL_ARRAY(ARRAY_NAME, TYPE_CLASS, LENGTH) \
+  std::shared_ptr<arrow::TYPE_CLASS##Array> ARRAY_NAME;    \
+  {                                                        \
+    arrow::TYPE_CLASS##Builder builder;                    \
+    for (int i = 0; i < LENGTH; i++) {                     \
+      ASSERT_OK(builder.AppendNull());                     \
+    }                                                      \
+    ASSERT_OK(builder.Finish(&(ARRAY_NAME)));              \
+  }
+
+using ::testing::_;
+using ::testing::Ref;
+
+namespace pb = arrow::flight::protocol;
+
+namespace arrow {
+namespace flight {
+namespace sql {
+
+TestServer* server;
+FlightSqlClient* sql_client;
+
+class TestFlightSqlServer : public ::testing::Environment {
+ protected:
+  void SetUp() override {
+    server = new TestServer("flight_sql_test_server");
+    server->Start();
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    ASSERT_TRUE(server->IsRunning());
+
+    std::stringstream ss;
+    ss << "grpc://localhost:" << server->port();
+    std::string uri = ss.str();
+
+    std::unique_ptr<FlightClient> client;
+    Location location;
+    ASSERT_OK(Location::Parse(uri, &location));
+    ASSERT_OK(FlightClient::Connect(location, &client));
+
+    sql_client = new FlightSqlClient(client);
+  }
+
+  void TearDown() override {
+    server->Stop();
+
+    free(server);
+    free(sql_client);
+  }
+};
+
+TEST(TestFlightSqlServer, TestCommandStatementQuery) {
+  std::unique_ptr<FlightInfo> flight_info;
+  ASSERT_OK(sql_client->Execute({}, "SELECT * FROM intTable", &flight_info));
+
+  std::unique_ptr<FlightStreamReader> stream;
+  ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, 
&stream));
+
+  std::shared_ptr<Table> table;
+  ASSERT_OK(stream->ReadAll(&table));
+
+  const std::shared_ptr<Schema>& expected_schema =
+      arrow::schema({arrow::field("id", int64()), arrow::field("keyName", 
utf8()),
+                     arrow::field("value", int64()), arrow::field("foreignId", 
int64())});
+
+  DECLARE_ARRAY(id_array, Int64, ({1, 2, 3}));
+  DECLARE_ARRAY(keyname_array, String, ({"one", "zero", "negative one"}));
+  DECLARE_ARRAY(value_array, Int64, ({1, 0, -1}));
+  DECLARE_ARRAY(foreignId_array, Int64, ({1, 1, 1}));
+
+  const std::shared_ptr<Table>& expected_table = Table::Make(
+      expected_schema, {id_array, keyname_array, value_array, 
foreignId_array});
+
+  ASSERT_TRUE(expected_table->Equals(*table));
+}
+
+TEST(TestFlightSqlServer, TestCommandGetTables) {
+  std::unique_ptr<FlightInfo> flight_info;
+  std::vector<std::string> table_types;
+  ASSERT_OK(sql_client->GetTables({}, nullptr, nullptr, nullptr, false, 
table_types,
+                                  &flight_info));
+
+  std::unique_ptr<FlightStreamReader> stream;
+  ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, 
&stream));
+
+  std::shared_ptr<Table> table;
+  ASSERT_OK(stream->ReadAll(&table));
+
+  DECLARE_NULL_ARRAY(catalog_name, String, 3);
+  DECLARE_NULL_ARRAY(schema_name, String, 3);
+  DECLARE_ARRAY(table_name, String,
+                ({
+                    "foreignTable",
+                    "intTable",
+                    "sqlite_sequence",
+                }));
+  DECLARE_ARRAY(table_type, String, ({"table", "table", "table"}));
+
+  const std::shared_ptr<Table>& expected_table = Table::Make(
+      SqlSchema::GetTablesSchema(), {catalog_name, schema_name, table_name, 
table_type});
+
+  ASSERT_TRUE(expected_table->Equals(*table));

Review comment:
       I'm seeing this fail for me.

##########
File path: cpp/src/arrow/flight/flight-sql/client_impl.h
##########
@@ -0,0 +1,463 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/buffer.h>
+#include <arrow/flight/flight-sql/FlightSql.pb.h>
+#include <arrow/flight/types.h>
+#include <arrow/io/memory.h>
+#include <arrow/ipc/reader.h>
+#include <arrow/testing/gtest_util.h>
+#include <google/protobuf/any.pb.h>
+#include <google/protobuf/message_lite.h>
+
+#include <utility>
+
+namespace pb = arrow::flight::protocol;
+
+namespace arrow {
+namespace flight {
+namespace sql {
+namespace internal {
+
+template <class T>
+FlightSqlClientT<T>::FlightSqlClientT(std::unique_ptr<T>& client) {
+  this->client = std::move(client);
+}
+
+template <class T>
+PreparedStatementT<T>::PreparedStatementT(
+    T* client_, const std::string& query,
+    pb::sql::ActionCreatePreparedStatementResult& prepared_statement_result_,
+    const FlightCallOptions& options_)
+    : client(client_),
+      prepared_statement_result(std::move(prepared_statement_result_)),
+      options(options_) {

Review comment:
       I'm getting -Wreorder here.

##########
File path: cpp/src/arrow/flight/flight-sql/client_impl.h
##########
@@ -0,0 +1,463 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/buffer.h>
+#include <arrow/flight/flight-sql/FlightSql.pb.h>
+#include <arrow/flight/types.h>
+#include <arrow/io/memory.h>
+#include <arrow/ipc/reader.h>
+#include <arrow/testing/gtest_util.h>
+#include <google/protobuf/any.pb.h>
+#include <google/protobuf/message_lite.h>
+
+#include <utility>
+
+namespace pb = arrow::flight::protocol;
+
+namespace arrow {
+namespace flight {
+namespace sql {
+namespace internal {
+
+template <class T>
+FlightSqlClientT<T>::FlightSqlClientT(std::unique_ptr<T>& client) {
+  this->client = std::move(client);
+}
+
+template <class T>
+PreparedStatementT<T>::PreparedStatementT(
+    T* client_, const std::string& query,
+    pb::sql::ActionCreatePreparedStatementResult& prepared_statement_result_,
+    const FlightCallOptions& options_)
+    : client(client_),
+      prepared_statement_result(std::move(prepared_statement_result_)),
+      options(options_) {
+  is_closed = false;
+}
+
+template <class T>
+FlightSqlClientT<T>::~FlightSqlClientT() = default;
+
+template <class T>
+PreparedStatementT<T>::~PreparedStatementT<T>() {
+  Close();

Review comment:
       Missing RETURN_NOT_OK - though we can't do that here. How do we want to 
deal with this?

##########
File path: cpp/src/arrow/flight/flight-sql/CMakeLists.txt
##########
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_custom_target(arrow_flight_sql)
+
+arrow_install_all_headers("arrow/flight/flight-sql")
+
+set(FLIGHT_SQL_PROTO_PATH "${ARROW_SOURCE_DIR}/../format")
+set(FLIGHT_SQL_PROTO ${ARROW_SOURCE_DIR}/../format/FlightSql.proto)
+
+set(FLIGHT_SQL_GENERATED_PROTO_FILES 
"${CMAKE_CURRENT_BINARY_DIR}/FlightSql.pb.cc"
+                                     
"${CMAKE_CURRENT_BINARY_DIR}/FlightSql.pb.h")
+
+set(PROTO_DEPENDS ${FLIGHT_SQL_PROTO} ${ARROW_PROTOBUF_LIBPROTOBUF} 
gRPC::grpc_cpp_plugin)
+
+add_custom_command(OUTPUT ${FLIGHT_SQL_GENERATED_PROTO_FILES}
+                   COMMAND ${ARROW_PROTOBUF_PROTOC} 
"-I${FLIGHT_SQL_PROTO_PATH}"
+                           "--cpp_out=${CMAKE_CURRENT_BINARY_DIR}" 
"${FLIGHT_SQL_PROTO}"
+                   DEPENDS ${PROTO_DEPENDS} ARGS
+                   COMMAND ${ARROW_PROTOBUF_PROTOC} 
"-I${FLIGHT_SQL_PROTO_PATH}"
+                           "--grpc_out=${CMAKE_CURRENT_BINARY_DIR}"
+                           
"--plugin=protoc-gen-grpc=$<TARGET_FILE:gRPC::grpc_cpp_plugin>"
+                           "${FLIGHT_SQL_PROTO}")
+
+set_source_files_properties(${FLIGHT_SQL_GENERATED_PROTO_FILES} PROPERTIES 
GENERATED TRUE)
+
+add_custom_target(flight_sql_grpc_gen ALL DEPENDS 
${FLIGHT_SQL_GENERATED_PROTO_FILES})
+
+# <KLUDGE> -Werror / /WX cause try_compile to fail because there seems to be no
+# way to pass -isystem $GRPC_INCLUDE_DIR instead of -I$GRPC_INCLUDE_DIR
+set(CMAKE_CXX_FLAGS_BACKUP "${CMAKE_CXX_FLAGS}")
+string(REPLACE "/WX" "" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
+string(REPLACE "-Werror " " " CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
+
+set(ARROW_FLIGHT_SQL_SRCS server.cc)
+
+add_arrow_lib(arrow_flight_sql
+              CMAKE_PACKAGE_NAME
+              ArrowFlightSql
+              PKG_CONFIG_NAME
+              arrow-flight-sql
+              OUTPUTS
+              ARROW_FLIGHT_SQL_LIBRARIES
+              SOURCES
+              ${ARROW_FLIGHT_SQL_SRCS}
+              PRECOMPILED_HEADERS
+              "$<$<COMPILE_LANGUAGE:CXX>:arrow/flight/flight-sql/pch.h>"
+              DEPENDENCIES
+              flight_sql_grpc_gen
+              SHARED_LINK_FLAGS
+              ${ARROW_VERSION_SCRIPT_FLAGS} # Defined in 
cpp/arrow/CMakeLists.txt
+              SHARED_LINK_LIBS
+              arrow_flight_shared
+              ${ARROW_FLIGHT_SQL_LINK_LIBS}
+              STATIC_LINK_LIBS
+              arrow_flight_static
+              ${ARROW_FLIGHT_SQL_LINK_LIBS})
+
+if(ARROW_TEST_LINKAGE STREQUAL "static")
+  set(ARROW_FLIGHT_SQL_TEST_LINK_LIBS
+      arrow_flight_sql_static arrow_flight_testing_static
+      ${ARROW_FLIGHT_STATIC_LINK_LIBS} ${ARROW_TEST_LINK_LIBS})
+else()
+  set(ARROW_FLIGHT_SQL_TEST_LINK_LIBS arrow_flight_sql_shared 
arrow_flight_testing_shared
+                                      ${ARROW_TEST_LINK_LIBS})
+endif()
+
+add_arrow_test(flight_sql_test
+               SOURCES
+               client_test.cc
+               server_test.cc
+               "${CMAKE_CURRENT_BINARY_DIR}/FlightSql.pb.cc"
+               STATIC_LINK_LIBS
+               ${ARROW_FLIGHT_SQL_TEST_LINK_LIBS}
+               GTest::gtest
+               GTest::gmock
+               LABELS
+               "arrow_flight_sql")
+
+add_executable(flight_sql_test_app test_app_cli.cc
+                                   ${CMAKE_CURRENT_BINARY_DIR}/FlightSql.pb.cc)
+target_link_libraries(flight_sql_test_app PRIVATE arrow_flight_sql_static
+                                                  ${GFLAGS_LIBRARIES})
+
+find_package(SQLite3)
+include_directories(${SQLite3_INCLUDE_DIRS})
+
+add_executable(flight_sql_test_server
+               server.cc
+               test_server_cli.cc
+               example/sqlite_statement.cc
+               example/sqlite_statement_batch_reader.cc
+               example/sqlite_server.cc
+               example/sqlite_tables_schema_batch_reader.cpp
+               "${CMAKE_CURRENT_BINARY_DIR}/FlightSql.pb.cc")
+target_link_libraries(flight_sql_test_server
+                      PRIVATE arrow_flight_sql_static ${GFLAGS_LIBRARIES}
+                              ${SQLite3_LIBRARIES})

Review comment:
       We need something like `add_dependencies(arrow-flight-sql-test 
flight_sql_test_app flight_sql_test_server)` so that a partial build of just 
`arrow-flight-sql-test` works (which speeds up development).

##########
File path: cpp/src/arrow/flight/flight-sql/server_test.cc
##########
@@ -0,0 +1,653 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <arrow/flight/api.h>
+#include <arrow/flight/flight-sql/api.h>
+#include <arrow/flight/flight-sql/example/sqlite_server.h>
+#include <arrow/flight/flight-sql/server.h>
+#include <arrow/flight/test_util.h>
+#include <arrow/flight/types.h>
+#include <arrow/testing/gtest_util.h>
+#include <gmock/gmock.h>
+#include <google/protobuf/any.pb.h>
+#include <gtest/gtest.h>
+
+#define unparen(...) __VA_ARGS__
+#define DECLARE_ARRAY(ARRAY_NAME, TYPE_CLASS, DATA)     \
+  std::shared_ptr<arrow::TYPE_CLASS##Array> ARRAY_NAME; \
+  {                                                     \
+    arrow::TYPE_CLASS##Builder builder;                 \
+    auto data = unparen DATA;                           \
+    for (const auto& item : data) {                     \
+      ASSERT_OK(builder.Append(item));                  \
+    }                                                   \
+    ASSERT_OK(builder.Finish(&(ARRAY_NAME)));           \
+  }
+
+#define DECLARE_BINARY_ARRAY(ARRAY_NAME, DATA, LENGTH) \
+  std::shared_ptr<arrow::BinaryArray> ARRAY_NAME;      \
+  {                                                    \
+    arrow::Binary##Builder builder;                    \
+    auto data = unparen DATA;                          \
+    for (const auto& item : data) {                    \
+      ASSERT_OK(builder.Append(item, LENGTH));         \
+    }                                                  \
+    ASSERT_OK(builder.Finish(&(ARRAY_NAME)));          \
+  }
+
+#define DECLARE_NULL_ARRAY(ARRAY_NAME, TYPE_CLASS, LENGTH) \
+  std::shared_ptr<arrow::TYPE_CLASS##Array> ARRAY_NAME;    \
+  {                                                        \
+    arrow::TYPE_CLASS##Builder builder;                    \
+    for (int i = 0; i < LENGTH; i++) {                     \
+      ASSERT_OK(builder.AppendNull());                     \
+    }                                                      \
+    ASSERT_OK(builder.Finish(&(ARRAY_NAME)));              \
+  }
+
+using ::testing::_;
+using ::testing::Ref;
+
+namespace pb = arrow::flight::protocol;
+
+namespace arrow {
+namespace flight {
+namespace sql {
+
+TestServer* server;
+FlightSqlClient* sql_client;
+
+class TestFlightSqlServer : public ::testing::Environment {
+ protected:
+  void SetUp() override {
+    server = new TestServer("flight_sql_test_server");
+    server->Start();
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    ASSERT_TRUE(server->IsRunning());
+
+    std::stringstream ss;
+    ss << "grpc://localhost:" << server->port();
+    std::string uri = ss.str();
+
+    std::unique_ptr<FlightClient> client;
+    Location location;
+    ASSERT_OK(Location::Parse(uri, &location));
+    ASSERT_OK(FlightClient::Connect(location, &client));
+
+    sql_client = new FlightSqlClient(client);
+  }
+
+  void TearDown() override {
+    server->Stop();
+
+    free(server);
+    free(sql_client);
+  }
+};
+
+TEST(TestFlightSqlServer, TestCommandStatementQuery) {
+  std::unique_ptr<FlightInfo> flight_info;
+  ASSERT_OK(sql_client->Execute({}, "SELECT * FROM intTable", &flight_info));
+
+  std::unique_ptr<FlightStreamReader> stream;
+  ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, 
&stream));
+
+  std::shared_ptr<Table> table;
+  ASSERT_OK(stream->ReadAll(&table));
+
+  const std::shared_ptr<Schema>& expected_schema =
+      arrow::schema({arrow::field("id", int64()), arrow::field("keyName", 
utf8()),
+                     arrow::field("value", int64()), arrow::field("foreignId", 
int64())});
+
+  DECLARE_ARRAY(id_array, Int64, ({1, 2, 3}));
+  DECLARE_ARRAY(keyname_array, String, ({"one", "zero", "negative one"}));
+  DECLARE_ARRAY(value_array, Int64, ({1, 0, -1}));
+  DECLARE_ARRAY(foreignId_array, Int64, ({1, 1, 1}));
+
+  const std::shared_ptr<Table>& expected_table = Table::Make(
+      expected_schema, {id_array, keyname_array, value_array, 
foreignId_array});
+
+  ASSERT_TRUE(expected_table->Equals(*table));

Review comment:
       We should use AssertTablesEqual which also shows the diff (with 
verbose=true).

##########
File path: cpp/src/arrow/flight/flight-sql/CMakeLists.txt
##########
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_custom_target(arrow_flight_sql)
+
+arrow_install_all_headers("arrow/flight/flight-sql")
+
+set(FLIGHT_SQL_PROTO_PATH "${ARROW_SOURCE_DIR}/../format")
+set(FLIGHT_SQL_PROTO ${ARROW_SOURCE_DIR}/../format/FlightSql.proto)
+
+set(FLIGHT_SQL_GENERATED_PROTO_FILES 
"${CMAKE_CURRENT_BINARY_DIR}/FlightSql.pb.cc"
+                                     
"${CMAKE_CURRENT_BINARY_DIR}/FlightSql.pb.h")
+
+set(PROTO_DEPENDS ${FLIGHT_SQL_PROTO} ${ARROW_PROTOBUF_LIBPROTOBUF} 
gRPC::grpc_cpp_plugin)
+
+add_custom_command(OUTPUT ${FLIGHT_SQL_GENERATED_PROTO_FILES}
+                   COMMAND ${ARROW_PROTOBUF_PROTOC} 
"-I${FLIGHT_SQL_PROTO_PATH}"
+                           "--cpp_out=${CMAKE_CURRENT_BINARY_DIR}" 
"${FLIGHT_SQL_PROTO}"
+                   DEPENDS ${PROTO_DEPENDS} ARGS
+                   COMMAND ${ARROW_PROTOBUF_PROTOC} 
"-I${FLIGHT_SQL_PROTO_PATH}"
+                           "--grpc_out=${CMAKE_CURRENT_BINARY_DIR}"
+                           
"--plugin=protoc-gen-grpc=$<TARGET_FILE:gRPC::grpc_cpp_plugin>"
+                           "${FLIGHT_SQL_PROTO}")
+
+set_source_files_properties(${FLIGHT_SQL_GENERATED_PROTO_FILES} PROPERTIES 
GENERATED TRUE)
+
+add_custom_target(flight_sql_grpc_gen ALL DEPENDS 
${FLIGHT_SQL_GENERATED_PROTO_FILES})
+
+# <KLUDGE> -Werror / /WX cause try_compile to fail because there seems to be no
+# way to pass -isystem $GRPC_INCLUDE_DIR instead of -I$GRPC_INCLUDE_DIR
+set(CMAKE_CXX_FLAGS_BACKUP "${CMAKE_CXX_FLAGS}")
+string(REPLACE "/WX" "" CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
+string(REPLACE "-Werror " " " CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
+
+set(ARROW_FLIGHT_SQL_SRCS server.cc)
+
+add_arrow_lib(arrow_flight_sql
+              CMAKE_PACKAGE_NAME
+              ArrowFlightSql
+              PKG_CONFIG_NAME
+              arrow-flight-sql
+              OUTPUTS
+              ARROW_FLIGHT_SQL_LIBRARIES
+              SOURCES
+              ${ARROW_FLIGHT_SQL_SRCS}
+              PRECOMPILED_HEADERS
+              "$<$<COMPILE_LANGUAGE:CXX>:arrow/flight/flight-sql/pch.h>"
+              DEPENDENCIES
+              flight_sql_grpc_gen
+              SHARED_LINK_FLAGS
+              ${ARROW_VERSION_SCRIPT_FLAGS} # Defined in 
cpp/arrow/CMakeLists.txt
+              SHARED_LINK_LIBS
+              arrow_flight_shared
+              ${ARROW_FLIGHT_SQL_LINK_LIBS}
+              STATIC_LINK_LIBS
+              arrow_flight_static
+              ${ARROW_FLIGHT_SQL_LINK_LIBS})
+
+if(ARROW_TEST_LINKAGE STREQUAL "static")
+  set(ARROW_FLIGHT_SQL_TEST_LINK_LIBS
+      arrow_flight_sql_static arrow_flight_testing_static
+      ${ARROW_FLIGHT_STATIC_LINK_LIBS} ${ARROW_TEST_LINK_LIBS})
+else()
+  set(ARROW_FLIGHT_SQL_TEST_LINK_LIBS arrow_flight_sql_shared 
arrow_flight_testing_shared
+                                      ${ARROW_TEST_LINK_LIBS})
+endif()
+
+add_arrow_test(flight_sql_test
+               SOURCES
+               client_test.cc
+               server_test.cc
+               "${CMAKE_CURRENT_BINARY_DIR}/FlightSql.pb.cc"
+               STATIC_LINK_LIBS
+               ${ARROW_FLIGHT_SQL_TEST_LINK_LIBS}
+               GTest::gtest
+               GTest::gmock
+               LABELS
+               "arrow_flight_sql")
+
+add_executable(flight_sql_test_app test_app_cli.cc

Review comment:
       We also need to guard these definitions because flight_sql_test_app uses 
Googletest.

##########
File path: cpp/src/arrow/flight/flight-sql/server_test.cc
##########
@@ -0,0 +1,653 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <arrow/flight/api.h>
+#include <arrow/flight/flight-sql/api.h>
+#include <arrow/flight/flight-sql/example/sqlite_server.h>
+#include <arrow/flight/flight-sql/server.h>
+#include <arrow/flight/test_util.h>
+#include <arrow/flight/types.h>
+#include <arrow/testing/gtest_util.h>
+#include <gmock/gmock.h>
+#include <google/protobuf/any.pb.h>
+#include <gtest/gtest.h>
+
+#define unparen(...) __VA_ARGS__
+#define DECLARE_ARRAY(ARRAY_NAME, TYPE_CLASS, DATA)     \
+  std::shared_ptr<arrow::TYPE_CLASS##Array> ARRAY_NAME; \
+  {                                                     \
+    arrow::TYPE_CLASS##Builder builder;                 \
+    auto data = unparen DATA;                           \
+    for (const auto& item : data) {                     \
+      ASSERT_OK(builder.Append(item));                  \
+    }                                                   \
+    ASSERT_OK(builder.Finish(&(ARRAY_NAME)));           \
+  }
+
+#define DECLARE_BINARY_ARRAY(ARRAY_NAME, DATA, LENGTH) \
+  std::shared_ptr<arrow::BinaryArray> ARRAY_NAME;      \
+  {                                                    \
+    arrow::Binary##Builder builder;                    \
+    auto data = unparen DATA;                          \
+    for (const auto& item : data) {                    \
+      ASSERT_OK(builder.Append(item, LENGTH));         \
+    }                                                  \
+    ASSERT_OK(builder.Finish(&(ARRAY_NAME)));          \
+  }
+
+#define DECLARE_NULL_ARRAY(ARRAY_NAME, TYPE_CLASS, LENGTH) \
+  std::shared_ptr<arrow::TYPE_CLASS##Array> ARRAY_NAME;    \
+  {                                                        \
+    arrow::TYPE_CLASS##Builder builder;                    \
+    for (int i = 0; i < LENGTH; i++) {                     \
+      ASSERT_OK(builder.AppendNull());                     \
+    }                                                      \
+    ASSERT_OK(builder.Finish(&(ARRAY_NAME)));              \
+  }
+
+using ::testing::_;
+using ::testing::Ref;
+
+namespace pb = arrow::flight::protocol;
+
+namespace arrow {
+namespace flight {
+namespace sql {
+
+TestServer* server;
+FlightSqlClient* sql_client;
+
+class TestFlightSqlServer : public ::testing::Environment {
+ protected:
+  void SetUp() override {
+    server = new TestServer("flight_sql_test_server");
+    server->Start();
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    ASSERT_TRUE(server->IsRunning());
+
+    std::stringstream ss;
+    ss << "grpc://localhost:" << server->port();
+    std::string uri = ss.str();
+
+    std::unique_ptr<FlightClient> client;
+    Location location;
+    ASSERT_OK(Location::Parse(uri, &location));
+    ASSERT_OK(FlightClient::Connect(location, &client));
+
+    sql_client = new FlightSqlClient(client);
+  }
+
+  void TearDown() override {
+    server->Stop();
+
+    free(server);
+    free(sql_client);
+  }
+};
+
+TEST(TestFlightSqlServer, TestCommandStatementQuery) {
+  std::unique_ptr<FlightInfo> flight_info;
+  ASSERT_OK(sql_client->Execute({}, "SELECT * FROM intTable", &flight_info));
+
+  std::unique_ptr<FlightStreamReader> stream;
+  ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, 
&stream));
+
+  std::shared_ptr<Table> table;
+  ASSERT_OK(stream->ReadAll(&table));
+
+  const std::shared_ptr<Schema>& expected_schema =
+      arrow::schema({arrow::field("id", int64()), arrow::field("keyName", 
utf8()),
+                     arrow::field("value", int64()), arrow::field("foreignId", 
int64())});
+
+  DECLARE_ARRAY(id_array, Int64, ({1, 2, 3}));
+  DECLARE_ARRAY(keyname_array, String, ({"one", "zero", "negative one"}));
+  DECLARE_ARRAY(value_array, Int64, ({1, 0, -1}));
+  DECLARE_ARRAY(foreignId_array, Int64, ({1, 1, 1}));
+
+  const std::shared_ptr<Table>& expected_table = Table::Make(
+      expected_schema, {id_array, keyname_array, value_array, 
foreignId_array});
+
+  ASSERT_TRUE(expected_table->Equals(*table));
+}
+
+TEST(TestFlightSqlServer, TestCommandGetTables) {
+  std::unique_ptr<FlightInfo> flight_info;
+  std::vector<std::string> table_types;
+  ASSERT_OK(sql_client->GetTables({}, nullptr, nullptr, nullptr, false, 
table_types,
+                                  &flight_info));
+
+  std::unique_ptr<FlightStreamReader> stream;
+  ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, 
&stream));
+
+  std::shared_ptr<Table> table;
+  ASSERT_OK(stream->ReadAll(&table));
+
+  DECLARE_NULL_ARRAY(catalog_name, String, 3);
+  DECLARE_NULL_ARRAY(schema_name, String, 3);
+  DECLARE_ARRAY(table_name, String,
+                ({
+                    "foreignTable",
+                    "intTable",
+                    "sqlite_sequence",
+                }));
+  DECLARE_ARRAY(table_type, String, ({"table", "table", "table"}));
+
+  const std::shared_ptr<Table>& expected_table = Table::Make(
+      SqlSchema::GetTablesSchema(), {catalog_name, schema_name, table_name, 
table_type});
+
+  ASSERT_TRUE(expected_table->Equals(*table));
+}
+
+TEST(TestFlightSqlServer, TestCommandGetTablesWithTableFilter) {
+  std::unique_ptr<FlightInfo> flight_info;
+  std::vector<std::string> table_types;
+
+  std::string table_filter_pattern = "int%";
+  ASSERT_OK(sql_client->GetTables({}, nullptr, nullptr, &table_filter_pattern, 
false,
+                                  table_types, &flight_info));
+
+  std::unique_ptr<FlightStreamReader> stream;
+  ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, 
&stream));
+
+  std::shared_ptr<Table> table;
+  ASSERT_OK(stream->ReadAll(&table));
+
+  DECLARE_NULL_ARRAY(catalog_name, String, 1);
+  DECLARE_NULL_ARRAY(schema_name, String, 1);
+  DECLARE_ARRAY(table_name, String, ({"intTable"}));
+  DECLARE_ARRAY(table_type, String, ({"table"}));
+
+  const std::shared_ptr<Table>& expected_table = Table::Make(
+      SqlSchema::GetTablesSchema(), {catalog_name, schema_name, table_name, 
table_type});
+
+  ASSERT_TRUE(expected_table->Equals(*table));
+}
+
+TEST(TestFlightSqlServer, TestCommandGetTablesWithTableTypesFilter) {
+  std::unique_ptr<FlightInfo> flight_info;
+  std::vector<std::string> table_types{"index"};
+
+  ASSERT_OK(sql_client->GetTables({}, nullptr, nullptr, nullptr, false, 
table_types,
+                                  &flight_info));
+
+  std::unique_ptr<FlightStreamReader> stream;
+  ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, 
&stream));
+
+  std::shared_ptr<Table> table;
+  ASSERT_OK(stream->ReadAll(&table));
+
+  ASSERT_TRUE(table->schema()->Equals(SqlSchema::GetTablesSchema()));
+
+  ASSERT_EQ(table->num_rows(), 0);
+}
+
+TEST(TestFlightSqlServer, TestCommandGetTablesWithUnexistenceTableTypeFilter) {
+  std::unique_ptr<FlightInfo> flight_info;
+  std::vector<std::string> table_types{"table"};
+
+  ASSERT_OK(sql_client->GetTables({}, nullptr, nullptr, nullptr, false, 
table_types,
+                                  &flight_info));
+
+  std::unique_ptr<FlightStreamReader> stream;
+  ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, 
&stream));
+
+  std::shared_ptr<Table> table;
+  ASSERT_OK(stream->ReadAll(&table));
+
+  DECLARE_NULL_ARRAY(catalog_name, String, 3);
+  DECLARE_NULL_ARRAY(schema_name, String, 3);
+  DECLARE_ARRAY(table_name, String,
+                ({
+                    "foreignTable",
+                    "intTable",
+                    "sqlite_sequence",
+                }));
+  DECLARE_ARRAY(table_type, String, ({"table", "table", "table"}));
+
+  const std::shared_ptr<Table>& expected_table = Table::Make(
+      SqlSchema::GetTablesSchema(), {catalog_name, schema_name, table_name, 
table_type});
+
+  ASSERT_TRUE(expected_table->Equals(*table));
+}
+
+TEST(TestFlightSqlServer, TestCommandGetTablesWithIncludedSchemas) {
+  std::unique_ptr<FlightInfo> flight_info;
+  std::vector<std::string> table_types;
+
+  std::string table_filter_pattern = "int%";
+  ASSERT_OK(sql_client->GetTables({}, nullptr, nullptr, &table_filter_pattern, 
true,
+                                  table_types, &flight_info));
+
+  std::unique_ptr<FlightStreamReader> stream;
+  ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, 
&stream));
+
+  std::shared_ptr<Table> table;
+  ASSERT_OK(stream->ReadAll(&table));
+
+  DECLARE_NULL_ARRAY(catalog_name, String, 1);
+  DECLARE_NULL_ARRAY(schema_name, String, 1);
+  DECLARE_ARRAY(table_name, String, ({"intTable"}));
+  DECLARE_ARRAY(table_type, String, ({"table"}));
+
+  const std::shared_ptr<Schema> schema_table =
+      arrow::schema({arrow::field("id", int64(), true, NULL),
+                     arrow::field("keyName", utf8(), true, NULL),
+                     arrow::field("value", int64(), true, NULL),
+                     arrow::field("foreignId", int64(), true, NULL)});
+
+  const arrow::Result<std::shared_ptr<Buffer>>& value =
+      ipc::SerializeSchema(*schema_table);
+  value.ValueOrDie()->data(), value.ValueOrDie()->size();
+
+  DECLARE_BINARY_ARRAY(table_schema, ({value.ValueOrDie()->data()}),
+                       value.ValueOrDie()->size());
+
+  const std::shared_ptr<Table>& expected_table =
+      Table::Make(SqlSchema::GetTablesSchemaWithIncludedSchema(),
+                  {catalog_name, schema_name, table_name, table_type, 
table_schema});
+
+  ASSERT_TRUE(expected_table->Equals(*table));
+}
+
+TEST(TestFlightSqlServer, TestCommandGetCatalogs) {
+  std::unique_ptr<FlightInfo> flight_info;
+  ASSERT_OK(sql_client->GetCatalogs({}, &flight_info));
+
+  std::unique_ptr<FlightStreamReader> stream;
+  ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, 
&stream));
+
+  std::shared_ptr<Table> table;
+  ASSERT_OK(stream->ReadAll(&table));
+
+  const std::shared_ptr<Schema>& expected_schema = 
SqlSchema::GetCatalogsSchema();
+
+  ASSERT_TRUE(table->schema()->Equals(*expected_schema));
+  ASSERT_EQ(0, table->num_rows());
+}
+
+TEST(TestFlightSqlServer, TestCommandGetSchemas) {
+  std::unique_ptr<FlightInfo> flight_info;
+  ASSERT_OK(sql_client->GetSchemas({}, NULLPTR, NULLPTR, &flight_info));
+
+  std::unique_ptr<FlightStreamReader> stream;
+  ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, 
&stream));
+
+  std::shared_ptr<Table> table;
+  ASSERT_OK(stream->ReadAll(&table));
+
+  const std::shared_ptr<Schema>& expected_schema = 
SqlSchema::GetSchemasSchema();
+
+  ASSERT_TRUE(table->schema()->Equals(*expected_schema));
+  ASSERT_EQ(0, table->num_rows());
+}
+
+TEST(TestFlightSqlServer, TestCommandGetTableTypes) {
+  std::unique_ptr<FlightInfo> flight_info;
+  ASSERT_OK(sql_client->GetTableTypes({}, &flight_info));
+
+  std::unique_ptr<FlightStreamReader> stream;
+  ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, 
&stream));
+
+  std::shared_ptr<Table> table;
+  ASSERT_OK(stream->ReadAll(&table));
+
+  DECLARE_ARRAY(table_type, String, ({"table"}));
+
+  const std::shared_ptr<Table>& expected_table =
+      Table::Make(SqlSchema::GetTableTypesSchema(), {table_type});
+  ASSERT_TRUE(expected_table->Equals(*table));
+}
+
+TEST(TestFlightSqlServer, TestCommandStatementUpdate) {
+  int64_t result;
+  ASSERT_OK(sql_client->ExecuteUpdate(
+      {},
+      "INSERT INTO intTable (keyName, value) VALUES "
+      "('KEYNAME1', 1001), ('KEYNAME2', 1002), ('KEYNAME3', 1003)",
+      &result));
+  ASSERT_EQ(3, result);
+
+  ASSERT_OK(
+      sql_client->ExecuteUpdate({},
+                                "UPDATE intTable SET keyName = 'KEYNAME1' "
+                                "WHERE keyName = 'KEYNAME2' OR keyName = 
'KEYNAME3'",
+                                &result));
+  ASSERT_EQ(2, result);
+
+  ASSERT_OK(sql_client->ExecuteUpdate(
+      {}, "DELETE FROM intTable WHERE keyName = 'KEYNAME1'", &result));
+  ASSERT_EQ(3, result);
+}
+
+TEST(TestFlightSqlServer, TestCommandPreparedStatementQuery) {
+  std::shared_ptr<PreparedStatement> prepared_statement;
+  ASSERT_OK(sql_client->Prepare({}, "SELECT * FROM intTable", 
&prepared_statement));
+
+  std::unique_ptr<FlightInfo> flight_info;
+  ASSERT_OK(prepared_statement->Execute(&flight_info));
+
+  std::unique_ptr<FlightStreamReader> stream;
+  ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, 
&stream));
+
+  std::shared_ptr<Table> table;
+  ASSERT_OK(stream->ReadAll(&table));
+
+  const std::shared_ptr<Schema>& expected_schema =
+      arrow::schema({arrow::field("id", int64()), arrow::field("keyName", 
utf8()),
+                     arrow::field("value", int64()), arrow::field("foreignId", 
int64())});
+
+  DECLARE_ARRAY(id_array, Int64, ({1, 2, 3}));
+  DECLARE_ARRAY(keyname_array, String, ({"one", "zero", "negative one"}));
+  DECLARE_ARRAY(value_array, Int64, ({1, 0, -1}));
+  DECLARE_ARRAY(foreignId_array, Int64, ({1, 1, 1}));
+
+  const std::shared_ptr<Table>& expected_table = Table::Make(
+      expected_schema, {id_array, keyname_array, value_array, 
foreignId_array});
+
+  ASSERT_TRUE(expected_table->Equals(*table));
+}
+
+TEST(TestFlightSqlServer, 
TestCommandPreparedStatementQueryWithParameterBinding) {
+  std::shared_ptr<PreparedStatement> prepared_statement;
+  ASSERT_OK(sql_client->Prepare({}, "SELECT * FROM intTable WHERE keyName LIKE 
?",
+                                &prepared_statement));
+
+  std::shared_ptr<Schema> parameter_schema;
+  ASSERT_OK(prepared_statement->GetParameterSchema(&parameter_schema));
+
+  const std::shared_ptr<Schema>& expected_parameter_schema =
+      arrow::schema({arrow::field("parameter_1", 
example::GetUnknownColumnDataType())});
+
+  ASSERT_TRUE(expected_parameter_schema->Equals(*parameter_schema));
+
+  std::shared_ptr<Array> type_ids;
+  ArrayFromVector<Int8Type>({0}, &type_ids);
+  std::shared_ptr<Array> offsets;
+  ArrayFromVector<Int32Type>({0}, &offsets);
+
+  std::shared_ptr<Array> string_array;
+  ArrayFromVector<StringType, std::string>({"%one"}, &string_array);
+  std::shared_ptr<Array> bytes_array;
+  ArrayFromVector<BinaryType, std::string>({}, &bytes_array);
+  std::shared_ptr<Array> bigint_array;
+  ArrayFromVector<Int64Type>({}, &bigint_array);
+  std::shared_ptr<Array> double_array;
+  ArrayFromVector<FloatType>({}, &double_array);
+
+  ASSERT_OK_AND_ASSIGN(
+      auto parameter_1_array,
+      DenseUnionArray::Make(*type_ids, *offsets,
+                            {string_array, bytes_array, bigint_array, 
double_array},
+                            {"string", "bytes", "bigint", "double"}, {0, 1, 2, 
3}));
+
+  const std::shared_ptr<RecordBatch>& record_batch =
+      RecordBatch::Make(parameter_schema, 1, {parameter_1_array});
+
+  ASSERT_OK(prepared_statement->SetParameters(record_batch));
+
+  std::unique_ptr<FlightInfo> flight_info;
+  ASSERT_OK(prepared_statement->Execute(&flight_info));
+
+  std::unique_ptr<FlightStreamReader> stream;
+  ASSERT_OK(sql_client->DoGet({}, flight_info->endpoints()[0].ticket, 
&stream));
+
+  std::shared_ptr<Table> table;
+  ASSERT_OK(stream->ReadAll(&table));
+
+  const std::shared_ptr<Schema>& expected_schema =
+      arrow::schema({arrow::field("id", int64()), arrow::field("keyName", 
utf8()),
+                     arrow::field("value", int64()), arrow::field("foreignId", 
int64())});
+
+  DECLARE_ARRAY(id_array, Int64, ({1, 3}));
+  DECLARE_ARRAY(keyname_array, String, ({"one", "negative one"}));
+  DECLARE_ARRAY(value_array, Int64, ({1, -1}));
+  DECLARE_ARRAY(foreignId_array, Int64, ({1, 1}));
+
+  const std::shared_ptr<Table>& expected_table = Table::Make(
+      expected_schema, {id_array, keyname_array, value_array, 
foreignId_array});
+
+  ASSERT_TRUE(expected_table->Equals(*table));

Review comment:
       I'm seeing this fail for me.

##########
File path: cpp/src/arrow/flight/flight-sql/client_impl.h
##########
@@ -0,0 +1,463 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/buffer.h>
+#include <arrow/flight/flight-sql/FlightSql.pb.h>
+#include <arrow/flight/types.h>
+#include <arrow/io/memory.h>
+#include <arrow/ipc/reader.h>
+#include <arrow/testing/gtest_util.h>
+#include <google/protobuf/any.pb.h>
+#include <google/protobuf/message_lite.h>
+
+#include <utility>
+
+namespace pb = arrow::flight::protocol;
+
+namespace arrow {
+namespace flight {
+namespace sql {
+namespace internal {
+
+template <class T>
+FlightSqlClientT<T>::FlightSqlClientT(std::unique_ptr<T>& client) {
+  this->client = std::move(client);
+}
+
+template <class T>
+PreparedStatementT<T>::PreparedStatementT(
+    T* client_, const std::string& query,
+    pb::sql::ActionCreatePreparedStatementResult& prepared_statement_result_,
+    const FlightCallOptions& options_)
+    : client(client_),
+      prepared_statement_result(std::move(prepared_statement_result_)),
+      options(options_) {
+  is_closed = false;
+}
+
+template <class T>
+FlightSqlClientT<T>::~FlightSqlClientT() = default;
+
+template <class T>
+PreparedStatementT<T>::~PreparedStatementT<T>() {
+  Close();
+}
+
+inline FlightDescriptor GetFlightDescriptorForCommand(
+    const google::protobuf::Message& command) {
+  google::protobuf::Any any;
+  any.PackFrom(command);
+
+  const std::string& string = any.SerializeAsString();
+  return FlightDescriptor::Command(string);
+}
+
+template <class T>
+Status GetFlightInfoForCommand(const std::unique_ptr<T>& client,
+                               const FlightCallOptions& options,
+                               std::unique_ptr<FlightInfo>* flight_info,
+                               const google::protobuf::Message& command) {
+  const FlightDescriptor& descriptor = GetFlightDescriptorForCommand(command);
+
+  return client->GetFlightInfo(options, descriptor, flight_info);
+}
+
+template <class T>
+Status FlightSqlClientT<T>::Execute(const FlightCallOptions& options,
+                                    const std::string& query,
+                                    std::unique_ptr<FlightInfo>* flight_info) 
const {
+  pb::sql::CommandStatementQuery command;
+  command.set_query(query);
+
+  return GetFlightInfoForCommand(client, options, flight_info, command);
+}
+
+template <class T>
+Status FlightSqlClientT<T>::ExecuteUpdate(const FlightCallOptions& options,
+                                          const std::string& query, int64_t* 
rows) const {
+  pb::sql::CommandStatementUpdate command;
+  command.set_query(query);
+
+  const FlightDescriptor& descriptor = GetFlightDescriptorForCommand(command);
+
+  std::unique_ptr<FlightStreamWriter> writer;
+  std::unique_ptr<FlightMetadataReader> reader;
+
+  ARROW_RETURN_NOT_OK(client->DoPut(options, descriptor, NULLPTR, &writer, 
&reader));
+
+  std::shared_ptr<Buffer> metadata;
+
+  const Status& status = reader->ReadMetadata(&metadata);

Review comment:
       Status is ignored here - should be RETURN_NOT_OK.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to