On 2020-Nov-02, Alvaro Herrera wrote: > In v23 I've gone over docs; discovered that PQgetResults docs were > missing the new values. Added those. No significant other changes yet.
>From 3f305d05000981445fe4dbbb96c2e88ace89f439 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Mon, 26 Oct 2020 13:24:24 -0300 Subject: [PATCH v23] libpq batch --- doc/src/sgml/libpq.sgml | 505 ++++++++ doc/src/sgml/lobj.sgml | 4 + .../libpqwalreceiver/libpqwalreceiver.c | 3 + src/interfaces/libpq/exports.txt | 4 + src/interfaces/libpq/fe-connect.c | 27 + src/interfaces/libpq/fe-exec.c | 630 +++++++++- src/interfaces/libpq/fe-protocol2.c | 6 + src/interfaces/libpq/fe-protocol3.c | 16 +- src/interfaces/libpq/libpq-fe.h | 22 +- src/interfaces/libpq/libpq-int.h | 46 +- src/test/modules/Makefile | 1 + src/test/modules/test_libpq/.gitignore | 5 + src/test/modules/test_libpq/Makefile | 25 + src/test/modules/test_libpq/README | 1 + .../modules/test_libpq/t/001_libpq_async.pl | 27 + src/test/modules/test_libpq/testlibpqbatch.c | 1013 +++++++++++++++++ src/tools/msvc/Mkvcbuild.pm | 3 +- src/tools/pgindent/typedefs.list | 3 + 18 files changed, 2295 insertions(+), 46 deletions(-) create mode 100644 src/test/modules/test_libpq/.gitignore create mode 100644 src/test/modules/test_libpq/Makefile create mode 100644 src/test/modules/test_libpq/README create mode 100644 src/test/modules/test_libpq/t/001_libpq_async.pl create mode 100644 src/test/modules/test_libpq/testlibpqbatch.c diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 9ce32fb39b..839cadfaf0 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -3061,6 +3061,30 @@ ExecStatusType PQresultStatus(const PGresult *res); </para> </listitem> </varlistentry> + + <varlistentry id="libpq-pgres-batch-end"> + <term><literal>PGRES_BATCH_END</literal></term> + <listitem> + <para> + The <structname>PGresult</structname> represents the end of a batch. + This status occurs only when batch mode has been selected. + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-pgres-batch-aborted"> + <term><literal>PGRES_BATCH_ABORTED</literal></term> + <listitem> + <para> + The <structname>PGresult</structname> represents a batch that's + received an error from the server. <function>PQgetResult</function> + must be called repeatedly, and it will return this status code, + until the end of the current batch, at which point it will return + <literal>PGRES_BATCH_END</literal> and normal processing can resume. + </para> + </listitem> + </varlistentry> + </variablelist> If the result status is <literal>PGRES_TUPLES_OK</literal> or @@ -4819,6 +4843,478 @@ int PQflush(PGconn *conn); </sect1> + <sect1 id="libpq-batch-mode"> + <title>Batch Mode and Query Pipelining</title> + + <indexterm zone="libpq-batch-mode"> + <primary>libpq</primary> + <secondary>batch mode</secondary> + </indexterm> + + <indexterm zone="libpq-batch-mode"> + <primary>libpq</primary> + <secondary>pipelining</secondary> + </indexterm> + + <para> + <application>libpq</application> supports queueing up queries into + a pipeline to be executed as a batch on the server. Batching queries + allows applications to avoid a client/server round-trip after each + query to get the results before issuing the next query. + </para> + + <sect2> + <title>When to Use Batching</title> + + <para> + Much like asynchronous query mode, there is no performance disadvantage to + using batching and pipelining. It increases client application complexity + and extra caution is required to prevent client/server deadlocks, but + pipelining can sometimes offer considerable performance improvements. + </para> + + <para> + Batching is most useful when the server is distant, i.e., network latency + (<quote>ping time</quote>) is high, and also when many small operations + are being performed in rapid sequence. There is usually less benefit + in using batches when each query takes many multiples of the client/server + round-trip time to execute. A 100-statement operation run on a server + 300ms round-trip-time away would take 30 seconds in network latency alone + without batching; with batching it may spend as little as 0.3s waiting for + results from the server. + </para> + + <para> + Use batches when your application does lots of small + <literal>INSERT</literal>, <literal>UPDATE</literal> and + <literal>DELETE</literal> operations that can't easily be transformed + into operations on sets or into a <literal>COPY</literal> operation. + </para> + + <para> + Batching is not useful when information from one operation is required by + the client to produce the next operation. In such cases, the client + must introduce a synchronization point and wait for a full client/server + round-trip to get the results it needs. However, it's often possible to + adjust the client design to exchange the required information server-side. + Read-modify-write cycles are especially good candidates; for example: + <programlisting> +BEGIN; +SELECT x FROM mytable WHERE id = 42 FOR UPDATE; +-- result: x=2 +-- client adds 1 to x: +UPDATE mytable SET x = 3 WHERE id = 42; +COMMIT; + </programlisting> + could be much more efficiently done with: + <programlisting> +UPDATE mytable SET x = x + 1 WHERE id = 42; + </programlisting> + </para> + + <note> + <para> + The batch API was introduced in PostgreSQL 14.0, but clients using + the PostgreSQL 14 version of <application>libpq</application> can use + batches on server versions 7.4 and newer. Batching works on any server + that supports the v3 extended query protocol. + </para> + </note> + + </sect2> + + <sect2 id="libpq-batch-using"> + <title>Using Batch Mode</title> + + <para> + To issue batches the application must switch a connection into batch mode. + Enter batch mode with <xref linkend="libpq-PQenterBatchMode"/> + or test whether batch mode is active with + <xref linkend="libpq-PQbatchStatus"/>. + In batch mode only <link linkend="libpq-async">asynchronous operations</link> + are permitted, and <literal>COPY</literal> is not recommended as it + may trigger failure in batch processing. Using any synchronous + command execution functions such as <function>PQfn</function>, + <function>PQexec</function> or one of its sibling functions are error + conditions. Functions allowed in batch mode are described in + <xref linkend="libpq-batch-sending"/>. + </para> + + <para> + The client uses libpq's asynchronous query functions to dispatch work, + marking the end of each batch with <function>PQbatchSendQueue</function>. + And to get results, it uses <function>PQgetResult</function>. It may + eventually exit batch mode with <function>PQexitBatchMode</function> + once all results are processed. + </para> + + <note> + <para> + It is best to use batch mode with <application>libpq</application> in + <link linkend="libpq-PQsetnonblocking">non-blocking mode</link>. If used + in blocking mode it is possible for a client/server deadlock to occur. + The client will block trying to send queries to the server, but the server + will block trying to send results from queries it has already processed to + the client. This only occurs when the client sends enough queries to fill + its output buffer and the server's receive buffer before switching to + processing input from the server, but it's hard to predict exactly when + that'll happen so it's best to always use non-blocking mode. + Batch mode consumes more memory when send/recv is not done as required, + even in non-blocking mode. + </para> + </note> + + <sect3 id="libpq-batch-sending"> + <title>Issuing Queries</title> + + <para> + After entering batch mode the application dispatches requests using + normal asynchronous <application>libpq</application> functions such as + <function>PQsendQueryParams</function>, <function>PQsendPrepare</function>, + <function>PQsendQueryPrepared</function>, <function>PQsendDescribePortal</function>, + <function>PQsendDescribePrepared</function>. + The asynchronous requests are followed by a + <xref linkend="libpq-PQbatchSendQueue"/> + call to mark the end of the batch. The client needs not + call <function>PQgetResult</function> immediately after + dispatching each operation. + <link linkend="libpq-batch-results">Result processing</link> + is handled separately. + </para> + + <para> + Batched operations will be executed by the server in the order the client + sends them. The server will send the results in the order the statements + executed. The server may begin executing the batch before all commands + in the batch are queued and the end of batch command is sent. If any + statement encounters an error the server aborts the current transaction and + skips processing the rest of the batch. Query processing resumes after the + end of the failed batch. + </para> + + <para> + It's fine for one operation to depend on the results of a + prior one. One query may define a table that the next query in the same + batch uses; similarly, an application may create a named prepared statement + then execute it with later statements in the same batch. + </para> + + </sect3> + + <sect3 id="libpq-batch-results"> + <title>Processing Results</title> + + <para> + The client <link linkend="libpq-batch-interleave">interleaves result + processing</link> with sending batch queries, or for small batches may + process all results after sending the whole batch. + </para> + + <para> + To process the result of the first batch entry the application calls + <function>PQgetResult</function> and handles each result until + <function>PQgetResult</function> returns null. + The result from the next batch entry may then be retrieved using + <function>PQgetResult</function> again and the cycle repeated. + The application handles individual statement results as normal. + </para> + + <para> + To enter single-row mode, call <function>PQsetSingleRowMode</function> + before retrieving results with <function>PQgetResult</function>. + This mode selection is effective only for the query currently + being processed. For more information on the use of + <function>PQsetSingleRowMode</function>, + refer to <xref linkend="libpq-single-row-mode"/>. + </para> + + <para> + <function>PQgetResult</function> behaves the same as for normal + asynchronous processing except that it may contain the new + <type>PGresult</type> types <literal>PGRES_BATCH_END</literal> + and <literal>PGRES_BATCH_ABORTED</literal>. + <literal>PGRES_BATCH_END</literal> is reported exactly once for each + <function>PQbatchSendQueue</function> call at the corresponding point in + the result stream and at no other time. + <literal>PGRES_BATCH_ABORTED</literal> is emitted during error handling; + see <link linkend="libpq-batch-errors">error handling</link>. + </para> + + <para> + <function>PQisBusy</function>, <function>PQconsumeInput</function>, etc + operate as normal when processing batch results. + </para> + + <para> + <application>libpq</application> does not provide any information to the + application about the query currently being processed (except that + <function>PQgetResult</function> returns null to indicate that we start + returning the results of next query). The application must keep track + of the order in which it sent queries and the expected results. + Applications will typically use a state machine or a FIFO queue for this. + </para> + + </sect3> + + <sect3 id="libpq-batch-errors"> + <title>Error Handling</title> + + <para> + When a query in a batch causes an <literal>ERROR</literal> the server + skips processing all subsequent messages until the end-of-batch message. + The open transaction is aborted. + </para> + + <para> + From the client perspective, after the client gets a + <literal>PGRES_FATAL_ERROR</literal> return from + <function>PQresultStatus</function> the batch is flagged as aborted. + <application>libpq</application> will report + <literal>PGRES_BATCH_ABORTED</literal> result for each remaining queued + operation in an aborted batch. The result for + <function>PQbatchSendQueue</function> is reported as + <literal>PGRES_BATCH_END</literal> to signal the end of the aborted batch + and resumption of normal result processing. + </para> + + <para> + The client <emphasis>must</emphasis> process results with + <function>PQgetResult</function> during error recovery. + </para> + + <para> + If the batch used an implicit transaction then operations that have + already executed are rolled back and operations that were queued for after + the failed operation are skipped entirely. The same behaviour holds if the + batch starts and commits a single explicit transaction (i.e. the first + statement is <literal>BEGIN</literal> and the last is + <literal>COMMIT</literal>) except that the session remains in an aborted + transaction state at the end of the batch. If a batch contains <emphasis> + multiple explicit transactions</emphasis>, all transactions that committed + prior to the error remain committed, the currently in-progress transaction + is aborted and all subsequent operations in the current and all later + transactions in the same batch are skipped completely. + </para> + + <note> + <para> + The client must not assume that work is committed when it + <emphasis>sends</emphasis> a <literal>COMMIT</literal>, only when the + corresponding result is received to confirm the commit is complete. + Because errors arrive asynchronously the application needs to be able to + restart from the last <emphasis>received</emphasis> committed change and + resend work done after that point if something goes wrong. + </para> + </note> + + </sect3> + + <sect3 id="libpq-batch-interleave"> + <title>Interleaving Result Processing and Query Dispatch</title> + + <para> + To avoid deadlocks on large batches the client should be structured + around a non-blocking event loop using operating system facilities + such as <function>select</function>, <function>poll</function>, + <function>WaitForMultipleObjectEx</function>, etc. + </para> + + <para> + The client application should generally maintain a queue of work + still to be dispatched and a queue of work that has been dispatched + but not yet had its results processed. When the socket is writable + it should dispatch more work. When the socket is readable it should + read results and process them, matching them up to the next entry in + its expected results queue. Based on available memory, results from + socket should be read frequently and there's no need to wait till the + batch end to read the results. Batches should be scoped to logical + units of work, usually (but not necessarily) one transaction per batch. + There's no need to exit batch mode and re-enter it between batches + or to wait for one batch to finish before sending the next. + </para> + + <para> + An example using <function>select()</function> and a simple state + machine to track sent and received work is in + <filename>src/test/modules/test_libpq/testlibpqbatch.c</filename> + in the PostgreSQL source distribution. + </para> + + </sect3> + + <sect3 id="libpq-batch-end"> + <title>Ending Batch Mode</title> + + <para> + Once all dispatched commands have had their results processed and + the end batch result has been consumed the application may return + to non-batched mode with <xref linkend="libpq-PQexitBatchMode"/>. + </para> + </sect3> + + </sect2> + + <sect2 id="libpq-funcs-batch"> + <title>Functions Associated with Batch Mode</title> + + <variablelist> + + <varlistentry id="libpq-PQbatchStatus"> + <term> + <function>PQbatchStatus</function> + <indexterm> + <primary>PQbatchStatus</primary> + </indexterm> + </term> + + <listitem> + <para> + Returns current batch mode status of the <application>libpq</application> + connection. +<synopsis> +int PQbatchStatus(PGconn *conn); +</synopsis> + </para> + + <para> + <function>PQbatchStatus</function> can return one of the following values: + + <variablelist> + <varlistentry> + <term> + <literal>PQBATCH_MODE_ON</literal> + </term> + <listitem> + <para> + The <application>libpq</application> connection is in + batch mode. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term> + <literal>PQBATCH_MODE_OFF</literal> + </term> + <listitem> + <para> + The <application>libpq</application> connection is + <emphasis>not</emphasis> in batch mode. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term> + <literal>PQBATCH_MODE_ABORTED</literal> + </term> + <listitem> + <para> + The <application>libpq</application> connection is in aborted + batch status. The aborted flag is cleared as soon as the result + of the <function>PQbatchSendQueue</function> at the end of the aborted + batch is processed. Clients don't usually need this function to + verify aborted status, as they can tell that the batch is aborted + from the <literal>PGRES_BATCH_ABORTED</literal> result code. + </para> + </listitem> + </varlistentry> + + </variablelist> + + </listitem> + </varlistentry> + + <varlistentry id="libpq-PQenterBatchMode"> + <term> + <function>PQenterBatchMode</function> + <indexterm> + <primary>PQenterBatchMode</primary> + </indexterm> + </term> + + <listitem> + <para> + Causes a connection to enter batch mode if it is currently idle or + already in batch mode. + +<synopsis> +int PQenterBatchMode(PGconn *conn); +</synopsis> + + </para> + <para> + Returns 1 for success. + Returns 0 and has no effect if the connection is not currently + idle, i.e., it has a result ready, or it is waiting for more + input from the server, etc. + This function does not actually send anything to the server, + it just changes the <application>libpq</application> connection + state. + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-PQexitBatchMode"> + <term> + <function>PQexitBatchMode</function> + <indexterm> + <primary>PQexitBatchMode</primary> + </indexterm> + </term> + + <listitem> + <para> + Causes a connection to exit batch mode if it is currently in batch mode + with an empty queue and no pending results. +<synopsis> +int PQexitBatchMode(PGconn *conn); +</synopsis> + </para> + <para> + Returns 1 for success. Returns 1 and takes no action if not in + batch mode. If the current statement isn't finished processing + or there are results pending for collection with + <function>PQgetResult</function>, returns 0 and does nothing. + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-PQbatchSendQueue"> + <term> + <function>PQbatchSendQueue</function> + <indexterm> + <primary>PQbatchSendQueue</primary> + </indexterm> + </term> + + <listitem> + <para> + Delimits the end of a set of a batched commands by sending a + <link linkend="protocol-flow-ext-query">sync message</link> + and flushing the send buffer. The end of a batch serves as + the delimiter of an implicit transaction and an error recovery + point; see <link linkend="libpq-batch-errors"> error handling</link>. + +<synopsis> +int PQbatchSendQueue(PGconn *conn); +</synopsis> + </para> + <para> + Returns 1 for success. Returns 0 if the connection is not in + batch mode or sending a + <link linkend="protocol-flow-ext-query">sync message</link> + is failed. + </para> + </listitem> + </varlistentry> + + </variablelist> + + </sect2> + + </sect1> + <sect1 id="libpq-single-row-mode"> <title>Retrieving Query Results Row-by-Row</title> @@ -4859,6 +5355,15 @@ int PQflush(PGconn *conn); Each object should be freed with <xref linkend="libpq-PQclear"/> as usual. </para> + <note> + <para> + When using batch mode, activate the single row mode on the current batch query by + calling <function>PQsetSingleRowMode</function> + before retrieving results with <function>PQgetResult</function>. + See <xref linkend="libpq-batch-mode"/> for more information. + </para> + </note> + <para> <variablelist> <varlistentry id="libpq-PQsetSingleRowMode"> diff --git a/doc/src/sgml/lobj.sgml b/doc/src/sgml/lobj.sgml index 6329cf0796..49be8b1dbe 100644 --- a/doc/src/sgml/lobj.sgml +++ b/doc/src/sgml/lobj.sgml @@ -130,6 +130,10 @@ <application>libpq</application> library. </para> + <para> + Client applications cannot use these functions while libpq connection is in batch mode. + </para> + <sect2 id="lo-create"> <title>Creating a Large Object</title> diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 24f8b3e42e..9ae67387a5 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -1026,6 +1026,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, walres->status = WALRCV_ERROR; walres->err = pchomp(PQerrorMessage(conn->streamConn)); break; + default: + /* This is just to keep compiler quiet */ + break; } PQclear(pgres); diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index bbc1f90481..ca86f55652 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -179,3 +179,7 @@ PQgetgssctx 176 PQsetSSLKeyPassHook_OpenSSL 177 PQgetSSLKeyPassHook_OpenSSL 178 PQdefaultSSLKeyPassHook_OpenSSL 179 +PQenterBatchMode 180 +PQexitBatchMode 181 +PQbatchSendQueue 182 +PQbatchStatus 183 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index e7781d010f..91f6b205e4 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -536,6 +536,24 @@ pqDropConnection(PGconn *conn, bool flushInput) } } +/* + * PQfreeCommandQueue + * Free all the entries of PGcommandQueueEntry queue passed. + */ +static void +PQfreeCommandQueue(PGcommandQueueEntry *queue) +{ + + while (queue != NULL) + { + PGcommandQueueEntry *prev = queue; + + queue = queue->next; + if (prev->query) + free(prev->query); + free(prev); + } +} /* * pqDropServerData @@ -555,6 +573,7 @@ pqDropServerData(PGconn *conn) { PGnotify *notify; pgParameterStatus *pstatus; + PGcommandQueueEntry *queue; /* Forget pending notifies */ notify = conn->notifyHead; @@ -567,6 +586,14 @@ pqDropServerData(PGconn *conn) } conn->notifyHead = conn->notifyTail = NULL; + queue = conn->cmd_queue_head; + PQfreeCommandQueue(queue); + conn->cmd_queue_head = conn->cmd_queue_tail = NULL; + + queue = conn->cmd_queue_recycle; + PQfreeCommandQueue(queue); + conn->cmd_queue_recycle = NULL; + /* Reset ParameterStatus data, as well as variables deduced from it */ pstatus = conn->pstatus; while (pstatus != NULL) diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index eea0237c3a..cf5d639c2f 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -39,7 +39,9 @@ char *const pgresStatus[] = { "PGRES_NONFATAL_ERROR", "PGRES_FATAL_ERROR", "PGRES_COPY_BOTH", - "PGRES_SINGLE_TUPLE" + "PGRES_SINGLE_TUPLE", + "PGRES_BATCH_END", + "PGRES_BATCH_ABORTED" }; /* @@ -70,7 +72,10 @@ static PGresult *PQexecFinish(PGconn *conn); static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target); static int check_field_number(const PGresult *res, int field_num); - +static PGcommandQueueEntry *PQmakePipelinedCommand(PGconn *conn); +static void PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry); +static void PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry); +static int pqBatchFlush(PGconn *conn); /* ---------------- * Space management for PGresult. @@ -1210,7 +1215,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) conn->next_result = conn->result; conn->result = res; /* And mark the result ready to return */ - conn->asyncStatus = PGASYNC_READY; + conn->asyncStatus = PGASYNC_READY_MORE; } return 1; @@ -1233,6 +1238,13 @@ fail: int PQsendQuery(PGconn *conn, const char *query) { + if (conn->batch_status != PQBATCH_MODE_OFF) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot PQsendQuery in batch mode, use PQsendQueryParams\n")); + return false; + } + if (!PQsendQueryStart(conn)) return 0; @@ -1331,6 +1343,10 @@ PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes) { + PGcommandQueueEntry *pipeCmd = NULL; + char **last_query; + PGQueryClass *queryclass; + if (!PQsendQueryStart(conn)) return 0; @@ -1389,31 +1405,51 @@ PQsendPrepare(PGconn *conn, goto sendFailed; /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (conn->batch_status == PQBATCH_MODE_OFF) + { + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + last_query = &conn->last_query; + queryclass = &conn->queryclass; + } + else + { + pipeCmd = PQmakePipelinedCommand(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + last_query = &pipeCmd->query; + queryclass = &pipeCmd->queryclass; + } /* remember we are doing just a Parse */ - conn->queryclass = PGQUERY_PREPARE; + *queryclass = PGQUERY_PREPARE; /* and remember the query text too, if possible */ /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); - conn->last_query = strdup(query); + if (*last_query) + free(*last_query); + *last_query = strdup(query); /* * Give the data a push. In nonblock mode, don't complain if we're unable * to send it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqBatchFlush(conn) < 0) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->batch_status != PQBATCH_MODE_OFF) + PQappendPipelinedCommand(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + PQrecyclePipelinedCommand(conn, pipeCmd); /* error message should be set up already */ return 0; } @@ -1461,7 +1497,81 @@ PQsendQueryPrepared(PGconn *conn, } /* - * Common startup code for PQsendQuery and sibling routines + * PQmakePipelinedCommand + * Get a new command queue entry, allocating it if required. Doesn't add it to + * the tail of the queue yet, use PQappendPipelinedCommand once the command has + * been written for that. If a command fails once it's called this, it should + * use PQrecyclePipelinedCommand to put it on the freelist or release it. + * + * If allocation fails sets the error message and returns null. + */ +static PGcommandQueueEntry * +PQmakePipelinedCommand(PGconn *conn) +{ + PGcommandQueueEntry *entry; + + if (conn->cmd_queue_recycle == NULL) + { + entry = (PGcommandQueueEntry *) malloc(sizeof(PGcommandQueueEntry)); + if (entry == NULL) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("out of memory\n")); + return NULL; + } + } + else + { + entry = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry->next; + } + entry->next = NULL; + entry->query = NULL; + + return entry; +} + +/* + * PQappendPipelinedCommand + * Append a precreated command queue entry to the queue after it's been + * sent successfully. + */ +static void +PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry) +{ + if (conn->cmd_queue_head == NULL) + conn->cmd_queue_head = entry; + else + conn->cmd_queue_tail->next = entry; + conn->cmd_queue_tail = entry; +} + +/* + * PQrecyclePipelinedCommand + * Push a command queue entry onto the freelist. It must be an entry + * with null next pointer and not referenced by any other entry's next pointer. + */ +static void +PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry *entry) +{ + if (entry == NULL) + return; + if (entry->next != NULL) + { + fprintf(stderr, + libpq_gettext("tried to recycle non-dangling command queue entry")); + abort(); + } + if (entry->query) + free(entry->query); + + entry->next = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry; +} + +/* + * PQsendQueryStart + * Common startup code for PQsendQuery and sibling routines */ static bool PQsendQueryStart(PGconn *conn) @@ -1479,20 +1589,61 @@ PQsendQueryStart(PGconn *conn) libpq_gettext("no connection to the server\n")); return false; } - /* Can't send while already busy, either. */ - if (conn->asyncStatus != PGASYNC_IDLE) + + /* Can't send while already busy, either, unless enqueuing for later */ + if (conn->asyncStatus != PGASYNC_IDLE && conn->batch_status == PQBATCH_MODE_OFF) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("another command is already in progress\n")); return false; } - /* initialize async result-accumulation state */ - pqClearAsyncResult(conn); + if (conn->batch_status != PQBATCH_MODE_OFF) + { + /* + * When enqueuing a message we don't change much of the connection + * state since it's already in use for the current command. The + * connection state will get updated when PQbatchQueueProcess(...) + * advances to start processing the queued message. + * + * Just make sure we can safely enqueue given the current connection + * state. We can enqueue behind another queue item, or behind a + * non-queue command (one that sends its own sync), but we can't + * enqueue if the connection is in a copy state. + */ + switch (conn->asyncStatus) + { + case PGASYNC_QUEUED: + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* ok to queue */ + break; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot queue commands during COPY\n")); + return false; + break; + case PGASYNC_IDLE: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, idle state in batch mode")); + break; + } + } + else + { + /* + * This command's results will come in immediately. Initialize async + * result-accumulation state + */ + pqClearAsyncResult(conn); - /* reset single-row processing mode */ - conn->singleRowMode = false; + /* reset single-row processing mode */ + conn->singleRowMode = false; + } /* ready to send command message */ return true; } @@ -1516,6 +1667,10 @@ PQsendQueryGuts(PGconn *conn, int resultFormat) { int i; + PGcommandQueueEntry *pipeCmd = NULL; + char **last_query; + PGQueryClass *queryclass; + /* This isn't gonna work on a 2.0 server */ if (PG_PROTOCOL_MAJOR(conn->pversion) < 3) @@ -1525,6 +1680,23 @@ PQsendQueryGuts(PGconn *conn, return 0; } + if (conn->batch_status != PQBATCH_MODE_OFF) + { + pipeCmd = PQmakePipelinedCommand(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + last_query = &pipeCmd->query; + queryclass = &pipeCmd->queryclass; + } + else + { + last_query = &conn->last_query; + queryclass = &conn->queryclass; + } + + /* * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync, * using specified statement name and the unnamed portal. @@ -1637,35 +1809,42 @@ PQsendQueryGuts(PGconn *conn, pqPutMsgEnd(conn) < 0) goto sendFailed; - /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (conn->batch_status == PQBATCH_MODE_OFF) + { + /* construct the Sync message */ + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are using extended query protocol */ - conn->queryclass = PGQUERY_EXTENDED; + *queryclass = PGQUERY_EXTENDED; /* and remember the query text too, if possible */ /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); + if (*last_query) + free(*last_query); if (command) - conn->last_query = strdup(command); + *last_query = strdup(command); else - conn->last_query = NULL; + *last_query = NULL; /* * Give the data a push. In nonblock mode, don't complain if we're unable * to send it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqBatchFlush(conn) < 0) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->batch_status != PQBATCH_MODE_OFF) + PQappendPipelinedCommand(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + PQrecyclePipelinedCommand(conn, pipeCmd); /* error message should be set up already */ return 0; } @@ -1766,14 +1945,307 @@ PQisBusy(PGconn *conn) return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed; } +/* + * PQbatchStatus + * Returns current batch mode status + */ +int +PQbatchStatus(PGconn *conn) +{ + if (!conn) + return false; + + return conn->batch_status; +} + +/* + * PQenterBatchMode + * Put an idle connection in batch mode. Commands submitted after this + * can be pipelined on the connection, there's no requirement to wait for + * one to finish before the next is dispatched. + * + * Queuing of a new query or syncing during COPY is not allowed. + * + * A set of commands is terminated by a PQbatchQueueSync. Multiple sets of batched + * commands may be sent while in batch mode. Batch mode can be exited by + * calling PQbatchEnd() once all results are processed. + * + * This doesn't actually send anything on the wire, it just puts libpq + * into a state where it can pipeline work. + */ +int +PQenterBatchMode(PGconn *conn) +{ + if (!conn) + return 0; + + if (conn->batch_status != PQBATCH_MODE_OFF) + return 1; + + if (conn->asyncStatus != PGASYNC_IDLE) + return 0; + + conn->batch_status = PQBATCH_MODE_ON; + conn->asyncStatus = PGASYNC_QUEUED; + + return 1; +} + +/* + * PQexitBatchMode + * End batch mode and return to normal command mode. + * + * Has no effect unless the client has processed all results + * from all outstanding batches and the connection is idle. + * + * Returns 1 if batch mode ended. + */ +int +PQexitBatchMode(PGconn *conn) +{ + if (!conn) + goto exitFailed; + + if (conn->batch_status == PQBATCH_MODE_OFF) + return 1; + + switch (conn->asyncStatus) + { + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* can't end batch while busy */ + goto exitFailed; + default: + break; + } + + /* still work to process */ + if (conn->cmd_queue_head != NULL) + goto exitFailed; + + conn->batch_status = PQBATCH_MODE_OFF; + conn->asyncStatus = PGASYNC_IDLE; + + /* Flush any pending data in out buffer */ + if (pqFlush(conn) < 0) + goto sendFailed; + return 1; + +sendFailed: + /* error message should be set up already */ + +exitFailed: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, Failed to exit batch mode")); + return 0; +} + + +/* + * internal function pqBatchProcessQueue + * + * In batch mode, start processing the next query in the queue. + * + * Returns 1 if the next query was popped from the queue and can + * be processed by PQconsumeInput, PQgetResult, etc. + * + * Returns 0 if the current query isn't done yet, the connection + * is not in a batch, or there are no more queries to process. + */ +static int +pqBatchProcessQueue(PGconn *conn) +{ + PGcommandQueueEntry *next_query; + + if (!conn) + return 0; + + if (conn->batch_status == PQBATCH_MODE_OFF) + return 0; + + switch (conn->asyncStatus) + { + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, COPY in batch mode")); + break; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* client still has to process current query or results */ + return 0; + break; + case PGASYNC_IDLE: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, IDLE in batch mode")); + break; + case PGASYNC_QUEUED: + /* next query please */ + break; + } + + if (conn->cmd_queue_head == NULL) + { + /* + * In batch mode but nothing left on the queue; caller can submit more + * work or PQbatchEnd() now. + */ + return 0; + } + + /* + * Pop the next query from the queue and set up the connection state as if + * it'd just been dispatched from a non-batched call + */ + next_query = conn->cmd_queue_head; + conn->cmd_queue_head = next_query->next; + next_query->next = NULL; + + /* Initialize async result-accumulation state */ + pqClearAsyncResult(conn); + + /* reset single-row processing mode */ + conn->singleRowMode = false; + + + conn->last_query = next_query->query; + next_query->query = NULL; + conn->queryclass = next_query->queryclass; + + PQrecyclePipelinedCommand(conn, next_query); + + if (conn->batch_status == PQBATCH_MODE_ABORTED && + conn->queryclass != PGQUERY_SYNC) + { + /* + * In an aborted batch we don't get anything from the server for each + * result; we're just discarding input until we get to the next sync + * from the server. The client needs to know its queries got aborted + * so we create a fake PGresult to return immediately from + * PQgetResult. + */ + conn->result = + PQmakeEmptyPGresult(conn, PGRES_BATCH_ABORTED); + if (!conn->result) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("out of memory")); + pqSaveErrorResult(conn); + return 0; + } + conn->asyncStatus = PGASYNC_READY; + } + else + { + /* allow parsing to continue */ + conn->asyncStatus = PGASYNC_BUSY; + } + + return 1; +} + +/* + * PQbatchSendQueue + * + * End a batch submission. + * + * It's legal to start submitting another batch immediately, without + * waiting for the results of the current batch. There's no need to end batch + * mode and start it again. + * + * If a command in a batch fails, every subsequent command up to and + * including the PQbatchQueueSync command result gets set to PGRES_BATCH_ABORTED + * state. If the whole batch is processed without error, a PGresult with + * PGRES_BATCH_END is produced. + * + * Queries can already have been sent before PQbatchSendQueue is called, but + * PQbatchSendQueue need to be called before retrieving command results. + * + * The connection will remain in batch mode and unavailable for new synchronous + * command execution functions until all results from the batch are processed + * by the client. + */ +int +PQbatchSendQueue(PGconn *conn) +{ + PGcommandQueueEntry *entry; + + if (!conn) + return 0; + + if (conn->batch_status == PQBATCH_MODE_OFF) + return 0; + + switch (conn->asyncStatus) + { + case PGASYNC_IDLE: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, IDLE in batch mode")); + return 0; + break; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, COPY in batch mode")); + return 0; + break; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + case PGASYNC_QUEUED: + /* can send sync to end this batch of cmds */ + break; + } + + entry = PQmakePipelinedCommand(conn); + if (entry == NULL) + return 0; /* error msg already set */ + + entry->queryclass = PGQUERY_SYNC; + entry->query = NULL; + + /* construct the Sync message */ + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + PQappendPipelinedCommand(conn, entry); + + /* + * Give the data a push. In nonblock mode, don't complain if we're unable + * to send it all; PQgetResult() will do any additional flushing needed. + */ + if (PQflush(conn) < 0) + goto sendFailed; + + /* + * Call pqBatchProcessQueue so the user can call start calling getResult. + */ + pqBatchProcessQueue(conn); + + return 1; + +sendFailed: + PQrecyclePipelinedCommand(conn, entry); + /* error message should be set up already */ + return 0; +} /* * PQgetResult * Get the next PGresult produced by a query. Returns NULL if no * query work remains or an error has occurred (e.g. out of * memory). + * + * In batch mode, once all the result of a query have been returned, + * PQgetResult returns NULL to let the user know that the next batched + * query is being processed. At the end of the batch, returns a + * end-of-batch result with PQresultStatus(result) == PGRES_BATCH_END. */ - PGresult * PQgetResult(PGconn *conn) { @@ -1842,9 +2314,38 @@ PQgetResult(PGconn *conn) switch (conn->asyncStatus) { case PGASYNC_IDLE: + case PGASYNC_QUEUED: res = NULL; /* query is complete */ + if (conn->batch_status != PQBATCH_MODE_OFF) + { + /* + * In batch mode, we prepare the processing of the results of + * the next query. + */ + pqBatchProcessQueue(conn); + } break; case PGASYNC_READY: + res = pqPrepareAsyncResult(conn); + if (conn->batch_status != PQBATCH_MODE_OFF) + { + /* + * In batch mode, query execution state cannot be IDLE as + * there can be other queries or results waiting in the queue + * + * The connection isn't idle since we can't submit new + * nonbatched commands. It isn't also busy since the current + * command is done and we need to process a new one. + */ + conn->asyncStatus = PGASYNC_QUEUED; + } + else + { + /* Set the state back to BUSY, allowing parsing to proceed. */ + conn->asyncStatus = PGASYNC_BUSY; + } + break; + case PGASYNC_READY_MORE: res = pqPrepareAsyncResult(conn); /* Set the state back to BUSY, allowing parsing to proceed. */ conn->asyncStatus = PGASYNC_BUSY; @@ -2025,6 +2526,13 @@ PQexecStart(PGconn *conn) if (!conn) return false; + if (conn->batch_status != PQBATCH_MODE_OFF) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("Synchronous command execution functions are not allowed in batch mode\n")); + return false; + } + /* * Silently discard any prior query result that application didn't eat. * This is probably poor design, but it's here for backward compatibility. @@ -2219,6 +2727,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal) static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) { + PGcommandQueueEntry *pipeCmd = NULL; + PGQueryClass *queryclass; + /* Treat null desc_target as empty string */ if (!desc_target) desc_target = ""; @@ -2234,6 +2745,20 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) return 0; } + if (conn->batch_status != PQBATCH_MODE_OFF) + { + pipeCmd = PQmakePipelinedCommand(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + queryclass = &pipeCmd->queryclass; + } + else + { + queryclass = &conn->queryclass; + } + /* construct the Describe message */ if (pqPutMsgStart('D', false, conn) < 0 || pqPutc(desc_type, conn) < 0 || @@ -2242,15 +2767,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) goto sendFailed; /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (conn->batch_status == PQBATCH_MODE_OFF) + { + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are doing a Describe */ - conn->queryclass = PGQUERY_DESCRIBE; + *queryclass = PGQUERY_DESCRIBE; - /* reset last_query string (not relevant now) */ - if (conn->last_query) + /* reset last-query string (not relevant now) */ + if (conn->last_query && conn->batch_status != PQBATCH_MODE_OFF) { free(conn->last_query); conn->last_query = NULL; @@ -2260,14 +2788,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) * Give the data a push. In nonblock mode, don't complain if we're unable * to send it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqBatchFlush(conn) < 0) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->batch_status != PQBATCH_MODE_OFF) + PQappendPipelinedCommand(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + PQrecyclePipelinedCommand(conn, pipeCmd); /* error message should be set up already */ return 0; } @@ -2665,6 +3197,13 @@ PQfn(PGconn *conn, /* clear the error string */ resetPQExpBuffer(&conn->errorMessage); + if (conn->batch_status != PQBATCH_MODE_OFF) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("Synchronous command execution functions are not allowed in batch mode\n")); + return NULL; + } + if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE || conn->result != NULL) { @@ -3858,3 +4397,16 @@ PQunescapeBytea(const unsigned char *strtext, size_t *retbuflen) *retbuflen = buflen; return tmpbuf; } + +/* pqBatchFlush + * In batch mode, data will be flushed only when the out buffer reaches the threshold value. + * In non-batch mode, data will be flushed all the time. + */ +static int +pqBatchFlush(PGconn *conn) +{ + if ((conn->batch_status == PQBATCH_MODE_OFF) || + (conn->outCount >= OUTBUFFER_THRESHOLD)) + return (pqFlush(conn)); + return 0; /* Just to keep compiler quiet */ +} diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index 9360c541be..2ff3fa4883 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -406,6 +406,12 @@ pqParseInput2(PGconn *conn) { char id; + if (conn->batch_status != PQBATCH_MODE_OFF) + { + fprintf(stderr, "internal error, attempt to read v2 protocol in batch mode"); + abort(); + } + /* * Loop to parse successive complete messages available in the buffer. */ diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 1696525475..da38e6aed1 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -217,10 +217,19 @@ pqParseInput3(PGconn *conn) return; conn->asyncStatus = PGASYNC_READY; break; - case 'Z': /* backend is ready for new query */ + case 'Z': /* sync response, backend is ready for new + * query */ if (getReadyForQuery(conn)) return; - conn->asyncStatus = PGASYNC_IDLE; + if (conn->batch_status != PQBATCH_MODE_OFF) + { + conn->batch_status = PQBATCH_MODE_ON; + conn->result = PQmakeEmptyPGresult(conn, + PGRES_BATCH_END); + conn->asyncStatus = PGASYNC_READY; + } + else + conn->asyncStatus = PGASYNC_IDLE; break; case 'I': /* empty query */ if (conn->result == NULL) @@ -875,6 +884,9 @@ pqGetErrorNotice3(PGconn *conn, bool isError) PQExpBufferData workBuf; char id; + if (isError && conn->batch_status != PQBATCH_MODE_OFF) + conn->batch_status = PQBATCH_MODE_ABORTED; + /* * If this is an error message, pre-emptively clear any incomplete query * result we may have. We'd just throw it away below anyway, and diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 3b6a9fbce3..efe3437bd4 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -97,7 +97,10 @@ typedef enum PGRES_NONFATAL_ERROR, /* notice or warning message */ PGRES_FATAL_ERROR, /* query failed */ PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */ - PGRES_SINGLE_TUPLE /* single tuple from larger resultset */ + PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */ + PGRES_BATCH_END, /* end of a batch of commands */ + PGRES_BATCH_ABORTED, /* Command didn't run because of an abort + * earlier in a batch */ } ExecStatusType; typedef enum @@ -137,6 +140,17 @@ typedef enum PQPING_NO_ATTEMPT /* connection not attempted (bad params) */ } PGPing; +/* + * PQBatchStatus - Current status of batch mode + */ + +typedef enum +{ + PQBATCH_MODE_OFF, + PQBATCH_MODE_ON, + PQBATCH_MODE_ABORTED +} PQBatchStatus; + /* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to applications. */ @@ -435,6 +449,12 @@ extern PGresult *PQgetResult(PGconn *conn); extern int PQisBusy(PGconn *conn); extern int PQconsumeInput(PGconn *conn); +/* Routines for batch mode management */ +extern int PQbatchStatus(PGconn *conn); +extern int PQenterBatchMode(PGconn *conn); +extern int PQexitBatchMode(PGconn *conn); +extern int PQbatchSendQueue(PGconn *conn); + /* LISTEN/NOTIFY support */ extern PGnotify *PQnotifies(PGconn *conn); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 1de91ae295..d607390118 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -217,10 +217,15 @@ typedef enum { PGASYNC_IDLE, /* nothing's happening, dude */ PGASYNC_BUSY, /* query in progress */ - PGASYNC_READY, /* result ready for PQgetResult */ + PGASYNC_READY, /* query done, waiting for client to fetch + * result */ + PGASYNC_READY_MORE, /* query done, waiting for client to fetch + * result, More results expected from this + * query */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */ - PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */ + PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */ + PGASYNC_QUEUED /* Current query done, more in queue */ } PGAsyncStatusType; /* PGQueryClass tracks which query protocol we are now executing */ @@ -229,7 +234,8 @@ typedef enum PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */ PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */ PGQUERY_PREPARE, /* Parse only (PQprepare) */ - PGQUERY_DESCRIBE /* Describe Statement or Portal */ + PGQUERY_DESCRIBE, /* Describe Statement or Portal */ + PGQUERY_SYNC /* A protocol sync to end a batch */ } PGQueryClass; /* PGSetenvStatusType defines the state of the pqSetenv state machine */ @@ -301,6 +307,22 @@ typedef enum pg_conn_host_type CHT_UNIX_SOCKET } pg_conn_host_type; +/* An entry in the pending command queue. Used by batch mode to keep track + * of the expected results of future commands we've dispatched. + * + * Note that entries in this list are reused by being zeroed and appended to + * the tail when popped off the head. The entry with null next pointer is not + * the end of the list of expected commands, that's the tail pointer in + * pg_conn. + */ +typedef struct pgCommandQueueEntry +{ + PGQueryClass queryclass; /* Query type; PGQUERY_SYNC for sync msg */ + char *query; /* SQL command, or NULL if unknown */ + struct pgCommandQueueEntry *next; +} PGcommandQueueEntry; + + /* * pg_conn_host stores all information about each of possibly several hosts * mentioned in the connection string. Most fields are derived by splitting @@ -394,6 +416,7 @@ struct pg_conn bool options_valid; /* true if OK to attempt connection */ bool nonblocking; /* whether this connection is using nonblock * sending semantics */ + PQBatchStatus batch_status; /* Batch(pipelining) mode status of connection */ bool singleRowMode; /* return current query result row-by-row? */ char copy_is_binary; /* 1 = copy binary, 0 = copy text */ int copy_already_done; /* # bytes already returned in COPY OUT */ @@ -406,6 +429,16 @@ struct pg_conn pg_conn_host *connhost; /* details about each named host */ char *connip; /* IP address for current network connection */ + /* + * The command queue + * + * head is the next pending cmd, tail is where we append new commands. + * Freed entries for recycling go on the recycle linked list. + */ + PGcommandQueueEntry *cmd_queue_head; + PGcommandQueueEntry *cmd_queue_tail; + PGcommandQueueEntry *cmd_queue_recycle; + /* Connection data */ pgsocket sock; /* FD for socket, PGINVALID_SOCKET if * unconnected */ @@ -798,6 +831,11 @@ extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t len); */ #define pqIsnonblocking(conn) ((conn)->nonblocking) +/* + * Connection's outbuffer threshold. + */ +#define OUTBUFFER_THRESHOLD 65536 + #ifdef ENABLE_NLS extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1); extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2); @@ -806,6 +844,8 @@ extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigne #define libpq_ngettext(s, p, n) ((n) == 1 ? (s) : (p)) #endif +#define libpq_gettext_noop(x) (x) + /* * These macros are needed to let error-handling code be portable between * Unix and Windows. (ugh) diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index a6d2ffbf9e..287214c544 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -17,6 +17,7 @@ SUBDIRS = \ test_extensions \ test_ginpostinglist \ test_integerset \ + test_libpq \ test_misc \ test_parser \ test_pg_dump \ diff --git a/src/test/modules/test_libpq/.gitignore b/src/test/modules/test_libpq/.gitignore new file mode 100644 index 0000000000..11e8463984 --- /dev/null +++ b/src/test/modules/test_libpq/.gitignore @@ -0,0 +1,5 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ +/testlibpqbatch diff --git a/src/test/modules/test_libpq/Makefile b/src/test/modules/test_libpq/Makefile new file mode 100644 index 0000000000..d907063f65 --- /dev/null +++ b/src/test/modules/test_libpq/Makefile @@ -0,0 +1,25 @@ +# src/test/modules/test_libpq/Makefile + +OBJS = testlibpqbatch.o +PROGRAM = testlibpqbatch + +PG_CPPFLAGS = -I$(libpq_srcdir) +PG_LIBS += $(libpq) + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_libpq +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif + +testlibpqbatch.o: testlibpqbatch.c +testlibpqbatch: testlibpqbatch.o +check: testlibpqbatch prove-check + +prove-check: + $(prove_check) diff --git a/src/test/modules/test_libpq/README b/src/test/modules/test_libpq/README new file mode 100644 index 0000000000..d8174dd579 --- /dev/null +++ b/src/test/modules/test_libpq/README @@ -0,0 +1 @@ +Test programs and libraries for libpq diff --git a/src/test/modules/test_libpq/t/001_libpq_async.pl b/src/test/modules/test_libpq/t/001_libpq_async.pl new file mode 100644 index 0000000000..2c361880fa --- /dev/null +++ b/src/test/modules/test_libpq/t/001_libpq_async.pl @@ -0,0 +1,27 @@ +use strict; +use warnings; + +use Config; +use PostgresNode; +use TestLib; +use Test::More tests => 6; +use Cwd; + +my $node = get_new_node('main'); +$node->init; +$node->start; + +my $port = $node->port; + +my $numrows = 10000; +my @tests = + qw(disallowed_in_batch simple_batch multi_batch batch_abort timings singlerowmode); +$ENV{PATH} = "$ENV{PATH}:" . getcwd(); +for my $testname (@tests) +{ + $node->command_ok( + [ 'testlibpqbatch', 'dbname=postgres', "$numrows", "$testname" ], + "testlibpqbatch $testname"); +} + +$node->stop('fast'); diff --git a/src/test/modules/test_libpq/testlibpqbatch.c b/src/test/modules/test_libpq/testlibpqbatch.c new file mode 100644 index 0000000000..3fff765ada --- /dev/null +++ b/src/test/modules/test_libpq/testlibpqbatch.c @@ -0,0 +1,1013 @@ +/* + * src/test/modules/test_libpq/testlibpqbatch.c + * + * + * testlibpqbatch.c + * Test of batch execution functionality + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> +#include <sys/time.h> +#include <sys/types.h> + +#include "libpq-fe.h" + +#define EXPECT(condition, ...) \ + if (0 == (condition)) \ + { \ + fprintf(stderr, __VA_ARGS__); \ + goto fail; \ + } + + +static void exit_nicely(PGconn *conn); +static void simple_batch(PGconn *conn); +static void test_disallowed_in_batch(PGconn *conn); +static void batch_insert_pipelined(PGconn *conn, int n_rows); +static void batch_insert_sequential(PGconn *conn, int n_rows); +static void batch_insert_copy(PGconn *conn, int n_rows); +static void test_batch_abort(PGconn *conn); +static void test_singlerowmode(PGconn *conn); +static const Oid INT4OID = 23; + +static const char *const drop_table_sql += "DROP TABLE IF EXISTS batch_demo"; +static const char *const create_table_sql += "CREATE UNLOGGED TABLE batch_demo(id serial primary key, itemno integer);"; +static const char *const insert_sql += "INSERT INTO batch_demo(itemno) VALUES ($1);"; + +/* max char length of an int32, plus sign and null terminator */ +#define MAXINTLEN 12 + +static void +exit_nicely(PGconn *conn) +{ + PQfinish(conn); + exit(1); +} + +static void +simple_batch(PGconn *conn) +{ + PGresult *res = NULL; + const char *dummy_params[1] = {"1"}; + Oid dummy_param_oids[1] = {INT4OID}; + + fprintf(stderr, "simple batch... "); + + /* + * Enter batch mode and dispatch a set of operations, which we'll then + * process the results of as they come in. + * + * For a simple case we should be able to do this without interim + * processing of results since our out buffer will give us enough slush to + * work with and we won't block on sending. So blocking mode is fine. + */ + EXPECT(!PQisnonblocking(conn), "Expected blocking connection mode\n"); + + EXPECT(PQenterBatchMode(conn), "failed to enter batch mode: %s\n", PQerrorMessage(conn)); + + EXPECT(PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, + dummy_params, NULL, NULL, 0), + "dispatching SELECT failed: %s\n", PQerrorMessage(conn)); + + EXPECT(!PQexitBatchMode(conn), "exiting batch mode with work in progress should fail, but succeeded\n"); + + EXPECT(PQbatchSendQueue(conn), "Ending a batch failed: %s\n", PQerrorMessage(conn)); + + res = PQgetResult(conn); + EXPECT(res != NULL, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn)); + + EXPECT(PQresultStatus(res) == PGRES_TUPLES_OK, "Unexpected result code %s from first batch item\n", PQresStatus(PQresultStatus(res))); + + PQclear(res); + res = NULL; + + EXPECT(PQgetResult(conn) == NULL, "PQgetResult returned something extra after first query result.\n"); + + /* + * Even though we've processed the result there's still a sync to come and + * we can't exit batch mode yet + */ + EXPECT(!PQexitBatchMode(conn), "exiting batch mode after query but before sync succeeded incorrectly\n"); + + res = PQgetResult(conn); + EXPECT(res != NULL, "PQgetResult returned null when sync result PGRES_BATCH_END expected: %s\n", PQerrorMessage(conn)); + + EXPECT(PQresultStatus(res) == PGRES_BATCH_END, "Unexpected result code %s instead of sync result, error: %s\n", PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)) + + PQclear(res); + res = NULL; + + EXPECT(PQgetResult(conn) == NULL, "PQgetResult returned something extra after end batch call\n"); + + /* We're still in a batch... */ + EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Fell out of batch mode somehow\n"); + + /* until we end it, which we can safely do now */ + EXPECT(PQexitBatchMode(conn), "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn)); + + EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_OFF, "exiting batch mode didn't seem to work\n"); + + fprintf(stderr, "ok\n"); + + return; + +fail: + PQclear(res); + exit_nicely(conn); +} + +static void +test_disallowed_in_batch(PGconn *conn) +{ + PGresult *res = NULL; + + fprintf(stderr, "test error cases... "); + + EXPECT(!PQisnonblocking(conn), "Expected blocking connection mode: %u\n", __LINE__); + + EXPECT(PQenterBatchMode(conn), "Unable to enter batch mode\n"); + + EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Batch mode not activated properly\n"); + + /* PQexec should fail in batch mode */ + res = PQexec(conn, "SELECT 1"); + EXPECT(PQresultStatus(res) == PGRES_FATAL_ERROR, "PQexec should fail in batch mode but succeeded\n"); + + /* So should PQsendQuery */ + EXPECT(PQsendQuery(conn, "SELECT 1") == 0, "PQsendQuery should fail in batch mode but succeeded\n"); + + /* Entering batch mode when already in batch mode is OK */ + EXPECT(PQenterBatchMode(conn), "re-entering batch mode should be a no-op but failed\n"); + + EXPECT(!PQisBusy(conn), "PQisBusy should return false when idle in batch, returned true\n"); + + /* ok, back to normal command mode */ + EXPECT(PQexitBatchMode(conn), "couldn't exit idle empty batch mode\n"); + + EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_OFF, "Batch mode not terminated properly\n"); + + /* exiting batch mode when not in batch mode should be a no-op */ + EXPECT(PQexitBatchMode(conn), "batch mode exit when not in batch mode should succeed but failed\n"); + + /* can now PQexec again */ + res = PQexec(conn, "SELECT 1"); + EXPECT(PQresultStatus(res) == PGRES_TUPLES_OK, "PQexec should succeed after exiting batch mode but failed with: %s\n", PQerrorMessage(conn)); + + fprintf(stderr, "ok\n"); + + return; + +fail: + PQclear(res); + exit_nicely(conn); +} + +static void +multi_batch(PGconn *conn) +{ + PGresult *res = NULL; + const char *dummy_params[1] = {"1"}; + Oid dummy_param_oids[1] = {INT4OID}; + + fprintf(stderr, "multi batch... "); + + /* + * Queue up a couple of small batches and process each without returning + * to command mode first. + */ + EXPECT(PQenterBatchMode(conn), "failed to enter batch mode: %s\n", PQerrorMessage(conn)); + + EXPECT(PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, + dummy_params, NULL, NULL, 0), "dispatching first SELECT failed: %s\n", PQerrorMessage(conn)); + + EXPECT(PQbatchSendQueue(conn), "Ending first batch failed: %s\n", PQerrorMessage(conn)); + + EXPECT(PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids, + dummy_params, NULL, NULL, 0), "dispatching second SELECT failed: %s\n", PQerrorMessage(conn)); + + EXPECT(PQbatchSendQueue(conn), "Ending second batch failed: %s\n", PQerrorMessage(conn)); + + /* OK, start processing the batch results */ + res = PQgetResult(conn); + EXPECT(res != NULL, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn)); + + EXPECT(PQresultStatus(res) == PGRES_TUPLES_OK, "Unexpected result code %s from first batch item\n", PQresStatus(PQresultStatus(res))); + PQclear(res); + res = NULL; + + EXPECT(PQgetResult(conn) == NULL, "PQgetResult returned something extra after first result\n"); + + EXPECT(!PQexitBatchMode(conn), "exiting batch mode after query but before sync succeeded incorrectly\n"); + + res = PQgetResult(conn); + EXPECT(res != NULL, "PQgetResult returned null when sync result expected: %s\n", PQerrorMessage(conn)); + + EXPECT(PQresultStatus(res) == PGRES_BATCH_END, "Unexpected result code %s instead of sync result, error: %s (line %u)\n", PQresStatus(PQresultStatus(res)), PQerrorMessage(conn), __LINE__); + + PQclear(res); + res = NULL; + + EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res))); + + /* second batch */ + + res = PQgetResult(conn); + EXPECT(res != NULL, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn)); + + EXPECT(PQresultStatus(res) == PGRES_TUPLES_OK, "Unexpected result code %s from second batch item\n", PQresStatus(PQresultStatus(res))); + + EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res))); + + res = PQgetResult(conn); + EXPECT(res != NULL, "PQgetResult returned null when there's a batch item: %s\n", PQerrorMessage(conn)); + + EXPECT(PQresultStatus(res) == PGRES_BATCH_END, "Unexpected result code %s from second end batch\n", PQresStatus(PQresultStatus(res))); + + /* We're still in a batch... */ + EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Fell out of batch mode somehow\n"); + + /* until we end it, which we can safely do now */ + EXPECT(PQexitBatchMode(conn), "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn)); + + EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_OFF, "exiting batch mode didn't seem to work\n"); + + fprintf(stderr, "ok\n"); + + return; + +fail: + PQclear(res); + exit_nicely(conn); +} + +/* + * When an operation in a batch fails the rest of the batch is flushed. We + * still have to get results for each batch item, but the item will just be + * a PGRES_BATCH_ABORTED code. + * + * This intentionally doesn't use a transaction to wrap the batch. You should + * usually use an xact, but in this case we want to observe the effects of each + * statement. + */ +static void +test_batch_abort(PGconn *conn) +{ + PGresult *res = NULL; + const char *dummy_params[1] = {"1"}; + Oid dummy_param_oids[1] = {INT4OID}; + int i; + + fprintf(stderr, "aborted batch... "); + + res = PQexec(conn, drop_table_sql); + EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "dispatching DROP TABLE failed: %s\n", PQerrorMessage(conn)); + + res = PQexec(conn, create_table_sql); + EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "dispatching CREATE TABLE failed: %s\n", PQerrorMessage(conn)); + + + /* + * Queue up a couple of small batches and process each without returning + * to command mode first. Make sure the second operation in the first + * batch ERRORs. + */ + EXPECT(PQenterBatchMode(conn), "failed to enter batch mode: %s\n", PQerrorMessage(conn)); + + dummy_params[0] = "1"; + EXPECT(PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids, + dummy_params, NULL, NULL, 0), "dispatching first INSERT failed: %s\n", PQerrorMessage(conn)); + + EXPECT(PQsendQueryParams(conn, "SELECT no_such_function($1)", 1, dummy_param_oids, + dummy_params, NULL, NULL, 0), "dispatching error select failed: %s\n", PQerrorMessage(conn)); + + dummy_params[0] = "2"; + EXPECT(PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids, + dummy_params, NULL, NULL, 0), "dispatching second insert failed: %s\n", PQerrorMessage(conn)); + + EXPECT(PQbatchSendQueue(conn), "Ending first batch failed: %s\n", PQerrorMessage(conn)); + + dummy_params[0] = "3"; + EXPECT(PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids, + dummy_params, NULL, NULL, 0), "dispatching second-batch insert failed: %s\n", PQerrorMessage(conn)); + + EXPECT(PQbatchSendQueue(conn), "Ending second batch failed: %s\n", PQerrorMessage(conn)); + + /* + * OK, start processing the batch results. + * + * We should get a tuples-ok for the first query, a fatal error, a batch + * aborted message for the second insert, a batch-end, then a command-ok + * and a batch-ok for the second batch operation. + */ + + EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_COMMAND_OK, + "Unexpected result code %s from first batch item, error='%s'\n", + res == NULL ? "NULL" : PQresStatus(PQresultStatus(res)), + res == NULL ? PQerrorMessage(conn) : PQresultErrorMessage(res)); + + PQclear(res); + res = NULL; + + EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res))); + + /* second query, caused error */ + + EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_FATAL_ERROR, + "Unexpected result code from second batch item. Wanted PGRES_FATAL_ERROR, got %s\n", + res == NULL ? "NULL" : PQresStatus(PQresultStatus(res))); + PQclear(res); + res = NULL; + + EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res))); + + /* + * batch should now be aborted. + * + * Note that we could still queue more queries at this point if we wanted; + * they'd get added to a new third batch since we've already sent a + * second. The aborted flag relates only to the batch being received. + */ + EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_ABORTED, "batch should be flagged as aborted but isn't\n"); + + /* third query in batch, the second insert */ + + EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_BATCH_ABORTED, + "Unexpected result code from third batch item. Wanted PGRES_BATCH_ABORTED, got %s\n", + res == NULL ? "NULL" : PQresStatus(PQresultStatus(res))); + PQclear(res); + res = NULL; + + EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res))); + + EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_ABORTED, "batch should be flagged as aborted but isn't\n"); + + /* We're still in a batch... */ + EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Fell out of batch mode somehow\n"); + + /* + * The end of a failed batch is still a PGRES_BATCH_END so clients know to + * start processing results normally again and can tell the difference + * between skipped commands and the sync. + */ + EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_BATCH_END, + "Unexpected result code from first batch sync. Wanted PGRES_BATCH_END, got %s\n", + res == NULL ? "NULL" : PQresStatus(PQresultStatus(res))); + + PQclear(res); + res = NULL; + + EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res))); + + EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_ABORTED, "sync should've cleared the aborted flag but didn't\n"); + + /* We're still in a batch... */ + EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Fell out of batch mode somehow\n"); + + /* the insert from the second batch */ + EXPECT(((res = PQgetResult(conn)) != NULL) && + PQresultStatus(res) == PGRES_COMMAND_OK, + "Unexpected result code %s from first item in second batch\n", + res ? PQresStatus(PQresultStatus(res)) : "NULL"); + PQclear(res); + res = NULL; + + EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res))); + + /* the second batch sync */ + EXPECT(((res = PQgetResult(conn)) != NULL) && PQresultStatus(res) == PGRES_BATCH_END, "Unexpected result code %s from second batch sync\n", res == NULL ? "NULL" : PQresStatus(PQresultStatus(res))); + PQclear(res); + res = NULL; + + EXPECT(PQgetResult(conn) == NULL, "Expected null result, got %s\n", PQresStatus(PQresultStatus(res))); + + /* We're still in a batch... */ + EXPECT(PQbatchStatus(conn) != PQBATCH_MODE_OFF, "Fell out of batch mode somehow\n"); + + /* until we end it, which we can safely do now */ + EXPECT(PQexitBatchMode(conn), "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn)); + + EXPECT(PQbatchStatus(conn) == PQBATCH_MODE_OFF, "exiting batch mode didn't seem to work\n"); + + fprintf(stderr, "ok\n"); + + /* + * Since we fired the batches off without a surrounding xact, the results + * should be: + * + * - Implicit xact started by server around 1st batch - First insert + * applied - Second statement aborted xact - Third insert skipped - Sync + * rolled back first implicit xact - Implicit xact created by server + * around 2nd batch - insert applied from 2nd batch - Sync commits 2nd + * xact + * + * So we should only have the value 3 that we inserted. + */ + res = PQexec(conn, "SELECT itemno FROM batch_demo"); + + EXPECT(PQresultStatus(res) == PGRES_TUPLES_OK, "Expected tuples, got %s: %s", PQresStatus(PQresultStatus(res)), PQerrorMessage(conn)); + + for (i = 0; i < PQntuples(res); i++) + { + const char *val = PQgetvalue(res, i, 0); + + EXPECT(strcmp(val, "3") == 0, "expected only insert with value 3, got %s", val); + } + + EXPECT(PQntuples(res) == 1, "expected 1 result, got %d", PQntuples(res)); + PQclear(res); + + return; + +fail: + PQclear(res); + exit_nicely(conn); +} + + +/* State machine enums for batch insert */ +typedef enum BatchInsertStep +{ + BI_BEGIN_TX, + BI_DROP_TABLE, + BI_CREATE_TABLE, + BI_PREPARE, + BI_INSERT_ROWS, + BI_COMMIT_TX, + BI_SYNC, + BI_DONE +} BatchInsertStep; + +static void +batch_insert_pipelined(PGconn *conn, int n_rows) +{ + PGresult *res = NULL; + const char *insert_params[1]; + Oid insert_param_oids[1] = {INT4OID}; + char insert_param_0[MAXINTLEN]; + BatchInsertStep send_step = BI_BEGIN_TX, + recv_step = BI_BEGIN_TX; + int rows_to_send, + rows_to_receive; + + insert_params[0] = &insert_param_0[0]; + + rows_to_send = rows_to_receive = n_rows; + + /* + * Do a batched insert into a table created at the start of the batch + */ + EXPECT(PQenterBatchMode(conn), "failed to enter batch mode: %s\n", PQerrorMessage(conn)); + + EXPECT(PQsendQueryParams(conn, "BEGIN", + 0, NULL, NULL, NULL, NULL, 0), "xact start failed: %s\n", PQerrorMessage(conn)); + + fprintf(stdout, "sent BEGIN\n"); + + send_step = BI_DROP_TABLE; + + EXPECT(PQsendQueryParams(conn, drop_table_sql, + 0, NULL, NULL, NULL, NULL, 0), "dispatching DROP TABLE failed: %s\n", PQerrorMessage(conn)); + + fprintf(stdout, "sent DROP\n"); + + send_step = BI_CREATE_TABLE; + + EXPECT(PQsendQueryParams(conn, create_table_sql, + 0, NULL, NULL, NULL, NULL, 0), "dispatching CREATE TABLE failed: %s\n", PQerrorMessage(conn)); + + fprintf(stdout, "sent CREATE\n"); + + send_step = BI_PREPARE; + + EXPECT(PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids), "dispatching PREPARE failed: %s\n", PQerrorMessage(conn)); + + fprintf(stdout, "sent PREPARE\n"); + + send_step = BI_INSERT_ROWS; + + /* + * Now we start inserting. We'll be sending enough data that we could fill + * our out buffer, so to avoid deadlocking we need to enter nonblocking + * mode and consume input while we send more output. As results of each + * query are processed we should pop them to allow processing of the next + * query. There's no need to finish the batch before processing results. + */ + EXPECT(PQsetnonblocking(conn, 1) == 0, "failed to set nonblocking mode: %s\n", PQerrorMessage(conn)); + + while (recv_step != BI_DONE) + { + int sock; + fd_set input_mask; + fd_set output_mask; + + sock = PQsocket(conn); + + if (sock < 0) + break; /* shouldn't happen */ + + FD_ZERO(&input_mask); + FD_SET(sock, &input_mask); + FD_ZERO(&output_mask); + FD_SET(sock, &output_mask); + + if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0) + { + fprintf(stderr, "select() failed: %s\n", strerror(errno)); + exit_nicely(conn); + } + + /* + * Process any results, so we keep the server's out buffer free + * flowing and it can continue to process input + */ + if (FD_ISSET(sock, &input_mask)) + { + PQconsumeInput(conn); + + /* Read until we'd block if we tried to read */ + while (!PQisBusy(conn) && recv_step < BI_DONE) + { + const char *cmdtag; + const char *description = NULL; + int status; + BatchInsertStep next_step; + + + res = PQgetResult(conn); + + if (res == NULL) + { + /* + * No more results from this query, advance to the next + * result + */ + + fprintf(stdout, "next query!\n"); + continue; + } + + status = PGRES_COMMAND_OK; + next_step = recv_step + 1; + switch (recv_step) + { + case BI_BEGIN_TX: + cmdtag = "BEGIN"; + break; + case BI_DROP_TABLE: + cmdtag = "DROP TABLE"; + break; + case BI_CREATE_TABLE: + cmdtag = "CREATE TABLE"; + break; + case BI_PREPARE: + cmdtag = ""; + description = "PREPARE"; + break; + case BI_INSERT_ROWS: + cmdtag = "INSERT"; + rows_to_receive--; + if (rows_to_receive > 0) + next_step = BI_INSERT_ROWS; + break; + case BI_COMMIT_TX: + cmdtag = "COMMIT"; + break; + case BI_SYNC: + cmdtag = ""; + description = "SYNC"; + status = PGRES_BATCH_END; + break; + case BI_DONE: + /* unreachable */ + abort(); + } + if (description == NULL) + description = cmdtag; + + fprintf(stderr, "At state %d (%s) expect tag '%s', result code %s, expect %d more rows, transition to %d\n", + recv_step, description, cmdtag, PQresStatus(status), rows_to_receive, next_step); + + EXPECT(PQresultStatus(res) == status, "%s reported status %s, expected %s. Error msg is [%s]\n", + description, PQresStatus(PQresultStatus(res)), PQresStatus(status), PQerrorMessage(conn)); + + EXPECT(strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) == 0, "%s expected command tag '%s', got '%s'\n", + description, cmdtag, PQcmdStatus(res)); + + fprintf(stdout, "Got %s OK\n", cmdtag); + + recv_step = next_step; + + PQclear(res); + res = NULL; + } + } + + /* Write more rows and/or the end batch message, if needed */ + if (FD_ISSET(sock, &output_mask)) + { + PQflush(conn); + + if (send_step == BI_INSERT_ROWS) + { + snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send); + insert_param_0[MAXINTLEN - 1] = '\0'; + + if (PQsendQueryPrepared(conn, "my_insert", + 1, insert_params, NULL, NULL, 0)) + { + fprintf(stdout, "sent row %d\n", rows_to_send); + + rows_to_send--; + if (rows_to_send == 0) + send_step = BI_COMMIT_TX; + } + else + { + /* + * in nonblocking mode, so it's OK for an insert to fail + * to send + */ + fprintf(stderr, "WARNING: failed to send insert #%d: %s\n", + rows_to_send, PQerrorMessage(conn)); + } + } + else if (send_step == BI_COMMIT_TX) + { + if (PQsendQueryParams(conn, "COMMIT", + 0, NULL, NULL, NULL, NULL, 0)) + { + fprintf(stdout, "sent COMMIT\n"); + send_step = BI_SYNC; + } + else + { + fprintf(stderr, "WARNING: failed to send commit: %s\n", + PQerrorMessage(conn)); + } + } + else if (send_step == BI_SYNC) + { + if (PQbatchSendQueue(conn)) + { + fprintf(stdout, "Dispatched end batch message\n"); + send_step = BI_DONE; + } + else + { + fprintf(stderr, "WARNING: Ending a batch failed: %s\n", + PQerrorMessage(conn)); + } + } + } + + } + + /* We've got the sync message and the batch should be done */ + EXPECT(PQexitBatchMode(conn), "attempt to exit batch mode failed when it should've succeeded: %s\n", PQerrorMessage(conn)); + + EXPECT(PQsetnonblocking(conn, 0) == 0, "failed to clear nonblocking mode: %s\n", PQerrorMessage(conn)); + + return; + +fail: + PQclear(res); + exit_nicely(conn); +} + + +static void +batch_insert_sequential(PGconn *conn, int nrows) +{ + PGresult *res = NULL; + const char *insert_params[1]; + Oid insert_param_oids[1] = {INT4OID}; + char insert_param_0[MAXINTLEN]; + + insert_params[0] = &insert_param_0[0]; + + res = PQexec(conn, "BEGIN"); + EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "BEGIN failed: %s\n", PQerrorMessage(conn)); + PQclear(res); + + res = PQexec(conn, drop_table_sql); + EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "DROP TABLE failed: %s\n", PQerrorMessage(conn)); + PQclear(res); + + res = PQexec(conn, create_table_sql); + EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "CREATE TABLE failed: %s\n", PQerrorMessage(conn)); + PQclear(res); + + res = PQprepare(conn, "my_insert2", insert_sql, 1, insert_param_oids); + EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "prepare failed: %s\n", PQerrorMessage(conn)); + PQclear(res); + + while (nrows > 0) + { + snprintf(&insert_param_0[0], MAXINTLEN, "%d", nrows); + insert_param_0[MAXINTLEN - 1] = '\0'; + + res = PQexecPrepared(conn, "my_insert2", + 1, insert_params, NULL, NULL, 0); + EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "INSERT failed: %s\n", PQerrorMessage(conn)); + + PQclear(res); + nrows--; + } + + res = PQexec(conn, "COMMIT"); + EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "COMMIT failed: %s\n", PQerrorMessage(conn)); + PQclear(res); + + return; + +fail: + PQclear(res); + exit_nicely(conn); +} + +static void +batch_insert_copy(PGconn *conn, int nrows) +{ + PGresult *res = NULL; + + res = PQexec(conn, drop_table_sql); + EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "DROP TABLE failed: %s\n", PQerrorMessage(conn)); + PQclear(res); + + res = PQexec(conn, create_table_sql); + EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "CREATE TABLE failed: %s\n", PQerrorMessage(conn)); + PQclear(res); + res = NULL; + + res = PQexec(conn, "COPY batch_demo(itemno) FROM stdin"); + EXPECT(PQresultStatus(res) == PGRES_COPY_IN, "COPY: %s\n", PQerrorMessage(conn)); + PQclear(res); + res = NULL; + + while (nrows > 0) + { + char buf[12 + 2]; + int formatted = snprintf(&buf[0], 12 + 1, "%d\n", nrows); + + EXPECT(formatted < 12 + 1, "Buffer write truncated somehow\n"); + + EXPECT(PQputCopyData(conn, buf, formatted) == 1, "Write of COPY data failed: %s\n", + PQerrorMessage(conn)); + + nrows--; + } + + EXPECT(PQputCopyEnd(conn, NULL) == 1, "Finishing COPY failed: %s", + PQerrorMessage(conn)); + + res = PQgetResult(conn); + EXPECT(PQresultStatus(res) == PGRES_COMMAND_OK, "COPY finished with %s: %s\n", + PQresStatus(PQresultStatus(res)), + PQresultErrorMessage(res)); + PQclear(res); + res = NULL; + + return; + +fail: + PQclear(res); + exit_nicely(conn); +} + +static void +test_timings(PGconn *conn, int number_of_rows) +{ + instr_time start_time, + end_time; + + fprintf(stderr, "inserting %d rows batched then unbatched\n", number_of_rows); + + INSTR_TIME_SET_CURRENT(start_time); + batch_insert_pipelined(conn, number_of_rows); + INSTR_TIME_SET_CURRENT(end_time); + INSTR_TIME_SUBTRACT(end_time, start_time); + + printf("batch insert elapsed: %.8f ms\n", + INSTR_TIME_GET_MILLISEC(end_time)); + + INSTR_TIME_SET_CURRENT(start_time); + batch_insert_sequential(conn, number_of_rows); + INSTR_TIME_SET_CURRENT(end_time); + INSTR_TIME_SUBTRACT(end_time, start_time); + printf("sequential insert elapsed: %.8f ms\n", + INSTR_TIME_GET_MILLISEC(end_time)); + + INSTR_TIME_SET_CURRENT(start_time); + batch_insert_copy(conn, number_of_rows); + INSTR_TIME_SET_CURRENT(end_time); + INSTR_TIME_SUBTRACT(end_time, start_time); + printf("COPY elapsed: %.8f ms\n", + INSTR_TIME_GET_MILLISEC(end_time)); + + fprintf(stderr, "Done.\n"); +} + +static void +usage_exit(const char *progname) +{ + fprintf(stderr, "Usage: %s ['connstring' [number_of_rows [test_to_run]]]\n", progname); + fprintf(stderr, " tests: all|disallowed_in_batch|simple_batch|multi_batch|batch_abort|timings|singlerowmode\n"); + exit(1); +} + +static void +test_singlerowmode(PGconn *conn) +{ + PGresult *res; + int i, + r; + + /* XXX this one is broken */ + return; + + /* 1 batch, 3 queries in it */ + r = PQenterBatchMode(conn); + + for (i = 0; i < 3; i++) + { + r = PQsendQueryParams(conn, + "SELECT 1", + 0, + NULL, + NULL, + NULL, + NULL, + 0); + } + PQbatchSendQueue(conn); + + i = 0; + for (;;) + { + int isSingleTuple = 0; + ExecStatusType last_est = 0; + + /* Set single row mode for only first 3 SELECT queries */ + if (i < 3) + { + r = PQsetSingleRowMode(conn); + if (r != 1) + { + fprintf(stderr, "PQsetSingleRowMode() failed for i=%d\n", i); + } + } + while ((res = PQgetResult(conn)) != NULL) + { + ExecStatusType est = PQresultStatus(res); + + fprintf(stderr, "Result status: %d (%s) for i=%d", est, PQresStatus(est), i); + if (est == PGRES_TUPLES_OK) + { + fprintf(stderr, ", tuples: %d\n", PQntuples(res)); + EXPECT(isSingleTuple, " Expected to follow PGREG_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead\n"); + isSingleTuple = 0; + } + else if (est == PGRES_SINGLE_TUPLE) + { + isSingleTuple = 1; + fprintf(stderr, ", single tuple: %d\n", PQntuples(res)); + } + else if (est == PGRES_BATCH_END) + { + fprintf(stderr, ", end of batch reached\n"); + } + else if (est != PGRES_COMMAND_OK) + { + fprintf(stderr, ", error: %s\n", PQresultErrorMessage(res)); + } + PQclear(res); + last_est = est; + } + i++; + if (last_est == PGRES_BATCH_END) + break; + } + PQexitBatchMode(conn); + PQclear(res); + res = NULL; + return; +fail: + PQclear(res); + exit_nicely(conn); +} + +int +main(int argc, char **argv) +{ + const char *conninfo; + PGconn *conn; + int number_of_rows = 10000; + + int run_disallowed_in_batch = 1, + run_simple_batch = 1, + run_multi_batch = 1, + run_batch_abort = 1, + run_timings = 1, + run_singlerowmode = 1; + + /* + * If the user supplies a parameter on the command line, use it as the + * conninfo string; otherwise default to setting dbname=postgres and using + * environment variables or defaults for all other connection parameters. + */ + if (argc > 4) + { + usage_exit(argv[0]); + } + if (argc > 3) + { + if (strcmp(argv[3], "all") != 0) + { + run_disallowed_in_batch = 0; + run_simple_batch = 0; + run_multi_batch = 0; + run_batch_abort = 0; + run_timings = 0; + run_singlerowmode = 0; + if (strcmp(argv[3], "disallowed_in_batch") == 0) + run_disallowed_in_batch = 1; + else if (strcmp(argv[3], "simple_batch") == 0) + run_simple_batch = 1; + else if (strcmp(argv[3], "multi_batch") == 0) + run_multi_batch = 1; + else if (strcmp(argv[3], "batch_abort") == 0) + run_batch_abort = 1; + else if (strcmp(argv[3], "timings") == 0) + run_timings = 1; + else if (strcmp(argv[3], "singlerowmode") == 0) + run_singlerowmode = 1; + else + { + fprintf(stderr, "%s is not a recognized test name\n", argv[3]); + usage_exit(argv[0]); + } + } + } + if (argc > 2) + { + errno = 0; + number_of_rows = strtol(argv[2], NULL, 10); + if (errno) + { + fprintf(stderr, "couldn't parse '%s' as an integer or zero rows supplied: %s", argv[2], strerror(errno)); + usage_exit(argv[0]); + } + if (number_of_rows <= 0) + { + fprintf(stderr, "number_of_rows must be positive"); + usage_exit(argv[0]); + } + } + if (argc > 1) + { + conninfo = argv[1]; + } + else + { + conninfo = "dbname = postgres"; + } + + /* Make a connection to the database */ + conn = PQconnectdb(conninfo); + + /* Check to see that the backend connection was successfully made */ + if (PQstatus(conn) != CONNECTION_OK) + { + fprintf(stderr, "Connection to database failed: %s\n", + PQerrorMessage(conn)); + exit_nicely(conn); + } + + if (run_disallowed_in_batch) + test_disallowed_in_batch(conn); + + if (run_simple_batch) + simple_batch(conn); + + if (run_multi_batch) + multi_batch(conn); + + if (run_batch_abort) + test_batch_abort(conn); + + if (run_timings) + test_timings(conn, number_of_rows); + + if (run_singlerowmode) + test_singlerowmode(conn); + /* close the connection to the database and cleanup */ + PQfinish(conn); + + return 0; +} diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm index 90594bd41b..634e48ec85 100644 --- a/src/tools/msvc/Mkvcbuild.pm +++ b/src/tools/msvc/Mkvcbuild.pm @@ -50,7 +50,8 @@ my @contrib_excludes = ( 'pgcrypto', 'sepgsql', 'brin', 'test_extensions', 'test_misc', 'test_pg_dump', - 'snapshot_too_old', 'unsafe_tests'); + 'snapshot_too_old', 'unsafe_tests', + 'test_libpq'); # Set of variables for frontend modules my $frontend_defines = { 'initdb' => 'FRONTEND' }; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 03c4e0fe5b..9b75db962b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -215,6 +215,7 @@ BackgroundWorkerHandle BackgroundWorkerSlot Barrier BaseBackupCmd +BatchInsertStep BeginDirectModify_function BeginForeignInsert_function BeginForeignModify_function @@ -1544,6 +1545,7 @@ PG_Locale_Strategy PG_Lock_Status PG_init_t PGcancel +PGcommandQueueEntry PGconn PGdataValue PGlobjfuncs @@ -1656,6 +1658,7 @@ PMSignalReason PMState POLYGON PQArgBlock +PQBatchStatus PQEnvironmentOption PQExpBuffer PQExpBufferData -- 2.20.1