Hi Dave,
I merged PQbatchProcessQueue into PQgetResult.
One first init call to PQbatchProcessQueue was also required in
PQsendQueue to have
PQgetResult ready to read the first batch query.

Tests and documentation are updated accordingly.

Matthieu Garrigues

On Mon, Sep 21, 2020 at 3:39 PM Dave Cramer <davecramer@postgres.rocks> wrote:
>
>
>
> On Mon, 21 Sep 2020 at 09:21, Matthieu Garrigues 
> <matthieu.garrig...@gmail.com> wrote:
>>
>> Matthieu Garrigues
>>
>> On Mon, Sep 21, 2020 at 3:09 PM Dave Cramer <davecramer@postgres.rocks> 
>> wrote:
>> >>
>> > There was a comment upthread a while back that people should look at the 
>> > comments made in 
>> > https://www.postgresql.org/message-id/20180322.211148.187821341.horiguchi.kyotaro%40lab.ntt.co.jp
>> >  by Horiguchi-San.
>> >
>> > From what I can tell this has not been addressed. The one big thing is the 
>> > use of PQbatchProcessQueue vs just putting it in PQgetResult.
>> >
>> > The argument is that adding PQbatchProcessQueue is unnecessary and just 
>> > adds another step. Looking at this, it seems like putting this inside 
>> > PQgetResult would get my vote as it leaves the interface unchanged.
>> >
>>
>> Ok. I'll merge PQbatchProcessQueue into PQgetResult. But just one
>> thing: I'll keep PQgetResult returning null between the result of two
>> batched query so the user
>> can know which result comes from which query.
>
>
> Fair enough.
>
> There may be other things in his comments that need to be addressed. That was 
> the big one that stuck out for me.
>
> Thanks for working on this!
>
>
> Dave Cramer
> www.postgres.rocks
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 92556c7ce0..15d0c03c89 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -4829,6 +4829,465 @@ int PQflush(PGconn *conn);
 
  </sect1>
 
+ <sect1 id="libpq-batch-mode">
+  <title>Batch mode and query pipelining</title>
+
+  <indexterm zone="libpq-batch-mode">
+   <primary>libpq</primary>
+   <secondary>batch mode</secondary>
+  </indexterm>
+
+  <indexterm zone="libpq-batch-mode">
+   <primary>libpq</primary>
+   <secondary>pipelining</secondary>
+  </indexterm>
+
+  <para>
+   <application>libpq</application> supports queueing up queries into
+   a pipeline to be executed as a batch on the server. Batching queries allows
+   applications to avoid a client/server round-trip after each query to get
+   the results before issuing the next query.
+  </para>
+
+  <sect2>
+   <title>When to use batching</title>
+
+   <para>
+    Much like asynchronous query mode, there is no performance disadvantage to
+    using batching and pipelining. It increases client application complexity
+    and extra caution is required to prevent client/server deadlocks but
+    can sometimes offer considerable performance improvements.
+   </para>
+
+   <para>
+    Batching is most useful when the server is distant, i.e. network latency
+    (<quote>ping time</quote>) is high, and when many small operations are being performed in
+    rapid sequence. There is usually less benefit in using batches when each
+    query takes many multiples of the client/server round-trip time to execute.
+    A 100-statement operation run on a server 300ms round-trip-time away would take
+    30 seconds in network latency alone without batching; with batching it may spend
+    as little as 0.3s waiting for results from the server.
+   </para>
+
+   <para>
+    Use batches when your application does lots of small
+    <literal>INSERT</literal>, <literal>UPDATE</literal> and
+    <literal>DELETE</literal> operations that can't easily be transformed into
+    operations on sets or into a
+    <link linkend="libpq-copy"><literal>COPY</literal></link> operation.
+   </para>
+
+   <para>
+    Batching is not useful when information from one operation is required by the
+    client before it knows enough to send the next operation. The client must
+    introduce a synchronisation point and wait for a full client/server
+    round-trip to get the results it needs. However, it's often possible to
+    adjust the client design to exchange the required information server-side.
+    Read-modify-write cycles are especially good candidates; for example:
+    <programlisting>
+     BEGIN;
+     SELECT x FROM mytable WHERE id = 42 FOR UPDATE;
+     -- result: x=2
+     -- client adds 1 to x:
+     UPDATE mytable SET x = 3 WHERE id = 42;
+     COMMIT;
+    </programlisting>
+    could be much more efficiently done with:
+    <programlisting>
+     UPDATE mytable SET x = x + 1 WHERE id = 42;
+    </programlisting>
+   </para>
+
+   <note>
+    <para>
+     The batch API was introduced in PostgreSQL 10.0, but clients using PostgresSQL 10.0 version of libpq can
+     use batches on server versions 7.4 and newer. Batching works on any server
+     that supports the v3 extended query protocol.
+    </para>
+   </note>
+
+  </sect2>
+
+  <sect2 id="libpq-batch-using">
+   <title>Using batch mode</title>
+
+   <para>
+    To issue batches the application must switch
+    a connection into batch mode. Enter batch mode with <link
+    linkend="libpq-PQenterBatchMode"><function>PQenterBatchMode(conn)</function></link> or test
+    whether batch mode is active with <link
+    linkend="libpq-PQbatchStatus"><function>PQbatchStatus(conn)</function></link>. In batch mode only <link
+    linkend="libpq-async">asynchronous operations</link> are permitted, and
+    <literal>COPY</literal> is not recommended as it most likely will trigger failure in batch processing. 
+    Using any synchronous command execution functions such as <function>PQfn</function>,
+    <function>PQexec</function> or one of its sibling functions are error conditions.
+    Functions allowed in batch mode are described in <xref linkend="libpq-batch-sending"/>. 
+   </para>
+
+   <para>
+    The client uses libpq's asynchronous query functions to dispatch work,
+    marking the end of each batch with <function>PQbatchSendQueue</function>.
+    And to get results, it uses <function>PQgetResult</function>. It may eventually exit
+    batch mode with <function>PQexitBatchMode</function> once all results are
+    processed.
+   </para>
+
+   <note>
+    <para>
+     It is best to use batch mode with <application>libpq</application> in
+     <link linkend="libpq-pqsetnonblocking">non-blocking mode</link>. If used in
+     blocking mode it is possible for a client/server deadlock to occur. The
+     client will block trying to send queries to the server, but the server will
+     block trying to send results from queries it has already processed to the
+     client. This only occurs when the client sends enough queries to fill its
+     output buffer and the server's receive buffer before switching to
+     processing input from the server, but it's hard to predict exactly when
+     that'll happen so it's best to always use non-blocking mode.
+     Batch mode consumes more memory when send/recv is not done as required even in non-blocking mode.
+    </para>
+   </note>
+
+   <sect3 id="libpq-batch-sending">
+    <title>Issuing queries</title>
+
+    <para>
+     After entering batch mode the application dispatches requests
+     using normal asynchronous <application>libpq</application> functions such as 
+     <function>PQsendQueryParams</function>, <function>PQsendPrepare</function>,
+     <function>PQsendQueryPrepared</function>, <function>PQsendDescribePortal</function>,
+     <function>PQsendDescribePrepared</function>.
+     The asynchronous requests are followed by a <link
+     linkend="libpq-PQbatchSendQueue"><function>PQbatchSendQueue(conn)</function></link> call to mark
+     the end of the batch. The client <emphasis>does not</emphasis> need to call
+     <function>PQgetResult</function> immediately after dispatching each
+     operation. <link linkend="libpq-batch-results">Result processing</link>
+     is handled separately.
+    </para>
+    
+    <para>
+     Batched operations will be executed by the server in the order the client
+     sends them. The server will send the results in the order the statements
+     executed. The server may begin executing the batch before all commands
+     in the batch are queued and the end of batch command is sent. If any
+     statement encounters an error the server aborts the current transaction and
+     skips processing the rest of the batch. Query processing resumes after the
+     end of the failed batch.
+    </para>
+
+    <para>
+     It's fine for one operation to depend on the results of a
+     prior one. One query may define a table that the next query in the same
+     batch uses; similarly, an application may create a named prepared statement
+     then execute it with later statements in the same batch.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-results">
+    <title>Processing results</title>
+
+    <para>
+     The client <link linkend="libpq-batch-interleave">interleaves result
+     processing</link> with sending batch queries, or for small batches may
+     process all results after sending the whole batch.
+    </para>
+
+    <para>
+     To get the result of the first batch entry the user must call
+     <function>PQgetResult</function> and handle the results until
+     <function>PQgetResult</function> returns null. The result from the next batch entry 
+     may then be retrieved using <function>PQgetResult</function> again and the cycle repeated. The
+     application handles individual statement results as normal.
+    </para>
+
+    <para>
+     To enter single-row mode, call <function>PQsetSingleRowMode</function> before retrieving
+     results with <function>PQgetResult</function>. This mode selection is effective 
+     only for the query currently being processed. For more information on the use of <function>PQsetSingleRowMode
+     </function>, refer to <xref linkend="libpq-single-row-mode"/>.
+     
+    </para>
+
+    <para>
+     <function>PQgetResult</function> behaves the same as for normal asynchronous
+     processing except that it may contain the new <type>PGresult</type> types
+     <literal>PGRES_BATCH_END</literal> and <literal>PGRES_BATCH_ABORTED</literal>.
+     <literal>PGRES_BATCH_END</literal> is reported exactly once for each
+     <function>PQbatchSendQueue</function> call at the corresponding point in
+     the result stream and at no other time. <literal>PGRES_BATCH_ABORTED</literal>
+     is emitted during error handling; see <link linkend="libpq-batch-errors">
+     error handling</link>.
+    </para>
+
+    <para>
+     <function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
+     operate as normal when processing batch results.
+    </para>
+
+    <para>
+     <application>libpq</application> does not provide any information to the
+     application about the query currently being processed (exept that PQgetResult 
+     return null to indicate that we start returning the results of next query). The application
+     must keep track of the order in which it sent queries and the expected
+     results. Applications will typically use a state machine or a FIFO queue
+     for this.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-errors">
+    <title>Error handling</title>
+
+    <para>
+     When a query in a batch causes an <literal>ERROR</literal> the server
+     skips processing all subsequent messages until the end-of-batch message.
+     The open transaction is aborted.
+    </para>
+
+    <para>
+     From the client perspective, after the client gets a
+     <literal>PGRES_FATAL_ERROR</literal> return from
+     <function>PQresultStatus</function> the batch is flagged as aborted.
+     <application>libpq</application> will report
+     <literal>PGRES_BATCH_ABORTED</literal> result for each remaining queued
+     operation in an aborted batch. The result for
+     <function>PQbatchSendQueue</function> is reported as
+     <literal>PGRES_BATCH_END</literal> to signal the end of the aborted batch
+     and resumption of normal result processing.
+    </para>
+
+    <para>
+     The client <emphasis>must</emphasis> process results with
+     <function>PQgetResult</function> during error recovery.
+    </para>
+
+    <para>
+     If the batch used an implicit transaction then operations that have
+     already executed are rolled back and operations that were queued for after
+     the failed operation are skipped entirely. The same behaviour holds if the
+     batch starts and commits a single explicit transaction (i.e. the first
+     statement is <literal>BEGIN</literal> and the last is
+     <literal>COMMIT</literal>) except that the session remains in an aborted
+     transaction state at the end of the batch. If a batch contains <emphasis>
+     multiple explicit transactions</emphasis>, all transactions that committed
+     prior to the error remain committed, the currently in-progress transaction
+     is aborted and all subsequent operations in the current and all later
+     transactions in the same batch are skipped completely.
+    </para>
+
+    <note>
+     <para>
+      The client must not assume that work is committed when it
+      <emphasis>sends</emphasis> a <literal>COMMIT</literal>, only when the
+      corresponding result is received to confirm the commit is complete.
+      Because errors arrive asynchronously the application needs to be able to
+      restart from the last <emphasis>received</emphasis> committed change and
+      resend work done after that point if something goes wrong.
+     </para>
+    </note>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-interleave">
+    <title>Interleaving result processing and query dispatch</title>
+
+    <para>
+     To avoid deadlocks on large batches the client should be structured around
+     a nonblocking I/O loop using a function like <function>select</function>,
+     <function>poll</function>, <function>epoll</function>,
+     <function>WaitForMultipleObjectEx</function>, etc.
+    </para>
+
+    <para>
+     The client application should generally maintain a queue of work still to
+     be dispatched and a queue of work that has been dispatched but not yet had
+     its results processed. When the socket is writable it should dispatch more
+     work. When the socket is readable it should read results and process them,
+     matching them up to the next entry in its expected results queue. 
+     Based on available memory, results from socket should be read frequently and 
+     there's no need to wait till the batch end to read the results.  Batches
+     should be scoped to logical units of work, usually (but not always) one
+     transaction per batch. There's no need to exit batch mode and re-enter it
+     between batches or to wait for one batch to finish before sending the next.
+    </para>
+
+    <para>
+     An example using <function>select()</function> and a simple state machine
+     to track sent and received work is in
+     <filename>src/test/modules/test_libpq/testlibpqbatch.c</filename> in the PostgreSQL
+     source distribution.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-end">
+    <title>Ending batch mode</title>
+
+    <para>
+     Once all dispatched commands have had their results processed and the end batch
+     result has been consumed the application may return to non-batched mode with
+     <link linkend="libpq-PQexitBatchMode"><function>PQexitBatchMode(conn)</function></link>.
+    </para>
+   </sect3>
+
+  </sect2>
+
+  <sect2 id="libpq-funcs-batch">
+   <title>Functions associated with batch mode</title>
+
+   <variablelist>
+
+    <varlistentry id="libpq-PQbatchStatus">
+     <term>
+      <function>PQbatchStatus</function>
+      <indexterm>
+       <primary>PQbatchStatus</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Returns current batch mode status of the <application>libpq</application> connection.
+<synopsis>
+int PQbatchStatus(PGconn *conn);
+</synopsis>
+      </para>			
+      <variablelist>
+         <varlistentry id="libpq-PQbatchStatus-1">
+           <term>
+             <literal>PQBATCH_MODE_ON</literal>
+           </term>
+ 
+          <listitem>
+           <para>
+             Returns <literal>PQBATCH_MODE_ON</literal> if <application>libpq</application> connection is in <link
+             linkend="libpq-batch-mode">batch mode</link>.
+           </para>
+          </listitem>
+        </varlistentry>
+
+        <varlistentry id="libpq-PQbatchStatus-2">
+          <term>
+            <literal>PQBATCH_MODE_OFF</literal>
+          </term>
+  
+          <listitem>
+          <para>
+            Returns <literal>PQBATCH_MODE_OFF</literal> if <application>libpq</application> connection is not in <link
+            linkend="libpq-batch-mode">batch mode</link>.
+          </para>
+          </listitem>
+        </varlistentry>
+
+        <varlistentry id="libpq-PQbatchStatus-3">
+          <term>
+            <literal>PQBATCH_MODE_ABORTED</literal>
+          </term>
+          <listitem>
+            <para>
+                Returns <literal>PQBATCH_MODE_ABORTED</literal> if <application>libpq</application> connection is in 
+                aborted status. The aborted flag is cleared as soon as the result of the 
+                <function>PQbatchSendQueue</function> at the end of the aborted batch is 
+                processed. Clients don't usually need this function to verify aborted status 
+                as they can tell that the batch is aborted from <literal>PGRES_BATCH_ABORTED</literal> 
+                result codes.
+            </para>
+          </listitem>
+        </varlistentry>
+  
+       </variablelist>
+
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQenterBatchMode">
+     <term>
+      <function>PQenterBatchMode</function>
+      <indexterm>
+       <primary>PQenterBatchMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Causes a connection to enter batch mode if it is currently idle or
+      already in batch mode.
+
+<synopsis>
+int PQenterBatchMode(PGconn *conn);
+</synopsis>
+
+        </para>
+        <para>
+          Returns 1 for success. Returns 0 and has no 
+          effect if the connection is not currently idle, i.e. it has a result 
+          ready, is waiting for more input from the server, etc. This function 
+          does not actually send anything to the server, it just changes the 
+          <application>libpq</application> connection state.
+
+        </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQexitBatchMode">
+     <term>
+      <function>PQexitBatchMode</function>
+      <indexterm>
+       <primary>PQexitBatchMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Causes a connection to exit batch mode if it is currently in batch mode
+      with an empty queue and no pending results.
+<synopsis>
+int PQexitBatchMode(PGconn *conn);
+</synopsis>
+        </para>
+        <para>Returns 1 for success.
+      Returns 1 and takes no action if not in batch mode. If the current statement isn't finished
+      processing or there are results pending for collection with
+      <function>PQgetResult</function>, returns 0 and does nothing.
+
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQbatchSendQueue">
+     <term>
+      <function>PQbatchSendQueue</function>
+      <indexterm>
+       <primary>PQbatchSendQueue</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Delimits the end of a set of a batched commands by sending a <link
+      linkend="protocol-flow-ext-query">sync message</link> and flushing
+      the send buffer. The end of a batch serves as 
+      the delimiter of an implicit transaction and
+      an error recovery point; see <link linkend="libpq-batch-errors">
+      error handling</link>.
+
+<synopsis>
+int PQbatchSendQueue(PGconn *conn);
+</synopsis>
+        </para>
+        <para>Returns 1 for success. Returns 0 if the connection is not in batch mode
+              or sending a <link linkend="protocol-flow-ext-query">sync message</link> is failed.
+
+        </para>
+     </listitem>
+    </varlistentry>
+
+   </variablelist>
+
+  </sect2>
+
+ </sect1>
+
  <sect1 id="libpq-single-row-mode">
   <title>Retrieving Query Results Row-by-Row</title>
 
