On 2020-Nov-02, Alvaro Herrera wrote:

> In v23 I've gone over docs; discovered that PQgetResults docs were
> missing the new values.  Added those.  No significant other changes yet.


>From 3f305d05000981445fe4dbbb96c2e88ace89f439 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Mon, 26 Oct 2020 13:24:24 -0300
Subject: [PATCH v23] libpq batch

---
 doc/src/sgml/libpq.sgml                       |  505 ++++++++
 doc/src/sgml/lobj.sgml                        |    4 +
 .../libpqwalreceiver/libpqwalreceiver.c       |    3 +
 src/interfaces/libpq/exports.txt              |    4 +
 src/interfaces/libpq/fe-connect.c             |   27 +
 src/interfaces/libpq/fe-exec.c                |  630 +++++++++-
 src/interfaces/libpq/fe-protocol2.c           |    6 +
 src/interfaces/libpq/fe-protocol3.c           |   16 +-
 src/interfaces/libpq/libpq-fe.h               |   22 +-
 src/interfaces/libpq/libpq-int.h              |   46 +-
 src/test/modules/Makefile                     |    1 +
 src/test/modules/test_libpq/.gitignore        |    5 +
 src/test/modules/test_libpq/Makefile          |   25 +
 src/test/modules/test_libpq/README            |    1 +
 .../modules/test_libpq/t/001_libpq_async.pl   |   27 +
 src/test/modules/test_libpq/testlibpqbatch.c  | 1013 +++++++++++++++++
 src/tools/msvc/Mkvcbuild.pm                   |    3 +-
 src/tools/pgindent/typedefs.list              |    3 +
 18 files changed, 2295 insertions(+), 46 deletions(-)
 create mode 100644 src/test/modules/test_libpq/.gitignore
 create mode 100644 src/test/modules/test_libpq/Makefile
 create mode 100644 src/test/modules/test_libpq/README
 create mode 100644 src/test/modules/test_libpq/t/001_libpq_async.pl
 create mode 100644 src/test/modules/test_libpq/testlibpqbatch.c

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 9ce32fb39b..839cadfaf0 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3061,6 +3061,30 @@ ExecStatusType PQresultStatus(const PGresult *res);
            </para>
           </listitem>
          </varlistentry>
+
+         <varlistentry id="libpq-pgres-batch-end">
+          <term><literal>PGRES_BATCH_END</literal></term>
+          <listitem>
+           <para>
+            The <structname>PGresult</structname> represents the end of a batch.
+            This status occurs only when batch mode has been selected.
+           </para>
+          </listitem>
+         </varlistentry>
+
+         <varlistentry id="libpq-pgres-batch-aborted">
+          <term><literal>PGRES_BATCH_ABORTED</literal></term>
+          <listitem>
+           <para>
+            The <structname>PGresult</structname> represents a batch that's
+            received an error from the server.  <function>PQgetResult</function>
+            must be called repeatedly, and it will return this status code,
+            until the end of the current batch, at which point it will return
+            <literal>PGRES_BATCH_END</literal> and normal processing can resume.
+           </para>
+          </listitem>
+         </varlistentry>
+
         </variablelist>
 
         If the result status is <literal>PGRES_TUPLES_OK</literal> or
@@ -4819,6 +4843,478 @@ int PQflush(PGconn *conn);
 
  </sect1>
 
