Hi,

Here's a new version with the pgbench support included.

Best regards,
-- 
Daniel Vérité
PostgreSQL-powered mailer: https://www.manitou-mail.org
Twitter: @DanielVerite
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 9ce32fb39b..2a94f8f6b9 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3061,6 +3061,30 @@ ExecStatusType PQresultStatus(const PGresult *res);
            </para>
           </listitem>
          </varlistentry>
+
+         <varlistentry id="libpq-pgres-batch-end">
+          <term><literal>PGRES_BATCH_END</literal></term>
+          <listitem>
+           <para>
+            The <structname>PGresult</structname> represents the end of a 
batch.
+            This status occurs only when batch mode has been selected.
+           </para>
+          </listitem>
+         </varlistentry>
+
+         <varlistentry id="libpq-pgres-batch-aborted">
+          <term><literal>PGRES_BATCH_ABORTED</literal></term>
+          <listitem>
+           <para>
+            The <structname>PGresult</structname> represents a batch that's
+            received an error from the server.  
<function>PQgetResult</function>
+            must be called repeatedly, and it will return this status code,
+            until the end of the current batch, at which point it will return
+            <literal>PGRES_BATCH_END</literal> and normal processing can 
resume.
+           </para>
+          </listitem>
+         </varlistentry>
+
         </variablelist>
 
         If the result status is <literal>PGRES_TUPLES_OK</literal> or
@@ -4819,6 +4843,482 @@ int PQflush(PGconn *conn);
 
  </sect1>
 
+ <sect1 id="libpq-batch-mode">
+  <title>Batch Mode</title>
+
+  <indexterm zone="libpq-batch-mode">
+   <primary>libpq</primary>
+   <secondary>batch mode</secondary>
+  </indexterm>
+
+  <indexterm zone="libpq-batch-mode">
+   <primary>pipelining</primary>
+   <secondary>in libpq</secondary>
+  </indexterm>
+
+  <para>
+   <application>libpq</application> batch mode allows applications to
+   send a query without having to read the result of the previously
+   sent query.  Taking advantage of the batch mode, a client will wait
+   less for the server, since multiple queries/results can be sent/
+   received in a single network transaction.
+  </para>
+
+  <para>
+   While batch mode provides a significant performance boost, writing
+   clients using the batch mode is more complex because it involves
+   managing a queue of pending queries and finding which result
+   corresponds to which query in the queue.
+  </para>
+
+  <sect2 id="libpq-batch-using">
+   <title>Using Batch Mode</title>
+
+   <para>
+    To issue batches the application must switch a connection into batch mode.
+    Enter batch mode with <xref linkend="libpq-PQenterBatchMode"/>
+    or test whether batch mode is active with
+    <xref linkend="libpq-PQbatchStatus"/>.
+    In batch mode, only <link linkend="libpq-async">asynchronous 
operations</link>
+    are permitted, and <literal>COPY</literal> is not recommended as it
+    may trigger failure in batch processing.  Using any synchronous
+    command execution functions such as <function>PQfn</function>,
+    <function>PQexec</function> or one of its sibling functions are error
+    conditions.
+   </para>
+
+   <note>
+    <para>
+     It is best to use batch mode with <application>libpq</application> in
+     <link linkend="libpq-PQsetnonblocking">non-blocking mode</link>. If used
+     in blocking mode it is possible for a client/server deadlock to occur.
+      <footnote>
+       <para>
+        The client will block trying to send queries to the server, but the
+        server will block trying to send results to the client from queries
+        it has already processed. This only occurs when the client sends
+        enough queries to fill its output buffer and the server's receive
+        buffer before switching to processing input from the server,
+        but it's hard to predict exactly when that will happen.
+       </para>
+      </footnote>
+    </para>
+    <para>
+     Batch mode consumes more memory when send/receive is not done as required,
+     even in non-blocking mode.
+    </para>
+   </note>
+
+   <sect3 id="libpq-batch-sending">
+    <title>Issuing Queries</title>
+
+    <para>
+     After entering batch mode the application dispatches requests using
+     normal asynchronous <application>libpq</application> functions such as
+     <function>PQsendQueryParams</function>, 
<function>PQsendPrepare</function>,
+     <function>PQsendQueryPrepared</function>, 
<function>PQsendDescribePortal</function>,
+     <function>PQsendDescribePrepared</function>.
+     The asynchronous requests are followed by a
+     <xref linkend="libpq-PQbatchSendQueue"/>
+     call to mark the end of the batch. The client needs not
+     call <function>PQgetResult</function> immediately after
+     dispatching each operation.
+     <link linkend="libpq-batch-results">Result processing</link>
+     is handled separately.
+    </para>
+
+    <para>
+     The server executes statements, and returns results, in the order the
+     client sends them.  The server may begin executing the batch before all
+     commands in the batch are queued and the end of batch command is sent.
+     If any statement encounters an error the server aborts the current
+     transaction and skips processing the rest of the batch.
+     Query processing resumes after the end of the failed batch.
+    </para>
+
+    <para>
+     It's fine for one operation to depend on the results of a
+     prior one. One query may define a table that the next query in the same
+     batch uses; similarly, an application may create a named prepared 
statement
+     then execute it with later statements in the same batch.
+    </para>
+   </sect3>
+
+   <sect3 id="libpq-batch-results">
+    <title>Processing Results</title>
+
+    <para>
+     To process the result of one batch query, the application calls
+     <function>PQgetResult</function> repeatedly and handles each result
+     until <function>PQgetResult</function> returns null.
+     The result from the next batch query may then be retrieved using
+     <function>PQgetResult</function> again and the cycle repeated.
+     The application handles individual statement results as normal.
+     When the results of all the queries in the batch have been
+     returned, <function>PQgetResult</function> returns a result
+     containing the status value <literal>PGRES_BATCH_END</literal>.
+    </para>
+
+    <para>
+     The client may choose to defer result processing until the complete
+     batch has been sent, or interleave that with sending further batch
+     queries; see <xref linkend="libpq-batch-interleave" />.
+    </para>
+
+    <para>
+     To enter single-row mode, call <function>PQsetSingleRowMode</function>
+     before retrieving results with <function>PQgetResult</function>.
+     This mode selection is effective only for the query currently
+     being processed. For more information on the use of
+     <function>PQsetSingleRowMode</function>,
+     refer to <xref linkend="libpq-single-row-mode"/>.
+    </para>
+
+    <para>
+     <function>PQgetResult</function> behaves the same as for normal
+     asynchronous processing except that it may contain the new
+     <type>PGresult</type> types <literal>PGRES_BATCH_END</literal>
+     and <literal>PGRES_BATCH_ABORTED</literal>.
+     <literal>PGRES_BATCH_END</literal> is reported exactly once for each
+     <function>PQbatchSendQueue</function> call at the corresponding point in
+     the result stream and at no other time.
+     <literal>PGRES_BATCH_ABORTED</literal> is emitted during error handling;
+     see <xref linkend="libpq-batch-errors"/>.
+    </para>
+
+    <para>
+     <function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
+     operate as normal when processing batch results.
+    </para>
+
+    <para>
+     <application>libpq</application> does not provide any information to the
+     application about the query currently being processed (except that
+     <function>PQgetResult</function> returns null to indicate that we start
+     returning the results of next query). The application must keep track
+     of the order in which it sent queries and the expected results.
+     Applications will typically use a state machine or a FIFO queue for this.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-errors">
+    <title>Error Handling</title>
+
+    <para>
+     When a query in a batch causes an <literal>ERROR</literal> the server
+     skips processing all subsequent messages until the end-of-batch message.
+     The open transaction is aborted.
+    </para>
+
+    <para>
+     From the client perspective, after the client gets a
+     <literal>PGRES_FATAL_ERROR</literal> return from
+     <function>PQresultStatus</function> the batch is flagged as aborted.
+     <application>libpq</application> will report
+     <literal>PGRES_BATCH_ABORTED</literal> result for each remaining queued
+     operation in an aborted batch. The result for
+     <function>PQbatchSendQueue</function> is reported as
+     <literal>PGRES_BATCH_END</literal> to signal the end of the aborted batch
+     and resumption of normal result processing.
+    </para>
+
+    <para>
+     The client <emphasis>must</emphasis> process results with
+     <function>PQgetResult</function> during error recovery.
+    </para>
+
+    <para>
+     If the batch used an implicit transaction then operations that have
+     already executed are rolled back and operations that were queued for after
+     the failed operation are skipped entirely. The same behaviour holds if the
+     batch starts and commits a single explicit transaction (i.e. the first
+     statement is <literal>BEGIN</literal> and the last is
+     <literal>COMMIT</literal>) except that the session remains in an aborted
+     transaction state at the end of the batch. If a batch contains <emphasis>
+     multiple explicit transactions</emphasis>, all transactions that committed
+     prior to the error remain committed, the currently in-progress transaction
+     is aborted and all subsequent operations in the current and all later
+     transactions in the same batch are skipped completely.
+    </para>
+
+    <note>
+     <para>
+      The client must not assume that work is committed when it
+      <emphasis>sends</emphasis> a <literal>COMMIT</literal>, only when the
+      corresponding result is received to confirm the commit is complete.
+      Because errors arrive asynchronously the application needs to be able to
+      restart from the last <emphasis>received</emphasis> committed change and
+      resend work done after that point if something goes wrong.
+     </para>
+    </note>
+   </sect3>
+
+   <sect3 id="libpq-batch-interleave">
+    <title>Interleaving Result Processing and Query Dispatch</title>
+
+    <para>
+     To avoid deadlocks on large batches the client should be structured
+     around a non-blocking event loop using operating system facilities
+     such as <function>select</function>, <function>poll</function>,
+     <function>WaitForMultipleObjectEx</function>, etc.
+    </para>
+
+    <para>
+     The client application should generally maintain a queue of work
+     still to be dispatched and a queue of work that has been dispatched
+     but not yet had its results processed. When the socket is writable
+     it should dispatch more work. When the socket is readable it should
+     read results and process them, matching them up to the next entry in
+     its expected results queue.  Based on available memory, results from
+     socket should be read frequently and there's no need to wait till the
+     batch end to read the results.  Batches should be scoped to logical
+     units of work, usually (but not necessarily) one transaction per batch.
+     There's no need to exit batch mode and re-enter it between batches
+     or to wait for one batch to finish before sending the next.
+    </para>
+
+    <para>
+     An example using <function>select()</function> and a simple state
+     machine to track sent and received work is in
+     <filename>src/test/modules/test_libpq/testlibpqbatch.c</filename>
+     in the PostgreSQL source distribution.
+    </para>
+   </sect3>
+
+   <sect3 id="libpq-batch-end">
+    <title>Ending Batch Mode</title>
+
+    <para>
+     Once all dispatched commands have had their results processed and
+     the end batch result has been consumed the application may return
+     to non-batched mode with <xref linkend="libpq-PQexitBatchMode"/>.
+    </para>
+   </sect3>
+  </sect2>
+
+  <sect2 id="libpq-batch-functions">
+   <title>Functions Associated with Batch Mode</title>
+
+   <variablelist>
+
+    <varlistentry id="libpq-PQbatchStatus">
+     <term>
+      <function>PQbatchStatus</function>
+      <indexterm>
+       <primary>PQbatchStatus</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Returns current batch mode status of the <application>libpq</application>
+      connection.
+<synopsis>
+int PQbatchStatus(const PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       <function>PQbatchStatus</function> can return one of the following 
values:
+
+       <variablelist>
+        <varlistentry>
+         <term>
+          <literal>PQBATCH_MODE_ON</literal>
+         </term>
+         <listitem>
+          <para>
+           The <application>libpq</application> connection is in
+           batch mode.
+          </para>
+         </listitem>
+        </varlistentry>
+ 
+        <varlistentry>
+         <term>
+          <literal>PQBATCH_MODE_OFF</literal>
+         </term>
+         <listitem>
+          <para>
+           The <application>libpq</application> connection is
+           <emphasis>not</emphasis> in batch mode.
+          </para>
+         </listitem>
+        </varlistentry>
+ 
+        <varlistentry>
+         <term>
+          <literal>PQBATCH_MODE_ABORTED</literal>
+         </term>
+         <listitem>
+          <para>
+           The <application>libpq</application> connection is in aborted
+           batch status. The aborted flag is cleared as soon as the result
+           of the <function>PQbatchSendQueue</function> at the end of the 
aborted
+           batch is processed. Clients don't usually need this function to
+           verify aborted status, as they can tell that the batch is aborted
+           from the <literal>PGRES_BATCH_ABORTED</literal> result code.
+          </para>
+         </listitem>
+        </varlistentry>
+ 
+       </variablelist>
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQenterBatchMode">
+     <term>
+      <function>PQenterBatchMode</function>
+      <indexterm>
+       <primary>PQenterBatchMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Causes a connection to enter batch mode if it is currently idle or
+      already in batch mode.
+
+<synopsis>
+int PQenterBatchMode(PGconn *conn);
+</synopsis>
+
+      </para>
+      <para>
+       Returns 1 for success.
+       Returns 0 and has no effect if the connection is not currently
+       idle, i.e., it has a result ready, or it is waiting for more
+       input from the server, etc.
+       This function does not actually send anything to the server,
+       it just changes the <application>libpq</application> connection
+       state.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQexitBatchMode">
+     <term>
+      <function>PQexitBatchMode</function>
+      <indexterm>
+       <primary>PQexitBatchMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       Causes a connection to exit batch mode if it is currently in batch mode
+       with an empty queue and no pending results.
+<synopsis>
+int PQexitBatchMode(PGconn *conn);
+</synopsis>
+      </para>
+      <para>
+       Returns 1 for success.  Returns 1 and takes no action if not in
+       batch mode. If the current statement isn't finished processing 
+       or there are results pending for collection with
+       <function>PQgetResult</function>, returns 0 and does nothing.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQbatchSendQueue">
+     <term>
+      <function>PQbatchSendQueue</function>
+      <indexterm>
+       <primary>PQbatchSendQueue</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+       Delimits the end of a set of a batched commands by sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>
+       and flushing the send buffer. The end of a batch serves as
+       the delimiter of an implicit transaction and an error recovery
+       point; see <xref linkend="libpq-batch-errors"/>.
+
+<synopsis>
+int PQbatchSendQueue(PGconn *conn);
+</synopsis>
+      </para>
+      <para>
+       Returns 1 for success. Returns 0 if the connection is not in
+       batch mode or sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>
+       is failed.
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </sect2>
+
+  <sect2 id="libpq-batch-tips">
+   <title>When to Use Batching</title>
+
+   <para>
+    Much like asynchronous query mode, there is no performance disadvantage to
+    using batching and pipelining. It increases client application complexity
+    and extra caution is required to prevent client/server deadlocks, but
+    pipelining can sometimes offer considerable performance improvements.
+   </para>
+
+   <para>
+    Batching is most useful when the server is distant, i.e., network latency
+    (<quote>ping time</quote>) is high, and also when many small operations
+    are being performed in rapid sequence.  There is usually less benefit
+    in using batches when each query takes many multiples of the client/server
+    round-trip time to execute.  A 100-statement operation run on a server
+    300ms round-trip-time away would take 30 seconds in network latency alone
+    without batching; with batching it may spend as little as 0.3s waiting for
+    results from the server.
+   </para>
+
+   <para>
+    Use batches when your application does lots of small
+    <literal>INSERT</literal>, <literal>UPDATE</literal> and
+    <literal>DELETE</literal> operations that can't easily be transformed
+    into operations on sets or into a <literal>COPY</literal> operation.
+   </para>
+
+   <para>
+    Batching is not useful when information from one operation is required by
+    the client to produce the next operation. In such cases, the client
+    must introduce a synchronization point and wait for a full client/server
+    round-trip to get the results it needs. However, it's often possible to
+    adjust the client design to exchange the required information server-side.
+    Read-modify-write cycles are especially good candidates; for example:
+    <programlisting>
+BEGIN;
+SELECT x FROM mytable WHERE id = 42 FOR UPDATE;
+-- result: x=2
+-- client adds 1 to x:
+UPDATE mytable SET x = 3 WHERE id = 42;
+COMMIT;
+    </programlisting>
+    could be much more efficiently done with:
+    <programlisting>
+UPDATE mytable SET x = x + 1 WHERE id = 42;
+    </programlisting>
+   </para>
+
+   <para>
+    Batching is less useful, and more complex, when a single batch contains
+    multiple transactions (see <xref linkend="libpq-batch-errors"/>).
+   </para>
+
+   <note>
+    <para>
+     The batch API was introduced in PostgreSQL 14.0, but clients using
+     the PostgreSQL 14 version of <application>libpq</application> can use
+     batches on server versions 7.4 and newer. Batching works on any server
+     that supports the v3 extended query protocol.
+    </para>
+   </note>
+  </sect2>
+ </sect1>
+
  <sect1 id="libpq-single-row-mode">
   <title>Retrieving Query Results Row-by-Row</title>
 
@@ -4859,6 +5359,17 @@ int PQflush(PGconn *conn);
    Each object should be freed with <xref linkend="libpq-PQclear"/> as usual.
   </para>
 
+  <!-- XXX Isn't this note merely repeating what was said above?
+       I think it should be removed. -->
+  <note>
+    <para>
+     When using batch mode, activate the single-row mode on the current
+     batch query by calling <function>PQsetSingleRowMode</function>
+     before retrieving results with <function>PQgetResult</function>.
+     See <xref linkend="libpq-batch-mode"/> for more information.
+    </para>
+   </note>
+
   <para>
    <variablelist>
     <varlistentry id="libpq-PQsetSingleRowMode">
diff --git a/doc/src/sgml/lobj.sgml b/doc/src/sgml/lobj.sgml
index 6329cf0796..49be8b1dbe 100644
--- a/doc/src/sgml/lobj.sgml
+++ b/doc/src/sgml/lobj.sgml
@@ -130,6 +130,10 @@
     <application>libpq</application> library.
    </para>
 
+   <para>
+    Client applications cannot use these functions while libpq connection is 
in batch mode.
+   </para>
+
    <sect2 id="lo-create">
     <title>Creating a Large Object</title>
 
diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 7180fedd65..921e5dccd9 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -1058,6 +1058,20 @@ pgbench <optional> <replaceable>options</replaceable> 
</optional> <replaceable>d
   </para>
 
   <variablelist>
+   <varlistentry id='pgbench-metacommand-batch'>
+    <term><literal>\beginbatch</literal></term>
+    <term><literal>\endbatch</literal></term>
+
+    <listitem>
+      <para>
+        These commands delimit the start and end of a batch of SQL statements.
+        In a batch, statements are sent to server without waiting for the 
results
+        of previous statements (see <xref linkend="libpq-batch-mode"/>).
+        Batching requires the extended query protocol.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id='pgbench-metacommand-gset'>
     <term>
      <literal>\gset [<replaceable>prefix</replaceable>]</literal>
@@ -1086,6 +1100,12 @@ pgbench <optional> <replaceable>options</replaceable> 
</optional> <replaceable>d
       row, the last value is kept.
      </para>
 
+     <para>
+      <literal>\gset</literal> and <literal>\aset</literal> cannot be used
+      inside a batch section, since query results are not immediately
+      fetched in this mode.
+     </para>
+
      <para>
       The following example puts the final account balance from the first query
       into variable <replaceable>abalance</replaceable>, and fills variables
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c 
b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 24f8b3e42e..9ae67387a5 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -1026,6 +1026,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
                        walres->status = WALRCV_ERROR;
                        walres->err = pchomp(PQerrorMessage(conn->streamConn));
                        break;
+               default:
+               /* This is just to keep compiler quiet */
+                       break;
        }
 
        PQclear(pgres);
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 3057665bbe..5846e9153e 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -351,7 +351,8 @@ typedef enum
         *
         * CSTATE_START_COMMAND starts the execution of a command.  On a SQL
         * command, the command is sent to the server, and we move to