@@ -4869,6 +5328,15 @@ int PQflush(PGconn *conn);
    Each object should be freed with <xref linkend="libpq-PQclear"/> as usual.
   </para>
 
+  <note>
+    <para>
+     When using batch mode, activate the single row mode on the current batch query by
+     calling <function>PQsetSingleRowMode</function>
+     before retrieving results with <function>PQgetResult</function>.
+     See <xref linkend="libpq-batch-mode"/> for more information.
+    </para>
+   </note>
+
   <para>
    <variablelist>
     <varlistentry id="libpq-PQsetSingleRowMode">
diff --git a/doc/src/sgml/lobj.sgml b/doc/src/sgml/lobj.sgml
index cf4653fe0f..a1b46c83ed 100644
--- a/doc/src/sgml/lobj.sgml
+++ b/doc/src/sgml/lobj.sgml
@@ -130,6 +130,10 @@
     <application>libpq</application> library.
    </para>
 
+   <para>
+    Client applications cannot use these functions while libpq connection is in batch mode.
+   </para>
+
    <sect2 id="lo-create">
     <title>Creating a Large Object</title>
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 8afa5a29b4..69bf757e85 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -1022,6 +1022,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 			walres->status = WALRCV_ERROR;
 			walres->err = pchomp(PQerrorMessage(conn->streamConn));
 			break;
+		default:
+		/* This is just to keep compiler quiet */
+			break;
 	}
 
 	PQclear(pgres);
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index bbc1f90481..ffb18fcf47 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -179,3 +179,8 @@ PQgetgssctx               176
 PQsetSSLKeyPassHook_OpenSSL         177
 PQgetSSLKeyPassHook_OpenSSL         178
 PQdefaultSSLKeyPassHook_OpenSSL     179
+PQenterBatchMode	      180
+PQexitBatchMode           181
+PQbatchSendQueue	      182
+PQbatchProcessQueue	      183
+PQbatchStatus		      184
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 7bee9dd201..25c79cb7b4 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -535,6 +535,24 @@ pqDropConnection(PGconn *conn, bool flushInput)
 	}
 }
 
+/*
+ * PQfreeCommandQueue
+ * Free all the entries of PGcommandQueueEntry queue passed.
+ */
+static void
+PQfreeCommandQueue(PGcommandQueueEntry *queue)
+{
+
+	while (queue != NULL)
+	{
+		PGcommandQueueEntry *prev = queue;
+
+		queue = queue->next;
+		if (prev->query)
+			free(prev->query);
+		free(prev);
+	}
+}
 
 /*
  *		pqDropServerData
@@ -554,6 +572,7 @@ pqDropServerData(PGconn *conn)
 {
 	PGnotify   *notify;
 	pgParameterStatus *pstatus;
+	PGcommandQueueEntry *queue;
 
 	/* Forget pending notifies */
 	notify = conn->notifyHead;
@@ -566,6 +585,14 @@ pqDropServerData(PGconn *conn)
 	}
 	conn->notifyHead = conn->notifyTail = NULL;
 