+ <sect1 id="libpq-batch-mode">
+  <title>Batch Mode and Query Pipelining</title>
+
+  <indexterm zone="libpq-batch-mode">
+   <primary>libpq</primary>
+   <secondary>batch mode</secondary>
+  </indexterm>
+
+  <indexterm zone="libpq-batch-mode">
+   <primary>libpq</primary>
+   <secondary>pipelining</secondary>
+  </indexterm>
+
+  <para>
+   <application>libpq</application> supports queueing up queries into
+   a pipeline to be executed as a batch on the server. Batching queries
+   allows applications to avoid a client/server round-trip after each
+   query to get the results before issuing the next query.
+  </para>
+
+  <sect2>
+   <title>When to Use Batching</title>
+
+   <para>
+    Much like asynchronous query mode, there is no performance disadvantage to
+    using batching and pipelining. It increases client application complexity
+    and extra caution is required to prevent client/server deadlocks, but
+    pipelining can sometimes offer considerable performance improvements.
+   </para>
+
+   <para>
+    Batching is most useful when the server is distant, i.e., network latency
+    (<quote>ping time</quote>) is high, and also when many small operations
+    are being performed in rapid sequence.  There is usually less benefit
+    in using batches when each query takes many multiples of the client/server
+    round-trip time to execute.  A 100-statement operation run on a server
+    300ms round-trip-time away would take 30 seconds in network latency alone
+    without batching; with batching it may spend as little as 0.3s waiting for
+    results from the server.
+   </para>
+
+   <para>
+    Use batches when your application does lots of small
+    <literal>INSERT</literal>, <literal>UPDATE</literal> and
+    <literal>DELETE</literal> operations that can't easily be transformed
+    into operations on sets or into a <literal>COPY</literal> operation.
+   </para>
+
+   <para>
+    Batching is not useful when information from one operation is required by
+    the client to produce the next operation. In such cases, the client
+    must introduce a synchronization point and wait for a full client/server
+    round-trip to get the results it needs. However, it's often possible to
+    adjust the client design to exchange the required information server-side.
+    Read-modify-write cycles are especially good candidates; for example:
+    <programlisting>
+BEGIN;
+SELECT x FROM mytable WHERE id = 42 FOR UPDATE;
+-- result: x=2
+-- client adds 1 to x:
+UPDATE mytable SET x = 3 WHERE id = 42;
+COMMIT;
+    </programlisting>
+    could be much more efficiently done with:
+    <programlisting>
+UPDATE mytable SET x = x + 1 WHERE id = 42;
+    </programlisting>
+   </para>
+
+   <note>
+    <para>
+     The batch API was introduced in PostgreSQL 14.0, but clients using
+     the PostgreSQL 14 version of <application>libpq</application> can use
+     batches on server versions 7.4 and newer. Batching works on any server
+     that supports the v3 extended query protocol.
+    </para>
+   </note>
+
+  </sect2>
+
+  <sect2 id="libpq-batch-using">
+   <title>Using Batch Mode</title>
+
+   <para>
+    To issue batches the application must switch a connection into batch mode.
+    Enter batch mode with <xref linkend="libpq-PQenterBatchMode"/>
+    or test whether batch mode is active with
+    <xref linkend="libpq-PQbatchStatus"/>.
+    In batch mode only <link linkend="libpq-async">asynchronous operations</link>
+    are permitted, and <literal>COPY</literal> is not recommended as it
+    may trigger failure in batch processing.  Using any synchronous
+    command execution functions such as <function>PQfn</function>,
+    <function>PQexec</function> or one of its sibling functions are error
+    conditions.  Functions allowed in batch mode are described in
+    <xref linkend="libpq-batch-sending"/>.
+   </para>
+
+   <para>
+    The client uses libpq's asynchronous query functions to dispatch work,
+    marking the end of each batch with <function>PQbatchSendQueue</function>.
+    And to get results, it uses <function>PQgetResult</function>. It may
+    eventually exit batch mode with <function>PQexitBatchMode</function>
+    once all results are processed.
+   </para>
+
+   <note>
+    <para>
+     It is best to use batch mode with <application>libpq</application> in
+     <link linkend="libpq-PQsetnonblocking">non-blocking mode</link>. If used
+     in blocking mode it is possible for a client/server deadlock to occur.
+     The client will block trying to send queries to the server, but the server
+     will block trying to send results from queries it has already processed to
+     the client. This only occurs when the client sends enough queries to fill
+     its output buffer and the server's receive buffer before switching to
+     processing input from the server, but it's hard to predict exactly when
+     that'll happen so it's best to always use non-blocking mode.
+     Batch mode consumes more memory when send/recv is not done as required,
+     even in non-blocking mode.
+    </para>
+   </note>
+
+   <sect3 id="libpq-batch-sending">
+    <title>Issuing Queries</title>
+
+    <para>
+     After entering batch mode the application dispatches requests using
+     normal asynchronous <application>libpq</application> functions such as
+     <function>PQsendQueryParams</function>, <function>PQsendPrepare</function>,
+     <function>PQsendQueryPrepared</function>, <function>PQsendDescribePortal</function>,
+     <function>PQsendDescribePrepared</function>.
+     The asynchronous requests are followed by a
+     <xref linkend="libpq-PQbatchSendQueue"/>
+     call to mark the end of the batch. The client needs not
+     call <function>PQgetResult</function> immediately after
+     dispatching each operation.
+     <link linkend="libpq-batch-results">Result processing</link>
+     is handled separately.
+    </para>
+
+    <para>
+     Batched operations will be executed by the server in the order the client
+     sends them. The server will send the results in the order the statements
+     executed. The server may begin executing the batch before all commands
+     in the batch are queued and the end of batch command is sent. If any
+     statement encounters an error the server aborts the current transaction and
+     skips processing the rest of the batch. Query processing resumes after the
+     end of the failed batch.
+    </para>
+
+    <para>
+     It's fine for one operation to depend on the results of a
+     prior one. One query may define a table that the next query in the same
+     batch uses; similarly, an application may create a named prepared statement
+     then execute it with later statements in the same batch.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-results">
+    <title>Processing Results</title>
+
+    <para>
+     The client <link linkend="libpq-batch-interleave">interleaves result
+     processing</link> with sending batch queries, or for small batches may
+     process all results after sending the whole batch.
+    </para>
+
+    <para>
+     To process the result of the first batch entry the application calls
+     <function>PQgetResult</function> and handles each result until
+     <function>PQgetResult</function> returns null.
+     The result from the next batch entry may then be retrieved using
+     <function>PQgetResult</function> again and the cycle repeated.
+     The application handles individual statement results as normal.
+    </para>
+
+    <para>
+     To enter single-row mode, call <function>PQsetSingleRowMode</function>
+     before retrieving results with <function>PQgetResult</function>.
+     This mode selection is effective only for the query currently
+     being processed. For more information on the use of
+     <function>PQsetSingleRowMode</function>,
+     refer to <xref linkend="libpq-single-row-mode"/>.
+    </para>
+
+    <para>
+     <function>PQgetResult</function> behaves the same as for normal
+     asynchronous processing except that it may contain the new
+     <type>PGresult</type> types <literal>PGRES_BATCH_END</literal>
+     and <literal>PGRES_BATCH_ABORTED</literal>.
+     <literal>PGRES_BATCH_END</literal> is reported exactly once for each
+     <function>PQbatchSendQueue</function> call at the corresponding point in
+     the result stream and at no other time.
+     <literal>PGRES_BATCH_ABORTED</literal> is emitted during error handling;
+     see <link linkend="libpq-batch-errors">error handling</link>.
+    </para>
+
+    <para>
+     <function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
+     operate as normal when processing batch results.
+    </para>
+
+    <para>
+     <application>libpq</application> does not provide any information to the
+     application about the query currently being processed (except that
+     <function>PQgetResult</function> returns null to indicate that we start
+     returning the results of next query). The application must keep track
+     of the order in which it sent queries and the expected results.
+     Applications will typically use a state machine or a FIFO queue for this.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-errors">
+    <title>Error Handling</title>
+
+    <para>
+     When a query in a batch causes an <literal>ERROR</literal> the server
+     skips processing all subsequent messages until the end-of-batch message.
+     The open transaction is aborted.
+    </para>
+
+    <para>
+     From the client perspective, after the client gets a
+     <literal>PGRES_FATAL_ERROR</literal> return from
+     <function>PQresultStatus</function> the batch is flagged as aborted.
+     <application>libpq</application> will report
+     <literal>PGRES_BATCH_ABORTED</literal> result for each remaining queued
+     operation in an aborted batch. The result for
+     <function>PQbatchSendQueue</function> is reported as
+     <literal>PGRES_BATCH_END</literal> to signal the end of the aborted batch
+     and resumption of normal result processing.
+    </para>
+
+    <para>
+     The client <emphasis>must</emphasis> process results with
+     <function>PQgetResult</function> during error recovery.
+    </para>
+
+    <para>
+     If the batch used an implicit transaction then operations that have
+     already executed are rolled back and operations that were queued for after
+     the failed operation are skipped entirely. The same behaviour holds if the
+     batch starts and commits a single explicit transaction (i.e. the first
+     statement is <literal>BEGIN</literal> and the last is
+     <literal>COMMIT</literal>) except that the session remains in an aborted
+     transaction state at the end of the batch. If a batch contains <emphasis>
+     multiple explicit transactions</emphasis>, all transactions that committed
+     prior to the error remain committed, the currently in-progress transaction
+     is aborted and all subsequent operations in the current and all later
+     transactions in the same batch are skipped completely.
+    </para>
+
+    <note>
+     <para>
+      The client must not assume that work is committed when it
+      <emphasis>sends</emphasis> a <literal>COMMIT</literal>, only when the
+      corresponding result is received to confirm the commit is complete.
+      Because errors arrive asynchronously the application needs to be able to
+      restart from the last <emphasis>received</emphasis> committed change and
+      resend work done after that point if something goes wrong.
+     </para>
+    </note>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-interleave">
+    <title>Interleaving Result Processing and Query Dispatch</title>
+
+    <para>
+     To avoid deadlocks on large batches the client should be structured
+     around a non-blocking event loop using operating system facilities
+     such as <function>select</function>, <function>poll</function>,
+     <function>WaitForMultipleObjectEx</function>, etc.
+    </para>
+
+    <para>
+     The client application should generally maintain a queue of work
+     still to be dispatched and a queue of work that has been dispatched
+     but not yet had its results processed. When the socket is writable
+     it should dispatch more work. When the socket is readable it should
+     read results and process them, matching them up to the next entry in
+     its expected results queue.  Based on available memory, results from
+     socket should be read frequently and there's no need to wait till the
+     batch end to read the results.  Batches should be scoped to logical
+     units of work, usually (but not necessarily) one transaction per batch.
+     There's no need to exit batch mode and re-enter it between batches
+     or to wait for one batch to finish before sending the next.
+    </para>
+
+    <para>
+     An example using <function>select()</function> and a simple state
+     machine to track sent and received work is in
+     <filename>src/test/modules/test_libpq/testlibpqbatch.c</filename>
+     in the PostgreSQL source distribution.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-end">
+    <title>Ending Batch Mode</title>
+
+    <para>
+     Once all dispatched commands have had their results processed and
+     the end batch result has been consumed the application may return
+     to non-batched mode with <xref linkend="libpq-PQexitBatchMode"/>.
+    </para>
+   </sect3>
+
+  </sect2>
+
+  <sect2 id="libpq-funcs-batch">
+   <title>Functions Associated with Batch Mode</title>
+
+   <variablelist>
+
+    <varlistentry id="libpq-PQbatchStatus">
+     <term>
+      <function>PQbatchStatus</function>
+      <indexterm>
+       <primary>PQbatchStatus</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Returns current batch mode status of the <application>libpq</application>
+      connection.
+<synopsis>
+int PQbatchStatus(PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       <function>PQbatchStatus</function> can return one of the following values:
+
+       <variablelist>
+        <varlistentry>
+         <term>
+          <literal>PQBATCH_MODE_ON</literal>
+         </term>
+         <listitem>
+          <para>
+           The <application>libpq</application> connection is in
+           batch mode.
+          </para>
+         </listitem>
+        </varlistentry>
+ 
+        <varlistentry>
+         <term>
+          <literal>PQBATCH_MODE_OFF</literal>
+         </term>
+         <listitem>
+          <para>
+           The <application>libpq</application> connection is
+           <emphasis>not</emphasis> in batch mode.
+          </para>
+         </listitem>
+        </varlistentry>
+ 
+        <varlistentry>
+         <term>
+          <literal>PQBATCH_MODE_ABORTED</literal>
+         </term>
+         <listitem>
+          <para>
+           The <application>libpq</application> connection is in aborted
+           batch status. The aborted flag is cleared as soon as the result
+           of the <function>PQbatchSendQueue</function> at the end of the aborted
+           batch is processed. Clients don't usually need this function to
+           verify aborted status, as they can tell that the batch is aborted
+           from the <literal>PGRES_BATCH_ABORTED</literal> result code.
+          </para>
+         </listitem>
+        </varlistentry>
+ 
+       </variablelist>
+
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQenterBatchMode">
+     <term>
+      <function>PQenterBatchMode</function>
+      <indexterm>
+       <primary>PQenterBatchMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Causes a connection to enter batch mode if it is currently idle or
+      already in batch mode.
+
+<synopsis>
+int PQenterBatchMode(PGconn *conn);
+</synopsis>
+
+      </para>
+      <para>
+       Returns 1 for success.
+       Returns 0 and has no effect if the connection is not currently
+       idle, i.e., it has a result ready, or it is waiting for more
+       input from the server, etc.
+       This function does not actually send anything to the server,
+       it just changes the <application>libpq</application> connection
+       state.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQexitBatchMode">
+     <term>
+      <function>PQexitBatchMode</function>
+      <indexterm>
+       <primary>PQexitBatchMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       Causes a connection to exit batch mode if it is currently in batch mode
+       with an empty queue and no pending results.
+<synopsis>
+int PQexitBatchMode(PGconn *conn);
+</synopsis>
+      </para>
+      <para>
+       Returns 1 for success.  Returns 1 and takes no action if not in
+       batch mode. If the current statement isn't finished processing 
+       or there are results pending for collection with
+       <function>PQgetResult</function>, returns 0 and does nothing.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQbatchSendQueue">
+     <term>
+      <function>PQbatchSendQueue</function>
+      <indexterm>
+       <primary>PQbatchSendQueue</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       Delimits the end of a set of a batched commands by sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>
+       and flushing the send buffer. The end of a batch serves as
+       the delimiter of an implicit transaction and an error recovery
+       point; see <link linkend="libpq-batch-errors"> error handling</link>.
+
+<synopsis>
+int PQbatchSendQueue(PGconn *conn);
+</synopsis>
+      </para>
+      <para>
+       Returns 1 for success. Returns 0 if the connection is not in
+       batch mode or sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>
+       is failed.
+      </para>
+     </listitem>
+    </varlistentry>
+
+   </variablelist>
+
+  </sect2>
+
+ </sect1>
+
  <sect1 id="libpq-single-row-mode">
   <title>Retrieving Query Results Row-by-Row</title>
 
@@ -4859,6 +5355,15 @@ int PQflush(PGconn *conn);
    Each object should be freed with <xref linkend="libpq-PQclear"/> as usual.
   </para>
 
+  <note>
+    <para>
+     When using batch mode, activate the single row mode on the current batch query by
+     calling <function>PQsetSingleRowMode</function>
+     before retrieving results with <function>PQgetResult</function>.
+     See <xref linkend="libpq-batch-mode"/> for more information.
+    </para>
+   </note>
+
   <para>
    <variablelist>
     <varlistentry id="libpq-PQsetSingleRowMode">
diff --git a/doc/src/sgml/lobj.sgml b/doc/src/sgml/lobj.sgml
index 6329cf0796..49be8b1dbe 100644
--- a/doc/src/sgml/lobj.sgml
+++ b/doc/src/sgml/lobj.sgml
@@ -130,6 +130,10 @@
     <application>libpq</application> library.
    </para>
 
+   <para>
+    Client applications cannot use these functions while libpq connection is in batch mode.
+   </para>
+
    <sect2 id="lo-create">
     <title>Creating a Large Object</title>
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 24f8b3e42e..9ae67387a5 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -1026,6 +1026,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 			walres->status = WALRCV_ERROR;
 			walres->err = pchomp(PQerrorMessage(conn->streamConn));
 			break;
+		default:
+		/* This is just to keep compiler quiet */
+			break;
 	}
 
 	PQclear(pgres);
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index bbc1f90481..ca86f55652 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -179,3 +179,7 @@ PQgetgssctx               176
 PQsetSSLKeyPassHook_OpenSSL         177
 PQgetSSLKeyPassHook_OpenSSL         178
 PQdefaultSSLKeyPassHook_OpenSSL     179
+PQenterBatchMode          180
+PQexitBatchMode           181
+PQbatchSendQueue          182
+PQbatchStatus             183
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index e7781d010f..91f6b205e4 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -536,6 +536,24 @@ pqDropConnection(PGconn *conn, bool flushInput)
 	}
 }
 
+/*
+ * PQfreeCommandQueue
+ * Free all the entries of PGcommandQueueEntry queue passed.
+ */
+static void
+PQfreeCommandQueue(PGcommandQueueEntry *queue)
+{
+
+	while (queue != NULL)
+	{
+		PGcommandQueueEntry *prev = queue;
+
+		queue = queue->next;
+		if (prev->query)
+			free(prev->query);
+		free(prev);
+	}
+}
 
 /*
  *		pqDropServerData
@@ -555,6 +573,7 @@ pqDropServerData(PGconn *conn)
 {
 	PGnotify   *notify;
 	pgParameterStatus *pstatus;
+	PGcommandQueueEntry *queue;
 
 	/* Forget pending notifies */
 	notify = conn->notifyHead;