-        * CSTATE_WAIT_RESULT state.  On a \sleep meta-command, the timer is 
set,
+        * CSTATE_WAIT_RESULT state unless in batch mode.
+        * On a \sleep meta-command, the timer is set,
         * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
         * meta-commands are executed immediately.  If the command about to 
start
         * is actually beyond the end of the script, advance to CSTATE_END_TX.
@@ -485,7 +486,9 @@ typedef enum MetaCommand
        META_IF,                                        /* \if */
        META_ELIF,                                      /* \elif */
        META_ELSE,                                      /* \else */
-       META_ENDIF                                      /* \endif */
+       META_ENDIF,                                     /* \endif */
+       META_BEGINBATCH,                        /* \beginbatch */
+       META_ENDBATCH                           /* \endbatch */
 } MetaCommand;
 
 typedef enum QueryMode
@@ -2492,6 +2495,10 @@ getMetaCommand(const char *cmd)
                mc = META_GSET;
        else if (pg_strcasecmp(cmd, "aset") == 0)
                mc = META_ASET;
+       else if (pg_strcasecmp(cmd, "beginbatch") == 0)
+               mc = META_BEGINBATCH;
+       else if (pg_strcasecmp(cmd, "endbatch") == 0)
+               mc = META_ENDBATCH;
        else
                mc = META_NONE;
        return mc;
@@ -2681,11 +2688,24 @@ sendCommand(CState *st, Command *command)
                                if (commands[j]->type != SQL_COMMAND)
                                        continue;
                                preparedStatementName(name, st->use_file, j);
-                               res = PQprepare(st->con, name,
-                                                               
commands[j]->argv[0], commands[j]->argc - 1, NULL);
-                               if (PQresultStatus(res) != PGRES_COMMAND_OK)
-                                       pg_log_error("%s", 
PQerrorMessage(st->con));
-                               PQclear(res);
+                               if (PQbatchStatus(st->con) == PQBATCH_MODE_OFF)
+                               {
+                                       res = PQprepare(st->con, name,
+                                                                       
commands[j]->argv[0], commands[j]->argc - 1, NULL);
+                                       if (PQresultStatus(res) != 
PGRES_COMMAND_OK)
+                                               pg_log_error("%s", 
PQerrorMessage(st->con));
+                                       PQclear(res);
+                               }
+                               else
+                               {
+                                       /*
+                                        * In batch mode, we use asynchronous 
functions. If a server-side
+                                        * error occurs, it will be processed 
later among the other results.
+                                        */
+                                       if (!PQsendPrepare(st->con, name,
+                                                                          
commands[j]->argv[0], commands[j]->argc - 1, NULL))
+                                               pg_log_error("%s", 
PQerrorMessage(st->con));
+                               }
                        }
                        st->prepared[st->use_file] = true;
                }
@@ -2799,6 +2819,12 @@ readCommandResponse(CState *st, MetaCommand meta, char 
*varprefix)
                                /* otherwise the result is simply thrown away 
by PQclear below */
                                break;
 