+	queue = conn->cmd_queue_head;
+	PQfreeCommandQueue(queue);
+	conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
+
+	queue = conn->cmd_queue_recycle;
+	PQfreeCommandQueue(queue);
+	conn->cmd_queue_recycle = NULL;
+
 	/* Reset ParameterStatus data, as well as variables deduced from it */
 	pstatus = conn->pstatus;
 	while (pstatus != NULL)
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index eea0237c3a..ff9a6ca253 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -39,7 +39,9 @@ char	   *const pgresStatus[] = {
 	"PGRES_NONFATAL_ERROR",
 	"PGRES_FATAL_ERROR",
 	"PGRES_COPY_BOTH",
-	"PGRES_SINGLE_TUPLE"
+	"PGRES_SINGLE_TUPLE",
+	"PGRES_BATCH_END",
+	"PGRES_BATCH_ABORTED"
 };
 
 /*
@@ -70,7 +72,10 @@ static PGresult *PQexecFinish(PGconn *conn);
 static int	PQsendDescribe(PGconn *conn, char desc_type,
 						   const char *desc_target);
 static int	check_field_number(const PGresult *res, int field_num);
-
+static PGcommandQueueEntry *PQmakePipelinedCommand(PGconn *conn);
+static void PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry);
+static void PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry);
+static int pqBatchFlush(PGconn *conn);
 
 /* ----------------
  * Space management for PGresult.
@@ -1210,7 +1215,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 		conn->next_result = conn->result;
 		conn->result = res;
 		/* And mark the result ready to return */
-		conn->asyncStatus = PGASYNC_READY;
+		conn->asyncStatus = PGASYNC_READY_MORE;
 	}
 
 	return 1;
@@ -1233,6 +1238,13 @@ fail:
 int
 PQsendQuery(PGconn *conn, const char *query)
 {
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("cannot PQsendQuery in batch mode, use PQsendQueryParams\n"));
+		return false;
+	}
+
 	if (!PQsendQueryStart(conn))
 		return 0;
 
@@ -1331,6 +1343,10 @@ PQsendPrepare(PGconn *conn,
 			  const char *stmtName, const char *query,
 			  int nParams, const Oid *paramTypes)
 {
+	PGcommandQueueEntry *pipeCmd = NULL;
+	char	  **last_query;
+	PGQueryClass *queryclass;
+
 	if (!PQsendQueryStart(conn))
 		return 0;
 
@@ -1389,31 +1405,51 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+	{
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+
+		last_query = &conn->last_query;
+		queryclass = &conn->queryclass;
+	}
+	else
+	{
+		pipeCmd = PQmakePipelinedCommand(conn);
+
+		if (pipeCmd == NULL)
+			return 0;                       /* error msg already set */
+
+		last_query = &pipeCmd->query;
+		queryclass = &pipeCmd->queryclass;
+	}
 
 	/* remember we are doing just a Parse */
-	conn->queryclass = PGQUERY_PREPARE;
+	*queryclass = PGQUERY_PREPARE;
 
 	/* and remember the query text too, if possible */
 	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	if (*last_query)
+		free(*last_query);
+	*last_query = strdup(query);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqBatchFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		PQappendPipelinedCommand(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	PQrecyclePipelinedCommand(conn, pipeCmd);
 	/* error message should be set up already */
 	return 0;
 }
@@ -1461,7 +1497,80 @@ PQsendQueryPrepared(PGconn *conn,
 }
 
 /*
- * Common startup code for PQsendQuery and sibling routines
+ * PQmakePipelinedCommand
+ *	Get a new command queue entry, allocating it if required. Doesn't add it to
+ *	the tail of the queue yet, use PQappendPipelinedCommand once the command has
+ *	been written for that. If a command fails once it's called this, it should
+ *	use PQrecyclePipelinedCommand to put it on the freelist or release it.
+ *
+ * If allocation fails sets the error message and returns null.
+ */
+static PGcommandQueueEntry *
+PQmakePipelinedCommand(PGconn *conn)
+{
+	PGcommandQueueEntry *entry;
+
+	if (conn->cmd_queue_recycle == NULL)
+	{
+		entry = (PGcommandQueueEntry *) malloc(sizeof(PGcommandQueueEntry));
+		if (entry == NULL)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("out of memory\n"));
+			return NULL;
+		}
+	}
+	else
+	{
+		entry = conn->cmd_queue_recycle;
+		conn->cmd_queue_recycle = entry->next;
+	}
+	entry->next = NULL;
+	entry->query = NULL;
+
+	return entry;
+}
+
+/*
+ * PQappendPipelinedCommand
+ *	Append a precreated command queue entry to the queue after it's been
+ *	sent successfully.
+ */
+static void
+PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry)
+{
+	if (conn->cmd_queue_head == NULL)
+		conn->cmd_queue_head = entry;
+	else
+		conn->cmd_queue_tail->next = entry;
+	conn->cmd_queue_tail = entry;
+}
+
+/*
+ * PQrecyclePipelinedCommand
+ *	Push a command queue entry onto the freelist. It must be an entry
+ *	with null next pointer and not referenced by any other entry's next pointer.
+ */
+static void
+PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry)
+{
+	if (entry == NULL)
+		return;
+	if (entry->next != NULL)
+	{
+		fprintf(stderr, libpq_gettext("tried to recycle non-dangling command queue entry"));
+		abort();
+	}
+	if (entry->query)
+		free(entry->query);
+
+	entry->next = conn->cmd_queue_recycle;
+	conn->cmd_queue_recycle = entry;
+}
+
+/*
+ * PQsendQueryStart
+ *	Common startup code for PQsendQuery and sibling routines
  */
 static bool
 PQsendQueryStart(PGconn *conn)
@@ -1479,20 +1588,60 @@ PQsendQueryStart(PGconn *conn)
 						  libpq_gettext("no connection to the server\n"));
 		return false;
 	}
-	/* Can't send while already busy, either. */
-	if (conn->asyncStatus != PGASYNC_IDLE)
+
+	/* Can't send while already busy, either, unless enqueuing for later */
+	if (conn->asyncStatus != PGASYNC_IDLE && conn->batch_status == PQBATCH_MODE_OFF)
 	{
 		printfPQExpBuffer(&conn->errorMessage,
 						  libpq_gettext("another command is already in progress\n"));
 		return false;
 	}
 
-	/* initialize async result-accumulation state */
-	pqClearAsyncResult(conn);
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		/*
+		 * When enqueuing a message we don't change much of the connection
+		 * state since it's already in use for the current command. The
+		 * connection state will get updated when PQbatchQueueProcess(...)
+		 * advances to start processing the queued message.
+		 *
+		 * Just make sure we can safely enqueue given the current connection
+		 * state. We can enqueue behind another queue item, or behind a
+		 * non-queue command (one that sends its own sync), but we can't
+		 * enqueue if the connection is in a copy state.
+		 */
+		switch (conn->asyncStatus)
+		{
+			case PGASYNC_QUEUED:
+			case PGASYNC_READY:
+			case PGASYNC_READY_MORE:
+			case PGASYNC_BUSY:
+				/* ok to queue */
+				break;
+			case PGASYNC_COPY_IN:
+			case PGASYNC_COPY_OUT:
+			case PGASYNC_COPY_BOTH:
+				printfPQExpBuffer(&conn->errorMessage,
+						libpq_gettext("cannot queue commands during COPY\n"));
+				return false;
+				break;
+			case PGASYNC_IDLE:
+				printfPQExpBuffer(&conn->errorMessage,
+						libpq_gettext_noop("internal error, idle state in batch mode"));
+				break;
+		}
+	}
+	else
+	{
+		/* This command's results will come in immediately.
+		 * Initialize async result-accumulation state
+		 */
+		pqClearAsyncResult(conn);
 
-	/* reset single-row processing mode */
-	conn->singleRowMode = false;
+		/* reset single-row processing mode */
+		conn->singleRowMode = false;
 
+	}
 	/* ready to send command message */
 	return true;
 }
@@ -1516,6 +1665,10 @@ PQsendQueryGuts(PGconn *conn,
 				int resultFormat)
 {
 	int			i;
+	PGcommandQueueEntry *pipeCmd = NULL;
+	char	  **last_query;
+	PGQueryClass *queryclass;
+
 
 	/* This isn't gonna work on a 2.0 server */
 	if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
@@ -1525,6 +1678,23 @@ PQsendQueryGuts(PGconn *conn,
 		return 0;
 	}
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		pipeCmd = PQmakePipelinedCommand(conn);
+
+		if (pipeCmd == NULL)
+			return 0;			/* error msg already set */
+
+		last_query = &pipeCmd->query;
+		queryclass = &pipeCmd->queryclass;
+	}
+	else
+	{
+		last_query = &conn->last_query;
+		queryclass = &conn->queryclass;
+	}
+
+
 	/*
 	 * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
 	 * using specified statement name and the unnamed portal.
@@ -1637,35 +1807,42 @@ PQsendQueryGuts(PGconn *conn,
 		pqPutMsgEnd(conn) < 0)
 		goto sendFailed;
 
-	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+	{
+		/* construct the Sync message */
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are using extended query protocol */
-	conn->queryclass = PGQUERY_EXTENDED;
+	*queryclass = PGQUERY_EXTENDED;
 
 	/* and remember the query text too, if possible */
 	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
+	if (*last_query)
+		free(*last_query);
 	if (command)
-		conn->last_query = strdup(command);
+		*last_query = strdup(command);
 	else
-		conn->last_query = NULL;
+		*last_query = NULL;
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqBatchFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		PQappendPipelinedCommand(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	PQrecyclePipelinedCommand(conn, pipeCmd);
 	/* error message should be set up already */
 	return 0;
 }
@@ -1766,14 +1943,308 @@ PQisBusy(PGconn *conn)
 	return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed;
 }
 