@@ -567,6 +586,14 @@ pqDropServerData(PGconn *conn)
 	}
 	conn->notifyHead = conn->notifyTail = NULL;
 
+	queue = conn->cmd_queue_head;
+	PQfreeCommandQueue(queue);
+	conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
+
+	queue = conn->cmd_queue_recycle;
+	PQfreeCommandQueue(queue);
+	conn->cmd_queue_recycle = NULL;
+
 	/* Reset ParameterStatus data, as well as variables deduced from it */
 	pstatus = conn->pstatus;
 	while (pstatus != NULL)
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index eea0237c3a..cf5d639c2f 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -39,7 +39,9 @@ char	   *const pgresStatus[] = {
 	"PGRES_NONFATAL_ERROR",
 	"PGRES_FATAL_ERROR",
 	"PGRES_COPY_BOTH",
-	"PGRES_SINGLE_TUPLE"
+	"PGRES_SINGLE_TUPLE",
+	"PGRES_BATCH_END",
+	"PGRES_BATCH_ABORTED"
 };
 
 /*
@@ -70,7 +72,10 @@ static PGresult *PQexecFinish(PGconn *conn);
 static int	PQsendDescribe(PGconn *conn, char desc_type,
 						   const char *desc_target);
 static int	check_field_number(const PGresult *res, int field_num);
-
+static PGcommandQueueEntry *PQmakePipelinedCommand(PGconn *conn);
+static void PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry);
+static void PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry);
+static int	pqBatchFlush(PGconn *conn);
 
 /* ----------------
  * Space management for PGresult.
@@ -1210,7 +1215,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 		conn->next_result = conn->result;
 		conn->result = res;
 		/* And mark the result ready to return */
-		conn->asyncStatus = PGASYNC_READY;
+		conn->asyncStatus = PGASYNC_READY_MORE;
 	}
 
 	return 1;
@@ -1233,6 +1238,13 @@ fail:
 int
 PQsendQuery(PGconn *conn, const char *query)
 {
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("cannot PQsendQuery in batch mode, use PQsendQueryParams\n"));
+		return false;
+	}
+
 	if (!PQsendQueryStart(conn))
 		return 0;
 
@@ -1331,6 +1343,10 @@ PQsendPrepare(PGconn *conn,
 			  const char *stmtName, const char *query,
 			  int nParams, const Oid *paramTypes)
 {
+	PGcommandQueueEntry *pipeCmd = NULL;
+	char	  **last_query;
+	PGQueryClass *queryclass;
+
 	if (!PQsendQueryStart(conn))
 		return 0;
 
@@ -1389,31 +1405,51 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+	{
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+
+		last_query = &conn->last_query;
+		queryclass = &conn->queryclass;
+	}
+	else
+	{
+		pipeCmd = PQmakePipelinedCommand(conn);
+
+		if (pipeCmd == NULL)
+			return 0;			/* error msg already set */
+
+		last_query = &pipeCmd->query;
+		queryclass = &pipeCmd->queryclass;
+	}
 
 	/* remember we are doing just a Parse */
-	conn->queryclass = PGQUERY_PREPARE;
+	*queryclass = PGQUERY_PREPARE;
 
 	/* and remember the query text too, if possible */
 	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	if (*last_query)
+		free(*last_query);
+	*last_query = strdup(query);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqBatchFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		PQappendPipelinedCommand(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	PQrecyclePipelinedCommand(conn, pipeCmd);
 	/* error message should be set up already */
 	return 0;
 }
@@ -1461,7 +1497,81 @@ PQsendQueryPrepared(PGconn *conn,
 }
 
 /*
- * Common startup code for PQsendQuery and sibling routines
+ * PQmakePipelinedCommand
+ *	Get a new command queue entry, allocating it if required. Doesn't add it to
+ *	the tail of the queue yet, use PQappendPipelinedCommand once the command has
+ *	been written for that. If a command fails once it's called this, it should
+ *	use PQrecyclePipelinedCommand to put it on the freelist or release it.
+ *
+ * If allocation fails sets the error message and returns null.
+ */
+static PGcommandQueueEntry *
+PQmakePipelinedCommand(PGconn *conn)
+{
+	PGcommandQueueEntry *entry;
+
+	if (conn->cmd_queue_recycle == NULL)
+	{
+		entry = (PGcommandQueueEntry *) malloc(sizeof(PGcommandQueueEntry));
+		if (entry == NULL)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("out of memory\n"));
+			return NULL;
+		}
+	}
+	else
+	{
+		entry = conn->cmd_queue_recycle;
+		conn->cmd_queue_recycle = entry->next;
+	}
+	entry->next = NULL;
+	entry->query = NULL;
+
+	return entry;
+}
+
+/*
+ * PQappendPipelinedCommand
+ *	Append a precreated command queue entry to the queue after it's been
+ *	sent successfully.
+ */
+static void
+PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry)
+{
+	if (conn->cmd_queue_head == NULL)
+		conn->cmd_queue_head = entry;
+	else
+		conn->cmd_queue_tail->next = entry;
+	conn->cmd_queue_tail = entry;
+}
+
+/*
+ * PQrecyclePipelinedCommand
+ *	Push a command queue entry onto the freelist. It must be an entry
+ *	with null next pointer and not referenced by any other entry's next pointer.
+ */
+static void
+PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry)
+{
+	if (entry == NULL)
+		return;
+	if (entry->next != NULL)
+	{
+		fprintf(stderr,
+				libpq_gettext("tried to recycle non-dangling command queue entry"));
+		abort();
+	}
+	if (entry->query)
+		free(entry->query);
+
+	entry->next = conn->cmd_queue_recycle;
+	conn->cmd_queue_recycle = entry;
+}
+
+/*
+ * PQsendQueryStart
+ *	Common startup code for PQsendQuery and sibling routines
  */
 static bool
 PQsendQueryStart(PGconn *conn)
@@ -1479,20 +1589,61 @@ PQsendQueryStart(PGconn *conn)
 						  libpq_gettext("no connection to the server\n"));
 		return false;
 	}
-	/* Can't send while already busy, either. */
-	if (conn->asyncStatus != PGASYNC_IDLE)
+
+	/* Can't send while already busy, either, unless enqueuing for later */
+	if (conn->asyncStatus != PGASYNC_IDLE && conn->batch_status == PQBATCH_MODE_OFF)
 	{
 		printfPQExpBuffer(&conn->errorMessage,
 						  libpq_gettext("another command is already in progress\n"));
 		return false;
 	}
 
-	/* initialize async result-accumulation state */
-	pqClearAsyncResult(conn);
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		/*
+		 * When enqueuing a message we don't change much of the connection
+		 * state since it's already in use for the current command. The
+		 * connection state will get updated when PQbatchQueueProcess(...)
+		 * advances to start processing the queued message.
+		 *
+		 * Just make sure we can safely enqueue given the current connection
+		 * state. We can enqueue behind another queue item, or behind a
+		 * non-queue command (one that sends its own sync), but we can't
+		 * enqueue if the connection is in a copy state.
+		 */
+		switch (conn->asyncStatus)
+		{
+			case PGASYNC_QUEUED:
+			case PGASYNC_READY:
+			case PGASYNC_READY_MORE:
+			case PGASYNC_BUSY:
+				/* ok to queue */
+				break;
+			case PGASYNC_COPY_IN:
+			case PGASYNC_COPY_OUT:
+			case PGASYNC_COPY_BOTH:
+				printfPQExpBuffer(&conn->errorMessage,
+								  libpq_gettext("cannot queue commands during COPY\n"));
+				return false;
+				break;
+			case PGASYNC_IDLE:
+				printfPQExpBuffer(&conn->errorMessage,
+								  libpq_gettext_noop("internal error, idle state in batch mode"));
+				break;
+		}
+	}
+	else
+	{
+		/*
+		 * This command's results will come in immediately. Initialize async
+		 * result-accumulation state
+		 */
+		pqClearAsyncResult(conn);
 
-	/* reset single-row processing mode */
-	conn->singleRowMode = false;
+		/* reset single-row processing mode */
+		conn->singleRowMode = false;
 
+	}
 	/* ready to send command message */
 	return true;
 }
@@ -1516,6 +1667,10 @@ PQsendQueryGuts(PGconn *conn,
 				int resultFormat)
 {
 	int			i;
+	PGcommandQueueEntry *pipeCmd = NULL;
+	char	  **last_query;
+	PGQueryClass *queryclass;
+
 
 	/* This isn't gonna work on a 2.0 server */
 	if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
@@ -1525,6 +1680,23 @@ PQsendQueryGuts(PGconn *conn,
 		return 0;
 	}
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		pipeCmd = PQmakePipelinedCommand(conn);
+
+		if (pipeCmd == NULL)
+			return 0;			/* error msg already set */
+
+		last_query = &pipeCmd->query;
+		queryclass = &pipeCmd->queryclass;
+	}
+	else
+	{
+		last_query = &conn->last_query;
+		queryclass = &conn->queryclass;
+	}
+
+
 	/*
 	 * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
 	 * using specified statement name and the unnamed portal.
@@ -1637,35 +1809,42 @@ PQsendQueryGuts(PGconn *conn,
 		pqPutMsgEnd(conn) < 0)
 		goto sendFailed;
 
-	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+	{
+		/* construct the Sync message */
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are using extended query protocol */
-	conn->queryclass = PGQUERY_EXTENDED;
+	*queryclass = PGQUERY_EXTENDED;
 
 	/* and remember the query text too, if possible */
 	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
+	if (*last_query)
+		free(*last_query);
 	if (command)
-		conn->last_query = strdup(command);
+		*last_query = strdup(command);
 	else
-		conn->last_query = NULL;
+		*last_query = NULL;
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqBatchFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		PQappendPipelinedCommand(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	PQrecyclePipelinedCommand(conn, pipeCmd);
 	/* error message should be set up already */
 	return 0;
 }
@@ -1766,14 +1945,307 @@ PQisBusy(PGconn *conn)
 	return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed;
 }
 
