This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 05fa60d64 feat(c/driver/postgresql): Implement consuming a PGresult 
via the copy reader (#2029)
05fa60d64 is described below

commit 05fa60d643c66b572d426ab28aa78fc52e9520e8
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Aug 6 21:52:23 2024 -0300

    feat(c/driver/postgresql): Implement consuming a PGresult via the copy 
reader (#2029)
    
    I started this PR wanting to get queries with parameters able to return
    their results; however, this turned into a PR leaning in to the
    `PqResultHelper` because it was helpful to export arrays from the
    `PGresult*` but wasn't quite general enough. I did a second bit of
    shuffling to make it (possibly, or maybe just for me) easier to
    understand what path gets taken on `ExecuteQuery()`.
    
    Some side effects of these changes are that we can now support multiple
    statements in the same query (by using `PQexec()` instead of
    `PQexecParams()` when there is no output requested) and that we can
    `ExecuteSchema()` for all parameterized queries.
    
    The actual feature is that a user can set `adbc.postgresql.use_copy =
    FALSE` to force a non-COPY path for queries that aren't supported there.
    Because we request binary data, we can use all the same infrastructure
    for converting the results! I have only one test for this although I did
    run the whole test suite in C++ and Python...there are still a few
    missing features (batch size hint, large string overflow, error detail,
    cancel) but most tests pass using either path.
    
    I'm happy to split this up if that is easier! I'm also planning to
    document the helper (but wanted a first round of review before
    documenting the behaviour to make sure it's behaviour we actually want).
    
    Closes #855, Closes #2035.
    
    ``` r
    library(adbcdrivermanager)
    #> Warning: package 'adbcdrivermanager' was built under R version 4.3.3
    
    con <- adbc_database_init(
      adbcpostgresql::adbcpostgresql(),
      uri = 
"postgresql://localhost:5432/postgres?user=postgres&password=password"
    ) |>
      adbc_connection_init()
    
    nycflights13::flights |>
      write_adbc(con, "flights")
    
    stream <- nanoarrow::nanoarrow_allocate_array_stream()
    rows <- con |>
      adbc_statement_init(adbc.postgresql.use_copy = FALSE) |>
      adbc_statement_set_sql_query(
        "SELECT * from flights where month = 1 AND day = 1"
      ) |>
      adbc_statement_prepare() |>
      adbc_statement_execute_query(stream)
    
    rows
    #> [1] 842
    
    tibble::as_tibble(stream)
    #> # A tibble: 842 × 19
    #>     year month   day dep_time sched_dep_time dep_delay arr_time 
sched_arr_time
    #>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          
<int>
    #>  1  2013     1     1      517            515         2      830          
  819
    #>  2  2013     1     1      533            529         4      850          
  830
    #>  3  2013     1     1      542            540         2      923          
  850
    #>  4  2013     1     1      544            545        -1     1004          
 1022
    #>  5  2013     1     1      554            600        -6      812          
  837
    #>  6  2013     1     1      554            558        -4      740          
  728
    #>  7  2013     1     1      555            600        -5      913          
  854
    #>  8  2013     1     1      557            600        -3      709          
  723
    #>  9  2013     1     1      557            600        -3      838          
  846
    #> 10  2013     1     1      558            600        -2      753          
  745
    #> # ℹ 832 more rows
    #> # ℹ 11 more variables: arr_delay <dbl>, carrier <chr>, flight <int>,
    #> #   tailnum <chr>, origin <chr>, dest <chr>, air_time <dbl>, distance 
<dbl>,
    #> #   hour <dbl>, minute <dbl>, time_hour <dttm>
    
    con |>
      execute_adbc("DROP TABLE flights")
    ```
    
    <sup>Created on 2024-07-25 with [reprex
    v2.1.0](https://reprex.tidyverse.org)</sup>
---
 c/driver/postgresql/connection.cc                 |  55 ++--
 c/driver/postgresql/postgresql_test.cc            | 130 ++++++++-
 c/driver/postgresql/result_helper.cc              | 338 ++++++++++++++++++++--
 c/driver/postgresql/result_helper.h               | 110 ++++++-
 c/driver/postgresql/statement.cc                  | 298 ++++++++-----------
 c/driver/postgresql/statement.h                   |  21 +-
 c/validation/adbc_validation_util.cc              |  14 +
 c/validation/adbc_validation_util.h               |   4 +
 python/adbc_driver_postgresql/tests/test_dbapi.py |   2 +-
 9 files changed, 707 insertions(+), 265 deletions(-)

diff --git a/c/driver/postgresql/connection.cc 
b/c/driver/postgresql/connection.cc
index a918188bf..b5c0ef161 100644
--- a/c/driver/postgresql/connection.cc
+++ b/c/driver/postgresql/connection.cc
@@ -145,12 +145,10 @@ class PqGetObjectsHelper {
         params.push_back(db_schema_);
       }
 
-      auto result_helper =
-          PqResultHelper{conn_, std::string(query.buffer), params, error_};
+      auto result_helper = PqResultHelper{conn_, std::string(query.buffer)};
       StringBuilderReset(&query);
 
-      RAISE_ADBC(result_helper.Prepare());
-      RAISE_ADBC(result_helper.Execute());
+      RAISE_ADBC(result_helper.Execute(error_, params));
 
       for (PqResultRow row : result_helper) {
         const char* schema_name = row[0].data;
@@ -188,12 +186,10 @@ class PqGetObjectsHelper {
       params.push_back(catalog_);
     }
 
-    PqResultHelper result_helper =
-        PqResultHelper{conn_, std::string(query.buffer), params, error_};
+    PqResultHelper result_helper = PqResultHelper{conn_, 
std::string(query.buffer)};
     StringBuilderReset(&query);
 
-    RAISE_ADBC(result_helper.Prepare());
-    RAISE_ADBC(result_helper.Execute());
+    RAISE_ADBC(result_helper.Execute(error_, params));
 
     for (PqResultRow row : result_helper) {
       const char* db_name = row[0].data;
@@ -280,11 +276,10 @@ class PqGetObjectsHelper {
       }
     }
 
-    auto result_helper = PqResultHelper{conn_, query.buffer, params, error_};
+    auto result_helper = PqResultHelper{conn_, query.buffer};
     StringBuilderReset(&query);
 
-    RAISE_ADBC(result_helper.Prepare());
-    RAISE_ADBC(result_helper.Execute());
+    RAISE_ADBC(result_helper.Execute(error_, params));
     for (PqResultRow row : result_helper) {
       const char* table_name = row[0].data;
       const char* table_type = row[1].data;
@@ -341,11 +336,10 @@ class PqGetObjectsHelper {
       params.push_back(std::string(column_name_));
     }
 
-    auto result_helper = PqResultHelper{conn_, query.buffer, params, error_};
+    auto result_helper = PqResultHelper{conn_, query.buffer};
     StringBuilderReset(&query);
 
-    RAISE_ADBC(result_helper.Prepare());
-    RAISE_ADBC(result_helper.Execute());
+    RAISE_ADBC(result_helper.Execute(error_, params));
 
     for (PqResultRow row : result_helper) {
       const char* column_name = row[0].data;
@@ -493,11 +487,10 @@ class PqGetObjectsHelper {
       params.push_back(std::string(column_name_));
     }
 
-    auto result_helper = PqResultHelper{conn_, query.buffer, params, error_};
+    auto result_helper = PqResultHelper{conn_, query.buffer};
     StringBuilderReset(&query);
 
-    RAISE_ADBC(result_helper.Prepare());
-    RAISE_ADBC(result_helper.Execute());
+    RAISE_ADBC(result_helper.Execute(error_, params));
 
     for (PqResultRow row : result_helper) {
       const char* constraint_name = row[0].data;
@@ -655,9 +648,8 @@ AdbcStatusCode 
PostgresConnection::PostgresConnectionGetInfoImpl(
         break;
       case ADBC_INFO_VENDOR_VERSION: {
         const char* stmt = "SHOW server_version_num";
-        auto result_helper = PqResultHelper{conn_, std::string(stmt), error};
-        RAISE_ADBC(result_helper.Prepare());
-        RAISE_ADBC(result_helper.Execute());
+        auto result_helper = PqResultHelper{conn_, std::string(stmt)};
+        RAISE_ADBC(result_helper.Execute(error));
         auto it = result_helper.begin();
         if (it == result_helper.end()) {
           SetError(error, "[libpq] PostgreSQL returned no rows for '%s'", 
stmt);
@@ -760,9 +752,8 @@ AdbcStatusCode PostgresConnection::GetOption(const char* 
option, char* value,
   if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_CATALOG) == 0) {
     output = PQdb(conn_);
   } else if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 
0) {
-    PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA", {}, error};
-    RAISE_ADBC(result_helper.Prepare());
-    RAISE_ADBC(result_helper.Execute());
+    PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA"};
+    RAISE_ADBC(result_helper.Execute(error));
     auto it = result_helper.begin();
     if (it == result_helper.end()) {
       SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT 
CURRENT_SCHEMA'");
@@ -931,10 +922,8 @@ AdbcStatusCode PostgresConnectionGetStatisticsImpl(PGconn* 
conn, const char* db_
   std::string prev_table;
 
   {
-    PqResultHelper result_helper{
-        conn, query, {db_schema, table_name ? table_name : "%"}, error};
-    RAISE_ADBC(result_helper.Prepare());
-    RAISE_ADBC(result_helper.Execute());
+    PqResultHelper result_helper{conn, query};
+    RAISE_ADBC(result_helper.Execute(error, {db_schema, table_name ? 
table_name : "%"}));
 
     for (PqResultRow row : result_helper) {
       auto reltuples = row[5].ParseDouble();
@@ -1166,11 +1155,9 @@ AdbcStatusCode PostgresConnection::GetTableSchema(const 
char* catalog,
 
   std::vector<std::string> params = {table_name_str};
 
-  PqResultHelper result_helper =
-      PqResultHelper{conn_, std::string(query.c_str()), params, error};
+  PqResultHelper result_helper = PqResultHelper{conn_, 
std::string(query.c_str())};
 
-  RAISE_ADBC(result_helper.Prepare());
-  auto result = result_helper.Execute();
+  auto result = result_helper.Execute(error, params);
   if (result != ADBC_STATUS_OK) {
     auto error_code = std::string(error->sqlstate, 5);
     if ((error_code == "42P01") || (error_code == "42602")) {
@@ -1337,10 +1324,8 @@ AdbcStatusCode PostgresConnection::SetOption(const char* 
key, const char* value,
     return ADBC_STATUS_OK;
   } else if (std::strcmp(key, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) {
     // PostgreSQL doesn't accept a parameter here
-    PqResultHelper result_helper{
-        conn_, std::string("SET search_path TO ") + value, {}, error};
-    RAISE_ADBC(result_helper.Prepare());
-    RAISE_ADBC(result_helper.Execute());
+    PqResultHelper result_helper{conn_, std::string("SET search_path TO ") + 
value};
+    RAISE_ADBC(result_helper.Execute(error));
     return ADBC_STATUS_OK;
   }
   SetError(error, "%s%s", "[libpq] Unknown option ", key);
diff --git a/c/driver/postgresql/postgresql_test.cc 
b/c/driver/postgresql/postgresql_test.cc
index a6d4f7704..ff3dc0b70 100644
--- a/c/driver/postgresql/postgresql_test.cc
+++ b/c/driver/postgresql/postgresql_test.cc
@@ -1247,7 +1247,7 @@ TEST_F(PostgresStatementTest, UpdateInExecuteQuery) {
     ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
                                           &reader.rows_affected, &error),
                 IsOkStatus(&error));
-    ASSERT_EQ(reader.rows_affected, -1);
+    ASSERT_EQ(reader.rows_affected, 2);
     ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
     ASSERT_NO_FATAL_FAILURE(reader.Next());
     ASSERT_EQ(reader.array->release, nullptr);
@@ -1276,6 +1276,32 @@ TEST_F(PostgresStatementTest, UpdateInExecuteQuery) {
   }
 }
 
+TEST_F(PostgresStatementTest, ExecuteSchemaParameterizedQuery) {
+  nanoarrow::UniqueSchema schema_bind;
+  ArrowSchemaInit(schema_bind.get());
+  ASSERT_THAT(ArrowSchemaSetTypeStruct(schema_bind.get(), 1),
+              adbc_validation::IsOkErrno());
+  ASSERT_THAT(ArrowSchemaSetType(schema_bind->children[0], 
NANOARROW_TYPE_STRING),
+              adbc_validation::IsOkErrno());
+
+  nanoarrow::UniqueArrayStream bind;
+  nanoarrow::EmptyArrayStream(schema_bind.get()).ToArrayStream(bind.get());
+
+  ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), 
IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT $1", &error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementBindStream(&statement, bind.get(), &error), 
IsOkStatus());
+
+  nanoarrow::UniqueSchema schema;
+  ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error),
+              IsOkStatus(&error));
+
+  ASSERT_EQ(1, schema->n_children);
+  ASSERT_STREQ("u", schema->children[0]->format);
+
+  ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
+}
+
 TEST_F(PostgresStatementTest, BatchSizeHint) {
   ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "batch_size_hint_test", 
&error),
               IsOkStatus(&error));
@@ -1345,16 +1371,13 @@ TEST_F(PostgresStatementTest, 
AdbcErrorBackwardsCompatibility) {
 TEST_F(PostgresStatementTest, Cancel) {
   ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), 
IsOkStatus(&error));
 
-  for (const char* query : {
-           "DROP TABLE IF EXISTS test_cancel",
-           "CREATE TABLE test_cancel (ints INT)",
-           R"(INSERT INTO test_cancel (ints)
-              SELECT g :: INT FROM GENERATE_SERIES(1, 65536) temp(g))",
-       }) {
-    ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error), 
IsOkStatus(&error));
-    ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, 
&error),
-                IsOkStatus(&error));
-  }
+  const char* query = R"(DROP TABLE IF EXISTS test_cancel;
+            CREATE TABLE test_cancel (ints INT);
+            INSERT INTO test_cancel (ints)
+            SELECT g :: INT FROM GENERATE_SERIES(1, 65536) temp(g);)";
+  ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error), 
IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
+              IsOkStatus(&error));
 
   ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT * FROM 
test_cancel", &error),
               IsOkStatus(&error));
@@ -1381,6 +1404,91 @@ TEST_F(PostgresStatementTest, Cancel) {
   ASSERT_NE(0, AdbcErrorGetDetailCount(detail));
 }
 
+TEST_F(PostgresStatementTest, MultipleStatementsSingleQuery) {
+  ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), 
IsOkStatus(&error));
+
+  const char* query = R"(DROP TABLE IF EXISTS test_query_statements;
+            CREATE TABLE test_query_statements (ints INT);
+            INSERT INTO test_query_statements VALUES((1));
+            INSERT INTO test_query_statements VALUES((2));
+            INSERT INTO test_query_statements VALUES((3));)";
+  ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error), 
IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
+              IsOkStatus(&error));
+
+  ASSERT_THAT(
+      AdbcStatementSetSqlQuery(&statement, "SELECT * FROM 
test_query_statements", &error),
+      IsOkStatus(&error));
+
+  adbc_validation::StreamReader reader;
+  ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
+                                        &reader.rows_affected, &error),
+              IsOkStatus(&error));
+  reader.GetSchema();
+  ASSERT_THAT(reader.MaybeNext(), adbc_validation::IsOkErrno());
+  ASSERT_EQ(reader.array->length, 3);
+}
+
+TEST_F(PostgresStatementTest, SetUseCopyFalse) {
+  ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), 
IsOkStatus(&error));
+
+  const char* query = R"(DROP TABLE IF EXISTS test_query_set_copy_false;
+            CREATE TABLE test_query_set_copy_false (ints INT);
+            INSERT INTO test_query_set_copy_false VALUES((1));
+            INSERT INTO test_query_set_copy_false VALUES((NULL));
+            INSERT INTO test_query_set_copy_false VALUES((3));)";
+  ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error), 
IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
+              IsOkStatus(&error));
+
+  // Check option setting/getting
+  ASSERT_EQ(
+      adbc_validation::StatementGetOption(&statement, 
"adbc.postgresql.use_copy", &error),
+      "true");
+
+  ASSERT_THAT(AdbcStatementSetOption(&statement, "adbc.postgresql.use_copy",
+                                     "not true or false", &error),
+              IsStatus(ADBC_STATUS_INVALID_ARGUMENT));
+
+  ASSERT_THAT(AdbcStatementSetOption(&statement, "adbc.postgresql.use_copy",
+                                     ADBC_OPTION_VALUE_ENABLED, &error),
+              IsOkStatus(&error));
+  ASSERT_EQ(
+      adbc_validation::StatementGetOption(&statement, 
"adbc.postgresql.use_copy", &error),
+      "true");
+
+  ASSERT_THAT(AdbcStatementSetOption(&statement, "adbc.postgresql.use_copy",
+                                     ADBC_OPTION_VALUE_DISABLED, &error),
+              IsOkStatus(&error));
+  ASSERT_EQ(
+      adbc_validation::StatementGetOption(&statement, 
"adbc.postgresql.use_copy", &error),
+      "false");
+
+  ASSERT_THAT(AdbcStatementSetSqlQuery(&statement,
+                                       "SELECT * FROM 
test_query_set_copy_false", &error),
+              IsOkStatus(&error));
+
+  adbc_validation::StreamReader reader;
+  ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
+                                        &reader.rows_affected, &error),
+              IsOkStatus(&error));
+
+  ASSERT_EQ(reader.rows_affected, 3);
+
+  reader.GetSchema();
+  ASSERT_EQ(reader.schema->n_children, 1);
+  ASSERT_STREQ(reader.schema->children[0]->format, "i");
+  ASSERT_STREQ(reader.schema->children[0]->name, "ints");
+
+  ASSERT_THAT(reader.MaybeNext(), adbc_validation::IsOkErrno());
+  ASSERT_EQ(reader.array->length, 3);
+  ASSERT_EQ(reader.array->n_children, 1);
+  ASSERT_EQ(reader.array->children[0]->null_count, 1);
+
+  ASSERT_THAT(reader.MaybeNext(), adbc_validation::IsOkErrno());
+  ASSERT_EQ(reader.array->release, nullptr);
+}
+
 struct TypeTestCase {
   std::string name;
   std::string sql_type;
diff --git a/c/driver/postgresql/result_helper.cc 
b/c/driver/postgresql/result_helper.cc
index ad5a54e00..df890a7c5 100644
--- a/c/driver/postgresql/result_helper.cc
+++ b/c/driver/postgresql/result_helper.cc
@@ -17,24 +17,25 @@
 
 #include "result_helper.h"
 
+#include <charconv>
+#include <memory>
+
+#include "copy/reader.h"
 #include "driver/common/utils.h"
 #include "error.h"
 
 namespace adbcpq {
 
-PqResultHelper::~PqResultHelper() {
-  if (result_ != nullptr) {
-    PQclear(result_);
-  }
-}
+PqResultHelper::~PqResultHelper() { ClearResult(); }
 
-AdbcStatusCode PqResultHelper::Prepare() {
+AdbcStatusCode PqResultHelper::PrepareInternal(int n_params, const Oid* 
param_oids,
+                                               struct AdbcError* error) {
   // TODO: make stmtName a unique identifier?
   PGresult* result =
-      PQprepare(conn_, /*stmtName=*/"", query_.c_str(), param_values_.size(), 
NULL);
+      PQprepare(conn_, /*stmtName=*/"", query_.c_str(), n_params, param_oids);
   if (PQresultStatus(result) != PGRES_COMMAND_OK) {
     AdbcStatusCode code =
-        SetError(error_, result, "[libpq] Failed to prepare query: %s\nQuery 
was:%s",
+        SetError(error, result, "[libpq] Failed to prepare query: %s\nQuery 
was:%s",
                  PQerrorMessage(conn_), query_.c_str());
     PQclear(result);
     return code;
@@ -44,24 +45,327 @@ AdbcStatusCode PqResultHelper::Prepare() {
   return ADBC_STATUS_OK;
 }
 
-AdbcStatusCode PqResultHelper::Execute() {
-  std::vector<const char*> param_c_strs;
+AdbcStatusCode PqResultHelper::Prepare(struct AdbcError* error) {
+  return PrepareInternal(0, nullptr, error);
+}
 
-  for (size_t index = 0; index < param_values_.size(); index++) {
-    param_c_strs.push_back(param_values_[index].c_str());
+AdbcStatusCode PqResultHelper::Prepare(const std::vector<Oid>& param_oids,
+                                       struct AdbcError* error) {
+  return PrepareInternal(param_oids.size(), param_oids.data(), error);
+}
+
+AdbcStatusCode PqResultHelper::DescribePrepared(struct AdbcError* error) {
+  ClearResult();
+  result_ = PQdescribePrepared(conn_, /*stmtName=*/"");
+  if (PQresultStatus(result_) != PGRES_COMMAND_OK) {
+    AdbcStatusCode code = SetError(
+        error, result_, "[libpq] Failed to describe prepared statement: 
%s\nQuery was:%s",
+        PQerrorMessage(conn_), query_.c_str());
+    ClearResult();
+    return code;
   }
 
-  result_ =
-      PQexecPrepared(conn_, "", param_values_.size(), param_c_strs.data(), 
NULL, NULL, 0);
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode PqResultHelper::Execute(struct AdbcError* error,
+                                       const std::vector<std::string>& params,
+                                       PostgresType* param_types) {
+  if (params.size() == 0 && param_types == nullptr && output_format_ == 
Format::kText) {
+    ClearResult();
+    result_ = PQexec(conn_, query_.c_str());
+  } else {
+    std::vector<const char*> param_values;
+    std::vector<int> param_lengths;
+    std::vector<int> param_formats;
+
+    for (const auto& param : params) {
+      param_values.push_back(param.data());
+      param_lengths.push_back(static_cast<int>(param.size()));
+      param_formats.push_back(static_cast<int>(param_format_));
+    }
+
+    std::vector<Oid> param_oids;
+    const Oid* param_oids_ptr = nullptr;
+    if (param_types != nullptr) {
+      param_oids.resize(params.size());
+      for (size_t i = 0; i < params.size(); i++) {
+        param_oids[i] = param_types->child(i).oid();
+      }
+      param_oids_ptr = param_oids.data();
+    }
+
+    ClearResult();
+    result_ = PQexecParams(conn_, query_.c_str(), param_values.size(), 
param_oids_ptr,
+                           param_values.data(), param_lengths.data(),
+                           param_formats.data(), 
static_cast<int>(output_format_));
+  }
 
   ExecStatusType status = PQresultStatus(result_);
   if (status != PGRES_TUPLES_OK && status != PGRES_COMMAND_OK) {
-    AdbcStatusCode error =
-        SetError(error_, result_, "[libpq] Failed to execute query '%s': %s",
+    AdbcStatusCode status =
+        SetError(error, result_, "[libpq] Failed to execute query '%s': %s",
                  query_.c_str(), PQerrorMessage(conn_));
-    return error;
+    return status;
+  }
+
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode PqResultHelper::ExecuteCopy(struct AdbcError* error) {
+  // Remove trailing semicolon(s) from the query before feeding it into COPY
+  while (!query_.empty() && query_.back() == ';') {
+    query_.pop_back();
   }
 
+  std::string copy_query = "COPY (" + query_ + ") TO STDOUT (FORMAT binary)";
+  ClearResult();
+  result_ = PQexecParams(conn_, copy_query.c_str(), /*nParams=*/0,
+                         /*paramTypes=*/nullptr, /*paramValues=*/nullptr,
+                         /*paramLengths=*/nullptr, /*paramFormats=*/nullptr,
+                         static_cast<int>(Format::kBinary));
+
+  if (PQresultStatus(result_) != PGRES_COPY_OUT) {
+    AdbcStatusCode code = SetError(
+        error, result_,
+        "[libpq] Failed to execute query: could not begin COPY: %s\nQuery was: 
%s",
+        PQerrorMessage(conn_), copy_query.c_str());
+    ClearResult();
+    return code;
+  }
+
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode PqResultHelper::ResolveParamTypes(PostgresTypeResolver& 
type_resolver,
+                                                 PostgresType* param_types,
+                                                 struct AdbcError* error) {
+  struct ArrowError na_error;
+  ArrowErrorInit(&na_error);
+
+  const int num_params = PQnparams(result_);
+  PostgresType root_type(PostgresTypeId::kRecord);
+
+  for (int i = 0; i < num_params; i++) {
+    const Oid pg_oid = PQparamtype(result_, i);
+    PostgresType pg_type;
+    if (type_resolver.Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) {
+      SetError(error, "%s%d%s%s%s%d", "[libpq] Parameter #", i + 1, " (\"",
+               PQfname(result_, i), "\") has unknown type code ", pg_oid);
+      ClearResult();
+      return ADBC_STATUS_NOT_IMPLEMENTED;
+    }
+
+    root_type.AppendChild(PQfname(result_, i), pg_type);
+  }
+
+  *param_types = root_type;
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode PqResultHelper::ResolveOutputTypes(PostgresTypeResolver& 
type_resolver,
+                                                  PostgresType* result_types,
+                                                  struct AdbcError* error) {
+  struct ArrowError na_error;
+  ArrowErrorInit(&na_error);
+
+  const int num_fields = PQnfields(result_);
+  PostgresType root_type(PostgresTypeId::kRecord);
+
+  for (int i = 0; i < num_fields; i++) {
+    const Oid pg_oid = PQftype(result_, i);
+    PostgresType pg_type;
+    if (type_resolver.Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) {
+      SetError(error, "%s%d%s%s%s%d", "[libpq] Column #", i + 1, " (\"",
+               PQfname(result_, i), "\") has unknown type code ", pg_oid);
+      ClearResult();
+      return ADBC_STATUS_NOT_IMPLEMENTED;
+    }
+
+    root_type.AppendChild(PQfname(result_, i), pg_type);
+  }
+
+  *result_types = root_type;
+  return ADBC_STATUS_OK;
+}
+
+PGresult* PqResultHelper::ReleaseResult() {
+  PGresult* out = result_;
+  result_ = nullptr;
+  return out;
+}
+
+int64_t PqResultHelper::AffectedRows() {
+  if (result_ == nullptr) {
+    return -1;
+  }
+
+  char* first = PQcmdTuples(result_);
+  char* last = first + strlen(first);
+  if ((last - first) == 0) {
+    return -1;
+  }
+
+  int64_t out;
+  auto result = std::from_chars(first, last, out);
+
+  if (result.ec == std::errc() && result.ptr == last) {
+    return out;
+  } else {
+    return -1;
+  }
+}
+
+int PqResultArrayReader::GetSchema(struct ArrowSchema* out) {
+  ResetErrors();
+
+  if (schema_->release == nullptr) {
+    AdbcStatusCode status = Initialize(&error_);
+    if (status != ADBC_STATUS_OK) {
+      return EINVAL;
+    }
+  }
+
+  return ArrowSchemaDeepCopy(schema_.get(), out);
+}
+
+int PqResultArrayReader::GetNext(struct ArrowArray* out) {
+  ResetErrors();
+
+  if (schema_->release == nullptr) {
+    AdbcStatusCode status = Initialize(&error_);
+    if (status != ADBC_STATUS_OK) {
+      return EINVAL;
+    }
+  }
+
+  if (!helper_.HasResult()) {
+    out->release = nullptr;
+    return NANOARROW_OK;
+  }
+
+  nanoarrow::UniqueArray tmp;
+  NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromSchema(tmp.get(), schema_.get(), 
&na_error_));
+  NANOARROW_RETURN_NOT_OK(ArrowArrayStartAppending(tmp.get()));
+  for (int i = 0; i < helper_.NumColumns(); i++) {
+    NANOARROW_RETURN_NOT_OK(field_readers_[i]->InitArray(tmp->children[i]));
+  }
+
+  // TODO: If we get an EOVERFLOW here (e.g., big string data), we
+  // would need to keep track of what row number we're on and start
+  // from there instead of begin() on the next call. We could also
+  // respect the size hint here to chunk the batches.
+  struct ArrowBufferView item;
+  for (auto it = helper_.begin(); it != helper_.end(); it++) {
+    auto row = *it;
+    for (int i = 0; i < helper_.NumColumns(); i++) {
+      auto pg_item = row[i];
+      item.data.data = pg_item.data;
+
+      if (pg_item.is_null) {
+        item.size_bytes = -1;
+      } else {
+        item.size_bytes = pg_item.len;
+      }
+
+      NANOARROW_RETURN_NOT_OK(
+          field_readers_[i]->Read(&item, item.size_bytes, tmp->children[i], 
&na_error_));
+    }
+  }
+
+  for (int i = 0; i < helper_.NumColumns(); i++) {
+    NANOARROW_RETURN_NOT_OK(field_readers_[i]->FinishArray(tmp->children[i], 
&na_error_));
+  }
+
+  tmp->length = helper_.NumRows();
+  tmp->null_count = 0;
+  NANOARROW_RETURN_NOT_OK(ArrowArrayFinishBuildingDefault(tmp.get(), 
&na_error_));
+
+  // Ensure that the next call to GetNext() will signal the end of the stream
+  helper_.ClearResult();
+
+  // Canonically return zero-size results as an empty stream
+  if (tmp->length == 0) {
+    out->release = nullptr;
+    return NANOARROW_OK;
+  }
+
+  ArrowArrayMove(tmp.get(), out);
+  return NANOARROW_OK;
+}
+
+const char* PqResultArrayReader::GetLastError() {
+  if (error_.message != nullptr) {
+    return error_.message;
+  } else {
+    return na_error_.message;
+  }
+}
+
+AdbcStatusCode PqResultArrayReader::Initialize(struct AdbcError* error) {
+  helper_.set_output_format(PqResultHelper::Format::kBinary);
+  RAISE_ADBC(helper_.Execute(error));
+
+  ArrowSchemaInit(schema_.get());
+  CHECK_NA_DETAIL(INTERNAL, ArrowSchemaSetTypeStruct(schema_.get(), 
helper_.NumColumns()),
+                  &na_error_, error);
+
+  for (int i = 0; i < helper_.NumColumns(); i++) {
+    PostgresType child_type;
+    CHECK_NA_DETAIL(INTERNAL,
+                    type_resolver_->Find(helper_.FieldType(i), &child_type, 
&na_error_),
+                    &na_error_, error);
+
+    CHECK_NA(INTERNAL, child_type.SetSchema(schema_->children[i]), error);
+    CHECK_NA(INTERNAL, ArrowSchemaSetName(schema_->children[i], 
helper_.FieldName(i)),
+             error);
+
+    std::unique_ptr<PostgresCopyFieldReader> child_reader;
+    CHECK_NA_DETAIL(
+        INTERNAL,
+        MakeCopyFieldReader(child_type, schema_->children[i], &child_reader, 
&na_error_),
+        &na_error_, error);
+
+    child_reader->Init(child_type);
+    CHECK_NA_DETAIL(INTERNAL, child_reader->InitSchema(schema_->children[i]), 
&na_error_,
+                    error);
+
+    field_readers_.push_back(std::move(child_reader));
+  }
+
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode PqResultArrayReader::ToArrayStream(int64_t* affected_rows,
+                                                  struct ArrowArrayStream* out,
+                                                  struct AdbcError* error) {
+  if (out == nullptr) {
+    // If there is no output requested, we still need to execute and set
+    // affected_rows if needed. We don't need an output schema or to set
+    // up a copy reader, so we can skip those steps by going straight
+    // to Execute(). This also enables us to support queries with multiple
+    // statements because we can call PQexec() instead of PQexecParams().
+    RAISE_ADBC(helper_.Execute(error));
+
+    if (affected_rows != nullptr) {
+      *affected_rows = helper_.AffectedRows();
+    }
+
+    return ADBC_STATUS_OK;
+  }
+
+  // Execute eagerly. We need this to provide row counts for DELETE and
+  // CREATE TABLE queries as well as to provide more informative errors
+  // until this reader class is wired up to provide extended AdbcError
+  // information.
+  RAISE_ADBC(Initialize(error));
+  if (affected_rows != nullptr) {
+    *affected_rows = helper_.AffectedRows();
+  }
+
+  nanoarrow::ArrayStreamFactory<PqResultArrayReader>::InitArrayStream(
+      new PqResultArrayReader(this), out);
+
   return ADBC_STATUS_OK;
 }
 
diff --git a/c/driver/postgresql/result_helper.h 
b/c/driver/postgresql/result_helper.h
index 25a79fad8..43083b8bc 100644
--- a/c/driver/postgresql/result_helper.h
+++ b/c/driver/postgresql/result_helper.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <cassert>
+#include <memory>
 #include <optional>
 #include <string>
 #include <utility>
@@ -26,6 +27,8 @@
 #include <arrow-adbc/adbc.h>
 #include <libpq-fe.h>
 
+#include "copy/reader.h"
+
 namespace adbcpq {
 
 /// \brief A single column in a single row of a result set.
@@ -73,25 +76,57 @@ class PqResultRow {
 // prior to iterating
 class PqResultHelper {
  public:
-  explicit PqResultHelper(PGconn* conn, std::string query, struct AdbcError* 
error)
-      : conn_(conn), query_(std::move(query)), error_(error) {}
+  enum class Format {
+    kText = 0,
+    kBinary = 1,
+  };
 
-  explicit PqResultHelper(PGconn* conn, std::string query,
-                          std::vector<std::string> param_values, struct 
AdbcError* error)
-      : conn_(conn),
-        query_(std::move(query)),
-        param_values_(std::move(param_values)),
-        error_(error) {}
+  explicit PqResultHelper(PGconn* conn, std::string query)
+      : conn_(conn), query_(std::move(query)) {}
+
+  PqResultHelper(PqResultHelper&& other)
+      : PqResultHelper(other.conn_, std::move(other.query_)) {
+    result_ = other.result_;
+    other.result_ = nullptr;
+  }
 
   ~PqResultHelper();
 
-  AdbcStatusCode Prepare();
-  AdbcStatusCode Execute();
+  void set_param_format(Format format) { param_format_ = format; }
+  void set_output_format(Format format) { output_format_ = format; }
+
+  AdbcStatusCode Prepare(struct AdbcError* error);
+  AdbcStatusCode Prepare(const std::vector<Oid>& param_oids, struct AdbcError* 
error);
+  AdbcStatusCode DescribePrepared(struct AdbcError* error);
+  AdbcStatusCode Execute(struct AdbcError* error,
+                         const std::vector<std::string>& params = {},
+                         PostgresType* param_types = nullptr);
+  AdbcStatusCode ExecuteCopy(struct AdbcError* error);
+  AdbcStatusCode ResolveParamTypes(PostgresTypeResolver& type_resolver,
+                                   PostgresType* param_types, struct 
AdbcError* error);
+  AdbcStatusCode ResolveOutputTypes(PostgresTypeResolver& type_resolver,
+                                    PostgresType* result_types, struct 
AdbcError* error);
+
+  bool HasResult() { return result_ != nullptr; }
+
+  PGresult* ReleaseResult();
+
+  void ClearResult() {
+    PQclear(result_);
+    result_ = nullptr;
+  }
+
+  int64_t AffectedRows();
 
   int NumRows() const { return PQntuples(result_); }
 
   int NumColumns() const { return PQnfields(result_); }
 
+  const char* FieldName(int column_number) const {
+    return PQfname(result_, column_number);
+  }
+  Oid FieldType(int column_number) const { return PQftype(result_, 
column_number); }
+
   class iterator {
     const PqResultHelper& outer_;
     int curr_row_ = 0;
@@ -127,7 +162,58 @@ class PqResultHelper {
   PGresult* result_ = nullptr;
   PGconn* conn_;
   std::string query_;
-  std::vector<std::string> param_values_;
-  struct AdbcError* error_;
+  Format param_format_ = Format::kText;
+  Format output_format_ = Format::kText;
+
+  AdbcStatusCode PrepareInternal(int n_params, const Oid* param_oids,
+                                 struct AdbcError* error);
 };
+
+class PqResultArrayReader {
+ public:
+  PqResultArrayReader(PGconn* conn, std::shared_ptr<PostgresTypeResolver> 
type_resolver,
+                      std::string query)
+      : helper_(conn, std::move(query)), type_resolver_(type_resolver) {
+    ArrowErrorInit(&na_error_);
+    error_ = ADBC_ERROR_INIT;
+  }
+
+  ~PqResultArrayReader() { ResetErrors(); }
+
+  int GetSchema(struct ArrowSchema* out);
+  int GetNext(struct ArrowArray* out);
+  const char* GetLastError();
+
+  AdbcStatusCode ToArrayStream(int64_t* affected_rows, struct 
ArrowArrayStream* out,
+                               struct AdbcError* error);
+
+  AdbcStatusCode Initialize(struct AdbcError* error);
+
+ private:
+  PqResultHelper helper_;
+  std::shared_ptr<PostgresTypeResolver> type_resolver_;
+  std::vector<std::unique_ptr<PostgresCopyFieldReader>> field_readers_;
+  nanoarrow::UniqueSchema schema_;
+  struct AdbcError error_;
+  struct ArrowError na_error_;
+
+  explicit PqResultArrayReader(PqResultArrayReader* other)
+      : helper_(std::move(other->helper_)),
+        type_resolver_(std::move(other->type_resolver_)),
+        field_readers_(std::move(other->field_readers_)),
+        schema_(std::move(other->schema_)) {
+    ArrowErrorInit(&na_error_);
+    error_ = ADBC_ERROR_INIT;
+  }
+
+  void ResetErrors() {
+    ArrowErrorInit(&na_error_);
+
+    if (error_.private_data != nullptr) {
+      error_.release(&error_);
+    }
+    error_ = ADBC_ERROR_INIT;
+  }
+};
+
 }  // namespace adbcpq
diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index 444355123..0fa8a79b9 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -82,30 +82,6 @@ struct OneValueStream {
   }
 };
 
-/// Build an PostgresType object from a PGresult*
-AdbcStatusCode ResolvePostgresType(const PostgresTypeResolver& type_resolver,
-                                   PGresult* result, PostgresType* out,
-                                   struct AdbcError* error) {
-  ArrowError na_error;
-  const int num_fields = PQnfields(result);
-  PostgresType root_type(PostgresTypeId::kRecord);
-
-  for (int i = 0; i < num_fields; i++) {
-    const Oid pg_oid = PQftype(result, i);
-    PostgresType pg_type;
-    if (type_resolver.Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) {
-      SetError(error, "%s%d%s%s%s%d", "[libpq] Column #", i + 1, " (\"",
-               PQfname(result, i), "\") has unknown type code ", pg_oid);
-      return ADBC_STATUS_NOT_IMPLEMENTED;
-    }
-
-    root_type.AppendChild(PQfname(result, i), pg_type);
-  }
-
-  *out = root_type;
-  return ADBC_STATUS_OK;
-}
-
 /// Helper to manage bind parameters with a prepared statement
 struct BindStream {
   Handle<struct ArrowArrayStream> bind;
@@ -1148,18 +1124,15 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
   return ADBC_STATUS_OK;
 }
 
-AdbcStatusCode PostgresStatement::ExecutePreparedStatement(
-    struct ArrowArrayStream* stream, int64_t* rows_affected, struct AdbcError* 
error) {
-  if (!bind_.release) {
-    // TODO: set an empty stream just to unify the code paths
-    SetError(error, "%s",
-             "[libpq] Prepared statements without parameters are not 
implemented");
-    return ADBC_STATUS_NOT_IMPLEMENTED;
-  }
+AdbcStatusCode PostgresStatement::ExecuteBind(struct ArrowArrayStream* stream,
+                                              int64_t* rows_affected,
+                                              struct AdbcError* error) {
   if (stream) {
     // TODO:
     SetError(error, "%s",
-             "[libpq] Prepared statements returning result sets are not 
implemented");
+             "[libpq] Prepared statements with parameters returning result 
sets are not "
+             "implemented");
+
     return ADBC_STATUS_NOT_IMPLEMENTED;
   }
 
@@ -1178,27 +1151,10 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct 
ArrowArrayStream* stream,
                                                int64_t* rows_affected,
                                                struct AdbcError* error) {
   ClearResult();
-  if (prepared_) {
-    if (bind_.release || !stream) {
-      return ExecutePreparedStatement(stream, rows_affected, error);
-    }
-    // XXX: don't use a prepared statement to execute a no-parameter
-    // result-set-returning query for now, since we can't easily get
-    // access to COPY there. (This might have to become sequential
-    // executions of COPY (EXECUTE ($n, ...)) TO STDOUT which won't
-    // get all the benefits of a prepared statement.) At preparation
-    // time we don't know whether the query will be used with a result
-    // set or not without analyzing the query (we could prepare both?)
-    // and https://stackoverflow.com/questions/69233792 suggests that
-    // you can't PREPARE a query containing COPY.
-  }
-  if (!stream && !ingest_.target.empty()) {
-    return ExecuteUpdateBulk(rows_affected, error);
-  }
 
-  // Remove trailing semicolon(s) from the query before feeding it into COPY
-  while (!query_.empty() && query_.back() == ';') {
-    query_.pop_back();
+  // Use a dedicated path to handle bulk ingest
+  if (!ingest_.target.empty()) {
+    return ExecuteIngest(stream, rows_affected, error);
   }
 
   if (query_.empty()) {
@@ -1206,53 +1162,53 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct 
ArrowArrayStream* stream,
     return ADBC_STATUS_INVALID_STATE;
   }
 
-  // 1. Prepare the query to get the schema
-  {
-    RAISE_ADBC(SetupReader(error));
-
-    // If the caller did not request a result set or if there are no
-    // inferred output columns (e.g. a CREATE or UPDATE), then don't
-    // use COPY (which would fail anyways)
-    if (!stream || reader_.copy_reader_->pg_type().n_children() == 0) {
-      RAISE_ADBC(ExecuteUpdateQuery(rows_affected, error));
-      if (stream) {
-        struct ArrowSchema schema;
-        std::memset(&schema, 0, sizeof(schema));
-        RAISE_NA(reader_.copy_reader_->GetSchema(&schema));
-        nanoarrow::EmptyArrayStream::MakeUnique(&schema).move(stream);
-      }
-      return ADBC_STATUS_OK;
-    }
+  // Use a dedicated path to handle parameter binding
+  if (bind_.release != nullptr) {
+    return ExecuteBind(stream, rows_affected, error);
+  }
 
-    // This resolves the reader specific to each PostgresType -> ArrowSchema
-    // conversion. It is unlikely that this will fail given that we have just
-    // inferred these conversions ourselves.
-    struct ArrowError na_error;
-    int na_res = reader_.copy_reader_->InitFieldReaders(&na_error);
-    if (na_res != NANOARROW_OK) {
-      SetError(error, "[libpq] Failed to initialize field readers: %s", 
na_error.message);
-      return na_res;
-    }
+  // If we have been requested to avoid COPY or there is no output requested,
+  // execute using the PqResultArrayReader.
+  if (!stream || !use_copy_) {
+    PqResultArrayReader reader(connection_->conn(), type_resolver_, query_);
+    RAISE_ADBC(reader.ToArrayStream(rows_affected, stream, error));
+    return ADBC_STATUS_OK;
   }
 
-  // 2. Execute the query with COPY to get binary tuples
-  {
-    std::string copy_query = "COPY (" + query_ + ") TO STDOUT (FORMAT binary)";
-    reader_.result_ =
-        PQexecParams(connection_->conn(), copy_query.c_str(), /*nParams=*/0,
-                     /*paramTypes=*/nullptr, /*paramValues=*/nullptr,
-                     /*paramLengths=*/nullptr, /*paramFormats=*/nullptr, 
kPgBinaryFormat);
-    if (PQresultStatus(reader_.result_) != PGRES_COPY_OUT) {
-      AdbcStatusCode code = SetError(
-          error, reader_.result_,
-          "[libpq] Failed to execute query: could not begin COPY: %s\nQuery 
was: %s",
-          PQerrorMessage(connection_->conn()), copy_query.c_str());
-      ClearResult();
-      return code;
-    }
-    // Result is read from the connection, not the result, but we won't clear 
it here
+  PqResultHelper helper(connection_->conn(), query_);
+  RAISE_ADBC(helper.Prepare(error));
+  RAISE_ADBC(helper.DescribePrepared(error));
+
+  // Initialize the copy reader and infer the output schema (i.e., error for
+  // unsupported types before issuing the COPY query). This could be lazier
+  // (i.e., executed on the first call to GetSchema() or GetNext()).
+  PostgresType root_type;
+  RAISE_ADBC(helper.ResolveOutputTypes(*type_resolver_, &root_type, error));
+
+  // If there will be no columns in the result, we can also avoid COPY
+  if (root_type.n_children() == 0) {
+    // Could/should move the helper into the reader instead of repreparing
+    PqResultArrayReader reader(connection_->conn(), type_resolver_, query_);
+    RAISE_ADBC(reader.ToArrayStream(rows_affected, stream, error));
+    return ADBC_STATUS_OK;
   }
 
+  struct ArrowError na_error;
+  reader_.copy_reader_ = std::make_unique<PostgresCopyStreamReader>();
+  CHECK_NA(INTERNAL, reader_.copy_reader_->Init(root_type), error);
+  CHECK_NA_DETAIL(INTERNAL, 
reader_.copy_reader_->InferOutputSchema(&na_error), &na_error,
+                  error);
+
+  CHECK_NA_DETAIL(INTERNAL, reader_.copy_reader_->InitFieldReaders(&na_error), 
&na_error,
+                  error);
+
+  // Execute the COPY query
+  RAISE_ADBC(helper.ExecuteCopy(error));
+
+  // We need the PQresult back for the reader
+  reader_.result_ = helper.ReleaseResult();
+
+  // Export to stream
   reader_.ExportTo(stream);
   if (rows_affected) *rows_affected = -1;
   return ADBC_STATUS_OK;
@@ -1264,31 +1220,69 @@ AdbcStatusCode PostgresStatement::ExecuteSchema(struct 
ArrowSchema* schema,
   if (query_.empty()) {
     SetError(error, "%s", "[libpq] Must SetSqlQuery before ExecuteQuery");
     return ADBC_STATUS_INVALID_STATE;
-  } else if (bind_.release) {
-    // TODO: if we have parameters, bind them (since they can affect the 
output schema)
-    SetError(error, "[libpq] ExecuteSchema with parameters is not 
implemented");
-    return ADBC_STATUS_NOT_IMPLEMENTED;
   }
 
-  RAISE_ADBC(SetupReader(error));
-  CHECK_NA(INTERNAL, reader_.copy_reader_->GetSchema(schema), error);
+  PqResultHelper helper(connection_->conn(), query_);
+
+  if (bind_.release) {
+    nanoarrow::UniqueSchema schema;
+    struct ArrowError na_error;
+    ArrowErrorInit(&na_error);
+    CHECK_NA_DETAIL(INTERNAL, ArrowArrayStreamGetSchema(&bind_, schema.get(), 
&na_error),
+                    &na_error, error);
+
+    if (std::string(schema->format) != "+s") {
+      SetError(error, "%s", "[libpq] Bind parameters must have type STRUCT");
+      return ADBC_STATUS_INVALID_STATE;
+    }
+
+    std::vector<Oid> param_oids(schema->n_children);
+    for (int64_t i = 0; i < schema->n_children; i++) {
+      PostgresType pg_type;
+      CHECK_NA_DETAIL(INTERNAL,
+                      PostgresType::FromSchema(*type_resolver_, 
schema->children[i],
+                                               &pg_type, &na_error),
+                      &na_error, error);
+      param_oids[i] = pg_type.oid();
+    }
+
+    RAISE_ADBC(helper.Prepare(param_oids, error));
+  } else {
+    RAISE_ADBC(helper.Prepare(error));
+  }
+
+  RAISE_ADBC(helper.DescribePrepared(error));
+
+  PostgresType output_type;
+  RAISE_ADBC(helper.ResolveOutputTypes(*type_resolver_, &output_type, error));
+
+  nanoarrow::UniqueSchema tmp;
+  ArrowSchemaInit(tmp.get());
+  CHECK_NA(INTERNAL, output_type.SetSchema(tmp.get()), error);
+
+  tmp.move(schema);
   return ADBC_STATUS_OK;
 }
 
-AdbcStatusCode PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected,
-                                                    struct AdbcError* error) {
+AdbcStatusCode PostgresStatement::ExecuteIngest(struct ArrowArrayStream* 
stream,
+                                                int64_t* rows_affected,
+                                                struct AdbcError* error) {
   if (!bind_.release) {
     SetError(error, "%s", "[libpq] Must Bind() before Execute() for bulk 
ingestion");
     return ADBC_STATUS_INVALID_STATE;
   }
 
+  if (stream != nullptr) {
+    SetError(error, "%s", "[libpq] Bulk ingest with result set is not 
supported");
+    return ADBC_STATUS_NOT_IMPLEMENTED;
+  }
+
   // Need the current schema to avoid being shadowed by temp tables
   // This is a little unfortunate; we need another DB roundtrip
   std::string current_schema;
   {
-    PqResultHelper result_helper{connection_->conn(), "SELECT CURRENT_SCHEMA", 
{}, error};
-    RAISE_ADBC(result_helper.Prepare());
-    RAISE_ADBC(result_helper.Execute());
+    PqResultHelper result_helper{connection_->conn(), "SELECT CURRENT_SCHEMA"};
+    RAISE_ADBC(result_helper.Execute(error));
     auto it = result_helper.begin();
     if (it == result_helper.end()) {
       SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT 
CURRENT_SCHEMA'");
@@ -1329,37 +1323,6 @@ AdbcStatusCode 
PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected,
   return ADBC_STATUS_OK;
 }
 
-AdbcStatusCode PostgresStatement::ExecuteUpdateQuery(int64_t* rows_affected,
-                                                     struct AdbcError* error) {
-  // NOTE: must prepare first (used in ExecuteQuery)
-  PGresult* result =
-      PQexecPrepared(connection_->conn(), /*stmtName=*/"", /*nParams=*/0,
-                     /*paramValues=*/nullptr, /*paramLengths=*/nullptr,
-                     /*paramFormats=*/nullptr, 
/*resultFormat=*/kPgBinaryFormat);
-  ExecStatusType status = PQresultStatus(result);
-  if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) {
-    AdbcStatusCode code =
-        SetError(error, result, "[libpq] Failed to execute query: %s\nQuery 
was:%s",
-                 PQerrorMessage(connection_->conn()), query_.c_str());
-    PQclear(result);
-    return code;
-  }
-  if (rows_affected) {
-    if (status == PGRES_TUPLES_OK) {
-      *rows_affected = PQntuples(reader_.result_);
-    } else {
-      // In theory, PQcmdTuples would work here, but experimentally it gives
-      // an empty string even for a DELETE.  (Also, why does it return a
-      // string...)  Possibly, it doesn't work because we use PQexecPrepared
-      // but the docstring is careful to specify it works on an EXECUTE of a
-      // prepared statement.
-      *rows_affected = -1;
-    }
-  }
-  PQclear(result);
-  return ADBC_STATUS_OK;
-}
-
 AdbcStatusCode PostgresStatement::GetOption(const char* key, char* value, 
size_t* length,
                                             struct AdbcError* error) {
   std::string result;
@@ -1384,6 +1347,12 @@ AdbcStatusCode PostgresStatement::GetOption(const char* 
key, char* value, size_t
     }
   } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 
0) {
     result = std::to_string(reader_.batch_size_hint_bytes_);
+  } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_USE_COPY) == 0) {
+    if (use_copy_) {
+      result = "true";
+    } else {
+      result = "false";
+    }
   } else {
     SetError(error, "[libpq] Unknown statement option '%s'", key);
     return ADBC_STATUS_NOT_FOUND;
@@ -1503,6 +1472,15 @@ AdbcStatusCode PostgresStatement::SetOption(const char* 
key, const char* value,
     }
 
     this->reader_.batch_size_hint_bytes_ = int_value;
+  } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_USE_COPY) == 0) {
+    if (std::strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) {
+      use_copy_ = true;
+    } else if (std::strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) {
+      use_copy_ = false;
+    } else {
+      SetError(error, "[libpq] Invalid value '%s' for option '%s'", value, 
key);
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    }
   } else {
     SetError(error, "[libpq] Unknown statement option '%s'", key);
     return ADBC_STATUS_NOT_IMPLEMENTED;
@@ -1537,53 +1515,9 @@ AdbcStatusCode PostgresStatement::SetOptionInt(const 
char* key, int64_t value,
   return ADBC_STATUS_NOT_IMPLEMENTED;
 }
 
-AdbcStatusCode PostgresStatement::SetupReader(struct AdbcError* error) {
-  // TODO: we should pipeline here and assume this will succeed
-  PGresult* result = PQprepare(connection_->conn(), /*stmtName=*/"", 
query_.c_str(),
-                               /*nParams=*/0, nullptr);
-  if (PQresultStatus(result) != PGRES_COMMAND_OK) {
-    AdbcStatusCode code =
-        SetError(error, result,
-                 "[libpq] Failed to execute query: could not infer schema: 
failed to "
-                 "prepare query: %s\nQuery was:%s",
-                 PQerrorMessage(connection_->conn()), query_.c_str());
-    PQclear(result);
-    return code;
-  }
-  PQclear(result);
-  result = PQdescribePrepared(connection_->conn(), /*stmtName=*/"");
-  if (PQresultStatus(result) != PGRES_COMMAND_OK) {
-    AdbcStatusCode code =
-        SetError(error, result,
-                 "[libpq] Failed to execute query: could not infer schema: 
failed to "
-                 "describe prepared statement: %s\nQuery was:%s",
-                 PQerrorMessage(connection_->conn()), query_.c_str());
-    PQclear(result);
-    return code;
-  }
-
-  // Resolve the information from the PGresult into a PostgresType
-  PostgresType root_type;
-  AdbcStatusCode status = ResolvePostgresType(*type_resolver_, result, 
&root_type, error);
-  PQclear(result);
-  if (status != ADBC_STATUS_OK) return status;
-
-  // Initialize the copy reader and infer the output schema (i.e., error for
-  // unsupported types before issuing the COPY query)
-  reader_.copy_reader_ = std::make_unique<PostgresCopyStreamReader>();
-  reader_.copy_reader_->Init(root_type);
-  struct ArrowError na_error;
-  int na_res = reader_.copy_reader_->InferOutputSchema(&na_error);
-  if (na_res != NANOARROW_OK) {
-    SetError(error, "[libpq] Failed to infer output schema: (%d) %s: %s", 
na_res,
-             std::strerror(na_res), na_error.message);
-    return ADBC_STATUS_INTERNAL;
-  }
-  return ADBC_STATUS_OK;
-}
-
 void PostgresStatement::ClearResult() {
   // TODO: we may want to synchronize here for safety
   reader_.Release();
 }
+
 }  // namespace adbcpq
diff --git a/c/driver/postgresql/statement.h b/c/driver/postgresql/statement.h
index f2387a3ac..1cd60bff5 100644
--- a/c/driver/postgresql/statement.h
+++ b/c/driver/postgresql/statement.h
@@ -33,6 +33,8 @@
 #define ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES \
   "adbc.postgresql.batch_size_hint_bytes"
 
+#define ADBC_POSTGRESQL_OPTION_USE_COPY "adbc.postgresql.use_copy"
+
 namespace adbcpq {
 class PostgresConnection;
 class PostgresStatement;
@@ -90,7 +92,11 @@ class TupleReader final {
 class PostgresStatement {
  public:
   PostgresStatement()
-      : connection_(nullptr), query_(), prepared_(false), reader_(nullptr) {
+      : connection_(nullptr),
+        query_(),
+        prepared_(false),
+        use_copy_(true),
+        reader_(nullptr) {
     std::memset(&bind_, 0, sizeof(bind_));
   }
 
@@ -130,12 +136,10 @@ class PostgresStatement {
       const std::vector<struct ArrowSchemaView>& source_schema_fields,
       std::string* escaped_table, std::string* escaped_field_list,
       struct AdbcError* error);
-  AdbcStatusCode ExecuteUpdateBulk(int64_t* rows_affected, struct AdbcError* 
error);
-  AdbcStatusCode ExecuteUpdateQuery(int64_t* rows_affected, struct AdbcError* 
error);
-  AdbcStatusCode ExecutePreparedStatement(struct ArrowArrayStream* stream,
-                                          int64_t* rows_affected,
-                                          struct AdbcError* error);
-  AdbcStatusCode SetupReader(struct AdbcError* error);
+  AdbcStatusCode ExecuteIngest(struct ArrowArrayStream* stream, int64_t* 
rows_affected,
+                               struct AdbcError* error);
+  AdbcStatusCode ExecuteBind(struct ArrowArrayStream* stream, int64_t* 
rows_affected,
+                             struct AdbcError* error);
 
  private:
   std::shared_ptr<PostgresTypeResolver> type_resolver_;
@@ -154,6 +158,9 @@ class PostgresStatement {
     kCreateAppend,
   };
 
+  // Options
+  bool use_copy_;
+
   struct {
     std::string db_schema;
     std::string target;
diff --git a/c/validation/adbc_validation_util.cc 
b/c/validation/adbc_validation_util.cc
index b319e5495..54c18cce7 100644
--- a/c/validation/adbc_validation_util.cc
+++ b/c/validation/adbc_validation_util.cc
@@ -36,6 +36,20 @@ std::optional<std::string> ConnectionGetOption(struct 
AdbcConnection* connection
   return std::string(buffer, buffer_size - 1);
 }
 
+std::optional<std::string> StatementGetOption(struct AdbcStatement* statement,
+                                              std::string_view option,
+                                              struct AdbcError* error) {
+  char buffer[128];
+  size_t buffer_size = sizeof(buffer);
+  AdbcStatusCode status =
+      AdbcStatementGetOption(statement, option.data(), buffer, &buffer_size, 
error);
+  EXPECT_THAT(status, IsOkStatus(error));
+  if (status != ADBC_STATUS_OK) return std::nullopt;
+  EXPECT_GT(buffer_size, 0);
+  if (buffer_size == 0) return std::nullopt;
+  return std::string(buffer, buffer_size - 1);
+}
+
 std::string StatusCodeToString(AdbcStatusCode code) {
 #define CASE(CONSTANT)         \
   case ADBC_STATUS_##CONSTANT: \
diff --git a/c/validation/adbc_validation_util.h 
b/c/validation/adbc_validation_util.h
index 0dba6dafb..21eca52de 100644
--- a/c/validation/adbc_validation_util.h
+++ b/c/validation/adbc_validation_util.h
@@ -43,6 +43,10 @@ std::optional<std::string> ConnectionGetOption(struct 
AdbcConnection* connection
                                                std::string_view option,
                                                struct AdbcError* error);
 
+std::optional<std::string> StatementGetOption(struct AdbcStatement* statement,
+                                              std::string_view option,
+                                              struct AdbcError* error);
+
 // ------------------------------------------------------------
 // Helpers to print values
 
diff --git a/python/adbc_driver_postgresql/tests/test_dbapi.py 
b/python/adbc_driver_postgresql/tests/test_dbapi.py
index 94cd9f82d..9e2661d54 100644
--- a/python/adbc_driver_postgresql/tests/test_dbapi.py
+++ b/python/adbc_driver_postgresql/tests/test_dbapi.py
@@ -148,7 +148,7 @@ def test_query_execute_schema(postgres: dbapi.Connection) 
-> None:
 def test_query_invalid(postgres: dbapi.Connection) -> None:
     with postgres.cursor() as cur:
         with pytest.raises(
-            postgres.ProgrammingError, match="failed to prepare query"
+            postgres.ProgrammingError, match="Failed to prepare query"
         ) as excinfo:
             cur.execute("SELECT * FROM tabledoesnotexist")
 

Reply via email to