+/*
+ * PQbatchStatus
+ * 	Returns current batch mode status
+ */
+int
+PQbatchStatus(PGconn *conn)
+{
+	if (!conn)
+		return false;
+
+	return conn->batch_status;
+}
+
+/*
+ * PQenterBatchMode
+ * 	Put an idle connection in batch mode. Commands submitted after this
+ * 	can be pipelined on the connection, there's no requirement to wait for
+ * 	one to finish before the next is dispatched.
+ *
+ * 	Queuing of a new query or syncing during COPY is not allowed.
+ *
+ * 	A set of commands is terminated by a PQbatchQueueSync. Multiple sets of batched
+ * 	commands may be sent while in batch mode. Batch mode can be exited by
+ * 	calling PQbatchEnd() once all results are processed.
+ *
+ * 	This doesn't actually send anything on the wire, it just puts libpq
+ * 	into a state where it can pipeline work.
+ */
+int
+PQenterBatchMode(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		return 1;
+
+	if (conn->asyncStatus != PGASYNC_IDLE)
+		return 0;
+
+	conn->batch_status = PQBATCH_MODE_ON;
+	conn->asyncStatus = PGASYNC_QUEUED;
+
+	return 1;
+}
+
+/*
+ * PQexitBatchMode
+ * 	End batch mode and return to normal command mode.
+ *
+ * 	Has no effect unless the client has processed all results
+ * 	from all outstanding batches and the connection is idle.
+ *
+ * 	Returns 1 if batch mode ended.
+ */
+int
+PQexitBatchMode(PGconn *conn)
+{
+	if (!conn)
+		goto exitFailed;
+
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+		return 1;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+			/* can't end batch while busy */
+			goto exitFailed;
+		default:
+			break;
+	}
+
+	/* still work to process */
+	if (conn->cmd_queue_head != NULL)
+		goto exitFailed;
+
+	conn->batch_status = PQBATCH_MODE_OFF;
+	conn->asyncStatus = PGASYNC_IDLE;
+
+	/* Flush any pending data in out buffer */
+	if (pqFlush(conn) < 0)
+		goto sendFailed;
+	return 1;
+
+sendFailed:
+	/* error message should be set up already */
+
+exitFailed:
+	printfPQExpBuffer(&conn->errorMessage,
+							libpq_gettext_noop("internal error, Failed to exit batch mode"));
+	return 0;
+}
+
+
+/*
+ * internal function pqBatchProcessQueue
+ * 
+ * In batch mode, start processing the next query in the queue.
+ *
+ * Returns 1 if the next query was popped from the queue and can
+ * be processed by PQconsumeInput, PQgetResult, etc.
+ *
+ * Returns 0 if the current query isn't done yet, the connection
+ * is not in a batch, or there are no more queries to process.
+ */
+static int
+pqBatchProcessQueue(PGconn *conn)
+{
+	PGcommandQueueEntry *next_query;
+
+	if (!conn)
+		return 0;
+
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+		return 0;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			printfPQExpBuffer(&conn->errorMessage,
+				   libpq_gettext_noop("internal error, COPY in batch mode"));
+			break;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+			/* client still has to process current query or results */
+			return 0;
+			break;
+		case PGASYNC_IDLE:
+			printfPQExpBuffer(&conn->errorMessage,
+				   libpq_gettext_noop("internal error, IDLE in batch mode"));
+			break;
+		case PGASYNC_QUEUED:
+			/* next query please */
+			break;
+	}
+
+	if (conn->cmd_queue_head == NULL)
+	{
+		/*
+		 * In batch mode but nothing left on the queue; caller can submit more
+		 * work or PQbatchEnd() now.
+		 */
+		return 0;
+	}
+
+	/*
+	 * Pop the next query from the queue and set up the connection state as if
+	 * it'd just been dispatched from a non-batched call
+	 */
+	next_query = conn->cmd_queue_head;
+	conn->cmd_queue_head = next_query->next;
+	next_query->next = NULL;
+
+	/* Initialize async result-accumulation state */
+	pqClearAsyncResult(conn);
+
+	/* reset single-row processing mode */
+	conn->singleRowMode = false;
+
+
+	conn->last_query = next_query->query;
+	next_query->query = NULL;
+	conn->queryclass = next_query->queryclass;
+
+	PQrecyclePipelinedCommand(conn, next_query);
+
+	if (conn->batch_status == PQBATCH_MODE_ABORTED && conn->queryclass != PGQUERY_SYNC)
+	{
+		/*
+		 * In an aborted batch we don't get anything from the server for each
+		 * result; we're just discarding input until we get to the next sync
+		 * from the server. The client needs to know its queries got aborted
+		 * so we create a fake PGresult to return immediately from
+		 * PQgetResult.
+		 */
+		conn->result = PQmakeEmptyPGresult(conn,
+										   PGRES_BATCH_ABORTED);
+		if (!conn->result)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("out of memory"));
+			pqSaveErrorResult(conn);
+			return 0;
+		}
+		conn->asyncStatus = PGASYNC_READY;
+	}
+	else
+	{
+		/* allow parsing to continue */
+		conn->asyncStatus = PGASYNC_BUSY;
+	}
+
+	return 1;
+}
+
+/*
+ * PQbatchSendQueue
+ *
+ * End a batch submission.
+ *
+ * It's legal to start submitting another batch immediately, without
+ * waiting for the results of the current batch. There's no need to end batch
+ * mode and start it again.
+ *
+ * If a command in a batch fails, every subsequent command up to and
+ * including the PQbatchQueueSync command result gets set to PGRES_BATCH_ABORTED
+ * state. If the whole batch is processed without error, a PGresult with
+ * PGRES_BATCH_END is produced.
+ *
+ * Queries can already have been sent before PQbatchSendQueue is called, but
+ * PQbatchSendQueue need to be called before retrieving command results.
+ *
+ * The connection will remain in batch mode and unavailable for new synchronous
+ * command execution functions until all results from the batch are processed
+ * by the client.
+ */
+int
+PQbatchSendQueue(PGconn *conn)
+{
+	PGcommandQueueEntry *entry;
+
+	if (!conn)
+		return 0;
+
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+		return 0;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_IDLE:
+			printfPQExpBuffer(&conn->errorMessage,
+				   libpq_gettext_noop("internal error, IDLE in batch mode"));
+			return 0;
+			break;
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			printfPQExpBuffer(&conn->errorMessage,
+				   libpq_gettext_noop("internal error, COPY in batch mode"));
+			return 0;
+			break;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+		case PGASYNC_QUEUED:
+			/* can send sync to end this batch of cmds */
+			break;
+	}
+
+	entry = PQmakePipelinedCommand(conn);
+	if (entry == NULL)
+		return 0;			/* error msg already set */
+
+	entry->queryclass = PGQUERY_SYNC;
+	entry->query = NULL;
+
+	/* construct the Sync message */
+	if (pqPutMsgStart('S', false, conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+		goto sendFailed;
+
+	PQappendPipelinedCommand(conn, entry);
+
+	/*
+	 * Give the data a push.  In nonblock mode, don't complain if we're unable
+	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 */
+	if (PQflush(conn) < 0)
+		goto sendFailed;
+
+	/*
+	 * Call pqBatchProcessQueue so the user can call start calling getResult.
+	 */  
+	pqBatchProcessQueue(conn);
+
+	return 1;
+
+sendFailed:
+	PQrecyclePipelinedCommand(conn, entry);
+	/* error message should be set up already */
+	return 0;
+}
 
 /*
  * PQgetResult
  *	  Get the next PGresult produced by a query.  Returns NULL if no
  *	  query work remains or an error has occurred (e.g. out of
  *	  memory).
+ *
+ * 		In batch mode, once all the result of a query have been
+ * 		returned, PQgetResult returns NULL to let the user know that
+ * 		the next batched query is being processed.
+ * 
+ * 		At the end of the batch, returns a end-of-batch result with
+ * 		PQresultStatus(result) == PGRES_BATCH_END.
  */
-
 PGresult *
 PQgetResult(PGconn *conn)
 {
@@ -1842,9 +2313,38 @@ PQgetResult(PGconn *conn)
 	switch (conn->asyncStatus)
 	{
 		case PGASYNC_IDLE:
+		case PGASYNC_QUEUED:
 			res = NULL;			/* query is complete */
+			if (conn->batch_status != PQBATCH_MODE_OFF)
+			{
+				/* 
+				 * In batch mode, we prepare the processing of the results 
+				 * of the next query.
+				*/
+				pqBatchProcessQueue(conn);
+			}
 			break;
 		case PGASYNC_READY:
+			res = pqPrepareAsyncResult(conn);
+			if (conn->batch_status != PQBATCH_MODE_OFF)
+			{
+				/*
+				 * In batch mode, query execution state cannot be IDLE as there
+				 * can be other queries or results waiting in the queue
+				 *
+				 * The connection isn't idle since we can't submit new
+				 * nonbatched commands. It isn't also busy since the current
+				 * command is done and we need to process a new one.
+				 */
+				conn->asyncStatus = PGASYNC_QUEUED;
+			}
+			else
+			{
+				/* Set the state back to BUSY, allowing parsing to proceed. */
+				conn->asyncStatus = PGASYNC_BUSY;
+			}
+			break;
+		case PGASYNC_READY_MORE:
 			res = pqPrepareAsyncResult(conn);
 			/* Set the state back to BUSY, allowing parsing to proceed. */
 			conn->asyncStatus = PGASYNC_BUSY;
@@ -2025,6 +2525,13 @@ PQexecStart(PGconn *conn)
 	if (!conn)
 		return false;
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("Synchronous command execution functions are not allowed in batch mode\n"));
+		return false;
+	}
+
 	/*
 	 * Silently discard any prior query result that application didn't eat.
 	 * This is probably poor design, but it's here for backward compatibility.
@@ -2219,6 +2726,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
 static int
 PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 {
+	PGcommandQueueEntry *pipeCmd = NULL;
+	PGQueryClass *queryclass;
+
 	/* Treat null desc_target as empty string */
 	if (!desc_target)
 		desc_target = "";
@@ -2234,6 +2744,20 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		return 0;
 	}
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		pipeCmd = PQmakePipelinedCommand(conn);
+
+		if (pipeCmd == NULL)
+			return 0;			/* error msg already set */
+
+		queryclass = &pipeCmd->queryclass;
+	}
+	else
+	{
+		queryclass = &conn->queryclass;
+	}
+
 	/* construct the Describe message */
 	if (pqPutMsgStart('D', false, conn) < 0 ||
 		pqPutc(desc_type, conn) < 0 ||
@@ -2242,15 +2766,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+	{
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are doing a Describe */
-	conn->queryclass = PGQUERY_DESCRIBE;
+	*queryclass = PGQUERY_DESCRIBE;
 
-	/* reset last_query string (not relevant now) */
-	if (conn->last_query)
+	/* reset last-query string (not relevant now) */
+	if (conn->last_query && conn->batch_status != PQBATCH_MODE_OFF)
 	{
 		free(conn->last_query);
 		conn->last_query = NULL;
@@ -2260,14 +2787,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqBatchFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		PQappendPipelinedCommand(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	PQrecyclePipelinedCommand(conn, pipeCmd);
 	/* error message should be set up already */
 	return 0;
 }
@@ -2665,6 +3196,13 @@ PQfn(PGconn *conn,
 	/* clear the error string */
 	resetPQExpBuffer(&conn->errorMessage);
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						libpq_gettext("Synchronous command execution functions are not allowed in batch mode\n"));
+		return NULL;
+	}
+
 	if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE ||
 		conn->result != NULL)
 	{
@@ -3858,3 +4396,15 @@ PQunescapeBytea(const unsigned char *strtext, size_t *retbuflen)
 	*retbuflen = buflen;
 	return tmpbuf;
 }
+/* pqBatchFlush
+ * In batch mode, data will be flushed only when the out buffer reaches the threshold value.
+ * In non-batch mode, data will be flushed all the time.
+ */
+static int
+pqBatchFlush(PGconn *conn)
+{
+	if ((conn->batch_status == PQBATCH_MODE_OFF) ||
+	    (conn->outCount >= OUTBUFFER_THRESHOLD))
+		return(pqFlush(conn));
+	return 0; /* Just to keep compiler quiet */
+}
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index 9360c541be..2ff3fa4883 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -406,6 +406,12 @@ pqParseInput2(PGconn *conn)
 {
 	char		id;
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		fprintf(stderr, "internal error, attempt to read v2 protocol in batch mode");
+		abort();
+	}
+
 	/*
 	 * Loop to parse successive complete messages available in the buffer.
 	 */
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 1696525475..9c37a9c4ff 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -217,10 +217,18 @@ pqParseInput3(PGconn *conn)
 						return;
 					conn->asyncStatus = PGASYNC_READY;
 					break;
-				case 'Z':		/* backend is ready for new query */
+				case 'Z':		/* sync response, backend is ready for new query */
 					if (getReadyForQuery(conn))
 						return;
-					conn->asyncStatus = PGASYNC_IDLE;
+					if (conn->batch_status != PQBATCH_MODE_OFF)
+					{
+						conn->batch_status = PQBATCH_MODE_ON;
+						conn->result = PQmakeEmptyPGresult(conn,
+								PGRES_BATCH_END);
+						conn->asyncStatus = PGASYNC_READY;
+					}
+					else
+						conn->asyncStatus = PGASYNC_IDLE;
 					break;
 				case 'I':		/* empty query */
 					if (conn->result == NULL)
@@ -875,6 +883,9 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
 	PQExpBufferData workBuf;
 	char		id;
 
+	if (isError && conn->batch_status != PQBATCH_MODE_OFF)
+		conn->batch_status = PQBATCH_MODE_ABORTED;
+
 	/*
 	 * If this is an error message, pre-emptively clear any incomplete query
 	 * result we may have.  We'd just throw it away below anyway, and
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 3b6a9fbce3..b6e3cde539 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -15,6 +15,10 @@
 #ifndef LIBPQ_FE_H
 #define LIBPQ_FE_H
 
+#ifdef WIN32
+#include <windows.h>
+#endif
+
 #ifdef __cplusplus
 extern "C"
 {
@@ -97,7 +101,10 @@ typedef enum
 	PGRES_NONFATAL_ERROR,		/* notice or warning message */
 	PGRES_FATAL_ERROR,			/* query failed */
 	PGRES_COPY_BOTH,			/* Copy In/Out data transfer in progress */
-	PGRES_SINGLE_TUPLE			/* single tuple from larger resultset */
+	PGRES_SINGLE_TUPLE,			/* single tuple from larger resultset */
+	PGRES_BATCH_END,			/* end of a batch of commands */
+	PGRES_BATCH_ABORTED,		/* Command didn't run because of an abort
+								 * earlier in a batch */
 } ExecStatusType;
 
 typedef enum