+/*
+ * PQbatchStatus
+ * 	Returns current batch mode status
+ */
+int
+PQbatchStatus(PGconn *conn)
+{
+	if (!conn)
+		return false;
+
+	return conn->batch_status;
+}
+
+/*
+ * PQenterBatchMode
+ * 	Put an idle connection in batch mode. Commands submitted after this
+ * 	can be pipelined on the connection, there's no requirement to wait for
+ * 	one to finish before the next is dispatched.
+ *
+ * 	Queuing of a new query or syncing during COPY is not allowed.
+ *
+ * 	A set of commands is terminated by a PQbatchQueueSync. Multiple sets of batched
+ * 	commands may be sent while in batch mode. Batch mode can be exited by
+ * 	calling PQbatchEnd() once all results are processed.
+ *
+ * 	This doesn't actually send anything on the wire, it just puts libpq
+ * 	into a state where it can pipeline work.
+ */
+int
+PQenterBatchMode(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		return 1;
+
+	if (conn->asyncStatus != PGASYNC_IDLE)
+		return 0;
+
+	conn->batch_status = PQBATCH_MODE_ON;
+	conn->asyncStatus = PGASYNC_QUEUED;
+
+	return 1;
+}
+
+/*
+ * PQexitBatchMode
+ * 	End batch mode and return to normal command mode.
+ *
+ * 	Has no effect unless the client has processed all results
+ * 	from all outstanding batches and the connection is idle.
+ *
+ * 	Returns 1 if batch mode ended.
+ */
+int
+PQexitBatchMode(PGconn *conn)
+{
+	if (!conn)
+		goto exitFailed;
+
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+		return 1;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+			/* can't end batch while busy */
+			goto exitFailed;
+		default:
+			break;
+	}
+
+	/* still work to process */
+	if (conn->cmd_queue_head != NULL)
+		goto exitFailed;
+
+	conn->batch_status = PQBATCH_MODE_OFF;
+	conn->asyncStatus = PGASYNC_IDLE;
+
+	/* Flush any pending data in out buffer */
+	if (pqFlush(conn) < 0)
+		goto sendFailed;
+	return 1;
+
+sendFailed:
+	/* error message should be set up already */
+
+exitFailed:
+	printfPQExpBuffer(&conn->errorMessage,
+					  libpq_gettext_noop("internal error, Failed to exit batch mode"));
+	return 0;
+}
+
+
+/*
+ * internal function pqBatchProcessQueue
+ *
+ * In batch mode, start processing the next query in the queue.
+ *
+ * Returns 1 if the next query was popped from the queue and can
+ * be processed by PQconsumeInput, PQgetResult, etc.
+ *
+ * Returns 0 if the current query isn't done yet, the connection
+ * is not in a batch, or there are no more queries to process.
+ */
+static int
+pqBatchProcessQueue(PGconn *conn)
+{
+	PGcommandQueueEntry *next_query;
+
+	if (!conn)
+		return 0;
+
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+		return 0;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext_noop("internal error, COPY in batch mode"));
+			break;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+			/* client still has to process current query or results */
+			return 0;
+			break;
+		case PGASYNC_IDLE:
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext_noop("internal error, IDLE in batch mode"));
+			break;
+		case PGASYNC_QUEUED:
+			/* next query please */
+			break;
+	}
+
+	if (conn->cmd_queue_head == NULL)
+	{
+		/*
+		 * In batch mode but nothing left on the queue; caller can submit more
+		 * work or PQbatchEnd() now.
+		 */
+		return 0;
+	}
+
+	/*
+	 * Pop the next query from the queue and set up the connection state as if
+	 * it'd just been dispatched from a non-batched call
+	 */
+	next_query = conn->cmd_queue_head;
+	conn->cmd_queue_head = next_query->next;
+	next_query->next = NULL;
+
+	/* Initialize async result-accumulation state */
+	pqClearAsyncResult(conn);
+
+	/* reset single-row processing mode */
+	conn->singleRowMode = false;
+
+
+	conn->last_query = next_query->query;
+	next_query->query = NULL;
+	conn->queryclass = next_query->queryclass;
+
+	PQrecyclePipelinedCommand(conn, next_query);
+
+	if (conn->batch_status == PQBATCH_MODE_ABORTED &&
+		conn->queryclass != PGQUERY_SYNC)
+	{
+		/*
+		 * In an aborted batch we don't get anything from the server for each
+		 * result; we're just discarding input until we get to the next sync
+		 * from the server. The client needs to know its queries got aborted
+		 * so we create a fake PGresult to return immediately from
+		 * PQgetResult.
+		 */
+		conn->result =
+			PQmakeEmptyPGresult(conn, PGRES_BATCH_ABORTED);
+		if (!conn->result)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("out of memory"));
+			pqSaveErrorResult(conn);
+			return 0;
+		}
+		conn->asyncStatus = PGASYNC_READY;
+	}
+	else
+	{
+		/* allow parsing to continue */
+		conn->asyncStatus = PGASYNC_BUSY;
+	}
+
+	return 1;
+}
+
+/*
+ * PQbatchSendQueue
+ *
+ * End a batch submission.
+ *
+ * It's legal to start submitting another batch immediately, without
+ * waiting for the results of the current batch. There's no need to end batch
+ * mode and start it again.
+ *
+ * If a command in a batch fails, every subsequent command up to and
+ * including the PQbatchQueueSync command result gets set to PGRES_BATCH_ABORTED
+ * state. If the whole batch is processed without error, a PGresult with
+ * PGRES_BATCH_END is produced.
+ *
+ * Queries can already have been sent before PQbatchSendQueue is called, but
+ * PQbatchSendQueue need to be called before retrieving command results.
+ *
+ * The connection will remain in batch mode and unavailable for new synchronous
+ * command execution functions until all results from the batch are processed
+ * by the client.
+ */
+int
+PQbatchSendQueue(PGconn *conn)
+{
+	PGcommandQueueEntry *entry;
+
+	if (!conn)
+		return 0;
+
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+		return 0;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_IDLE:
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext_noop("internal error, IDLE in batch mode"));
+			return 0;
+			break;
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext_noop("internal error, COPY in batch mode"));
+			return 0;
+			break;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+		case PGASYNC_QUEUED:
+			/* can send sync to end this batch of cmds */
+			break;
+	}
+
+	entry = PQmakePipelinedCommand(conn);
+	if (entry == NULL)
+		return 0;				/* error msg already set */
+
+	entry->queryclass = PGQUERY_SYNC;
+	entry->query = NULL;
+
+	/* construct the Sync message */
+	if (pqPutMsgStart('S', false, conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+		goto sendFailed;
+
+	PQappendPipelinedCommand(conn, entry);
+
+	/*
+	 * Give the data a push.  In nonblock mode, don't complain if we're unable
+	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 */
+	if (PQflush(conn) < 0)
+		goto sendFailed;
+
+	/*
+	 * Call pqBatchProcessQueue so the user can call start calling getResult.
+	 */
+	pqBatchProcessQueue(conn);
+
+	return 1;
+
+sendFailed:
+	PQrecyclePipelinedCommand(conn, entry);
+	/* error message should be set up already */
+	return 0;
+}
 
 /*
  * PQgetResult
  *	  Get the next PGresult produced by a query.  Returns NULL if no
  *	  query work remains or an error has occurred (e.g. out of
  *	  memory).
+ *
+ *	  In batch mode, once all the result of a query have been returned,
+ *	  PQgetResult returns NULL to let the user know that the next batched
+ *	  query is being processed.  At the end of the batch, returns a
+ *	  end-of-batch result with PQresultStatus(result) == PGRES_BATCH_END.
  */
-
 PGresult *
 PQgetResult(PGconn *conn)
 {
@@ -1842,9 +2314,38 @@ PQgetResult(PGconn *conn)
 	switch (conn->asyncStatus)
 	{
 		case PGASYNC_IDLE:
+		case PGASYNC_QUEUED:
 			res = NULL;			/* query is complete */
+			if (conn->batch_status != PQBATCH_MODE_OFF)
+			{
+				/*
+				 * In batch mode, we prepare the processing of the results of
+				 * the next query.
+				 */
+				pqBatchProcessQueue(conn);
+			}
 			break;
 		case PGASYNC_READY:
+			res = pqPrepareAsyncResult(conn);
+			if (conn->batch_status != PQBATCH_MODE_OFF)
+			{
+				/*
+				 * In batch mode, query execution state cannot be IDLE as
+				 * there can be other queries or results waiting in the queue
+				 *
+				 * The connection isn't idle since we can't submit new
+				 * nonbatched commands. It isn't also busy since the current
+				 * command is done and we need to process a new one.
+				 */
+				conn->asyncStatus = PGASYNC_QUEUED;
+			}
+			else
+			{
+				/* Set the state back to BUSY, allowing parsing to proceed. */
+				conn->asyncStatus = PGASYNC_BUSY;
+			}
+			break;
+		case PGASYNC_READY_MORE:
 			res = pqPrepareAsyncResult(conn);
 			/* Set the state back to BUSY, allowing parsing to proceed. */
 			conn->asyncStatus = PGASYNC_BUSY;
@@ -2025,6 +2526,13 @@ PQexecStart(PGconn *conn)
 	if (!conn)
 		return false;
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("Synchronous command execution functions are not allowed in batch mode\n"));
+		return false;
+	}
+
 	/*
 	 * Silently discard any prior query result that application didn't eat.
 	 * This is probably poor design, but it's here for backward compatibility.
@@ -2219,6 +2727,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
 static int
 PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 {
+	PGcommandQueueEntry *pipeCmd = NULL;
+	PGQueryClass *queryclass;
+
 	/* Treat null desc_target as empty string */
 	if (!desc_target)
 		desc_target = "";
@@ -2234,6 +2745,20 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		return 0;
 	}
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		pipeCmd = PQmakePipelinedCommand(conn);
+
+		if (pipeCmd == NULL)
+			return 0;			/* error msg already set */
+
+		queryclass = &pipeCmd->queryclass;
+	}
+	else
+	{
+		queryclass = &conn->queryclass;
+	}
+
 	/* construct the Describe message */
 	if (pqPutMsgStart('D', false, conn) < 0 ||
 		pqPutc(desc_type, conn) < 0 ||
@@ -2242,15 +2767,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+	{
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are doing a Describe */
-	conn->queryclass = PGQUERY_DESCRIBE;
+	*queryclass = PGQUERY_DESCRIBE;
 
-	/* reset last_query string (not relevant now) */
-	if (conn->last_query)
+	/* reset last-query string (not relevant now) */
+	if (conn->last_query && conn->batch_status != PQBATCH_MODE_OFF)
 	{
 		free(conn->last_query);
 		conn->last_query = NULL;
@@ -2260,14 +2788,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqBatchFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		PQappendPipelinedCommand(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	PQrecyclePipelinedCommand(conn, pipeCmd);
 	/* error message should be set up already */
 	return 0;
 }
@@ -2665,6 +3197,13 @@ PQfn(PGconn *conn,
 	/* clear the error string */
 	resetPQExpBuffer(&conn->errorMessage);
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("Synchronous command execution functions are not allowed in batch mode\n"));
+		return NULL;
+	}
+
 	if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE ||
 		conn->result != NULL)
 	{
@@ -3858,3 +4397,16 @@ PQunescapeBytea(const unsigned char *strtext, size_t *retbuflen)
 	*retbuflen = buflen;
 	return tmpbuf;
 }
+
+/* pqBatchFlush
+ * In batch mode, data will be flushed only when the out buffer reaches the threshold value.
+ * In non-batch mode, data will be flushed all the time.
+ */
+static int
+pqBatchFlush(PGconn *conn)
+{
+	if ((conn->batch_status == PQBATCH_MODE_OFF) ||
+		(conn->outCount >= OUTBUFFER_THRESHOLD))
+		return (pqFlush(conn));
+	return 0;					/* Just to keep compiler quiet */
+}
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index 9360c541be..2ff3fa4883 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -406,6 +406,12 @@ pqParseInput2(PGconn *conn)
 {
 	char		id;
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		fprintf(stderr, "internal error, attempt to read v2 protocol in batch mode");
+		abort();
+	}
+
 	/*
 	 * Loop to parse successive complete messages available in the buffer.
 	 */
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 1696525475..da38e6aed1 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -217,10 +217,19 @@ pqParseInput3(PGconn *conn)
 						return;
 					conn->asyncStatus = PGASYNC_READY;
 					break;
-				case 'Z':		/* backend is ready for new query */
+				case 'Z':		/* sync response, backend is ready for new
+								 * query */
 					if (getReadyForQuery(conn))
 						return;
-					conn->asyncStatus = PGASYNC_IDLE;
+					if (conn->batch_status != PQBATCH_MODE_OFF)
+					{
+						conn->batch_status = PQBATCH_MODE_ON;
+						conn->result = PQmakeEmptyPGresult(conn,
+														   PGRES_BATCH_END);
+						conn->asyncStatus = PGASYNC_READY;
+					}
+					else
+						conn->asyncStatus = PGASYNC_IDLE;
 					break;
 				case 'I':		/* empty query */
 					if (conn->result == NULL)
@@ -875,6 +884,9 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
 	PQExpBufferData workBuf;
 	char		id;
 
+	if (isError && conn->batch_status != PQBATCH_MODE_OFF)
+		conn->batch_status = PQBATCH_MODE_ABORTED;
+
 	/*
 	 * If this is an error message, pre-emptively clear any incomplete query
 	 * result we may have.  We'd just throw it away below anyway, and
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 3b6a9fbce3..efe3437bd4 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -97,7 +97,10 @@ typedef enum
 	PGRES_NONFATAL_ERROR,		/* notice or warning message */
 	PGRES_FATAL_ERROR,			/* query failed */
 	PGRES_COPY_BOTH,			/* Copy In/Out data transfer in progress */
-	PGRES_SINGLE_TUPLE			/* single tuple from larger resultset */
+	PGRES_SINGLE_TUPLE,			/* single tuple from larger resultset */
+	PGRES_BATCH_END,			/* end of a batch of commands */
+	PGRES_BATCH_ABORTED,		/* Command didn't run because of an abort
+								 * earlier in a batch */
 } ExecStatusType;
 
 typedef enum