+                       case PGRES_BATCH_END:
+                               pg_log_debug("client %d batch ending", st->id);
+                               if (PQexitBatchMode(st->con) != 1)
+                                       pg_log_error("client %d failed to exit 
batch mode", st->id);
+                               break;
+
                        default:
                                /* anything else is unexpected */
                                pg_log_error("client %d script %d aborted in 
command %d query %d: %s",
@@ -3057,13 +3083,27 @@ advanceConnectionState(TState *thread, CState *st, 
StatsData *agg)
                                /* Execute the command */
                                if (command->type == SQL_COMMAND)
                                {
+                                       if ((command->meta == META_GSET || 
command->meta == META_ASET)
+                                               && PQbatchStatus(st->con) != 
PQBATCH_MODE_OFF)
+                                       {
+                                               commandFailed(st, "SQL", 
"\\gset and \\aset are not allowed in a batch section");
+                                               st->state = CSTATE_ABORTED;
+                                               break;
+                                       }
+
                                        if (!sendCommand(st, command))
                                        {
                                                commandFailed(st, "SQL", "SQL 
command send failed");
                                                st->state = CSTATE_ABORTED;
                                        }
                                        else
-                                               st->state = CSTATE_WAIT_RESULT;
+                                       {
+                                               /* Wait for results, unless in 
batch mode */
+                                               if (PQbatchStatus(st->con) == 
PQBATCH_MODE_OFF)
+                                                       st->state = 
CSTATE_WAIT_RESULT;
+                                               else
+                                                       st->state = 
CSTATE_END_COMMAND;
+                                       }
                                }
                                else if (command->type == META_COMMAND)
                                {
@@ -3184,6 +3224,7 @@ advanceConnectionState(TState *thread, CState *st, 
StatsData *agg)
                                }
                                break;
 
+
                                /*
                                 * Wait for the current SQL command to complete
                                 */
@@ -3203,7 +3244,14 @@ advanceConnectionState(TState *thread, CState *st, 
StatsData *agg)
                                if (readCommandResponse(st,
                                                                                
sql_script[st->use_file].commands[st->command]->meta,
                                                                                
sql_script[st->use_file].commands[st->command]->varprefix))
-                                       st->state = CSTATE_END_COMMAND;
+                               {
+                                       /*
+                                        * outside of batch mode: stop reading 
results.
+                                        * batch mode: continue reading results 
until an end of batch response.
+                                        */
+                                       if (PQbatchStatus(st->con) != 
PQBATCH_MODE_ON)
+                                               st->state = CSTATE_END_COMMAND;
+                               }
                                else
                                        st->state = CSTATE_ABORTED;
                                break;
@@ -3447,6 +3495,45 @@ executeMetaCommand(CState *st, instr_time *now)
                        return CSTATE_ABORTED;
                }
        }
+       else if (command->meta == META_BEGINBATCH)
+       {
+               /*
+                * In batch mode, we use a workflow based on libpq batch
+                * functions.
+                */
+               if (querymode == QUERY_SIMPLE)
+               {
+                       commandFailed(st, "beginbatch", "cannot use batch mode 
with the simple query protocol");
+                       st->state = CSTATE_ABORTED;
+                       return CSTATE_ABORTED;
+               }
+
+               if (PQbatchStatus(st->con) != PQBATCH_MODE_OFF)
+               {
+                       commandFailed(st, "beginbatch", "already in batch 
mode");
+                       return CSTATE_ABORTED;
+               }
+               if (PQenterBatchMode(st->con) == 0)
+               {
+                       commandFailed(st, "beginbatch", "failed to start a 
batch");
+                       return CSTATE_ABORTED;
+               }
+       }
+       else if (command->meta == META_ENDBATCH)
+       {
+               if (PQbatchStatus(st->con) != PQBATCH_MODE_ON)
+               {
+                       commandFailed(st, "endbatch", "not in batch mode");
+                       return CSTATE_ABORTED;
+               }
+               if (!PQbatchSendQueue(st->con))
+               {
+                       commandFailed(st, "endbatch", "failed to end the 
batch");
+                       return CSTATE_ABORTED;
+               }
+               /* collect pending results before getting out of batch mode */
+               return CSTATE_WAIT_RESULT;
+       }
 
        /*
         * executing the expression or shell command might have taken a
@@ -4686,6 +4773,12 @@ process_backslash_command(PsqlScanState sstate, const 
char *source)
                        syntax_error(source, lineno, my_command->first_line, 
my_command->argv[0],
                                                 "too many arguments", NULL, 
-1);
        }
+       else if (my_command->meta == META_BEGINBATCH || my_command->meta == 
META_ENDBATCH)
+       {
+               if (my_command->argc != 1)
+                       syntax_error(source, lineno, my_command->first_line, 
my_command->argv[0],
+                                                "unexpected argument", NULL, 
-1);
+       }
        else
        {
                /* my_command->meta == META_NONE */
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index bbc1f90481..ca86f55652 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -179,3 +179,7 @@ PQgetgssctx               176
 PQsetSSLKeyPassHook_OpenSSL         177
 PQgetSSLKeyPassHook_OpenSSL         178
 PQdefaultSSLKeyPassHook_OpenSSL     179
+PQenterBatchMode          180
+PQexitBatchMode           181
+PQbatchSendQueue          182
+PQbatchStatus             183
diff --git a/src/interfaces/libpq/fe-connect.c 
b/src/interfaces/libpq/fe-connect.c
index e7781d010f..c673bc405f 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -536,6 +536,23 @@ pqDropConnection(PGconn *conn, bool flushInput)
        }
 }
 
+/*
+ * pqFreeCommandQueue
+ * Free all the entries of PGcommandQueueEntry queue passed.
+ */
+static void
+pqFreeCommandQueue(PGcommandQueueEntry *queue)
+{
+       while (queue != NULL)
+       {
+               PGcommandQueueEntry *cur = queue;
+
+               queue = cur->next;
+               if (cur->query)
+                       free(cur->query);
+               free(cur);
+       }
+}
 
 /*
  *             pqDropServerData
@@ -555,6 +572,7 @@ pqDropServerData(PGconn *conn)
 {
        PGnotify   *notify;
        pgParameterStatus *pstatus;
+       PGcommandQueueEntry *queue;
 
        /* Forget pending notifies */
        notify = conn->notifyHead;
@@ -567,6 +585,14 @@ pqDropServerData(PGconn *conn)
        }
        conn->notifyHead = conn->notifyTail = NULL;
 
+       queue = conn->cmd_queue_head;
+       pqFreeCommandQueue(queue);
+       conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
+
+       queue = conn->cmd_queue_recycle;
+       pqFreeCommandQueue(queue);
+       conn->cmd_queue_recycle = NULL;
+
        /* Reset ParameterStatus data, as well as variables deduced from it */
        pstatus = conn->pstatus;
        while (pstatus != NULL)
@@ -6699,6 +6725,15 @@ PQbackendPID(const PGconn *conn)
        return conn->be_pid;
 }
 
+int
+PQbatchStatus(const PGconn *conn)
+{
+       if (!conn)
+               return false;
+
+       return conn->batch_status;
+}
+
 int
 PQconnectionNeedsPassword(const PGconn *conn)
 {
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index eea0237c3a..93971857d5 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -39,7 +39,9 @@ char     *const pgresStatus[] = {
        "PGRES_NONFATAL_ERROR",
        "PGRES_FATAL_ERROR",
        "PGRES_COPY_BOTH",
-       "PGRES_SINGLE_TUPLE"
+       "PGRES_SINGLE_TUPLE",
+       "PGRES_BATCH_END",
+       "PGRES_BATCH_ABORTED"
 };
 
 /*
@@ -70,6 +72,11 @@ static PGresult *PQexecFinish(PGconn *conn);
 static int     PQsendDescribe(PGconn *conn, char desc_type,
                                                   const char *desc_target);
 static int     check_field_number(const PGresult *res, int field_num);
+static PGcommandQueueEntry *pqMakePipelineCmd(PGconn *conn);
+static void pqAppendPipelineCmd(PGconn *conn, PGcommandQueueEntry *entry);
+static void pqRecyclePipelineCmd(PGconn *conn, PGcommandQueueEntry *entry);
+static int pqBatchProcessQueue(PGconn *conn);
+static int     pqBatchFlush(PGconn *conn);
 
 
 /* ----------------
@@ -1210,7 +1217,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
                conn->next_result = conn->result;
                conn->result = res;
                /* And mark the result ready to return */
-               conn->asyncStatus = PGASYNC_READY;
+               conn->asyncStatus = PGASYNC_READY_MORE;
        }
 
        return 1;
@@ -1233,6 +1240,13 @@ fail:
 int
 PQsendQuery(PGconn *conn, const char *query)
 {
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+       {
+               printfPQExpBuffer(&conn->errorMessage,
+                                                 libpq_gettext("cannot 
PQsendQuery in batch mode, use PQsendQueryParams\n"));
+               return 0;
+       }
+
        if (!PQsendQueryStart(conn))
                return 0;
 
@@ -1263,10 +1277,11 @@ PQsendQuery(PGconn *conn, const char *query)
        conn->last_query = strdup(query);
 
        /*
-        * Give the data a push.  In nonblock mode, don't complain if we're 
unable
-        * to send it all; PQgetResult() will do any additional flushing needed.
+        * Give the data a push (in batch mode, only if we're past the size
+        * threshold).  In nonblock mode, don't complain if we're unable to send
+        * it all; PQgetResult() will do any additional flushing needed.
         */
-       if (pqFlush(conn) < 0)
+       if (pqBatchFlush(conn) < 0)
        {
                /* error message should be set up already */
                return 0;
@@ -1331,6 +1346,10 @@ PQsendPrepare(PGconn *conn,
                          const char *stmtName, const char *query,
                          int nParams, const Oid *paramTypes)
 {
+       PGcommandQueueEntry *pipeCmd = NULL;
+       char      **last_query;
+       PGQueryClass *queryclass;
+
        if (!PQsendQueryStart(conn))
                return 0;
 
@@ -1362,6 +1381,15 @@ PQsendPrepare(PGconn *conn,
                return 0;
        }
 
+       /* Alloc batch memory before doing anything */
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+       {
+               pipeCmd = pqMakePipelineCmd(conn);
+
+               if (pipeCmd == NULL)
+                       return 0;                       /* error msg already 
set */
+       }
+
        /* construct the Parse message */
        if (pqPutMsgStart('P', false, conn) < 0 ||
                pqPuts(stmtName, conn) < 0 ||
@@ -1389,31 +1417,47 @@ PQsendPrepare(PGconn *conn,
                goto sendFailed;
 
        /* construct the Sync message */
-       if (pqPutMsgStart('S', false, conn) < 0 ||
-               pqPutMsgEnd(conn) < 0)
-               goto sendFailed;
+       if (conn->batch_status == PQBATCH_MODE_OFF)
+       {
+               if (pqPutMsgStart('S', false, conn) < 0 ||
+                       pqPutMsgEnd(conn) < 0)
+                       goto sendFailed;
+
+               last_query = &conn->last_query;
+               queryclass = &conn->queryclass;
+       }
+       else
+       {
+               last_query = &pipeCmd->query;
+               queryclass = &pipeCmd->queryclass;
+       }
 
        /* remember we are doing just a Parse */
-       conn->queryclass = PGQUERY_PREPARE;
+       *queryclass = PGQUERY_PREPARE;
 
        /* and remember the query text too, if possible */
        /* if insufficient memory, last_query just winds up NULL */
-       if (conn->last_query)
-               free(conn->last_query);
-       conn->last_query = strdup(query);
+       if (*last_query)
+               free(*last_query);
+       *last_query = strdup(query);
 
        /*
-        * Give the data a push.  In nonblock mode, don't complain if we're 
unable
-        * to send it all; PQgetResult() will do any additional flushing needed.
+        * Give the data a push (in batch mode, only if we're past the size
+        * threshold).  In nonblock mode, don't complain if we're unable to send
+        * it all; PQgetResult() will do any additional flushing needed.
         */
-       if (pqFlush(conn) < 0)
+       if (pqBatchFlush(conn) < 0)
                goto sendFailed;
 
        /* OK, it's launched! */
-       conn->asyncStatus = PGASYNC_BUSY;
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+               pqAppendPipelineCmd(conn, pipeCmd);
+       else
+               conn->asyncStatus = PGASYNC_BUSY;
        return 1;
 
 sendFailed:
+       pqRecyclePipelineCmd(conn, pipeCmd);
        /* error message should be set up already */
        return 0;
 }
@@ -1461,7 +1505,8 @@ PQsendQueryPrepared(PGconn *conn,
 }
 
 /*
- * Common startup code for PQsendQuery and sibling routines
+ * PQsendQueryStart
+ *     Common startup code for PQsendQuery and sibling routines
  */
 static bool
 PQsendQueryStart(PGconn *conn)
@@ -1479,20 +1524,62 @@ PQsendQueryStart(PGconn *conn)
                                                  libpq_gettext("no connection 
to the server\n"));
                return false;
        }
-       /* Can't send while already busy, either. */
-       if (conn->asyncStatus != PGASYNC_IDLE)
+
+       /* Can't send while already busy, either, unless enqueuing for later */
+       if (conn->asyncStatus != PGASYNC_IDLE &&
+               conn->batch_status == PQBATCH_MODE_OFF)
        {
                printfPQExpBuffer(&conn->errorMessage,
                                                  libpq_gettext("another 
command is already in progress\n"));
                return false;
        }
 
-       /* initialize async result-accumulation state */
-       pqClearAsyncResult(conn);
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+       {
+               /*
+                * When enqueuing a message we don't change much of the 
connection
+                * state since it's already in use for the current command. The
+                * connection state will get updated when 
PQbatchQueueProcess(...)
+                * advances to start processing the queued message.
+                *
+                * Just make sure we can safely enqueue given the current 
connection
+                * state. We can enqueue behind another queue item, or behind a
+                * non-queue command (one that sends its own sync), but we can't
+                * enqueue if the connection is in a copy state.
+                */
+               switch (conn->asyncStatus)
+               {
+                       case PGASYNC_QUEUED:
+                       case PGASYNC_READY:
+                       case PGASYNC_READY_MORE:
+                       case PGASYNC_BUSY:
+                               /* ok to queue */
+                               break;
+                       case PGASYNC_COPY_IN:
+                       case PGASYNC_COPY_OUT:
+                       case PGASYNC_COPY_BOTH:
+                               printfPQExpBuffer(&conn->errorMessage,
+                                                                 
libpq_gettext("cannot queue commands during COPY\n"));
+                               return false;
+                               break;
+                       case PGASYNC_IDLE:
+                               printfPQExpBuffer(&conn->errorMessage,
+                                                                 
libpq_gettext("internal error, idle state in batch mode"));
+                               break;
+               }
+       }
+       else
+       {
+               /*
+                * This command's results will come in immediately. Initialize 
async
+                * result-accumulation state
+                */
+               pqClearAsyncResult(conn);
 
-       /* reset single-row processing mode */
-       conn->singleRowMode = false;
+               /* reset single-row processing mode */
+               conn->singleRowMode = false;
 
+       }
        /* ready to send command message */
        return true;
 }
@@ -1516,6 +1603,10 @@ PQsendQueryGuts(PGconn *conn,
                                int resultFormat)
 {
        int                     i;
+       PGcommandQueueEntry *pipeCmd = NULL;
+       char      **last_query;
+       PGQueryClass *queryclass;
+
 
        /* This isn't gonna work on a 2.0 server */
        if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
@@ -1525,6 +1616,23 @@ PQsendQueryGuts(PGconn *conn,
                return 0;
        }
 
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+       {
+               pipeCmd = pqMakePipelineCmd(conn);
+
+               if (pipeCmd == NULL)
+                       return 0;                       /* error msg already 
set */
+
+               last_query = &pipeCmd->query;
+               queryclass = &pipeCmd->queryclass;
+       }
+       else
+       {
+               last_query = &conn->last_query;
+               queryclass = &conn->queryclass;
+       }
+
+
        /*
         * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
         * using specified statement name and the unnamed portal.
@@ -1637,35 +1745,43 @@ PQsendQueryGuts(PGconn *conn,
                pqPutMsgEnd(conn) < 0)
                goto sendFailed;
 
-       /* construct the Sync message */
-       if (pqPutMsgStart('S', false, conn) < 0 ||
-               pqPutMsgEnd(conn) < 0)
-               goto sendFailed;
+       if (conn->batch_status == PQBATCH_MODE_OFF)
+       {
+               /* construct the Sync message */
+               if (pqPutMsgStart('S', false, conn) < 0 ||
+                       pqPutMsgEnd(conn) < 0)
+                       goto sendFailed;
+       }
 
        /* remember we are using extended query protocol */
-       conn->queryclass = PGQUERY_EXTENDED;
+       *queryclass = PGQUERY_EXTENDED;
 
        /* and remember the query text too, if possible */
        /* if insufficient memory, last_query just winds up NULL */
-       if (conn->last_query)
-               free(conn->last_query);
+       if (*last_query)
+               free(*last_query);
        if (command)
-               conn->last_query = strdup(command);
+               *last_query = strdup(command);
        else
-               conn->last_query = NULL;
+               *last_query = NULL;
 
        /*
-        * Give the data a push.  In nonblock mode, don't complain if we're 
unable
-        * to send it all; PQgetResult() will do any additional flushing needed.
+        * Give the data a push (in batch mode, only if we're past the size
+        * threshold).  In nonblock mode, don't complain if we're unable to send
+        * it all; PQgetResult() will do any additional flushing needed.
         */
-       if (pqFlush(conn) < 0)
+       if (pqBatchFlush(conn) < 0)
                goto sendFailed;
 
        /* OK, it's launched! */
-       conn->asyncStatus = PGASYNC_BUSY;
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+               pqAppendPipelineCmd(conn, pipeCmd);
+       else
+               conn->asyncStatus = PGASYNC_BUSY;
        return 1;
 
 sendFailed:
+       pqRecyclePipelineCmd(conn, pipeCmd);
        /* error message should be set up already */
        return 0;
 }
@@ -1766,14 +1882,17 @@ PQisBusy(PGconn *conn)
        return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed;
 }
 
-
 /*
  * PQgetResult
  *       Get the next PGresult produced by a query.  Returns NULL if no
  *       query work remains or an error has occurred (e.g. out of
  *       memory).
+ *
+ *       In batch mode, once all the result of a query have been returned,
+ *       PQgetResult returns NULL to let the user know that the next batched
+ *       query is being processed.  At the end of the batch, returns a
+ *       end-of-batch result with PQresultStatus(result) == PGRES_BATCH_END.
  */
-
 PGresult *
 PQgetResult(PGconn *conn)
 {
@@ -1842,9 +1961,38 @@ PQgetResult(PGconn *conn)
        switch (conn->asyncStatus)
        {
                case PGASYNC_IDLE:
+               case PGASYNC_QUEUED:
                        res = NULL;                     /* query is complete */
+                       if (conn->batch_status != PQBATCH_MODE_OFF)
+                       {
+                               /*
+                                * In batch mode, we prepare the processing of 
the results of
+                                * the next query.
+                                */
+                               pqBatchProcessQueue(conn);
+                       }
                        break;
                case PGASYNC_READY:
+                       res = pqPrepareAsyncResult(conn);
+                       if (conn->batch_status != PQBATCH_MODE_OFF)
+                       {
+                               /*
+                                * In batch mode, query execution state cannot 
be IDLE as
+                                * there can be other queries or results 
waiting in the queue
+                                *
+                                * The connection isn't idle since we can't 
submit new
+                                * nonbatched commands. It isn't also busy 
since the current
+                                * command is done and we need to process a new 
one.
+                                */
+                               conn->asyncStatus = PGASYNC_QUEUED;
+                       }
+                       else
+                       {
+                               /* Set the state back to BUSY, allowing parsing 
to proceed. */
+                               conn->asyncStatus = PGASYNC_BUSY;
+                       }
+                       break;
+               case PGASYNC_READY_MORE:
                        res = pqPrepareAsyncResult(conn);
                        /* Set the state back to BUSY, allowing parsing to 
proceed. */
                        conn->asyncStatus = PGASYNC_BUSY;
@@ -2025,6 +2173,13 @@ PQexecStart(PGconn *conn)
        if (!conn)
                return false;
 
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+       {
+               printfPQExpBuffer(&conn->errorMessage,
+                                                 libpq_gettext("Synchronous 
command execution functions are not allowed in batch mode\n"));
+               return false;
+       }
+
        /*
         * Silently discard any prior query result that application didn't eat.
         * This is probably poor design, but it's here for backward 
compatibility.
@@ -2219,6 +2374,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
 static int
 PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 {
+       PGcommandQueueEntry *pipeCmd = NULL;
+       PGQueryClass *queryclass;
+
        /* Treat null desc_target as empty string */
        if (!desc_target)
                desc_target = "";
@@ -2234,6 +2392,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char 
*desc_target)
                return 0;
        }
 
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+       {
+               pipeCmd = pqMakePipelineCmd(conn);
+
+               if (pipeCmd == NULL)
+                       return 0;                       /* error msg already 
set */
+
+               queryclass = &pipeCmd->queryclass;
+       }
+       else
+               queryclass = &conn->queryclass;
+
        /* construct the Describe message */
        if (pqPutMsgStart('D', false, conn) < 0 ||
                pqPutc(desc_type, conn) < 0 ||
@@ -2242,32 +2412,40 @@ PQsendDescribe(PGconn *conn, char desc_type, const char 
*desc_target)
                goto sendFailed;
 
        /* construct the Sync message */
-       if (pqPutMsgStart('S', false, conn) < 0 ||
-               pqPutMsgEnd(conn) < 0)
-               goto sendFailed;
+       if (conn->batch_status == PQBATCH_MODE_OFF)
+       {
+               if (pqPutMsgStart('S', false, conn) < 0 ||
+                       pqPutMsgEnd(conn) < 0)
+                       goto sendFailed;
+       }
 
        /* remember we are doing a Describe */
-       conn->queryclass = PGQUERY_DESCRIBE;
+       *queryclass = PGQUERY_DESCRIBE;
 
-       /* reset last_query string (not relevant now) */
-       if (conn->last_query)
+       /* reset last-query string (not relevant now) */
+       if (conn->last_query && conn->batch_status != PQBATCH_MODE_OFF)
        {
                free(conn->last_query);
                conn->last_query = NULL;
        }
 
        /*
-        * Give the data a push.  In nonblock mode, don't complain if we're 
unable
-        * to send it all; PQgetResult() will do any additional flushing needed.
+        * Give the data a push (in batch mode, only if we're past the size
+        * threshold).  In nonblock mode, don't complain if we're unable to send
+        * it all; PQgetResult() will do any additional flushing needed.
         */
-       if (pqFlush(conn) < 0)
+       if (pqBatchFlush(conn) < 0)
                goto sendFailed;
 
        /* OK, it's launched! */
-       conn->asyncStatus = PGASYNC_BUSY;
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+               pqAppendPipelineCmd(conn, pipeCmd);
+       else
+               conn->asyncStatus = PGASYNC_BUSY;
        return 1;
 
 sendFailed:
+       pqRecyclePipelineCmd(conn, pipeCmd);
        /* error message should be set up already */
        return 0;
 }
@@ -2665,6 +2843,13 @@ PQfn(PGconn *conn,
        /* clear the error string */
        resetPQExpBuffer(&conn->errorMessage);
 
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+       {
+               printfPQExpBuffer(&conn->errorMessage,
+                                                 libpq_gettext("PQfn not 
allowed in batch mode\n"));
+               return NULL;
+       }
+
        if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE 
||
                conn->result != NULL)
        {
@@ -2685,6 +2870,377 @@ PQfn(PGconn *conn,
                                                           args, nargs);
 }
 
+/* ====== Batch mode support ======== */
+
+/*
+ * PQenterBatchMode
+ *             Put an idle connection in batch mode.
+ *
+ * Returns 1 on success. On failure, errorMessage is set and 0 is returned.
+ *
+ * Commands submitted after this can be pipelined on the connection;
+ * there's no requirement to wait for one to finish before the next is
+ * dispatched.
+ *
+ * Queuing of a new query or syncing during COPY is not allowed.
+ *
+ * A set of commands is terminated by a PQbatchQueueSync. Multiple sets of
+ * batched commands may be sent while in batch mode. Batch mode can be exited
+ * by calling PQexitBatchMode() once all results are processed.
+ *
+ * This doesn't actually send anything on the wire, it just puts libpq
+ * into a state where it can pipeline work.
+ */
+int
+PQenterBatchMode(PGconn *conn)
+{
+       if (!conn)
+               return 0;
+
+       /* succeed with no action if already in batch mode */
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+               return 1;
+
+       if (conn->asyncStatus != PGASYNC_IDLE)
+       {
+               printfPQExpBuffer(&conn->errorMessage,
+                                                 libpq_gettext("cannot enter 
batch mode, connection not idle\n"));
+               return 0;
+       }
+
+       conn->batch_status = PQBATCH_MODE_ON;
+       conn->asyncStatus = PGASYNC_QUEUED;
+
+       return 1;
+}
+
+/*
+ * PQexitBatchMode
+ *             End batch mode and return to normal command mode.
+ *
+ * Returns 1 in success (batch mode was ended, or not in batch mode).
+ *
+ * Returns 0 if in batch mode and cannot be ended yet.
+ * Error message will be set.
+ */
+int
+PQexitBatchMode(PGconn *conn)
+{
+       if (!conn)
+               return 0;
+
+       if (conn->batch_status == PQBATCH_MODE_OFF)
+               return 1;
+
+       switch (conn->asyncStatus)
+       {
+               case PGASYNC_READY:
+               case PGASYNC_READY_MORE:
+                       /* there are some uncollected results */
+                       printfPQExpBuffer(&conn->errorMessage,
+                                                         libpq_gettext("cannot 
exit batch mode with uncollected results\n"));
+                       return 0;
+
+               case PGASYNC_BUSY:
+                       printfPQExpBuffer(&conn->errorMessage,
+                                                         libpq_gettext("cannot 
exit batch mode while busy\n"));
+                       return 0;
+
+               default:
+                       /* OK */
+                       break;
+       }
+
+       /* still work to process */
+       if (conn->cmd_queue_head != NULL)
+       {
+               printfPQExpBuffer(&conn->errorMessage,
+                                                 libpq_gettext("command queue 
not clean"));
+               return 0;
+       }
+
+       conn->batch_status = PQBATCH_MODE_OFF;
+       conn->asyncStatus = PGASYNC_IDLE;
+
+       /* Flush any pending data in out buffer */
+       if (pqFlush(conn) < 0)
+               return 0;       /* error message is setup already */
+       return 1;
+}
+
+/*
+ * internal function pqBatchProcessQueue
+ *
+ * In batch mode, start processing the next query in the queue.
+ *
+ * Returns 1 if the next query was popped from the queue and can
+ * be processed by PQconsumeInput, PQgetResult, etc.
+ *
+ * Returns 0 if the current query isn't done yet, the connection
+ * is not in a batch, or there are no more queries to process.
+ */
+static int
+pqBatchProcessQueue(PGconn *conn)
+{
+       PGcommandQueueEntry *next_query;
+
+       if (!conn)
+               return 0;
+
+       if (conn->batch_status == PQBATCH_MODE_OFF)
+               return 0;
+
+       switch (conn->asyncStatus)
+       {
+               case PGASYNC_COPY_IN:
+               case PGASYNC_COPY_OUT:
+               case PGASYNC_COPY_BOTH:
+                       /* should be unreachable */
+                       printfPQExpBuffer(&conn->errorMessage,
+                                                         "internal error, COPY 
in batch mode");
+                       break;
+               case PGASYNC_READY:
+               case PGASYNC_READY_MORE:
+               case PGASYNC_BUSY:
+                       /* client still has to process current query or results 
*/
+                       return 0;
+                       break;
+               case PGASYNC_IDLE:
+                       /* should be unreachable */
+                       printfPQExpBuffer(&conn->errorMessage,
+                                                         "internal error, IDLE 
in batch mode");
+                       break;
+               case PGASYNC_QUEUED:
+                       /* next query please */
+                       break;
+       }
+
+       if (conn->cmd_queue_head == NULL)
+       {
+               /*
+                * In batch mode but nothing left on the queue; caller can 
submit more
+                * work or PQexitBatchMode() now.
+                */
+               return 0;
+       }
+
+       /*
+        * Pop the next query from the queue and set up the connection state as 
if
+        * it'd just been dispatched from a non-batched call
+        */
+       next_query = conn->cmd_queue_head;
+       conn->cmd_queue_head = next_query->next;
+       next_query->next = NULL;
+
+       /* Initialize async result-accumulation state */
+       pqClearAsyncResult(conn);
+
+       /* reset single-row processing mode XXX why?? */
+       conn->singleRowMode = false;
+
+
+       conn->last_query = next_query->query;
+       next_query->query = NULL;
+       conn->queryclass = next_query->queryclass;
+
+       pqRecyclePipelineCmd(conn, next_query);
+
+       if (conn->batch_status == PQBATCH_MODE_ABORTED &&
+               conn->queryclass != PGQUERY_SYNC)
+       {
+               /*
+                * In an aborted batch we don't get anything from the server 
for each
+                * result; we're just discarding input until we get to the next 
sync
+                * from the server. The client needs to know its queries got 
aborted
+                * so we create a fake PGresult to return immediately from
+                * PQgetResult.
+                */
+               conn->result =
+                       PQmakeEmptyPGresult(conn, PGRES_BATCH_ABORTED);
+               if (!conn->result)
+               {
+                       printfPQExpBuffer(&conn->errorMessage,
+                                                         libpq_gettext("out of 
memory"));
+                       pqSaveErrorResult(conn);
+                       return 0;
+               }
+               conn->asyncStatus = PGASYNC_READY;
+       }
+       else
+       {
+               /* allow parsing to continue */
+               conn->asyncStatus = PGASYNC_BUSY;
+       }
+
+       return 1;
+}
+
+/*
+ * PQbatchSendQueue
+ *             End a batch submission.
+ *
+ * It's legal to start submitting another batch immediately, without
+ * waiting for the results of the current batch. There's no need to end batch
+ * mode and start it again.
+ *
+ * If a command in a batch fails, every subsequent command up to and
+ * including the PQbatchQueueSync command result gets set to 
PGRES_BATCH_ABORTED
+ * state. If the whole batch is processed without error, a PGresult with
+ * PGRES_BATCH_END is produced.
+ *
+ * Queries can already have been sent before PQbatchSendQueue is called, but
+ * PQbatchSendQueue need to be called before retrieving command results.
+ *
+ * The connection will remain in batch mode and unavailable for new synchronous
+ * command execution functions until all results from the batch are processed
+ * by the client.
+ */
+int
+PQbatchSendQueue(PGconn *conn)
+{
+       PGcommandQueueEntry *entry;
+
+       if (!conn)
+               return 0;
+
+       if (conn->batch_status == PQBATCH_MODE_OFF)
+       {
+               printfPQExpBuffer(&conn->errorMessage,
+                                                 libpq_gettext("cannot send 
batch when not in batch mode\n"));
+               return 0;
+       }
+
+       switch (conn->asyncStatus)
+       {
+               case PGASYNC_IDLE:
+                       /* should be unreachable */
+                       printfPQExpBuffer(&conn->errorMessage,
+                                                         "internal error: 
cannot send batch when idle\n");
+                       return 0;
+                       break;
+               case PGASYNC_COPY_IN:
+               case PGASYNC_COPY_OUT:
+               case PGASYNC_COPY_BOTH:
+                       /* should be unreachable */
+                       printfPQExpBuffer(&conn->errorMessage,
+                                                         "internal error: 
cannot send batch while in COPY\n");
+                       return 0;
+                       break;
+               case PGASYNC_READY:
+               case PGASYNC_READY_MORE:
+               case PGASYNC_BUSY:
+               case PGASYNC_QUEUED:
+                       /* can send sync to end this batch of cmds */
+                       break;
+       }
+
+       entry = pqMakePipelineCmd(conn);
+       if (entry == NULL)
+               return 0;                               /* error msg already 
set */
+
+       entry->queryclass = PGQUERY_SYNC;
+       entry->query = NULL;
+
+       /* construct the Sync message */
+       if (pqPutMsgStart('S', false, conn) < 0 ||
+               pqPutMsgEnd(conn) < 0)
+               goto sendFailed;
+
+       pqAppendPipelineCmd(conn, entry);
+
+       /*
+        * Give the data a push.  In nonblock mode, don't complain if we're 
unable
+        * to send it all; PQgetResult() will do any additional flushing needed.
+        */
+       if (PQflush(conn) < 0)
+               goto sendFailed;
+
+       /*
+        * Call pqBatchProcessQueue so the user can call start calling 
getResult.
+        */
+       pqBatchProcessQueue(conn);
+
+       return 1;
+
+sendFailed:
+       pqRecyclePipelineCmd(conn, entry);
+       /* error message should be set up already */
+       return 0;
+}
+
+/*
+ * pqMakePipelineCmd
+ *             Get a command queue entry for caller to fill.
+ *
+ * If the recycle queue has a free element, that is returned; if not, a
+ * fresh one is allocated.  Caller is responsible for adding it to the
+ * command queue (pqAppendPipelineCmd) once the struct is filled in, or
+ * releasing the memory (pqRecyclePipelineCmd) if an error occurs.
+ *
+ * If allocation fails, sets the error message and returns NULL.
+ */
+static PGcommandQueueEntry *
+pqMakePipelineCmd(PGconn *conn)
+{
+       PGcommandQueueEntry *entry;
+
+       if (conn->cmd_queue_recycle == NULL)
+       {
+               entry = (PGcommandQueueEntry *) 
malloc(sizeof(PGcommandQueueEntry));
+               if (entry == NULL)
+               {
+                       printfPQExpBuffer(&conn->errorMessage,
+                                                         libpq_gettext("out of 
memory\n"));
+                       return NULL;
+               }
+       }
+       else
+       {
+               entry = conn->cmd_queue_recycle;
+               conn->cmd_queue_recycle = entry->next;
+       }
+       entry->next = NULL;
+       entry->query = NULL;
+
+       return entry;
+}
+
+/*
+ * pqAppendPipelineCmd
+ *             Append a precreated command queue entry to the queue after it's 
been
+ *             sent successfully.
+ */
+static void
+pqAppendPipelineCmd(PGconn *conn, PGcommandQueueEntry *entry)
+{
+       if (conn->cmd_queue_head == NULL)
+               conn->cmd_queue_head = entry;
+       else
+               conn->cmd_queue_tail->next = entry;
+       conn->cmd_queue_tail = entry;
+}
+
+/*
+ * pqRecyclePipelineCmd
+ *             Push a command queue entry onto the freelist. It must be an 
entry
+ *             with null next pointer and not referenced by any other entry's 
next
+ *             pointer.
+ */
+static void
+pqRecyclePipelineCmd(PGconn *conn, PGcommandQueueEntry *entry)
+{
+       if (entry == NULL)
+               return;
+
+       Assert(entry->next == NULL);
+
+       if (entry->query)
+               free(entry->query);
+
+       entry->next = conn->cmd_queue_recycle;
+       conn->cmd_queue_recycle = entry;
+}
+
 
 /* ====== accessor funcs for PGresult ======== */
 
@@ -3285,6 +3841,23 @@ PQflush(PGconn *conn)
        return pqFlush(conn);
 }
 
+/*
+ * pqBatchFlush
+ *
+ * In batch mode, data will be flushed only when the out buffer reaches the
+ * threshold value.  In non-batch mode, data will be flushed all the time.
+ *
+ * Returns 0 on success.
+ */
+static int
+pqBatchFlush(PGconn *conn)
+{
+       if ((conn->batch_status == PQBATCH_MODE_OFF) ||
+               (conn->outCount >= OUTBUFFER_THRESHOLD))
+               return (pqFlush(conn));
+       return 0;
+}
+
 
 /*
  *             PQfreemem - safely frees memory allocated
diff --git a/src/interfaces/libpq/fe-protocol2.c 
b/src/interfaces/libpq/fe-protocol2.c
index 9360c541be..2ff3fa4883 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -406,6 +406,12 @@ pqParseInput2(PGconn *conn)
 {
        char            id;
 
+       if (conn->batch_status != PQBATCH_MODE_OFF)
+       {
+               fprintf(stderr, "internal error, attempt to read v2 protocol in 
batch mode");
+               abort();
+       }
+
        /*
         * Loop to parse successive complete messages available in the buffer.
         */
diff --git a/src/interfaces/libpq/fe-protocol3.c 
b/src/interfaces/libpq/fe-protocol3.c
index 1696525475..da38e6aed1 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -217,10 +217,19 @@ pqParseInput3(PGconn *conn)
                                                return;
                                        conn->asyncStatus = PGASYNC_READY;
                                        break;
-                               case 'Z':               /* backend is ready for 
new query */
+                               case 'Z':               /* sync response, 
backend is ready for new
+                                                                * query */
                                        if (getReadyForQuery(conn))
                                                return;
-                                       conn->asyncStatus = PGASYNC_IDLE;
+                                       if (conn->batch_status != 
PQBATCH_MODE_OFF)
+                                       {
+                                               conn->batch_status = 
PQBATCH_MODE_ON;
+                                               conn->result = 
PQmakeEmptyPGresult(conn,
+                                                                               
                                   PGRES_BATCH_END);
+                                               conn->asyncStatus = 
PGASYNC_READY;
+                                       }
+                                       else
+                                               conn->asyncStatus = 
PGASYNC_IDLE;
                                        break;
                                case 'I':               /* empty query */
                                        if (conn->result == NULL)
@@ -875,6 +884,9 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
        PQExpBufferData workBuf;
        char            id;
 
+       if (isError && conn->batch_status != PQBATCH_MODE_OFF)
+               conn->batch_status = PQBATCH_MODE_ABORTED;
+
        /*
         * If this is an error message, pre-emptively clear any incomplete query
         * result we may have.  We'd just throw it away below anyway, and
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 3b6a9fbce3..fcdd887958 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -97,7 +97,10 @@ typedef enum
        PGRES_NONFATAL_ERROR,           /* notice or warning message */
        PGRES_FATAL_ERROR,                      /* query failed */
        PGRES_COPY_BOTH,                        /* Copy In/Out data transfer in 
progress */
-       PGRES_SINGLE_TUPLE                      /* single tuple from larger 
resultset */
+       PGRES_SINGLE_TUPLE,                     /* single tuple from larger 
resultset */
+       PGRES_BATCH_END,                        /* end of a batch of commands */
+       PGRES_BATCH_ABORTED,            /* Command didn't run because of an 
abort
+                                                                * earlier in a 
batch */
 } ExecStatusType;
 
 typedef enum
@@ -137,6 +140,17 @@ typedef enum
        PQPING_NO_ATTEMPT                       /* connection not attempted 
(bad params) */
 } PGPing;
 
+/*
+ * PQBatchStatus - Current status of batch mode
+ */
+
+typedef enum
+{
+       PQBATCH_MODE_OFF,
+       PQBATCH_MODE_ON,
+       PQBATCH_MODE_ABORTED
+} PQBatchStatus;
+
 /* PGconn encapsulates a connection to the backend.
  * The contents of this struct are not supposed to be known to applications.
  */
@@ -328,6 +342,7 @@ extern int  PQserverVersion(const PGconn *conn);
 extern char *PQerrorMessage(const PGconn *conn);
 extern int     PQsocket(const PGconn *conn);
 extern int     PQbackendPID(const PGconn *conn);
+extern int     PQbatchStatus(const PGconn *conn);
 extern int     PQconnectionNeedsPassword(const PGconn *conn);
 extern int     PQconnectionUsedPassword(const PGconn *conn);
 extern int     PQclientEncoding(const PGconn *conn);
@@ -435,6 +450,11 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int     PQisBusy(PGconn *conn);
 extern int     PQconsumeInput(PGconn *conn);
 
+/* Routines for batch mode management */
+extern int     PQenterBatchMode(PGconn *conn);
+extern int     PQexitBatchMode(PGconn *conn);
+extern int     PQbatchSendQueue(PGconn *conn);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 1de91ae295..d2b26f2299 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -217,10 +217,15 @@ typedef enum
 {
        PGASYNC_IDLE,                           /* nothing's happening, dude */
        PGASYNC_BUSY,                           /* query in progress */
-       PGASYNC_READY,                          /* result ready for PQgetResult 
*/
+       PGASYNC_READY,                          /* query done, waiting for 
client to fetch
+                                                                * result */
+       PGASYNC_READY_MORE,                     /* query done, waiting for 
client to fetch
+                                                                * result, More 
results expected from this
+                                                                * query */
        PGASYNC_COPY_IN,                        /* Copy In data transfer in 
progress */
        PGASYNC_COPY_OUT,                       /* Copy Out data transfer in 
progress */
-       PGASYNC_COPY_BOTH                       /* Copy In/Out data transfer in 
progress */
+       PGASYNC_COPY_BOTH,                      /* Copy In/Out data transfer in 
progress */
+       PGASYNC_QUEUED                          /* Current query done, more in 
queue */
 } PGAsyncStatusType;
 
 /* PGQueryClass tracks which query protocol we are now executing */
@@ -229,7 +234,8 @@ typedef enum
        PGQUERY_SIMPLE,                         /* simple Query protocol 
(PQexec) */
        PGQUERY_EXTENDED,                       /* full Extended protocol 
(PQexecParams) */
        PGQUERY_PREPARE,                        /* Parse only (PQprepare) */
-       PGQUERY_DESCRIBE                        /* Describe Statement or Portal 
*/
+       PGQUERY_DESCRIBE,                       /* Describe Statement or Portal 
*/
+       PGQUERY_SYNC                            /* A protocol sync to end a 
batch */
 } PGQueryClass;
 
 /* PGSetenvStatusType defines the state of the pqSetenv state machine */
@@ -301,6 +307,22 @@ typedef enum pg_conn_host_type
        CHT_UNIX_SOCKET
 } pg_conn_host_type;
 
+/* An entry in the pending command queue. Used by batch mode to keep track
+ * of the expected results of future commands we've dispatched.
+ *
+ * Note that entries in this list are reused by being zeroed and appended to
+ * the tail when popped off the head. The entry with null next pointer is not
+ * the end of the list of expected commands, that's the tail pointer in
+ * pg_conn.
+ */
+typedef struct pgCommandQueueEntry
+{
+       PGQueryClass queryclass;        /* Query type; PGQUERY_SYNC for sync 
msg */
+       char       *query;                      /* SQL command, or NULL if 
unknown */
+       struct pgCommandQueueEntry *next;
+} PGcommandQueueEntry;
+
+
 /*
  * pg_conn_host stores all information about each of possibly several hosts
  * mentioned in the connection string.  Most fields are derived by splitting
@@ -394,6 +416,7 @@ struct pg_conn
        bool            options_valid;  /* true if OK to attempt connection */
        bool            nonblocking;    /* whether this connection is using 
nonblock
                                                                 * sending 
semantics */
+       PQBatchStatus batch_status; /* Batch(pipelining) mode status of 
connection */
        bool            singleRowMode;  /* return current query result 
row-by-row? */
        char            copy_is_binary; /* 1 = copy binary, 0 = copy text */
        int                     copy_already_done;      /* # bytes already 
returned in COPY OUT */
@@ -406,6 +429,16 @@ struct pg_conn
        pg_conn_host *connhost;         /* details about each named host */
        char       *connip;                     /* IP address for current 
network connection */
 
+       /*
+        * The command queue
+        *
+        * head is the next pending cmd, tail is where we append new commands.
+        * Freed entries for recycling go on the recycle linked list.
+        */
+       PGcommandQueueEntry *cmd_queue_head;
+       PGcommandQueueEntry *cmd_queue_tail;
+       PGcommandQueueEntry *cmd_queue_recycle;
+
        /* Connection data */
        pgsocket        sock;                   /* FD for socket, 
PGINVALID_SOCKET if
                                                                 * unconnected 
*/
@@ -798,6 +831,11 @@ extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t 
len);
  */
 #define pqIsnonblocking(conn)  ((conn)->nonblocking)
 
+/*
+ * Connection's outbuffer threshold.
+ */
+#define OUTBUFFER_THRESHOLD    65536
+
 #ifdef ENABLE_NLS
 extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1);
 extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, 
unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2);
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index a6d2ffbf9e..287214c544 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -17,6 +17,7 @@ SUBDIRS = \
                  test_extensions \
                  test_ginpostinglist \
                  test_integerset \
+                 test_libpq \
                  test_misc \
                  test_parser \
                  test_pg_dump \
diff --git a/src/test/modules/test_libpq/.gitignore 
b/src/test/modules/test_libpq/.gitignore
new file mode 100644
index 0000000000..11e8463984
--- /dev/null
+++ b/src/test/modules/test_libpq/.gitignore
@@ -0,0 +1,5 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
+/testlibpqbatch
diff --git a/src/test/modules/test_libpq/Makefile 
b/src/test/modules/test_libpq/Makefile
new file mode 100644
index 0000000000..6d3a0ea4d5
--- /dev/null
+++ b/src/test/modules/test_libpq/Makefile
@@ -0,0 +1,20 @@
+# src/test/modules/test_libpq/Makefile
+
+PROGRAM = testlibpqbatch
+OBJS = testlibpqbatch.o
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS += $(libpq)
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_libpq
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_libpq/README 
b/src/test/modules/test_libpq/README
new file mode 100644
index 0000000000..d8174dd579
--- /dev/null
+++ b/src/test/modules/test_libpq/README
@@ -0,0 +1 @@
+Test programs and libraries for libpq
diff --git a/src/test/modules/test_libpq/t/001_libpq_async.pl 
b/src/test/modules/test_libpq/t/001_libpq_async.pl
new file mode 100644
index 0000000000..0b27896a2a
--- /dev/null
+++ b/src/test/modules/test_libpq/t/001_libpq_async.pl
@@ -0,0 +1,26 @@
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 6;
+use Cwd;
+
+my $node = get_new_node('main');
+$node->init;
+$node->start;
+
+my $numrows = 10000;
+my @tests =
+  qw(disallowed_in_batch simple_batch multi_batch batch_abort
+  batch_insert singlerow);
+$ENV{PATH} = "$ENV{PATH}:" . getcwd();
+for my $testname (@tests)
+{
+       $node->command_ok(
+               [ 'testlibpqbatch', $testname, $node->connstr('postgres'), 
$numrows ],
+               "testlibpqbatch $testname");
+}
+
+$node->stop('fast');
diff --git a/src/test/modules/test_libpq/testlibpqbatch.c 
b/src/test/modules/test_libpq/testlibpqbatch.c
new file mode 100644
index 0000000000..7157e6cb90
--- /dev/null
+++ b/src/test/modules/test_libpq/testlibpqbatch.c
@@ -0,0 +1,963 @@
+/*
+ * src/test/modules/test_libpq/testlibpqbatch.c
+ *             Verify libpq batch execution functionality
+ */
+#include "postgres_fe.h"
+
+#include <sys/time.h>
+
+#include "catalog/pg_type_d.h"
+#include "common/fe_memutils.h"
+#include "libpq-fe.h"
+#include "portability/instr_time.h"
+
+
+static void exit_nicely(PGconn *conn);
+
+const char *const progname = "testlibpqbatch";
+
+
+#define DEBUG
+#ifdef DEBUG
+#define        pg_debug(...)  do { fprintf(stderr, __VA_ARGS__); } while (0)
+#else
+#define pg_debug(...)
+#endif
+
+static const char *const drop_table_sql =
+"DROP TABLE IF EXISTS batch_demo";
+static const char *const create_table_sql =
+"CREATE UNLOGGED TABLE batch_demo(id serial primary key, itemno integer);";
+static const char *const insert_sql =
+"INSERT INTO batch_demo(itemno) VALUES ($1);";
+
+/* max char length of an int32, plus sign and null terminator */
+#define MAXINTLEN 12
+
+static void
+exit_nicely(PGconn *conn)
+{
+       PQfinish(conn);
+       exit(1);
+}
+
+/*
+ * Print an error to stderr and terminate the program.
+ */
+#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
+static void
+pg_fatal_impl(int line, const char *fmt,...)
+{
+       va_list         args;
+
+       fprintf(stderr, "\n");          /* XXX hack */
+       fprintf(stderr, "%s:%d: ", progname, line);
+
+       va_start(args, fmt);
+       vfprintf(stderr, fmt, args);
+       va_end(args);
+       printf("Failure, exiting\n");
+       exit(1);
+}
+
+static void
+test_disallowed_in_batch(PGconn *conn)
+{
+       PGresult   *res = NULL;
+
+       fprintf(stderr, "test error cases... ");
+
+       if (PQisnonblocking(conn))
+               pg_fatal("Expected blocking connection mode\n");
+
+       if (PQenterBatchMode(conn) != 1)
+               pg_fatal("Unable to enter batch mode\n");
+
+       if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+               pg_fatal("Batch mode not activated properly\n");
+
+       /* PQexec should fail in batch mode */
+       res = PQexec(conn, "SELECT 1");
+       if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+               pg_fatal("PQexec should fail in batch mode but succeeded\n");
+
+       /* So should PQsendQuery */
+       if (PQsendQuery(conn, "SELECT 1") != 0)
+               pg_fatal("PQsendQuery should fail in batch mode but 
succeeded\n");
+
+       /* Entering batch mode when already in batch mode is OK */
+       if (PQenterBatchMode(conn) != 1)
+               pg_fatal("re-entering batch mode should be a no-op but 
failed\n");
+
+       if (PQisBusy(conn) != 0)
+               pg_fatal("PQisBusy should return 0 when idle in batch, returned 
1\n");
+
+       /* ok, back to normal command mode */
+       if (PQexitBatchMode(conn) != 1)
+               pg_fatal("couldn't exit idle empty batch mode\n");
+
+       if (PQbatchStatus(conn) != PQBATCH_MODE_OFF)
+               pg_fatal("Batch mode not terminated properly\n");
+
+       /* exiting batch mode when not in batch mode should be a no-op */
+       if (PQexitBatchMode(conn) != 1)
+               pg_fatal("batch mode exit when not in batch mode should succeed 
but failed\n");
+
+       /* can now PQexec again */
+       res = PQexec(conn, "SELECT 1");
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+               pg_fatal("PQexec should succeed after exiting batch mode but 
failed with: %s\n",
+                                PQerrorMessage(conn));
+
+       fprintf(stderr, "ok\n");
+}
+
+static void
+test_simple_batch(PGconn *conn)
+{
+       PGresult   *res = NULL;
+       const char *dummy_params[1] = {"1"};
+       Oid                     dummy_param_oids[1] = {INT4OID};
+
+       fprintf(stderr, "simple batch... ");
+
+       /*
+        * Enter batch mode and dispatch a set of operations, which we'll then
+        * process the results of as they come in.
+        *
+        * For a simple case we should be able to do this without interim
+        * processing of results since our out buffer will give us enough slush 
to
+        * work with and we won't block on sending. So blocking mode is fine.
+        */
+       if (PQisnonblocking(conn))
+               pg_fatal("Expected blocking connection mode\n");
+
+       if (PQenterBatchMode(conn) != 1)
+               pg_fatal("failed to enter batch mode: %s\n", 
PQerrorMessage(conn));
+
+       if (PQsendQueryParams(conn, "SELECT $1",
+                                                 1, dummy_param_oids, 
dummy_params,
+                                                 NULL, NULL, 0) != 1)
+               pg_fatal("dispatching SELECT failed: %s\n", 
PQerrorMessage(conn));
+
+       if (PQexitBatchMode(conn) != 0)
+               pg_fatal("exiting batch mode with work in progress should fail, 
but succeeded\n");
+
+       if (PQbatchSendQueue(conn) != 1)
+               pg_fatal("Ending a batch failed: %s\n", PQerrorMessage(conn));
+
+       res = PQgetResult(conn);
+       if (res == NULL)
+               pg_fatal("PQgetResult returned null when there's a batch item: 
%s\n",
+                                PQerrorMessage(conn));
+
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+               pg_fatal("Unexpected result code %s from first batch item\n",
+                                PQresStatus(PQresultStatus(res)));
+
+       PQclear(res);
+       res = NULL;
+
+       if (PQgetResult(conn) != NULL)
+               pg_fatal("PQgetResult returned something extra after first 
query result.\n");
+
+       /*
+        * Even though we've processed the result there's still a sync to come 
and
+        * we can't exit batch mode yet
+        */
+       if (PQexitBatchMode(conn) != 0)
+               pg_fatal("exiting batch mode after query but before sync 
succeeded incorrectly\n");
+
+       res = PQgetResult(conn);
+       if (res == NULL)
+               pg_fatal("PQgetResult returned null when sync result 
PGRES_BATCH_END expected: %s\n",
+                                PQerrorMessage(conn));
+
+       if (PQresultStatus(res) != PGRES_BATCH_END)
+               pg_fatal("Unexpected result code %s instead of sync result, 
error: %s\n",
+                                PQresStatus(PQresultStatus(res)), 
PQerrorMessage(conn));
+
+       PQclear(res);
+       res = NULL;
+
+       if (PQgetResult(conn) != NULL)
+               pg_fatal("PQgetResult returned something extra after end batch 
call\n");
+
+       /* We're still in a batch... */
+       if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+               pg_fatal("Fell out of batch mode somehow\n");
+
+       /* ... until we end it, which we can safely do now */
+       if (PQexitBatchMode(conn) != 1)
+               pg_fatal("attempt to exit batch mode failed when it should've 
succeeded: %s\n",
+                                PQerrorMessage(conn));
+
+       if (PQbatchStatus(conn) != PQBATCH_MODE_OFF)
+               pg_fatal("Exiting batch mode didn't seem to work\n");
+
+       fprintf(stderr, "ok\n");
+}
+
+static void
+test_multi_batch(PGconn *conn)
+{
+       PGresult   *res = NULL;
+       const char *dummy_params[1] = {"1"};
+       Oid                     dummy_param_oids[1] = {INT4OID};
+
+       fprintf(stderr, "multi batch... ");
+
+       /*
+        * Queue up a couple of small batches and process each without returning
+        * to command mode first.
+        */
+       if (PQenterBatchMode(conn) != 1)
+               pg_fatal("failed to enter batch mode: %s\n", 
PQerrorMessage(conn));
+
+       if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+                                                 dummy_params, NULL, NULL, 0) 
!= 1)
+               pg_fatal("dispatching first SELECT failed: %s\n", 
PQerrorMessage(conn));
+
+       if (PQbatchSendQueue(conn) != 1)
+               pg_fatal("Ending first batch failed: %s\n", 
PQerrorMessage(conn));
+
+       if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+                                                 dummy_params, NULL, NULL, 0) 
!= 1)
+               pg_fatal("dispatching second SELECT failed: %s\n", 
PQerrorMessage(conn));
+
+       if (PQbatchSendQueue(conn) != 1)
+               pg_fatal("Ending second batch failed: %s\n", 
PQerrorMessage(conn));
+
+       /* OK, start processing the batch results */
+       res = PQgetResult(conn);
+       if (res == NULL)
+               pg_fatal("PQgetResult returned null when there's a batch item: 
%s\n",
+                                PQerrorMessage(conn));
+
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+               pg_fatal("Unexpected result code %s from first batch item\n",
+                                PQresStatus(PQresultStatus(res)));
+       PQclear(res);
+       res = NULL;
+
+       if (PQgetResult(conn) != NULL)
+               pg_fatal("PQgetResult returned something extra after first 
result\n");
+
+       if (PQexitBatchMode(conn) != 0)
+               pg_fatal("exiting batch mode after query but before sync 
succeeded incorrectly\n");
+
+       res = PQgetResult(conn);
+       if (res == NULL)
+               pg_fatal("PQgetResult returned null when sync result expected: 
%s\n",
+                                PQerrorMessage(conn));
+
+       if (PQresultStatus(res) != PGRES_BATCH_END)
+               pg_fatal("Unexpected result code %s instead of sync result, 
error: %s\n",
+                                PQresStatus(PQresultStatus(res)), 
PQerrorMessage(conn));
+
+       PQclear(res);
+
+       res = PQgetResult(conn);
+       if (res != NULL)
+               pg_fatal("Expected null result, got %s\n",
+                                PQresStatus(PQresultStatus(res)));
+
+       /* second batch */
+
+       res = PQgetResult(conn);
+       if (res == NULL)
+               pg_fatal("PQgetResult returned null when there's a batch item: 
%s\n",
+                                PQerrorMessage(conn));
+
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+               pg_fatal("Unexpected result code %s from second batch item\n",
+                                PQresStatus(PQresultStatus(res)));
+
+       res = PQgetResult(conn);
+       if (res != NULL)
+               pg_fatal("Expected null result, got %s\n",
+                                PQresStatus(PQresultStatus(res)));
+
+       res = PQgetResult(conn);
+       if (res == NULL)
+               pg_fatal("PQgetResult returned null when there's a batch item: 
%s\n",
+                                PQerrorMessage(conn));
+
+       if (PQresultStatus(res) != PGRES_BATCH_END)
+               pg_fatal("Unexpected result code %s from second end batch\n",
+                                PQresStatus(PQresultStatus(res)));
+
+       /* We're still in a batch... */
+       if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+               pg_fatal("Fell out of batch mode somehow\n");
+
+       /* until we end it, which we can safely do now */
+       if (PQexitBatchMode(conn) != 1)
+               pg_fatal("attempt to exit batch mode failed when it should've 
succeeded: %s\n",
+                                PQerrorMessage(conn));
+
+       if (PQbatchStatus(conn) != PQBATCH_MODE_OFF)
+               pg_fatal("exiting batch mode didn't seem to work\n");
+
+       fprintf(stderr, "ok\n");
+}
+
+/*
+ * When an operation in a batch fails the rest of the batch is flushed. We
+ * still have to get results for each batch item, but the item will just be
+ * a PGRES_BATCH_ABORTED code.
+ *
+ * This intentionally doesn't use a transaction to wrap the batch. You should
+ * usually use an xact, but in this case we want to observe the effects of each
+ * statement.
+ */
+static void
+test_batch_abort(PGconn *conn)
+{
+       PGresult   *res = NULL;
+       const char *dummy_params[1] = {"1"};
+       Oid                     dummy_param_oids[1] = {INT4OID};
+       int                     i;
+
+       fprintf(stderr, "aborted batch... ");
+
+       res = PQexec(conn, drop_table_sql);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pg_fatal("dispatching DROP TABLE failed: %s\n", 
PQerrorMessage(conn));
+
+       res = PQexec(conn, create_table_sql);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pg_fatal("dispatching CREATE TABLE failed: %s\n", 
PQerrorMessage(conn));
+
+       /*
+        * Queue up a couple of small batches and process each without returning
+        * to command mode first. Make sure the second operation in the first
+        * batch ERRORs.
+        */
+       if (PQenterBatchMode(conn) != 1)
+               pg_fatal("failed to enter batch mode: %s\n", 
PQerrorMessage(conn));
+
+       dummy_params[0] = "1";
+       if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+                                                 dummy_params, NULL, NULL, 0) 
!= 1)
+               pg_fatal("dispatching first insert failed: %s\n", 
PQerrorMessage(conn));
+
+       if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
+                                                 1, dummy_param_oids, 
dummy_params,
+                                                 NULL, NULL, 0) != 1)
+               pg_fatal("dispatching error select failed: %s\n", 
PQerrorMessage(conn));
+
+       dummy_params[0] = "2";
+       if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+                                                 dummy_params, NULL, NULL, 0) 
!= 1)
+               pg_fatal("dispatching second insert failed: %s\n", 
PQerrorMessage(conn));
+
+       if (PQbatchSendQueue(conn) != 1)
+               pg_fatal("Sending first batch failed: %s\n", 
PQerrorMessage(conn));
+
+       dummy_params[0] = "3";
+       if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+                                                 dummy_params, NULL, NULL, 0) 
!= 1)
+               pg_fatal("dispatching second-batch insert failed: %s\n",
+                                PQerrorMessage(conn));
+
+       if (PQbatchSendQueue(conn) != 1)
+               pg_fatal("Ending second batch failed: %s\n", 
PQerrorMessage(conn));
+
+       /*
+        * OK, start processing the batch results.
+        *
+        * We should get a command-ok for the first query, then a fatal error 
and
+        * a batch aborted message for the second insert, a batch-end, then a
+        * command-ok and a batch-ok for the second batch operation.
+        */
+       res = PQgetResult(conn);
+       if (res == NULL)
+               pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pg_fatal("Unexpected result status %s: %s\n",
+                                PQresStatus(PQresultStatus(res)),
+                                PQresultErrorMessage(res));
+       PQclear(res);
+
+       /* NULL result to signal end-of-results for this command */
+       if ((res = PQgetResult(conn)) != NULL)
+               pg_fatal("Expected null result, got %s\n",
+                                PQresStatus(PQresultStatus(res)));
+
+       /* Second query caused error, so we expect an error next */
+       res = PQgetResult(conn);
+       if (res == NULL)
+               pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn));
+       if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+               pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, 
got %s\n",
+                                PQresStatus(PQresultStatus(res)));
+       PQclear(res);
+
+       /* NULL result to signal end-of-results for this command */
+       if ((res = PQgetResult(conn)) != NULL)
+               pg_fatal("Expected null result, got %s\n",
+                                PQresStatus(PQresultStatus(res)));
+
+       /*
+        * batch should now be aborted.
+        *
+        * Note that we could still queue more queries at this point if we 
wanted;
+        * they'd get added to a new third batch since we've already sent a
+        * second. The aborted flag relates only to the batch being received.
+        */
+       if (PQbatchStatus(conn) != PQBATCH_MODE_ABORTED)
+               pg_fatal("batch should be flagged as aborted but isn't\n");
+
+       /* third query in batch, the second insert */
+       res = PQgetResult(conn);
+       if (res == NULL)
+               pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn));
+       if (PQresultStatus(res) != PGRES_BATCH_ABORTED)
+               pg_fatal("Unexpected result code -- expected 
PGRES_BATCH_ABORTED, got %s\n",
+                                PQresStatus(PQresultStatus(res)));
+       PQclear(res);
+
+       /* NULL result to signal end-of-results for this command */
+       if ((res = PQgetResult(conn)) != NULL)
+               pg_fatal("Expected null result, got %s\n", 
PQresStatus(PQresultStatus(res)));
+
+       if (PQbatchStatus(conn) != PQBATCH_MODE_ABORTED)
+               pg_fatal("batch should be flagged as aborted but isn't\n");
+
+       /* Ensure we're still in batch */
+       if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+               pg_fatal("Fell out of batch mode somehow\n");
+
+       /*
+        * The end of a failed batch is a PGRES_BATCH_END.
+        *
+        * (This is so clients know to start processing results normally again 
and
+        * can tell the difference between skipped commands and the sync.)
+        */
+       res = PQgetResult(conn);
+       if (res == NULL)
+               pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn));
+       if (PQresultStatus(res) != PGRES_BATCH_END)
+               pg_fatal("Unexpected result code from first batch sync\n"
+                                "Expected PGRES_BATCH_END, got %s\n",
+                                PQresStatus(PQresultStatus(res)));
+       PQclear(res);
+
+       /* XXX why do we have a NULL result after PGRES_BATCH_END? */
+       res = PQgetResult(conn);
+       if (res != NULL)
+               pg_fatal("Expected null result, got %s\n", 
PQresStatus(PQresultStatus(res)));
+
+       if (PQbatchStatus(conn) == PQBATCH_MODE_ABORTED)
+               pg_fatal("sync should've cleared the aborted flag but 
didn't\n");
+
+       /* We're still in a batch... */
+       if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+               pg_fatal("Fell out of batch mode somehow\n");
+
+       /* the insert from the second batch */
+       res = PQgetResult(conn);
+       if (res == NULL)
+               pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn));
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pg_fatal("Unexpected result code %s from first item in second 
batch\n",
+                                PQresStatus(PQresultStatus(res)));
+       PQclear(res);
+
+       /* Read the NULL result at the end of the command */
+       if ((res = PQgetResult(conn)) != NULL)
+               pg_fatal("Expected null result, got %s\n", 
PQresStatus(PQresultStatus(res)));
+
+       /* the second batch sync */
+       if ((res = PQgetResult(conn)) == NULL)
+               pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn));
+       if (PQresultStatus(res) != PGRES_BATCH_END)
+               pg_fatal("Unexpected result code %s from second batch sync\n",
+                                PQresStatus(PQresultStatus(res)));
+       PQclear(res);
+
+       if ((res = PQgetResult(conn)) != NULL)
+               pg_fatal("Expected null result, got %s: %s\n",
+                                PQresStatus(PQresultStatus(res)),
+                                PQerrorMessage(conn));
+
+       /* We're still in a batch... */
+       if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+               pg_fatal("Fell out of batch mode somehow\n");
+
+       /* until we end it, which we can safely do now */
+       if (PQexitBatchMode(conn) != 1)
+               pg_fatal("attempt to exit batch mode failed when it should've 
succeeded: %s\n",
+                                PQerrorMessage(conn));
+
+       if (PQbatchStatus(conn) != PQBATCH_MODE_OFF)
+               pg_fatal("exiting batch mode didn't seem to work\n");
+
+       fprintf(stderr, "ok\n");
+
+       /*-
+        * Since we fired the batches off without a surrounding xact, the 
results
+        * should be:
+        *
+        * - Implicit xact started by server around 1st batch
+        * - First insert applied
+        * - Second statement aborted xact
+        * - Third insert skipped
+        * - Sync rolled back first implicit xact
+        * - Implicit xact created by server around 2nd batch
+        * - insert applied from 2nd batch
+        * - Sync commits 2nd xact
+        *
+        * So we should only have the value 3 that we inserted.
+        */
+       res = PQexec(conn, "SELECT itemno FROM batch_demo");
+
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+               pg_fatal("Expected tuples, got %s: %s\n",
+                                PQresStatus(PQresultStatus(res)), 
PQerrorMessage(conn));
+       if (PQntuples(res) != 1)
+               pg_fatal("expected 1 result, got %d\n", PQntuples(res));
+       for (i = 0; i < PQntuples(res); i++)
+       {
+               const char *val = PQgetvalue(res, i, 0);
+
+               if (strcmp(val, "3") != 0)
+                       pg_fatal("expected only insert with value 3, got %s", 
val);
+       }
+
+       PQclear(res);
+}
+
+/* State machine enum for test_batch_insert */
+typedef enum BatchInsertStep
+{
+       BI_BEGIN_TX,
+       BI_DROP_TABLE,
+       BI_CREATE_TABLE,
+       BI_PREPARE,
+       BI_INSERT_ROWS,
+       BI_COMMIT_TX,
+       BI_SYNC,
+       BI_DONE
+} BatchInsertStep;
+
+static void
+test_batch_insert(PGconn *conn, int n_rows)
+{
+       const char *insert_params[1];
+       Oid                     insert_param_oids[1] = {INT4OID};
+       char            insert_param_0[MAXINTLEN];
+       BatchInsertStep send_step = BI_BEGIN_TX,
+                               recv_step = BI_BEGIN_TX;
+       int                     rows_to_send,
+                               rows_to_receive;
+
+       insert_params[0] = &insert_param_0[0];
+
+       rows_to_send = rows_to_receive = n_rows;
+
+       /*
+        * Do a batched insert into a table created at the start of the batch
+        */
+       if (PQenterBatchMode(conn) != 1)
+               pg_fatal("failed to enter batch mode: %s\n", 
PQerrorMessage(conn));
+
+       while (send_step != BI_PREPARE)
+       {
+               const char *sql;
+
+               switch (send_step)
+               {
+                       case BI_BEGIN_TX:
+                               sql = "BEGIN TRANSACTION";
+                               send_step = BI_DROP_TABLE;
+                               break;
+
+                       case BI_DROP_TABLE:
+                               sql = drop_table_sql;
+                               send_step = BI_CREATE_TABLE;
+                               break;
+
+                       case BI_CREATE_TABLE:
+                               sql = create_table_sql;
+                               send_step = BI_PREPARE;
+                               break;
+
+                       default:
+                               pg_fatal("invalid state");
+               }
+
+               pg_debug("sending: %s\n", sql);
+               if (PQsendQueryParams(conn, sql,
+                                                         0, NULL, NULL, NULL, 
NULL, 0) != 1)
+                       pg_fatal("dispatching %s failed: %s\n", sql, 
PQerrorMessage(conn));
+       }
+
+       Assert(send_step == BI_PREPARE);
+       pg_debug("sending: %s\n", insert_sql);
+       if (PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids) 
!= 1)
+               pg_fatal("dispatching PREPARE failed: %s\n", 
PQerrorMessage(conn));
+       send_step = BI_INSERT_ROWS;
+
+       /*
+        * Now we start inserting. We'll be sending enough data that we could 
fill
+        * our out buffer, so to avoid deadlocking we need to enter nonblocking
+        * mode and consume input while we send more output. As results of each
+        * query are processed we should pop them to allow processing of the 
next
+        * query. There's no need to finish the batch before processing results.
+        */
+       if (PQsetnonblocking(conn, 1) != 0)
+               pg_fatal("failed to set nonblocking mode: %s\n", 
PQerrorMessage(conn));
+
+       while (recv_step != BI_DONE)
+       {
+               int                     sock;
+               fd_set          input_mask;
+               fd_set          output_mask;
+
+               sock = PQsocket(conn);
+
+               if (sock < 0)
+                       break;                          /* shouldn't happen */
+
+               FD_ZERO(&input_mask);
+               FD_SET(sock, &input_mask);
+               FD_ZERO(&output_mask);
+               FD_SET(sock, &output_mask);
+
+               if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
+               {
+                       fprintf(stderr, "select() failed: %s\n", 
strerror(errno));
+                       exit_nicely(conn);
+               }
+
+               /*
+                * Process any results, so we keep the server's out buffer free
+                * flowing and it can continue to process input
+                */
+               if (FD_ISSET(sock, &input_mask))
+               {
+                       PQconsumeInput(conn);
+
+                       /* Read until we'd block if we tried to read */
+                       while (!PQisBusy(conn) && recv_step < BI_DONE)
+                       {
+                               PGresult   *res;
+                               const char *cmdtag;
+                               const char *description = "";
+                               int                     status;
+
+                               /*
+                                * Read next result.  If no more results from 
this query,
+                                * advance to the next query
+                                */
+                               res = PQgetResult(conn);
+                               if (res == NULL)
+                                       continue;
+
+                               status = PGRES_COMMAND_OK;
+                               switch (recv_step)
+                               {
+                                       case BI_BEGIN_TX:
+                                               cmdtag = "BEGIN";
+                                               recv_step++;
+                                               break;
+                                       case BI_DROP_TABLE:
+                                               cmdtag = "DROP TABLE";
+                                               recv_step++;
+                                               break;
+                                       case BI_CREATE_TABLE:
+                                               cmdtag = "CREATE TABLE";
+                                               recv_step++;
+                                               break;
+                                       case BI_PREPARE:
+                                               cmdtag = "";
+                                               description = "PREPARE";
+                                               recv_step++;
+                                               break;
+                                       case BI_INSERT_ROWS:
+                                               cmdtag = "INSERT";
+                                               rows_to_receive--;
+                                               if (rows_to_receive == 0)
+                                                       recv_step++;
+                                               break;
+                                       case BI_COMMIT_TX:
+                                               cmdtag = "COMMIT";
+                                               recv_step++;
+                                               break;
+                                       case BI_SYNC:
+                                               cmdtag = "";
+                                               description = "SYNC";
+                                               status = PGRES_BATCH_END;
+                                               recv_step++;
+                                               break;
+                                       case BI_DONE:
+                                               /* unreachable */
+                                               description = "";
+                                               abort();
+                               }
+
+                               if (PQresultStatus(res) != status)
+                                       pg_fatal("%s reported status %s, 
expected %s\n"
+                                                        "Error message: 
\"%s\"\n",
+                                                        description, 
PQresStatus(PQresultStatus(res)),
+                                                        PQresStatus(status), 
PQerrorMessage(conn));
+
+                               if (strncmp(PQcmdStatus(res), cmdtag, 
strlen(cmdtag)) != 0)
+                                       pg_fatal("%s expected command tag '%s', 
got '%s'\n",
+                                                        description, cmdtag, 
PQcmdStatus(res));
+
+                               pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? 
cmdtag : description);
+
+                               PQclear(res);
+                       }
+               }
+
+               /* Write more rows and/or the end batch message, if needed */
+               if (FD_ISSET(sock, &output_mask))
+               {
+                       PQflush(conn);
+
+                       if (send_step == BI_INSERT_ROWS)
+                       {
+                               snprintf(&insert_param_0[0], MAXINTLEN, "%d", 
rows_to_send);
+
+                               if (PQsendQueryPrepared(conn, "my_insert",
+                                                                               
1, insert_params, NULL, NULL, 0) == 1)
+                               {
+                                       pg_debug("sent row %d\n", rows_to_send);
+
+                                       rows_to_send--;
+                                       if (rows_to_send == 0)
+                                               send_step = BI_COMMIT_TX;
+                               }
+                               else
+                               {
+                                       /*
+                                        * in nonblocking mode, so it's OK for 
an insert to fail
+                                        * to send
+                                        */
+                                       fprintf(stderr, "WARNING: failed to 
send insert #%d: %s\n",
+                                                       rows_to_send, 
PQerrorMessage(conn));
+                               }
+                       }
+                       else if (send_step == BI_COMMIT_TX)
+                       {
+                               if (PQsendQueryParams(conn, "COMMIT",
+                                                                         0, 
NULL, NULL, NULL, NULL, 0) == 1)
+                               {
+                                       pg_debug("sent COMMIT\n");
+                                       send_step = BI_SYNC;
+                               }
+                               else
+                               {
+                                       fprintf(stderr, "WARNING: failed to 
send commit: %s\n",
+                                                       PQerrorMessage(conn));
+                               }
+                       }
+                       else if (send_step == BI_SYNC)
+                       {
+                               if (PQbatchSendQueue(conn) == 1)
+                               {
+                                       fprintf(stdout, "Dispatched end batch 
message\n");
+                                       send_step = BI_DONE;
+                               }
+                               else
+                               {
+                                       fprintf(stderr, "WARNING: Ending a 
batch failed: %s\n",
+                                                       PQerrorMessage(conn));
+                               }
+                       }
+               }
+       }
+
+       /* We've got the sync message and the batch should be done */
+       if (PQexitBatchMode(conn) != 1)
+               pg_fatal("attempt to exit batch mode failed when it should've 
succeeded: %s\n",
+                                PQerrorMessage(conn));
+
+       if (PQsetnonblocking(conn, 0) != 0)
+               pg_fatal("failed to clear nonblocking mode: %s\n", 
PQerrorMessage(conn));
+}
+
+static void
+test_singlerowmode(PGconn *conn)
+{
+       PGresult   *res;
+       int                     i;
+       bool            batch_ended = false;
+
+       /* 1 batch, 3 queries in it */
+       if (PQenterBatchMode(conn) != 1)
+               pg_fatal("failed to enter batch mode: %s\n",
+                                PQerrorMessage(conn));
+
+       for (i = 0; i < 3; i++)
+       {
+               char   *param[1];
+
+               param[0] = psprintf("%d", 44 + i);
+
+               if (PQsendQueryParams(conn,
+                                                         "SELECT 
generate_series(42, $1)",
+                                                         1,
+                                                         NULL,
+                                                         (const char **) param,
+                                                         NULL,
+                                                         NULL,
+                                                         0) != 1)
+                       pg_fatal("failed to send query: %s\n",
+                                        PQerrorMessage(conn));
+               pfree(param[0]);
+       }
+       PQbatchSendQueue(conn);
+
+       for (i = 0; !batch_ended; i++)
+       {
+               bool            first = true;
+               bool            saw_ending_tuplesok;
+               bool            isSingleTuple = false;
+
+               /* Set single row mode for only first 2 SELECT queries */
+               if (i < 2)
+               {
+                       if (PQsetSingleRowMode(conn) != 1)
+                               pg_fatal("PQsetSingleRowMode() failed for 
i=%d\n", i);
+               }
+
+               /* Consume rows for this query */
+               saw_ending_tuplesok = false;
+               while ((res = PQgetResult(conn)) != NULL)
+               {
+                       ExecStatusType est = PQresultStatus(res);
+
+                       if (est == PGRES_BATCH_END)
+                       {
+                               fprintf(stderr, "end of batch reached\n");
+                               batch_ended = true;
+                               PQclear(res);
+                               if (i != 3)
+                                       pg_fatal("Expected three results, got 
%d\n", i);
+                               break;
+                       }
+
+                       /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK 
for 2 */
+                       if (first)
+                       {
+                               if (i <= 1 && est != PGRES_SINGLE_TUPLE)
+                                       pg_fatal("Expected PGRES_SINGLE_TUPLE 
for query %d, got %s\n",
+                                                        i, PQresStatus(est));
+                               if (i >= 2 && est != PGRES_TUPLES_OK)
+                                       pg_fatal("Expected PGRES_TUPLES_OK for 
query %d, got %s\n",
+                                                        i, PQresStatus(est));
+                               first = false;
+                       }
+
+                       fprintf(stderr, "Result status %s for query %d", 
PQresStatus(est), i);
+                       switch (est)
+                       {
+                               case PGRES_TUPLES_OK:
+                                       fprintf(stderr, ", tuples: %d\n", 
PQntuples(res));
+                                       saw_ending_tuplesok = true;
+                                       if (isSingleTuple)
+                                       {
+                                               if (PQntuples(res) == 0)
+                                                       fprintf(stderr, "all 
tuples received in query %d\n", i);
+                                               else
+                                                       pg_fatal("Expected to 
follow PGRES_SINGLE_TUPLE, "
+                                                                        "but 
received PGRES_TUPLES_OK directly instead\n");
+                                       }
+                                       break;
+
+                               case PGRES_SINGLE_TUPLE:
+                                       fprintf(stderr, ", %d tuple: %s\n", 
PQntuples(res), PQgetvalue(res, 0, 0));
+                                       break;
+
+                               default:
+                                       pg_fatal("unexpected\n");
+                       }
+                       PQclear(res);
+               }
+               if (!batch_ended && !saw_ending_tuplesok)
+                       pg_fatal("didn't get expected terminating TUPLES_OK\n");
+       }
+
+       if (PQexitBatchMode(conn) != 1)
+               pg_fatal("failed to end batch mode: %s\n", 
PQerrorMessage(conn));
+}
+
+static void
+usage(const char *progname)
+{
+       fprintf(stderr, "%s tests libpq's batch-mode.\n\n", progname);
+       fprintf(stderr, "Usage:\n");
+       fprintf(stderr, "  %s testname [conninfo [number_of_rows]]\n", 
progname);
+       fprintf(stderr, "Tests:\n");
+       fprintf(stderr, "  
disallowed_in_batch|simple_batch|multi_batch|batch_abort|\n");
+       fprintf(stderr, "  singlerow|batch_insert\n");
+}
+
+int
+main(int argc, char **argv)
+{
+       const char *conninfo = "";
+       PGconn     *conn;
+       int                     numrows = 10000;
+
+       /*
+        * The testname parameter is mandatory; it can be followed by a conninfo
+        * string and number of rows.
+        */
+       if (argc < 2 || argc > 4)
+       {
+               usage(argv[0]);
+               exit(1);
+       }
+
+       if (argc >= 3)
+               conninfo = pg_strdup(argv[2]);
+
+       if (argc >= 4)
+       {
+               errno = 0;
+               numrows = strtol(argv[3], NULL, 10);
+               if (errno != 0 || numrows <= 0)
+               {
+                       fprintf(stderr, "couldn't parse \"%s\" as a positive 
integer\n", argv[3]);
+                       exit(1);
+               }
+       }
+
+       /* Make a connection to the database */
+       conn = PQconnectdb(conninfo);
+       if (PQstatus(conn) != CONNECTION_OK)
+       {
+               fprintf(stderr, "Connection to database failed: %s\n",
+                               PQerrorMessage(conn));
+               exit_nicely(conn);
+       }
+
+       if (strcmp(argv[1], "disallowed_in_batch") == 0)
+               test_disallowed_in_batch(conn);
+       else if (strcmp(argv[1], "simple_batch") == 0)
+               test_simple_batch(conn);
+       else if (strcmp(argv[1], "multi_batch") == 0)
+               test_multi_batch(conn);
+       else if (strcmp(argv[1], "batch_abort") == 0)
+               test_batch_abort(conn);
+       else if (strcmp(argv[1], "batch_insert") == 0)
+               test_batch_insert(conn, numrows);
+       else if (strcmp(argv[1], "singlerow") == 0)
+               test_singlerowmode(conn);
+       else
+       {
+               fprintf(stderr, "\"%s\" is not a recognized test name\n", 
argv[1]);
+               usage(argv[0]);
+               exit(1);
+       }
+
+       /* close the connection to the database and cleanup */
+       PQfinish(conn);
+       return 0;
+}
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 90594bd41b..634e48ec85 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -50,7 +50,8 @@ my @contrib_excludes = (
        'pgcrypto',         'sepgsql',
        'brin',             'test_extensions',
        'test_misc',        'test_pg_dump',
-       'snapshot_too_old', 'unsafe_tests');
+       'snapshot_too_old', 'unsafe_tests',
+       'test_libpq');
 
 # Set of variables for frontend modules
 my $frontend_defines = { 'initdb' => 'FRONTEND' };
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 03c4e0fe5b..9b75db962b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -215,6 +215,7 @@ BackgroundWorkerHandle
 BackgroundWorkerSlot
 Barrier
 BaseBackupCmd
+BatchInsertStep
 BeginDirectModify_function
 BeginForeignInsert_function
 BeginForeignModify_function
@@ -1544,6 +1545,7 @@ PG_Locale_Strategy
 PG_Lock_Status
 PG_init_t
 PGcancel
+PGcommandQueueEntry
 PGconn
 PGdataValue
 PGlobjfuncs
@@ -1656,6 +1658,7 @@ PMSignalReason
 PMState
 POLYGON
 PQArgBlock
+PQBatchStatus
 PQEnvironmentOption
 PQExpBuffer
 PQExpBufferData

Reply via email to