@@ -137,6 +144,17 @@ typedef enum
 	PQPING_NO_ATTEMPT			/* connection not attempted (bad params) */
 } PGPing;
 
+/*
+ * PQBatchStatus - Current status of batch mode
+ */
+
+typedef enum
+{
+	PQBATCH_MODE_OFF,
+	PQBATCH_MODE_ON,
+	PQBATCH_MODE_ABORTED
+}	PQBatchStatus;
+
 /* PGconn encapsulates a connection to the backend.
  * The contents of this struct are not supposed to be known to applications.
  */
@@ -435,6 +453,12 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
 
+/* Routines for batch mode management */
+extern int	PQbatchStatus(PGconn *conn);
+extern int	PQenterBatchMode(PGconn *conn);
+extern int	PQexitBatchMode(PGconn *conn);
+extern int	PQbatchSendQueue(PGconn *conn);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 1de91ae295..dee4c85382 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -217,10 +217,15 @@ typedef enum
 {
 	PGASYNC_IDLE,				/* nothing's happening, dude */
 	PGASYNC_BUSY,				/* query in progress */
-	PGASYNC_READY,				/* result ready for PQgetResult */
+	PGASYNC_READY,				/* query done, waiting for client to fetch
+								 * result */
+	PGASYNC_READY_MORE,			/* query done, waiting for client to fetch
+								 * result, More results expected from this
+								 * query */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
-	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
+	PGASYNC_COPY_BOTH,			/* Copy In/Out data transfer in progress */
+	PGASYNC_QUEUED				/* Current query done, more in queue */
 } PGAsyncStatusType;
 
 /* PGQueryClass tracks which query protocol we are now executing */
@@ -229,7 +234,8 @@ typedef enum
 	PGQUERY_SIMPLE,				/* simple Query protocol (PQexec) */
 	PGQUERY_EXTENDED,			/* full Extended protocol (PQexecParams) */
 	PGQUERY_PREPARE,			/* Parse only (PQprepare) */
-	PGQUERY_DESCRIBE			/* Describe Statement or Portal */
+	PGQUERY_DESCRIBE,			/* Describe Statement or Portal */
+	PGQUERY_SYNC				/* A protocol sync to end a batch */
 } PGQueryClass;
 
 /* PGSetenvStatusType defines the state of the pqSetenv state machine */
@@ -301,6 +307,22 @@ typedef enum pg_conn_host_type
 	CHT_UNIX_SOCKET
 } pg_conn_host_type;
 
+/* An entry in the pending command queue. Used by batch mode to keep track
+ * of the expected results of future commands we've dispatched.
+ *
+ * Note that entries in this list are reused by being zeroed and appended to
+ * the tail when popped off the head. The entry with null next pointer is not
+ * the end of the list of expected commands, that's the tail pointer in
+ * pg_conn.
+ */
+typedef struct pgCommandQueueEntry
+{
+	PGQueryClass queryclass;	/* Query type; PGQUERY_SYNC for sync msg */
+	char	   *query;			/* SQL command, or NULL if unknown */
+	struct pgCommandQueueEntry *next;
+}	PGcommandQueueEntry;
+
+
 /*
  * pg_conn_host stores all information about each of possibly several hosts
  * mentioned in the connection string.  Most fields are derived by splitting
@@ -394,6 +416,7 @@ struct pg_conn
 	bool		options_valid;	/* true if OK to attempt connection */
 	bool		nonblocking;	/* whether this connection is using nonblock
 								 * sending semantics */
+	PQBatchStatus batch_status; /* Batch(pipelining) mode status of connection */
 	bool		singleRowMode;	/* return current query result row-by-row? */
 	char		copy_is_binary; /* 1 = copy binary, 0 = copy text */
 	int			copy_already_done;	/* # bytes already returned in COPY OUT */
@@ -406,6 +429,16 @@ struct pg_conn
 	pg_conn_host *connhost;		/* details about each named host */
 	char	   *connip;			/* IP address for current network connection */
 
+	/*
+	 * The command queue
+	 *
+	 * head is the next pending cmd, tail is where we append new commands.
+	 * Freed entries for recycling go on the recycle linked list.
+	 */
+	PGcommandQueueEntry *cmd_queue_head;
+	PGcommandQueueEntry *cmd_queue_tail;
+	PGcommandQueueEntry *cmd_queue_recycle;
+
 	/* Connection data */
 	pgsocket	sock;			/* FD for socket, PGINVALID_SOCKET if
 								 * unconnected */
@@ -798,6 +831,11 @@ extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t len);
  */
 #define pqIsnonblocking(conn)	((conn)->nonblocking)
 
+/*
+ * Connection's outbuffer threshold.
+ */
+#define OUTBUFFER_THRESHOLD	65536
+
 #ifdef ENABLE_NLS
 extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1);
 extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2);
@@ -806,6 +844,8 @@ extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigne
 #define libpq_ngettext(s, p, n) ((n) == 1 ? (s) : (p))
 #endif
 