@@ -137,6 +140,17 @@ typedef enum
 	PQPING_NO_ATTEMPT			/* connection not attempted (bad params) */
 } PGPing;
 
+/*
+ * PQBatchStatus - Current status of batch mode
+ */
+
+typedef enum
+{
+	PQBATCH_MODE_OFF,
+	PQBATCH_MODE_ON,
+	PQBATCH_MODE_ABORTED
+} PQBatchStatus;
+
 /* PGconn encapsulates a connection to the backend.
  * The contents of this struct are not supposed to be known to applications.
  */
@@ -435,6 +449,12 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
 
+/* Routines for batch mode management */
+extern int	PQbatchStatus(PGconn *conn);
+extern int	PQenterBatchMode(PGconn *conn);
+extern int	PQexitBatchMode(PGconn *conn);
+extern int	PQbatchSendQueue(PGconn *conn);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 1de91ae295..d607390118 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -217,10 +217,15 @@ typedef enum
 {
 	PGASYNC_IDLE,				/* nothing's happening, dude */
 	PGASYNC_BUSY,				/* query in progress */
-	PGASYNC_READY,				/* result ready for PQgetResult */
+	PGASYNC_READY,				/* query done, waiting for client to fetch
+								 * result */
+	PGASYNC_READY_MORE,			/* query done, waiting for client to fetch
+								 * result, More results expected from this
+								 * query */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
-	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
+	PGASYNC_COPY_BOTH,			/* Copy In/Out data transfer in progress */
+	PGASYNC_QUEUED				/* Current query done, more in queue */
 } PGAsyncStatusType;
 
 /* PGQueryClass tracks which query protocol we are now executing */
@@ -229,7 +234,8 @@ typedef enum
 	PGQUERY_SIMPLE,				/* simple Query protocol (PQexec) */
 	PGQUERY_EXTENDED,			/* full Extended protocol (PQexecParams) */
 	PGQUERY_PREPARE,			/* Parse only (PQprepare) */
-	PGQUERY_DESCRIBE			/* Describe Statement or Portal */
+	PGQUERY_DESCRIBE,			/* Describe Statement or Portal */
+	PGQUERY_SYNC				/* A protocol sync to end a batch */
 } PGQueryClass;
 
 /* PGSetenvStatusType defines the state of the pqSetenv state machine */
@@ -301,6 +307,22 @@ typedef enum pg_conn_host_type
 	CHT_UNIX_SOCKET
 } pg_conn_host_type;
 
+/* An entry in the pending command queue. Used by batch mode to keep track
+ * of the expected results of future commands we've dispatched.
+ *
+ * Note that entries in this list are reused by being zeroed and appended to
+ * the tail when popped off the head. The entry with null next pointer is not
+ * the end of the list of expected commands, that's the tail pointer in
+ * pg_conn.
+ */
+typedef struct pgCommandQueueEntry
+{
+	PGQueryClass queryclass;	/* Query type; PGQUERY_SYNC for sync msg */
+	char	   *query;			/* SQL command, or NULL if unknown */
+	struct pgCommandQueueEntry *next;
+} PGcommandQueueEntry;
+
+
 /*
  * pg_conn_host stores all information about each of possibly several hosts
  * mentioned in the connection string.  Most fields are derived by splitting
@@ -394,6 +416,7 @@ struct pg_conn
 	bool		options_valid;	/* true if OK to attempt connection */
 	bool		nonblocking;	/* whether this connection is using nonblock
 								 * sending semantics */
+	PQBatchStatus batch_status; /* Batch(pipelining) mode status of connection */
 	bool		singleRowMode;	/* return current query result row-by-row? */
 	char		copy_is_binary; /* 1 = copy binary, 0 = copy text */
 	int			copy_already_done;	/* # bytes already returned in COPY OUT */
@@ -406,6 +429,16 @@ struct pg_conn
 	pg_conn_host *connhost;		/* details about each named host */
 	char	   *connip;			/* IP address for current network connection */
 
+	/*
+	 * The command queue
+	 *
+	 * head is the next pending cmd, tail is where we append new commands.
+	 * Freed entries for recycling go on the recycle linked list.
+	 */
+	PGcommandQueueEntry *cmd_queue_head;
+	PGcommandQueueEntry *cmd_queue_tail;
+	PGcommandQueueEntry *cmd_queue_recycle;
+
 	/* Connection data */
 	pgsocket	sock;			/* FD for socket, PGINVALID_SOCKET if
 								 * unconnected */
@@ -798,6 +831,11 @@ extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t len);
  */
 #define pqIsnonblocking(conn)	((conn)->nonblocking)
 
+/*
+ * Connection's outbuffer threshold.
+ */
+#define OUTBUFFER_THRESHOLD	65536
+
 #ifdef ENABLE_NLS
 extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1);
 extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2);
@@ -806,6 +844,8 @@ extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigne
 #define libpq_ngettext(s, p, n) ((n) == 1 ? (s) : (p))
 #endif
 
+#define libpq_gettext_noop(x) (x)
+
 /*
  * These macros are needed to let error-handling code be portable between
  * Unix and Windows.  (ugh)
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index a6d2ffbf9e..287214c544 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -17,6 +17,7 @@ SUBDIRS = \
 		  test_extensions \
 		  test_ginpostinglist \
 		  test_integerset \
+		  test_libpq \
 		  test_misc \
 		  test_parser \
 		  test_pg_dump \
diff --git a/src/test/modules/test_libpq/.gitignore b/src/test/modules/test_libpq/.gitignore
new file mode 100644
index 0000000000..11e8463984
--- /dev/null
+++ b/src/test/modules/test_libpq/.gitignore
@@ -0,0 +1,5 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
+/testlibpqbatch
diff --git a/src/test/modules/test_libpq/Makefile b/src/test/modules/test_libpq/Makefile
new file mode 100644
index 0000000000..d907063f65
--- /dev/null
+++ b/src/test/modules/test_libpq/Makefile
@@ -0,0 +1,25 @@
+# src/test/modules/test_libpq/Makefile
+
+OBJS = testlibpqbatch.o
+PROGRAM = testlibpqbatch
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS += $(libpq)
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_libpq
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
+
+testlibpqbatch.o: testlibpqbatch.c
+testlibpqbatch: testlibpqbatch.o
+check: testlibpqbatch prove-check
+
+prove-check:
+	$(prove_check)
diff --git a/src/test/modules/test_libpq/README b/src/test/modules/test_libpq/README
new file mode 100644
index 0000000000..d8174dd579
--- /dev/null
+++ b/src/test/modules/test_libpq/README
@@ -0,0 +1 @@
+Test programs and libraries for libpq
diff --git a/src/test/modules/test_libpq/t/001_libpq_async.pl b/src/test/modules/test_libpq/t/001_libpq_async.pl
new file mode 100644
index 0000000000..2c361880fa
--- /dev/null
+++ b/src/test/modules/test_libpq/t/001_libpq_async.pl
@@ -0,0 +1,27 @@
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 6;
+use Cwd;
+
+my $node = get_new_node('main');
+$node->init;
+$node->start;
+
+my $port = $node->port;
+
+my $numrows = 10000;
+my @tests =
+  qw(disallowed_in_batch simple_batch multi_batch batch_abort timings singlerowmode);
+$ENV{PATH} = "$ENV{PATH}:" . getcwd();
+for my $testname (@tests)
+{
+	$node->command_ok(
+		[ 'testlibpqbatch', 'dbname=postgres', "$numrows", "$testname" ],
+		"testlibpqbatch $testname");
+}
+
+$node->stop('fast');
diff --git a/src/test/modules/test_libpq/testlibpqbatch.c b/src/test/modules/test_libpq/testlibpqbatch.c
new file mode 100644
index 0000000000..3fff765ada
--- /dev/null
+++ b/src/test/modules/test_libpq/testlibpqbatch.c
@@ -0,0 +1,1013 @@
+/*
+ * src/test/modules/test_libpq/testlibpqbatch.c
+ *
+ *
+ * testlibpqbatch.c
+ *		Test of batch execution functionality
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/time.h>
+#include <sys/types.h>
+
+#include "libpq-fe.h"
+
+#define EXPECT(condition, ...) \
+	if (0 == (condition)) \
+	{ \
+		fprintf(stderr, __VA_ARGS__); \
+		goto fail; \
+	}
+
+
+static void exit_nicely(PGconn *conn);
+static void simple_batch(PGconn *conn);
+static void test_disallowed_in_batch(PGconn *conn);
+static void batch_insert_pipelined(PGconn *conn, int n_rows);
+static void batch_insert_sequential(PGconn *conn, int n_rows);
+static void batch_insert_copy(PGconn *conn, int n_rows);
+static void test_batch_abort(PGconn *conn);
+static void test_singlerowmode(PGconn *conn);
+static const Oid INT4OID = 23;
+
+static const char *const drop_table_sql
+= "DROP TABLE IF EXISTS batch_demo";
+static const char *const create_table_sql
+= "CREATE UNLOGGED TABLE batch_demo(id serial primary key, itemno integer);";
+static const char *const insert_sql
+= "INSERT INTO batch_demo(itemno) VALUES ($1);";
+
+/* max char length of an int32, plus sign and null terminator */
+#define MAXINTLEN 12
+
+static void
+exit_nicely(PGconn *conn)
+{
+	PQfinish(conn);
+	exit(1);
+}
+
+static void
+simple_batch(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+
+	fprintf(stderr, "simple batch... ");
+
+	/*
+	 * Enter batch mode and dispatch a set of operations, which we'll then
+	 * process the results of as they come in.
+	 *
+	 * For a simple case we should be able to do this without interim
+	 * processing of results since our out buffer will give us enough slush to
+	 * work with and we won't block on sending. So blocking mode is fine.
+	 */
+	EXPECT(!PQisnonblocking(conn), "Expected blocking connection mode\n");
+
+	EXPECT(PQenterBatchMode(conn), "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+							 dummy_params, NULL, NULL, 0),
+		   "dispatching SELECT failed: %s\n", PQerrorMessage(conn));
+
+	EXPECT(!PQexitBatchMode(conn), "exiting batch mode with work in progress should fail, but succeeded\n");
+
+	EXPECT(PQbatchSendQueue(conn), "Ending a batch failed: %s\n", PQerrorMessage(conn));
+
+	res = PQgetResult(conn);
+	EXPECT(res != NULL, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQresultStatus(res) == PGRES_TUPLES_OK, "Unexpected result code %s from first batch item\n", PQresStatus(PQresultStatus(res)));
+
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "PQgetResult returned something extra after first query result.\n");
+
+	/*
+	 * Even though we've processed the result there's still a sync to come and
+	 * we can't exit batch mode yet
+	 */
+	EXPECT(!PQexitBatchMode(conn), "exiting batch mode after query but before sync succeeded incorrectly\n");
+
+	res = PQgetResult(conn);
+	EXPECT(res != NULL, "PQgetResult returned null when sync result PGRES_BATCH_END expected: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQresultStatus(res) == PGRES_BATCH_END, "Unexpected result code %s instead of sync result, error: %s\n", PQresStatus(PQresultStatus(res)), PQerrorMessage(conn))
+
+		PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "PQgetResult returned something extra after end batch call\n");
+
+	/* We're still in a batch... */
+	EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Fell out of batch mode somehow\n");
+
+	/* until we end it, which we can safely do now */
+	EXPECT(PQexitBatchMode(conn), "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_OFF, "exiting batch mode didn't seem to work\n");
+
+	fprintf(stderr, "ok\n");
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+static void
+test_disallowed_in_batch(PGconn *conn)
+{
+	PGresult   *res = NULL;
+
+	fprintf(stderr, "test error cases... ");
+
+	EXPECT(!PQisnonblocking(conn), "Expected blocking connection mode: %u\n", __LINE__);
+
+	EXPECT(PQenterBatchMode(conn), "Unable to enter batch mode\n");
+
+	EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Batch mode not activated properly\n");
+
+	/* PQexec should fail in batch mode */
+	res = PQexec(conn, "SELECT 1");
+	EXPECT(PQresultStatus(res) == PGRES_FATAL_ERROR, "PQexec should fail in batch mode but succeeded\n");
+
+	/* So should PQsendQuery */
+	EXPECT(PQsendQuery(conn, "SELECT 1") == 0, "PQsendQuery should fail in batch mode but succeeded\n");
+
+	/* Entering batch mode when already in batch mode is OK */
+	EXPECT(PQenterBatchMode(conn), "re-entering batch mode should be a no-op but failed\n");
+
+	EXPECT(!PQisBusy(conn), "PQisBusy should return false when idle in batch, returned true\n");
+
+	/* ok, back to normal command mode */
+	EXPECT(PQexitBatchMode(conn), "couldn't exit idle empty batch mode\n");
+
+	EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_OFF, "Batch mode not terminated properly\n");
+
+	/* exiting batch mode when not in batch mode should be a no-op */
+	EXPECT(PQexitBatchMode(conn), "batch mode exit when not in batch mode should succeed but failed\n");
+
+	/* can now PQexec again */
+	res = PQexec(conn, "SELECT 1");
+	EXPECT(PQresultStatus(res) == PGRES_TUPLES_OK, "PQexec should succeed after exiting batch mode but failed with: %s\n", PQerrorMessage(conn));
+
+	fprintf(stderr, "ok\n");
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+static void
+multi_batch(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+
+	fprintf(stderr, "multi batch... ");
+
+	/*
+	 * Queue up a couple of small batches and process each without returning
+	 * to command mode first.
+	 */
+	EXPECT(PQenterBatchMode(conn), "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+							 dummy_params, NULL, NULL, 0), "dispatching first SELECT failed: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQbatchSendQueue(conn), "Ending first batch failed: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+							 dummy_params, NULL, NULL, 0), "dispatching second SELECT failed: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQbatchSendQueue(conn), "Ending second batch failed: %s\n", PQerrorMessage(conn));
+
+	/* OK, start processing the batch results */
+	res = PQgetResult(conn);
+	EXPECT(res != NULL, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQresultStatus(res) == PGRES_TUPLES_OK, "Unexpected result code %s from first batch item\n", PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "PQgetResult returned something extra after first result\n");
+
+	EXPECT(!PQexitBatchMode(conn), "exiting batch mode after query but before sync succeeded incorrectly\n");
+
+	res = PQgetResult(conn);
+	EXPECT(res != NULL, "PQgetResult returned null when sync result expected: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQresultStatus(res) == PGRES_BATCH_END, "Unexpected result code %s instead of sync result, error: %s (line %u)\n", PQresStatus(PQresultStatus(res)), PQerrorMessage(conn), __LINE__);
+
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	/* second batch */
+
+	res = PQgetResult(conn);
+	EXPECT(res != NULL, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQresultStatus(res) == PGRES_TUPLES_OK, "Unexpected result code %s from second batch item\n", PQresStatus(PQresultStatus(res)));
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	res = PQgetResult(conn);
+	EXPECT(res != NULL, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQresultStatus(res) == PGRES_BATCH_END, "Unexpected result code %s from second end batch\n", PQresStatus(PQresultStatus(res)));
+
+	/* We're still in a batch... */
+	EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Fell out of batch mode somehow\n");
+
+	/* until we end it, which we can safely do now */
+	EXPECT(PQexitBatchMode(conn), "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_OFF, "exiting batch mode didn't seem to work\n");
+
+	fprintf(stderr, "ok\n");
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+/*
+ * When an operation in a batch fails the rest of the batch is flushed. We
+ * still have to get results for each batch item, but the item will just be
+ * a PGRES_BATCH_ABORTED code.
+ *
+ * This intentionally doesn't use a transaction to wrap the batch. You should
+ * usually use an xact, but in this case we want to observe the effects of each
+ * statement.
+ */
+static void
+test_batch_abort(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+	int			i;
+
+	fprintf(stderr, "aborted batch... ");
+
+	res = PQexec(conn, drop_table_sql);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "dispatching DROP TABLE failed: %s\n", PQerrorMessage(conn));
+
+	res = PQexec(conn, create_table_sql);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "dispatching CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+
+
+	/*
+	 * Queue up a couple of small batches and process each without returning
+	 * to command mode first. Make sure the second operation in the first
+	 * batch ERRORs.
+	 */
+	EXPECT(PQenterBatchMode(conn), "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+	dummy_params[0] = "1";
+	EXPECT(PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+							 dummy_params, NULL, NULL, 0), "dispatching first INSERT failed: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQsendQueryParams(conn, "SELECT no_such_function($1)", 1, dummy_param_oids,
+							 dummy_params, NULL, NULL, 0), "dispatching error select failed: %s\n", PQerrorMessage(conn));
+
+	dummy_params[0] = "2";
+	EXPECT(PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+							 dummy_params, NULL, NULL, 0), "dispatching second insert failed: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQbatchSendQueue(conn), "Ending first batch failed: %s\n", PQerrorMessage(conn));
+
+	dummy_params[0] = "3";
+	EXPECT(PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+							 dummy_params, NULL, NULL, 0), "dispatching second-batch insert failed: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQbatchSendQueue(conn), "Ending second batch failed: %s\n", PQerrorMessage(conn));
+
+	/*
+	 * OK, start processing the batch results.
+	 *
+	 * We should get a tuples-ok for the first query, a fatal error, a batch
+	 * aborted message for the second insert, a batch-end, then a command-ok
+	 * and a batch-ok for the second batch operation.
+	 */
+
+	EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_COMMAND_OK,
+		   "Unexpected result code %s from first batch item, error='%s'\n",
+		   res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)),
+		   res == NULL ? PQerrorMessage(conn) : PQresultErrorMessage(res));
+
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	/* second query, caused error */
+
+	EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_FATAL_ERROR,
+		   "Unexpected result code from second batch item. Wanted PGRES_FATAL_ERROR, got %s\n",
+		   res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	/*
+	 * batch should now be aborted.
+	 *
+	 * Note that we could still queue more queries at this point if we wanted;
+	 * they'd get added to a new third batch since we've already sent a
+	 * second. The aborted flag relates only to the batch being received.
+	 */
+	EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_ABORTED, "batch should be flagged as aborted but isn't\n");
+
+	/* third query in batch, the second insert */
+
+	EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_BATCH_ABORTED,
+		   "Unexpected result code from third batch item. Wanted PGRES_BATCH_ABORTED, got %s\n",
+		   res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_ABORTED, "batch should be flagged as aborted but isn't\n");
+
+	/* We're still in a batch... */
+	EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Fell out of batch mode somehow\n");
+
+	/*
+	 * The end of a failed batch is still a PGRES_BATCH_END so clients know to
+	 * start processing results normally again and can tell the difference
+	 * between skipped commands and the sync.
+	 */
+	EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_BATCH_END,
+		   "Unexpected result code from first batch sync. Wanted PGRES_BATCH_END, got %s\n",
+		   res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)));
+
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_ABORTED, "sync should've cleared the aborted flag but didn't\n");
+
+	/* We're still in a batch... */
+	EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Fell out of batch mode somehow\n");
+
+	/* the insert from the second batch */
+	EXPECT(((res = PQgetResult(conn)) != NULL) &&
+		   PQresultStatus(res) == PGRES_COMMAND_OK,
+		   "Unexpected result code %s from first item in second batch\n",
+		   res ? PQresStatus(PQresultStatus(res)) : "NULL");
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	/* the second batch sync */
+	EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_BATCH_END, "Unexpected result code %s from second batch sync\n", res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	/* We're still in a batch... */
+	EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Fell out of batch mode somehow\n");
+
+	/* until we end it, which we can safely do now */
+	EXPECT(PQexitBatchMode(conn), "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_OFF, "exiting batch mode didn't seem to work\n");
+
+	fprintf(stderr, "ok\n");
+
+	/*
+	 * Since we fired the batches off without a surrounding xact, the results
+	 * should be:
+	 *
+	 * - Implicit xact started by server around 1st batch - First insert
+	 * applied - Second statement aborted xact - Third insert skipped - Sync
+	 * rolled back first implicit xact - Implicit xact created by server
+	 * around 2nd batch - insert applied from 2nd batch - Sync commits 2nd
+	 * xact
+	 *
+	 * So we should only have the value 3 that we inserted.
+	 */
+	res = PQexec(conn, "SELECT itemno FROM batch_demo");
+
+	EXPECT(PQresultStatus(res) == PGRES_TUPLES_OK, "Expected tuples, got %s: %s", PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+
+	for (i = 0; i < PQntuples(res); i++)
+	{
+		const char *val = PQgetvalue(res, i, 0);
+
+		EXPECT(strcmp(val, "3") == 0, "expected only insert with value 3, got %s", val);
+	}
+
+	EXPECT(PQntuples(res) == 1, "expected 1 result, got %d", PQntuples(res));
+	PQclear(res);
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+
+/* State machine enums for batch insert */
+typedef enum BatchInsertStep
+{
+	BI_BEGIN_TX,
+	BI_DROP_TABLE,
+	BI_CREATE_TABLE,
+	BI_PREPARE,
+	BI_INSERT_ROWS,
+	BI_COMMIT_TX,
+	BI_SYNC,
+	BI_DONE
+} BatchInsertStep;
+
+static void
+batch_insert_pipelined(PGconn *conn, int n_rows)
+{
+	PGresult   *res = NULL;
+	const char *insert_params[1];
+	Oid			insert_param_oids[1] = {INT4OID};
+	char		insert_param_0[MAXINTLEN];
+	BatchInsertStep send_step = BI_BEGIN_TX,
+				recv_step = BI_BEGIN_TX;
+	int			rows_to_send,
+				rows_to_receive;
+
+	insert_params[0] = &insert_param_0[0];
+
+	rows_to_send = rows_to_receive = n_rows;
+
+	/*
+	 * Do a batched insert into a table created at the start of the batch
+	 */
+	EXPECT(PQenterBatchMode(conn), "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQsendQueryParams(conn, "BEGIN",
+							 0, NULL, NULL, NULL, NULL, 0), "xact start failed: %s\n", PQerrorMessage(conn));
+
+	fprintf(stdout, "sent BEGIN\n");
+
+	send_step = BI_DROP_TABLE;
+
+	EXPECT(PQsendQueryParams(conn, drop_table_sql,
+							 0, NULL, NULL, NULL, NULL, 0), "dispatching DROP TABLE failed: %s\n", PQerrorMessage(conn));
+
+	fprintf(stdout, "sent DROP\n");
+
+	send_step = BI_CREATE_TABLE;
+
+	EXPECT(PQsendQueryParams(conn, create_table_sql,
+							 0, NULL, NULL, NULL, NULL, 0), "dispatching CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+
+	fprintf(stdout, "sent CREATE\n");
+
+	send_step = BI_PREPARE;
+
+	EXPECT(PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids), "dispatching PREPARE failed: %s\n", PQerrorMessage(conn));
+
+	fprintf(stdout, "sent PREPARE\n");
+
+	send_step = BI_INSERT_ROWS;
+
+	/*
+	 * Now we start inserting. We'll be sending enough data that we could fill
+	 * our out buffer, so to avoid deadlocking we need to enter nonblocking
+	 * mode and consume input while we send more output. As results of each
+	 * query are processed we should pop them to allow processing of the next
+	 * query. There's no need to finish the batch before processing results.
+	 */
+	EXPECT(PQsetnonblocking(conn, 1) == 0, "failed to set nonblocking mode: %s\n", PQerrorMessage(conn));
+
+	while (recv_step != BI_DONE)
+	{
+		int			sock;
+		fd_set		input_mask;
+		fd_set		output_mask;
+
+		sock = PQsocket(conn);
+
+		if (sock < 0)
+			break;				/* shouldn't happen */
+
+		FD_ZERO(&input_mask);
+		FD_SET(sock, &input_mask);
+		FD_ZERO(&output_mask);
+		FD_SET(sock, &output_mask);
+
+		if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
+		{
+			fprintf(stderr, "select() failed: %s\n", strerror(errno));
+			exit_nicely(conn);
+		}
+
+		/*
+		 * Process any results, so we keep the server's out buffer free
+		 * flowing and it can continue to process input
+		 */
+		if (FD_ISSET(sock, &input_mask))
+		{
+			PQconsumeInput(conn);
+
+			/* Read until we'd block if we tried to read */
+			while (!PQisBusy(conn) && recv_step < BI_DONE)
+			{
+				const char *cmdtag;
+				const char *description = NULL;
+				int			status;
+				BatchInsertStep next_step;
+
+
+				res = PQgetResult(conn);
+
+				if (res == NULL)
+				{
+					/*
+					 * No more results from this query, advance to the next
+					 * result
+					 */
+
+					fprintf(stdout, "next query!\n");
+					continue;
+				}
+
+				status = PGRES_COMMAND_OK;
+				next_step = recv_step + 1;
+				switch (recv_step)
+				{
+					case BI_BEGIN_TX:
+						cmdtag = "BEGIN";
+						break;
+					case BI_DROP_TABLE:
+						cmdtag = "DROP TABLE";
+						break;
+					case BI_CREATE_TABLE:
+						cmdtag = "CREATE TABLE";
+						break;
+					case BI_PREPARE:
+						cmdtag = "";
+						description = "PREPARE";
+						break;
+					case BI_INSERT_ROWS:
+						cmdtag = "INSERT";
+						rows_to_receive--;
+						if (rows_to_receive > 0)
+							next_step = BI_INSERT_ROWS;
+						break;
+					case BI_COMMIT_TX:
+						cmdtag = "COMMIT";
+						break;
+					case BI_SYNC:
+						cmdtag = "";
+						description = "SYNC";
+						status = PGRES_BATCH_END;
+						break;
+					case BI_DONE:
+						/* unreachable */
+						abort();
+				}
+				if (description == NULL)
+					description = cmdtag;
+
+				fprintf(stderr, "At state %d (%s) expect tag '%s', result code %s, expect %d more rows, transition to %d\n",
+						recv_step, description, cmdtag, PQresStatus(status), rows_to_receive, next_step);
+
+				EXPECT(PQresultStatus(res) == status, "%s reported status %s, expected %s. Error msg is [%s]\n",
+					   description, PQresStatus(PQresultStatus(res)), PQresStatus(status), PQerrorMessage(conn));
+
+				EXPECT(strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) == 0, "%s expected command tag '%s', got '%s'\n",
+					   description, cmdtag, PQcmdStatus(res));
+
+				fprintf(stdout, "Got %s OK\n", cmdtag);
+
+				recv_step = next_step;
+
+				PQclear(res);
+				res = NULL;
+			}
+		}
+
+		/* Write more rows and/or the end batch message, if needed */
+		if (FD_ISSET(sock, &output_mask))
+		{
+			PQflush(conn);
+
+			if (send_step == BI_INSERT_ROWS)
+			{
+				snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send);
+				insert_param_0[MAXINTLEN - 1] = '\0';
+
+				if (PQsendQueryPrepared(conn, "my_insert",
+										1, insert_params, NULL, NULL, 0))
+				{
+					fprintf(stdout, "sent row %d\n", rows_to_send);
+
+					rows_to_send--;
+					if (rows_to_send == 0)
+						send_step = BI_COMMIT_TX;
+				}
+				else
+				{
+					/*
+					 * in nonblocking mode, so it's OK for an insert to fail
+					 * to send
+					 */
+					fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
+							rows_to_send, PQerrorMessage(conn));
+				}
+			}
+			else if (send_step == BI_COMMIT_TX)
+			{
+				if (PQsendQueryParams(conn, "COMMIT",
+									  0, NULL, NULL, NULL, NULL, 0))
+				{
+					fprintf(stdout, "sent COMMIT\n");
+					send_step = BI_SYNC;
+				}
+				else
+				{
+					fprintf(stderr, "WARNING: failed to send commit: %s\n",
+							PQerrorMessage(conn));
+				}
+			}
+			else if (send_step == BI_SYNC)
+			{
+				if (PQbatchSendQueue(conn))
+				{
+					fprintf(stdout, "Dispatched end batch message\n");
+					send_step = BI_DONE;
+				}
+				else
+				{
+					fprintf(stderr, "WARNING: Ending a batch failed: %s\n",
+							PQerrorMessage(conn));
+				}
+			}
+		}
+
+	}
+
+	/* We've got the sync message and the batch should be done */
+	EXPECT(PQexitBatchMode(conn), "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQsetnonblocking(conn, 0) == 0, "failed to clear nonblocking mode: %s\n", PQerrorMessage(conn));
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+
+static void
+batch_insert_sequential(PGconn *conn, int nrows)
+{
+	PGresult   *res = NULL;
+	const char *insert_params[1];
+	Oid			insert_param_oids[1] = {INT4OID};
+	char		insert_param_0[MAXINTLEN];
+
+	insert_params[0] = &insert_param_0[0];
+
+	res = PQexec(conn, "BEGIN");
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "BEGIN failed: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+
+	res = PQexec(conn, drop_table_sql);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "DROP TABLE failed: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+
+	res = PQexec(conn, create_table_sql);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+
+	res = PQprepare(conn, "my_insert2", insert_sql, 1, insert_param_oids);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "prepare failed: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+
+	while (nrows > 0)
+	{
+		snprintf(&insert_param_0[0], MAXINTLEN, "%d", nrows);
+		insert_param_0[MAXINTLEN - 1] = '\0';
+
+		res = PQexecPrepared(conn, "my_insert2",
+							 1, insert_params, NULL, NULL, 0);
+		EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT failed: %s\n", PQerrorMessage(conn));
+
+		PQclear(res);
+		nrows--;
+	}
+
+	res = PQexec(conn, "COMMIT");
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "COMMIT failed: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+static void
+batch_insert_copy(PGconn *conn, int nrows)
+{
+	PGresult   *res = NULL;
+
+	res = PQexec(conn, drop_table_sql);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "DROP TABLE failed: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+
+	res = PQexec(conn, create_table_sql);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+	res = NULL;
+
+	res = PQexec(conn, "COPY batch_demo(itemno) FROM stdin");
+	EXPECT(PQresultStatus(res) == PGRES_COPY_IN, "COPY: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+	res = NULL;
+
+	while (nrows > 0)
+	{
+		char		buf[12 + 2];
+		int			formatted = snprintf(&buf[0], 12 + 1, "%d\n", nrows);
+
+		EXPECT(formatted < 12 + 1, "Buffer write truncated somehow\n");
+
+		EXPECT(PQputCopyData(conn, buf, formatted) == 1, "Write of COPY data failed: %s\n",
+			   PQerrorMessage(conn));
+
+		nrows--;
+	}
+
+	EXPECT(PQputCopyEnd(conn, NULL) == 1, "Finishing COPY failed: %s",
+		   PQerrorMessage(conn));
+
+	res = PQgetResult(conn);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "COPY finished with %s: %s\n",
+		   PQresStatus(PQresultStatus(res)),
+		   PQresultErrorMessage(res));
+	PQclear(res);
+	res = NULL;
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+static void
+test_timings(PGconn *conn, int number_of_rows)
+{
+	instr_time	start_time,
+				end_time;
+
+	fprintf(stderr, "inserting %d rows batched then unbatched\n", number_of_rows);
+
+	INSTR_TIME_SET_CURRENT(start_time);
+	batch_insert_pipelined(conn, number_of_rows);
+	INSTR_TIME_SET_CURRENT(end_time);
+	INSTR_TIME_SUBTRACT(end_time, start_time);
+
+	printf("batch insert elapsed:      %.8f ms\n",
+		   INSTR_TIME_GET_MILLISEC(end_time));
+
+	INSTR_TIME_SET_CURRENT(start_time);
+	batch_insert_sequential(conn, number_of_rows);
+	INSTR_TIME_SET_CURRENT(end_time);
+	INSTR_TIME_SUBTRACT(end_time, start_time);
+	printf("sequential insert elapsed: %.8f ms\n",
+		   INSTR_TIME_GET_MILLISEC(end_time));
+
+	INSTR_TIME_SET_CURRENT(start_time);
+	batch_insert_copy(conn, number_of_rows);
+	INSTR_TIME_SET_CURRENT(end_time);
+	INSTR_TIME_SUBTRACT(end_time, start_time);
+	printf("COPY elapsed:              %.8f ms\n",
+		   INSTR_TIME_GET_MILLISEC(end_time));
+
+	fprintf(stderr, "Done.\n");
+}
+
+static void
+usage_exit(const char *progname)
+{
+	fprintf(stderr, "Usage: %s ['connstring' [number_of_rows [test_to_run]]]\n", progname);
+	fprintf(stderr, "  tests: all|disallowed_in_batch|simple_batch|multi_batch|batch_abort|timings|singlerowmode\n");
+	exit(1);
+}
+
+static void
+test_singlerowmode(PGconn *conn)
+{
+	PGresult   *res;
+	int			i,
+				r;
+
+	/* XXX this one is broken */
+	return;
+
+	/* 1 batch, 3 queries in it */
+	r = PQenterBatchMode(conn);
+
+	for (i = 0; i < 3; i++)
+	{
+		r = PQsendQueryParams(conn,
+							  "SELECT 1",
+							  0,
+							  NULL,
+							  NULL,
+							  NULL,
+							  NULL,
+							  0);
+	}
+	PQbatchSendQueue(conn);
+
+	i = 0;
+	for (;;)
+	{
+		int			isSingleTuple = 0;
+		ExecStatusType last_est = 0;
+
+		/* Set single row mode for only first 3 SELECT queries */
+		if (i < 3)
+		{
+			r = PQsetSingleRowMode(conn);
+			if (r != 1)
+			{
+				fprintf(stderr, "PQsetSingleRowMode() failed for i=%d\n", i);
+			}
+		}
+		while ((res = PQgetResult(conn)) != NULL)
+		{
+			ExecStatusType est = PQresultStatus(res);
+
+			fprintf(stderr, "Result status: %d (%s) for i=%d", est, PQresStatus(est), i);
+			if (est == PGRES_TUPLES_OK)
+			{
+				fprintf(stderr, ", tuples: %d\n", PQntuples(res));
+				EXPECT(isSingleTuple, " Expected to follow PGREG_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead\n");
+				isSingleTuple = 0;
+			}
+			else if (est == PGRES_SINGLE_TUPLE)
+			{
+				isSingleTuple = 1;
+				fprintf(stderr, ", single tuple: %d\n", PQntuples(res));
+			}
+			else if (est == PGRES_BATCH_END)
+			{
+				fprintf(stderr, ", end of batch reached\n");
+			}
+			else if (est != PGRES_COMMAND_OK)
+			{
+				fprintf(stderr, ", error: %s\n", PQresultErrorMessage(res));
+			}
+			PQclear(res);
+			last_est = est;
+		}
+		i++;
+		if (last_est == PGRES_BATCH_END)
+			break;
+	}
+	PQexitBatchMode(conn);
+	PQclear(res);
+	res = NULL;
+	return;
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+int
+main(int argc, char **argv)
+{
+	const char *conninfo;
+	PGconn	   *conn;
+	int			number_of_rows = 10000;
+
+	int			run_disallowed_in_batch = 1,
+				run_simple_batch = 1,
+				run_multi_batch = 1,
+				run_batch_abort = 1,
+				run_timings = 1,
+				run_singlerowmode = 1;
+
+	/*
+	 * If the user supplies a parameter on the command line, use it as the
+	 * conninfo string; otherwise default to setting dbname=postgres and using
+	 * environment variables or defaults for all other connection parameters.
+	 */
+	if (argc > 4)
+	{
+		usage_exit(argv[0]);
+	}
+	if (argc > 3)
+	{
+		if (strcmp(argv[3], "all") != 0)
+		{
+			run_disallowed_in_batch = 0;
+			run_simple_batch = 0;
+			run_multi_batch = 0;
+			run_batch_abort = 0;
+			run_timings = 0;
+			run_singlerowmode = 0;
+			if (strcmp(argv[3], "disallowed_in_batch") == 0)
+				run_disallowed_in_batch = 1;
+			else if (strcmp(argv[3], "simple_batch") == 0)
+				run_simple_batch = 1;
+			else if (strcmp(argv[3], "multi_batch") == 0)
+				run_multi_batch = 1;
+			else if (strcmp(argv[3], "batch_abort") == 0)
+				run_batch_abort = 1;
+			else if (strcmp(argv[3], "timings") == 0)
+				run_timings = 1;
+			else if (strcmp(argv[3], "singlerowmode") == 0)
+				run_singlerowmode = 1;
+			else
+			{
+				fprintf(stderr, "%s is not a recognized test name\n", argv[3]);
+				usage_exit(argv[0]);
+			}
+		}
+	}
+	if (argc > 2)
+	{
+		errno = 0;
+		number_of_rows = strtol(argv[2], NULL, 10);
+		if (errno)
+		{
+			fprintf(stderr, "couldn't parse '%s' as an integer or zero rows supplied: %s", argv[2], strerror(errno));
+			usage_exit(argv[0]);
+		}
+		if (number_of_rows <= 0)
+		{
+			fprintf(stderr, "number_of_rows must be positive");
+			usage_exit(argv[0]);
+		}
+	}
+	if (argc > 1)
+	{
+		conninfo = argv[1];
+	}
+	else
+	{
+		conninfo = "dbname = postgres";
+	}
+
+	/* Make a connection to the database */
+	conn = PQconnectdb(conninfo);
+
+	/* Check to see that the backend connection was successfully made */
+	if (PQstatus(conn) != CONNECTION_OK)
+	{
+		fprintf(stderr, "Connection to database failed: %s\n",
+				PQerrorMessage(conn));
+		exit_nicely(conn);
+	}
+
+	if (run_disallowed_in_batch)
+		test_disallowed_in_batch(conn);
+
+	if (run_simple_batch)
+		simple_batch(conn);
+
+	if (run_multi_batch)
+		multi_batch(conn);
+
+	if (run_batch_abort)
+		test_batch_abort(conn);
+
+	if (run_timings)
+		test_timings(conn, number_of_rows);
+
+	if (run_singlerowmode)
+		test_singlerowmode(conn);
+	/* close the connection to the database and cleanup */
+	PQfinish(conn);
+
+	return 0;
+}
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 90594bd41b..634e48ec85 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -50,7 +50,8 @@ my @contrib_excludes = (
 	'pgcrypto',         'sepgsql',
 	'brin',             'test_extensions',
 	'test_misc',        'test_pg_dump',
-	'snapshot_too_old', 'unsafe_tests');
+	'snapshot_too_old', 'unsafe_tests',
+	'test_libpq');
 
 # Set of variables for frontend modules
 my $frontend_defines = { 'initdb' => 'FRONTEND' };
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 03c4e0fe5b..9b75db962b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -215,6 +215,7 @@ BackgroundWorkerHandle
 BackgroundWorkerSlot
 Barrier
 BaseBackupCmd
+BatchInsertStep
 BeginDirectModify_function
 BeginForeignInsert_function
 BeginForeignModify_function
@@ -1544,6 +1545,7 @@ PG_Locale_Strategy
 PG_Lock_Status
 PG_init_t
 PGcancel
+PGcommandQueueEntry
 PGconn
 PGdataValue
 PGlobjfuncs
@@ -1656,6 +1658,7 @@ PMSignalReason
 PMState
 POLYGON
 PQArgBlock
+PQBatchStatus
 PQEnvironmentOption
 PQExpBuffer
 PQExpBufferData
-- 
2.20.1

Reply via email to