+#define libpq_gettext_noop(x) (x)
+
 /*
  * These macros are needed to let error-handling code be portable between
  * Unix and Windows.  (ugh)
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index a6d2ffbf9e..287214c544 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -17,6 +17,7 @@ SUBDIRS = \
 		  test_extensions \
 		  test_ginpostinglist \
 		  test_integerset \
+		  test_libpq \
 		  test_misc \
 		  test_parser \
 		  test_pg_dump \
diff --git a/src/test/modules/test_libpq/.gitignore b/src/test/modules/test_libpq/.gitignore
new file mode 100644
index 0000000000..11e8463984
--- /dev/null
+++ b/src/test/modules/test_libpq/.gitignore
@@ -0,0 +1,5 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
+/testlibpqbatch
diff --git a/src/test/modules/test_libpq/Makefile b/src/test/modules/test_libpq/Makefile
new file mode 100644
index 0000000000..d907063f65
--- /dev/null
+++ b/src/test/modules/test_libpq/Makefile
@@ -0,0 +1,25 @@
+# src/test/modules/test_libpq/Makefile
+
+OBJS = testlibpqbatch.o
+PROGRAM = testlibpqbatch
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS += $(libpq)
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_libpq
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
+
+testlibpqbatch.o: testlibpqbatch.c
+testlibpqbatch: testlibpqbatch.o
+check: testlibpqbatch prove-check
+
+prove-check:
+	$(prove_check)
diff --git a/src/test/modules/test_libpq/README b/src/test/modules/test_libpq/README
new file mode 100644
index 0000000000..d8174dd579
--- /dev/null
+++ b/src/test/modules/test_libpq/README
@@ -0,0 +1 @@
+Test programs and libraries for libpq
diff --git a/src/test/modules/test_libpq/t/001_libpq_async.pl b/src/test/modules/test_libpq/t/001_libpq_async.pl
new file mode 100644
index 0000000000..2c361880fa
--- /dev/null
+++ b/src/test/modules/test_libpq/t/001_libpq_async.pl
@@ -0,0 +1,27 @@
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 6;
+use Cwd;
+
+my $node = get_new_node('main');
+$node->init;
+$node->start;
+
+my $port = $node->port;
+
+my $numrows = 10000;
+my @tests =
+  qw(disallowed_in_batch simple_batch multi_batch batch_abort timings singlerowmode);
+$ENV{PATH} = "$ENV{PATH}:" . getcwd();
+for my $testname (@tests)
+{
+	$node->command_ok(
+		[ 'testlibpqbatch', 'dbname=postgres', "$numrows", "$testname" ],
+		"testlibpqbatch $testname");
+}
+
+$node->stop('fast');
diff --git a/src/test/modules/test_libpq/testlibpqbatch.c b/src/test/modules/test_libpq/testlibpqbatch.c
new file mode 100644
index 0000000000..b82634171b
--- /dev/null
+++ b/src/test/modules/test_libpq/testlibpqbatch.c
@@ -0,0 +1,1003 @@
+/*
+ * src/test/modules/test_libpq/testlibpqbatch.c
+ *
+ *
+ * testlibpqbatch.c
+ *		Test of batch execution functionality
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include "c.h"
+#include "libpq-fe.h"
+#include "portability/instr_time.h"
+
+#define EXPECT(condition, ...) \
+	if (0 == (condition)) \
+	{ \
+		fprintf(stderr, __VA_ARGS__); \
+		goto fail; \
+	}
+
+
+static void exit_nicely(PGconn *conn);
+static void simple_batch(PGconn *conn);
+static void test_disallowed_in_batch(PGconn *conn);
+static void batch_insert_pipelined(PGconn *conn, int n_rows);
+static void batch_insert_sequential(PGconn *conn, int n_rows);
+static void batch_insert_copy(PGconn *conn, int n_rows);
+static void test_batch_abort(PGconn *conn);
+static void test_singlerowmode(PGconn *conn);
+static const Oid INT4OID = 23;
+
+static const char *const drop_table_sql
+= "DROP TABLE IF EXISTS batch_demo";
+static const char *const create_table_sql
+= "CREATE UNLOGGED TABLE batch_demo(id serial primary key, itemno integer);";
+static const char *const insert_sql
+= "INSERT INTO batch_demo(itemno) VALUES ($1);";
+
+/* max char length of an int32, plus sign and null terminator */
+#define MAXINTLEN 12
+
+static void
+exit_nicely(PGconn *conn)
+{
+	PQfinish(conn);
+	exit(1);
+}
+
+static void
+simple_batch(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+
+	fprintf(stderr, "simple batch... ");
+
+	/*
+	 * Enter batch mode and dispatch a set of operations, which we'll then
+	 * process the results of as they come in.
+	 *
+	 * For a simple case we should be able to do this without interim
+	 * processing of results since our out buffer will give us enough slush to
+	 * work with and we won't block on sending. So blocking mode is fine.
+	 */
+	EXPECT(!PQisnonblocking(conn), "Expected blocking connection mode\n");
+
+	EXPECT(PQenterBatchMode(conn), "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+				            		   dummy_params, NULL, NULL, 0),
+				"dispatching SELECT failed: %s\n", PQerrorMessage(conn));
+
+	EXPECT(!PQexitBatchMode(conn), "exiting batch mode with work in progress should fail, but succeeded\n");
+
+	EXPECT(PQbatchSendQueue(conn), "Ending a batch failed: %s\n", PQerrorMessage(conn));
+
+	res = PQgetResult(conn);
+	EXPECT(res != NULL, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQresultStatus(res) == PGRES_TUPLES_OK, "Unexpected result code %s from first batch item\n", PQresStatus(PQresultStatus(res)));
+
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "PQgetResult returned something extra after first query result.\n");
+
+	/*
+	 * Even though we've processed the result there's still a sync to come and
+	 * we can't exit batch mode yet
+	 */
+	EXPECT(!PQexitBatchMode(conn), "exiting batch mode after query but before sync succeeded incorrectly\n");
+
+	res = PQgetResult(conn);
+	EXPECT(res != NULL, "PQgetResult returned null when sync result PGRES_BATCH_END expected: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQresultStatus(res) == PGRES_BATCH_END, "Unexpected result code %s instead of sync result, error: %s\n", PQresStatus(PQresultStatus(res)), PQerrorMessage(conn))
+
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "PQgetResult returned something extra after end batch call\n");
+
+	/* We're still in a batch... */
+	EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Fell out of batch mode somehow\n");
+
+	/* until we end it, which we can safely do now */
+	EXPECT(PQexitBatchMode(conn), "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_OFF, "exiting batch mode didn't seem to work\n");
+
+	fprintf(stderr, "ok\n");
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+static void
+test_disallowed_in_batch(PGconn *conn)
+{
+	PGresult   *res = NULL;
+
+	fprintf(stderr, "test error cases... ");
+
+	EXPECT(!PQisnonblocking(conn), "Expected blocking connection mode: %u\n", __LINE__);
+
+	EXPECT(PQenterBatchMode(conn), "Unable to enter batch mode\n");
+
+	EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Batch mode not activated properly\n");
+
+	/* PQexec should fail in batch mode */
+	res = PQexec(conn, "SELECT 1");
+	EXPECT(PQresultStatus(res) == PGRES_FATAL_ERROR, "PQexec should fail in batch mode but succeeded\n");
+
+	/* So should PQsendQuery */
+	EXPECT(PQsendQuery(conn, "SELECT 1") == 0, "PQsendQuery should fail in batch mode but succeeded\n");
+
+	/* Entering batch mode when already in batch mode is OK */
+	EXPECT(PQenterBatchMode(conn), "re-entering batch mode should be a no-op but failed\n");
+
+	EXPECT(!PQisBusy(conn), "PQisBusy should return false when idle in batch, returned true\n");
+
+	/* ok, back to normal command mode */
+	EXPECT(PQexitBatchMode(conn), "couldn't exit idle empty batch mode\n");
+
+	EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_OFF, "Batch mode not terminated properly\n");
+
+	/* exiting batch mode when not in batch mode should be a no-op */
+	EXPECT(PQexitBatchMode(conn), "batch mode exit when not in batch mode should succeed but failed\n");
+
+	/* can now PQexec again */
+	res = PQexec(conn, "SELECT 1");
+	EXPECT(PQresultStatus(res) == PGRES_TUPLES_OK, "PQexec should succeed after exiting batch mode but failed with: %s\n", PQerrorMessage(conn));
+
+	fprintf(stderr, "ok\n");
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+static void
+multi_batch(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+
+	fprintf(stderr, "multi batch... ");
+
+	/*
+	 * Queue up a couple of small batches and process each without returning
+	 * to command mode first.
+	 */
+	EXPECT(PQenterBatchMode(conn), "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+						   dummy_params, NULL, NULL, 0), "dispatching first SELECT failed: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQbatchSendQueue(conn), "Ending first batch failed: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+						   dummy_params, NULL, NULL, 0), "dispatching second SELECT failed: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQbatchSendQueue(conn), "Ending second batch failed: %s\n", PQerrorMessage(conn));
+
+	/* OK, start processing the batch results */
+	res = PQgetResult(conn);
+	EXPECT(res != NULL, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQresultStatus(res) == PGRES_TUPLES_OK, "Unexpected result code %s from first batch item\n", PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "PQgetResult returned something extra after first result\n");
+
+	EXPECT(!PQexitBatchMode(conn), "exiting batch mode after query but before sync succeeded incorrectly\n");
+
+	res = PQgetResult(conn);
+	EXPECT(res != NULL, "PQgetResult returned null when sync result expected: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQresultStatus(res) == PGRES_BATCH_END, "Unexpected result code %s instead of sync result, error: %s (line %u)\n", PQresStatus(PQresultStatus(res)), PQerrorMessage(conn), __LINE__);
+
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	/* second batch */
+
+	res = PQgetResult(conn);
+	EXPECT(res != NULL, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQresultStatus(res) == PGRES_TUPLES_OK, "Unexpected result code %s from second batch item\n",				PQresStatus(PQresultStatus(res)));
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	res = PQgetResult(conn);
+	EXPECT(res != NULL, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQresultStatus(res) == PGRES_BATCH_END, "Unexpected result code %s from second end batch\n",				PQresStatus(PQresultStatus(res)));
+
+	/* We're still in a batch... */
+	EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Fell out of batch mode somehow\n");
+
+	/* until we end it, which we can safely do now */
+	EXPECT(PQexitBatchMode(conn), "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_OFF, "exiting batch mode didn't seem to work\n");
+
+	fprintf(stderr, "ok\n");
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+/*
+ * When an operation in a batch fails the rest of the batch is flushed. We
+ * still have to get results for each batch item, but the item will just be
+ * a PGRES_BATCH_ABORTED code.
+ *
+ * This intentionally doesn't use a transaction to wrap the batch. You should
+ * usually use an xact, but in this case we want to observe the effects of each
+ * statement.
+ */
+static void
+test_batch_abort(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *dummy_params[1] = {"1"};
+	Oid			dummy_param_oids[1] = {INT4OID};
+	int			i;
+
+	fprintf(stderr, "aborted batch... ");
+
+	res = PQexec(conn, drop_table_sql);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "dispatching DROP TABLE failed: %s\n", PQerrorMessage(conn));
+
+	res = PQexec(conn, create_table_sql);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "dispatching CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+
+
+	/*
+	 * Queue up a couple of small batches and process each without returning
+	 * to command mode first. Make sure the second operation in the first
+	 * batch ERRORs.
+	 */
+	EXPECT(PQenterBatchMode(conn), "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+	dummy_params[0] = "1";
+	EXPECT(PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+						   dummy_params, NULL, NULL, 0), "dispatching first INSERT failed: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQsendQueryParams(conn, "SELECT no_such_function($1)", 1, dummy_param_oids,
+						   dummy_params, NULL, NULL, 0), "dispatching error select failed: %s\n", PQerrorMessage(conn));
+
+	dummy_params[0] = "2";
+	EXPECT(PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+						   dummy_params, NULL, NULL, 0), "dispatching second insert failed: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQbatchSendQueue(conn), "Ending first batch failed: %s\n", PQerrorMessage(conn));
+
+	dummy_params[0] = "3";
+	EXPECT(PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+						   dummy_params, NULL, NULL, 0), "dispatching second-batch insert failed: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQbatchSendQueue(conn), "Ending second batch failed: %s\n", PQerrorMessage(conn));
+
+	/*
+	 * OK, start processing the batch results.
+	 *
+	 * We should get a tuples-ok for the first query, a fatal error, a batch
+	 * aborted message for the second insert, a batch-end, then a command-ok
+	 * and a batch-ok for the second batch operation.
+	 */
+
+	EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_COMMAND_OK,
+					"Unexpected result code %s from first batch item, error='%s'\n",
+					res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)),
+					res == NULL ? PQerrorMessage(conn) : PQresultErrorMessage(res));
+
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	/* second query, caused error */
+
+	EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_FATAL_ERROR,
+					"Unexpected result code from second batch item. Wanted PGRES_FATAL_ERROR, got %s\n",
+					res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	/*
+	 * batch should now be aborted.
+	 *
+	 * Note that we could still queue more queries at this point if we wanted;
+	 * they'd get added to a new third batch since we've already sent a
+	 * second. The aborted flag relates only to the batch being received.
+	 */
+	EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_ABORTED, "batch should be flagged as aborted but isn't\n");
+
+	/* third query in batch, the second insert */
+
+	EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_BATCH_ABORTED, 
+					"Unexpected result code from third batch item. Wanted PGRES_BATCH_ABORTED, got %s\n", 
+					res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_ABORTED, "batch should be flagged as aborted but isn't\n");
+
+	/* We're still in a batch... */
+	EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Fell out of batch mode somehow\n");
+
+	/*
+	 * The end of a failed batch is still a PGRES_BATCH_END so clients know to
+	 * start processing results normally again and can tell the difference
+	 * between skipped commands and the sync.
+	 */
+	EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_BATCH_END,
+				 "Unexpected result code from first batch sync. Wanted PGRES_BATCH_END, got %s\n",
+				 res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)));
+
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_ABORTED, "sync should've cleared the aborted flag but didn't\n");
+
+	/* We're still in a batch... */
+	EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Fell out of batch mode somehow\n");
+
+	/* the insert from the second batch */
+	EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_COMMAND_OK, "Unexpected result code %s from first item in second batch\n", res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	/* the second batch sync */
+	EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_BATCH_END, "Unexpected result code %s from second batch sync\n", res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = NULL;
+
+	EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+	/* We're still in a batch... */
+	EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Fell out of batch mode somehow\n");
+
+	/* until we end it, which we can safely do now */
+	EXPECT(PQexitBatchMode(conn), "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_OFF, "exiting batch mode didn't seem to work\n");
+
+	fprintf(stderr, "ok\n");
+
+	/*
+	 * Since we fired the batches off without a surrounding xact, the results
+	 * should be:
+	 *
+	 * - Implicit xact started by server around 1st batch - First insert
+	 * applied - Second statement aborted xact - Third insert skipped - Sync
+	 * rolled back first implicit xact - Implicit xact created by server
+	 * around 2nd batch - insert applied from 2nd batch - Sync commits 2nd
+	 * xact
+	 *
+	 * So we should only have the value 3 that we inserted.
+	 */
+	res = PQexec(conn, "SELECT itemno FROM batch_demo");
+
+	EXPECT(PQresultStatus(res) == PGRES_TUPLES_OK, "Expected tuples, got %s: %s", PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+
+	for (i = 0; i < PQntuples(res); i++)
+	{
+		const char *val = PQgetvalue(res, i, 0);
+
+		EXPECT(strcmp(val, "3") == 0, "expected only insert with value 3, got %s", val);
+	}
+
+	EXPECT(PQntuples(res) == 1, "expected 1 result, got %d", PQntuples(res));
+	PQclear(res);
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+
+/* State machine enums for batch insert */
+typedef enum BatchInsertStep
+{
+	BI_BEGIN_TX,
+	BI_DROP_TABLE,
+	BI_CREATE_TABLE,
+	BI_PREPARE,
+	BI_INSERT_ROWS,
+	BI_COMMIT_TX,
+	BI_SYNC,
+	BI_DONE
+}	BatchInsertStep;
+
+static void
+batch_insert_pipelined(PGconn *conn, int n_rows)
+{
+	PGresult   *res = NULL;
+	const char *insert_params[1];
+	Oid			insert_param_oids[1] = {INT4OID};
+	char		insert_param_0[MAXINTLEN];
+	BatchInsertStep send_step = BI_BEGIN_TX,
+				recv_step = BI_BEGIN_TX;
+	int			rows_to_send,
+				rows_to_receive;
+
+	insert_params[0] = &insert_param_0[0];
+
+	rows_to_send = rows_to_receive = n_rows;
+
+	/*
+	 * Do a batched insert into a table created at the start of the batch
+	 */
+	EXPECT(PQenterBatchMode(conn), "failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQsendQueryParams(conn, "BEGIN",
+						   0, NULL, NULL, NULL, NULL, 0), "xact start failed: %s\n", PQerrorMessage(conn));
+
+	fprintf(stdout, "sent BEGIN\n");
+
+	send_step = BI_DROP_TABLE;
+
+	EXPECT(PQsendQueryParams(conn, drop_table_sql,
+						   0, NULL, NULL, NULL, NULL, 0), "dispatching DROP TABLE failed: %s\n", PQerrorMessage(conn));
+
+	fprintf(stdout, "sent DROP\n");
+
+	send_step = BI_CREATE_TABLE;
+
+	EXPECT(PQsendQueryParams(conn, create_table_sql,
+						   0, NULL, NULL, NULL, NULL, 0), "dispatching CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+
+	fprintf(stdout, "sent CREATE\n");
+
+	send_step = BI_PREPARE;
+
+	EXPECT(PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids), "dispatching PREPARE failed: %s\n", PQerrorMessage(conn));
+
+	fprintf(stdout, "sent PREPARE\n");
+
+	send_step = BI_INSERT_ROWS;
+
+	/*
+	 * Now we start inserting. We'll be sending enough data that we could fill
+	 * our out buffer, so to avoid deadlocking we need to enter nonblocking
+	 * mode and consume input while we send more output. As results of each
+	 * query are processed we should pop them to allow processing of the next
+	 * query. There's no need to finish the batch before processing results.
+	 */
+	EXPECT(PQsetnonblocking(conn, 1) == 0, "failed to set nonblocking mode: %s\n", PQerrorMessage(conn));
+
+	while (recv_step != BI_DONE)
+	{
+		int			sock;
+		fd_set		input_mask;
+		fd_set		output_mask;
+
+		sock = PQsocket(conn);
+
+		if (sock < 0)
+			break;				/* shouldn't happen */
+
+		FD_ZERO(&input_mask);
+		FD_SET(sock, &input_mask);
+		FD_ZERO(&output_mask);
+		FD_SET(sock, &output_mask);
+
+		if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
+		{
+			fprintf(stderr, "select() failed: %s\n", strerror(errno));
+			exit_nicely(conn);
+		}
+
+		/*
+		 * Process any results, so we keep the server's out buffer free
+		 * flowing and it can continue to process input
+		 */
+		if (FD_ISSET(sock, &input_mask))
+		{
+			PQconsumeInput(conn);
+
+			/* Read until we'd block if we tried to read */
+			while (!PQisBusy(conn) && recv_step < BI_DONE)
+			{
+				const char *cmdtag;
+				const char *description = NULL;
+				int			status;
+				BatchInsertStep next_step;
+
+
+				res = PQgetResult(conn);
+
+				if (res == NULL)
+				{
+					/*
+					 * No more results from this query, advance to the next
+					 * result
+					 */
+
+					fprintf(stdout, "next query!\n");
+					continue;
+				}
+
+				status = PGRES_COMMAND_OK;
+				next_step = recv_step + 1;
+				switch (recv_step)
+				{
+					case BI_BEGIN_TX:
+						cmdtag = "BEGIN";
+						break;
+					case BI_DROP_TABLE:
+						cmdtag = "DROP TABLE";
+						break;
+					case BI_CREATE_TABLE:
+						cmdtag = "CREATE TABLE";
+						break;
+					case BI_PREPARE:
+						cmdtag = "";
+						description = "PREPARE";
+						break;
+					case BI_INSERT_ROWS:
+						cmdtag = "INSERT";
+						rows_to_receive--;
+						if (rows_to_receive > 0)
+							next_step = BI_INSERT_ROWS;
+						break;
+					case BI_COMMIT_TX:
+						cmdtag = "COMMIT";
+						break;
+					case BI_SYNC:
+						cmdtag = "";
+						description = "SYNC";
+						status = PGRES_BATCH_END;
+						break;
+					case BI_DONE:
+						/* unreachable */
+						abort();
+				}
+				if (description == NULL)
+					description = cmdtag;
+
+				fprintf(stderr, "At state %d (%s) expect tag '%s', result code %s, expect %d more rows, transition to %d\n",
+						recv_step, description, cmdtag, PQresStatus(status), rows_to_receive, next_step);
+
+				EXPECT(PQresultStatus(res) == status, "%s reported status %s, expected %s. Error msg is [%s]\n",
+							description, PQresStatus(PQresultStatus(res)), PQresStatus(status), PQerrorMessage(conn));
+
+				EXPECT(strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) == 0, "%s expected command tag '%s', got '%s'\n",
+							description, cmdtag, PQcmdStatus(res));
+
+				fprintf(stdout, "Got %s OK\n", cmdtag);
+
+				recv_step = next_step;
+
+				PQclear(res);
+				res = NULL;
+			}
+		}
+
+		/* Write more rows and/or the end batch message, if needed */
+		if (FD_ISSET(sock, &output_mask))
+		{
+			PQflush(conn);
+
+			if (send_step == BI_INSERT_ROWS)
+			{
+				snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send);
+				insert_param_0[MAXINTLEN - 1] = '\0';
+
+				if (PQsendQueryPrepared(conn, "my_insert",
+										1, insert_params, NULL, NULL, 0))
+				{
+					fprintf(stdout, "sent row %d\n", rows_to_send);
+
+					rows_to_send--;
+					if (rows_to_send == 0)
+						send_step = BI_COMMIT_TX;
+				}
+				else
+				{
+					/*
+					 * in nonblocking mode, so it's OK for an insert to fail
+					 * to send
+					 */
+					fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
+							rows_to_send, PQerrorMessage(conn));
+				}
+			}
+			else if (send_step == BI_COMMIT_TX)
+			{
+				if (PQsendQueryParams(conn, "COMMIT",
+									  0, NULL, NULL, NULL, NULL, 0))
+				{
+					fprintf(stdout, "sent COMMIT\n");
+					send_step = BI_SYNC;
+				}
+				else
+				{
+					fprintf(stderr, "WARNING: failed to send commit: %s\n",
+							PQerrorMessage(conn));
+				}
+			}
+			else if (send_step == BI_SYNC)
+			{
+				if (PQbatchSendQueue(conn))
+				{
+					fprintf(stdout, "Dispatched end batch message\n");
+					send_step = BI_DONE;
+				}
+				else
+				{
+					fprintf(stderr, "WARNING: Ending a batch failed: %s\n",
+							PQerrorMessage(conn));
+				}
+			}
+		}
+
+	}
+
+	/* We've got the sync message and the batch should be done */
+	EXPECT(PQexitBatchMode(conn), "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn));
+
+	EXPECT(PQsetnonblocking(conn, 0) == 0, "failed to clear nonblocking mode: %s\n", PQerrorMessage(conn));
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+
+static void
+batch_insert_sequential(PGconn *conn, int nrows)
+{
+	PGresult   *res = NULL;
+	const char *insert_params[1];
+	Oid			insert_param_oids[1] = {INT4OID};
+	char		insert_param_0[MAXINTLEN];
+
+	insert_params[0] = &insert_param_0[0];
+
+	res = PQexec(conn, "BEGIN");
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "BEGIN failed: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+
+	res = PQexec(conn, drop_table_sql);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "DROP TABLE failed: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+
+	res = PQexec(conn, create_table_sql);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+
+	res = PQprepare(conn, "my_insert2", insert_sql, 1, insert_param_oids);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "prepare failed: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+
+	while (nrows > 0)
+	{
+		snprintf(&insert_param_0[0], MAXINTLEN, "%d", nrows);
+		insert_param_0[MAXINTLEN - 1] = '\0';
+
+		res = PQexecPrepared(conn, "my_insert2",
+							 1, insert_params, NULL, NULL, 0);
+		EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT failed: %s\n", PQerrorMessage(conn));
+
+		PQclear(res);
+		nrows--;
+	}
+
+	res = PQexec(conn, "COMMIT");
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "COMMIT failed: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+static void
+batch_insert_copy(PGconn *conn, int nrows)
+{
+	PGresult   *res = NULL;
+
+	res = PQexec(conn, drop_table_sql);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "DROP TABLE failed: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+
+	res = PQexec(conn, create_table_sql);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+	res = NULL;
+
+	res = PQexec(conn, "COPY batch_demo(itemno) FROM stdin");
+	EXPECT(PQresultStatus(res) == PGRES_COPY_IN, "COPY: %s\n", PQerrorMessage(conn));
+	PQclear(res);
+	res = NULL;
+
+	while (nrows > 0)
+	{
+		char		buf[12 + 2];
+		int			formatted = snprintf(&buf[0], 12 + 1, "%d\n", nrows);
+
+		EXPECT(formatted < 12 + 1, "Buffer write truncated somehow\n");
+
+		EXPECT(PQputCopyData(conn, buf, formatted) == 1, "Write of COPY data failed: %s\n",
+					PQerrorMessage(conn));
+
+		nrows--;
+	}
+
+	EXPECT(PQputCopyEnd(conn, NULL) == 1, "Finishing COPY failed: %s",
+				PQerrorMessage(conn));
+
+	res = PQgetResult(conn);
+	EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "COPY finished with %s: %s\n",
+	    	PQresStatus(PQresultStatus(res)),
+				PQresultErrorMessage(res));
+	PQclear(res);
+	res = NULL;
+
+	return;
+
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+}
+
+static void
+test_timings(PGconn *conn, int number_of_rows)
+{
+	instr_time start_time, end_time;
+
+	fprintf(stderr, "inserting %d rows batched then unbatched\n", number_of_rows);
+
+	INSTR_TIME_SET_CURRENT(start_time);
+	batch_insert_pipelined(conn, number_of_rows);
+	INSTR_TIME_SET_CURRENT(end_time);
+	INSTR_TIME_SUBTRACT(end_time, start_time);
+
+	printf("batch insert elapsed:      %.8f ms\n",
+		   INSTR_TIME_GET_MILLISEC(end_time));
+
+	INSTR_TIME_SET_CURRENT(start_time);
+	batch_insert_sequential(conn, number_of_rows);
+	INSTR_TIME_SET_CURRENT(end_time);
+	INSTR_TIME_SUBTRACT(end_time, start_time);
+	printf("sequential insert elapsed: %.8f ms\n",
+		   INSTR_TIME_GET_MILLISEC(end_time));
+
+	INSTR_TIME_SET_CURRENT(start_time);
+	batch_insert_copy(conn, number_of_rows);
+	INSTR_TIME_SET_CURRENT(end_time);
+	INSTR_TIME_SUBTRACT(end_time, start_time);
+	printf("COPY elapsed:              %.8f ms\n",
+		   INSTR_TIME_GET_MILLISEC(end_time));
+
+	fprintf(stderr, "Done.\n");
+}
+
+static void
+usage_exit(const char *progname)
+{
+	fprintf(stderr, "Usage: %s ['connstring' [number_of_rows [test_to_run]]]\n", progname);
+	fprintf(stderr, "  tests: all|disallowed_in_batch|simple_batch|multi_batch|batch_abort|timings|singlerowmode\n");
+	exit(1);
+}
+
+static void
+test_singlerowmode(PGconn *conn)
+{
+	PGresult *res;
+	int i,r;
+
+	/* 1 batch, 3 queries in it */
+	r = PQenterBatchMode(conn);
+
+	for (i=0; i < 3; i++) {
+		r = PQsendQueryParams(conn,
+				"SELECT 1",
+				0,
+				NULL,
+				NULL,
+				NULL,
+				NULL,
+				0);
+	}
+	PQbatchSendQueue(conn);
+
+	i=0;
+	while (true)
+	{
+		int	isSingleTuple = 0;
+		ExecStatusType last_est = 0;
+
+		/* Set single row mode for only first 3 SELECT queries */
+		if(i < 3)
+		{
+			r = PQsetSingleRowMode(conn);
+			if (r!=1)
+			{
+				fprintf(stderr, "PQsetSingleRowMode() failed for i=%d\n", i);
+			}
+		}
+		while ((res = PQgetResult(conn)) != NULL)
+		{
+			ExecStatusType est = PQresultStatus(res);
+			fprintf(stderr, "Result status: %d (%s) for i=%d", est, PQresStatus(est), i);
+			if (est == PGRES_TUPLES_OK)
+			{
+				fprintf(stderr,  ", tuples: %d\n", PQntuples(res));
+				EXPECT(isSingleTuple, " Expected to follow PGREG_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead\n");
+				isSingleTuple=0;
+			}
+			else if (est == PGRES_SINGLE_TUPLE)
+			{
+				isSingleTuple = 1;
+				fprintf(stderr,  ", single tuple: %d\n", PQntuples(res));
+			}
+			else if (est == PGRES_BATCH_END)
+			{
+				fprintf(stderr,  ", end of batch reached\n");
+			}
+			else if (est != PGRES_COMMAND_OK)
+			{
+				fprintf(stderr,  ", error: %s\n", PQresultErrorMessage(res));
+			}
+			PQclear(res);
+			last_est = est;
+		}
+		i++;
+		if (last_est == PGRES_BATCH_END) break;
+	}
+	PQexitBatchMode(conn);
+	PQclear(res);
+	res = NULL;
+	return;
+fail:
+	PQclear(res);
+	exit_nicely(conn);
+
+}
+int
+main(int argc, char **argv)
+{
+	const char *conninfo;
+	PGconn	   *conn;
+	int			number_of_rows = 10000;
+
+	int			run_disallowed_in_batch = 1,
+				run_simple_batch = 1,
+				run_multi_batch = 1,
+				run_batch_abort = 1,
+				run_timings = 1,
+				run_singlerowmode = 1;
+
+	/*
+	 * If the user supplies a parameter on the command line, use it as the
+	 * conninfo string; otherwise default to setting dbname=postgres and using
+	 * environment variables or defaults for all other connection parameters.
+	 */
+	if (argc > 4)
+	{
+		usage_exit(argv[0]);
+	}
+	if (argc > 3)
+	{
+		if (strcmp(argv[3], "all") != 0)
+		{
+			run_disallowed_in_batch = 0;
+			run_simple_batch = 0;
+			run_multi_batch = 0;
+			run_batch_abort = 0;
+			run_timings = 0;
+			run_singlerowmode = 0;
+			if (strcmp(argv[3], "disallowed_in_batch") == 0)
+				run_disallowed_in_batch = 1;
+			else if (strcmp(argv[3], "simple_batch") == 0)
+				run_simple_batch = 1;
+			else if (strcmp(argv[3], "multi_batch") == 0)
+				run_multi_batch = 1;
+			else if (strcmp(argv[3], "batch_abort") == 0)
+				run_batch_abort = 1;
+			else if (strcmp(argv[3], "timings") == 0)
+				run_timings = 1;
+			else if (strcmp(argv[3], "singlerowmode") == 0)
+				run_singlerowmode = 1;
+			else
+			{
+				fprintf(stderr, "%s is not a recognized test name\n", argv[3]);
+				usage_exit(argv[0]);
+			}
+		}
+	}
+	if (argc > 2)
+	{
+		errno = 0;
+		number_of_rows = strtol(argv[2], NULL, 10);
+		if (errno)
+		{
+			fprintf(stderr, "couldn't parse '%s' as an integer or zero rows supplied: %s", argv[2], strerror(errno));
+			usage_exit(argv[0]);
+		}
+		if (number_of_rows <= 0)
+		{
+			fprintf(stderr, "number_of_rows must be positive");
+			usage_exit(argv[0]);
+		}
+	}
+	if (argc > 1)
+	{
+		conninfo = argv[1];
+	}
+	else
+	{
+		conninfo = "dbname = postgres";
+	}
+
+	/* Make a connection to the database */
+	conn = PQconnectdb(conninfo);
+
+	/* Check to see that the backend connection was successfully made */
+	if (PQstatus(conn) != CONNECTION_OK)
+	{
+		fprintf(stderr, "Connection to database failed: %s\n",
+				PQerrorMessage(conn));
+		exit_nicely(conn);
+	}
+
+	if (run_disallowed_in_batch)
+		test_disallowed_in_batch(conn);
+
+	if (run_simple_batch)
+		simple_batch(conn);
+
+	if (run_multi_batch)
+		multi_batch(conn);
+
+	if (run_batch_abort)
+		test_batch_abort(conn);
+
+	if (run_timings)
+		test_timings(conn, number_of_rows);
+
+	if(run_singlerowmode)
+		test_singlerowmode(conn);
+	/* close the connection to the database and cleanup */
+	PQfinish(conn);
+
+	return 0;
+}
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 20da7985c1..b131afaa1a 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -50,7 +50,8 @@ my @contrib_excludes = (
 	'pgcrypto',         'sepgsql',
 	'brin',             'test_extensions',
 	'test_misc',        'test_pg_dump',
-	'snapshot_too_old', 'unsafe_tests');
+	'snapshot_too_old', 'unsafe_tests',
+	'test_libpq');
 
 # Set of variables for frontend modules
 my $frontend_defines = { 'initdb' => 'FRONTEND' };

Reply via email to