Hi,

The recent discussion about pipelining in the jodbc driver prompted me to look 
at what it would take for libpq.

I have a proof of concept patch working.  The results are even more promising 
than I expected.

While it's true that many applications and frameworks won't easily benefit, it 
amazes me that this hasn't been explored before.  

I developed a simple test application that creates a table with a single auto 
increment primary key column, then runs a 4 simple queries x times each:

        "INSERT INTO test() VALUES ()"
        "SELECT * FROM test LIMIT 1"
        "SELECT * FROM test"
        "DELETE FROM test"

The parameters to testPipelinedSeries are (number of times to execute each 
query, maximum number of queued queries).

Results against local server:

testPipelinedSeries(10,1) took 0.020884
testPipelinedSeries(10,3) took 0.020630, speedup 1.01
testPipelinedSeries(10,10) took 0.006265, speedup 3.33
testPipelinedSeries(100,1) took 0.042731
testPipelinedSeries(100,3) took 0.043035, speedup 0.99
testPipelinedSeries(100,10) took 0.037222, speedup 1.15
testPipelinedSeries(100,25) took 0.031223, speedup 1.37
testPipelinedSeries(100,50) took 0.032482, speedup 1.32
testPipelinedSeries(100,100) took 0.031356, speedup 1.36

Results against remote server through ssh tunnel(30-40ms rtt):

testPipelinedSeries(10,1) took 3.2461736
testPipelinedSeries(10,3) took 1.1008443, speedup 2.44
testPipelinedSeries(10,10) took 0.342399, speedup 7.19
testPipelinedSeries(100,1) took 26.25882588
testPipelinedSeries(100,3) took 8.8509234, speedup 3.04
testPipelinedSeries(100,10) took 3.2866285, speedup 9.03
testPipelinedSeries(100,25) took 2.1472847, speedup 17.57
testPipelinedSeries(100,50) took 1.957510, speedup 27.03
testPipelinedSeries(100,100) took 0.690682, speedup 37.47

I plan to write documentation, add regression testing, and do general cleanup 
before asking for feedback on the patch itself.  Any suggestions about 
performance testing or api design would be nice.  I haven't played with 
changing the sync logic yet, but I'm guessing that an api to allow manual sync 
instead of a sync per PQsendQuery will be needed.  That could make things 
tricky though with multi-statement queries, because currently the only way to 
detect when results change from one query  to the next are a ReadyForQuery 
message.

Matt Newell

/*
 * src/test/examples/testlibpqpipeline.c
 *
 *
 * testlibpqpipeline.c
 *		this test program test query pipelining and it's performance impact
 *
 *
 */
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>

#include "libpq-fe.h"

// If defined we won't issue more sql commands if the socket's
// write buffer is full
//#define MIN_LOCAL_Q

//#define PRINT_QUERY_PROGRESS

static int testPipelined( PGconn * conn, int totalQueries, int totalQueued, const char * sql );
static int testPipelinedSeries( PGconn * conn, int totalQueries, int totalQueued, int baseline_usecs );


int
testPipelined( PGconn * conn, int totalQueries, int totalQueued, const char * sql )
{
	int nQueriesQueued;
	int nQueriesTotal;
	PGresult * result;
	PGquery * firstQuery;
	PGquery * curQuery;
	
	nQueriesQueued = nQueriesTotal = 0;
	result = NULL;
	firstQuery = curQuery = NULL;
	
	while( nQueriesQueued > 0 || nQueriesTotal < totalQueries ) {
		
		if( PQconsumeInput(conn) == 0 ) {
			printf( "PQconsumeInput ERROR: %s\n", PQerrorMessage(conn) );
			return 1;
		}
		
		do {
			curQuery = PQgetFirstQuery(conn);
			
			/* firstQuery is finished */
			if( firstQuery != curQuery )
			{
				//printf( "%p done, curQuery=%p\n", firstQuery, curQuery );
#ifdef PRINT_QUERY_PROGRESS
				printf("-");
#endif
				firstQuery = curQuery;
				nQueriesQueued--;
			}
			
			/* Break if no queries are ready */
			if( !firstQuery || PQisBusy(conn) )
				break;
			
			if( (result = PQgetResult(conn)) != 0 )
				PQclear(result);
		}
		while(1);
		
		if( nQueriesTotal < totalQueries && nQueriesQueued < totalQueued ) {
#ifdef MIN_LOCAL_Q
			int flushResult = PQflush(conn);
			 if( flushResult == -1 ) {
				printf( "PQflush ERROR: %s\n", PQerrorMessage(conn) );
				return 1;
			} else if ( flushResult == 1 )
				continue;
#endif
			PQsendQuery(conn,sql);
			if( firstQuery == NULL )
				firstQuery = PQgetFirstQuery(conn);
			nQueriesTotal++;
			nQueriesQueued++;
#ifdef PRINT_QUERY_PROGRESS
			printf( "+" );
#endif
		}
	}
#ifdef PRINT_QUERY_PROGRESS
	printf( "\n" );
#endif
	return 0;
}

int testPipelinedSeries( PGconn * conn, int totalQueries, int totalQueued, int baseline_usecs )
{
	int result;
	struct timeval tv1, tv2;
	int secs, usecs;
	
	gettimeofday(&tv1,NULL);
#define TEST_P(q) \
	if( (result = testPipelined(conn,totalQueries,totalQueued,q)) != 0 ) \
		return result;
	TEST_P("INSERT INTO test() VALUES ()");
	TEST_P("SELECT * FROM test LIMIT 1");
	TEST_P("SELECT * FROM test");
	TEST_P("DELETE FROM test");
	gettimeofday(&tv2,NULL);
	secs = tv2.tv_sec - tv1.tv_sec;
	usecs = secs * 1000000 + tv2.tv_usec - tv1.tv_usec;
	printf("testPipelinedSeries(%i,%i) took %i.%06i",totalQueries,totalQueued,secs,usecs);
	if (baseline_usecs == 0)
		printf("\n");
	else
		printf(", speedup %.2f\n", (double)baseline_usecs / usecs );
	return usecs;
}

int
main(int argc, char **argv)
{
	PGconn * conn;
	int baseline;
	
	conn = NULL;
	
	/* make a connection to the database */
	conn = PQsetdb(NULL, NULL, NULL, NULL, NULL);

	/* check to see that the backend connection was successfully made */
	if (PQstatus(conn) != CONNECTION_OK)
	{
		fprintf(stderr, "Connection to database failed: %s",
				PQerrorMessage(conn));
		exit(1);
	}

	PQsetnonblocking(conn,1);
	
	PQexec(conn,"CREATE TABLE test ( id PRIMARY KEY AUTOINCREMENT )");

	baseline = testPipelinedSeries(conn,10,1,0);
	testPipelinedSeries(conn,10,3,baseline);
	testPipelinedSeries(conn,10,10,baseline);
	
	baseline = testPipelinedSeries(conn,100,1,0);
	testPipelinedSeries(conn,100,3,baseline);
	testPipelinedSeries(conn,100,10,baseline);
	testPipelinedSeries(conn,100,25,baseline);
	testPipelinedSeries(conn,100,50,baseline);
	testPipelinedSeries(conn,100,100,baseline);
	
	PQexec(conn,"DROP TABLE test");
	
	return 0;
}
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 93da50d..6bbc6b4 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -165,3 +165,6 @@ lo_lseek64                162
 lo_tell64                 163
 lo_truncate64             164
 PQconninfo                165
+PQgetFirstQuery           166
+PQgetLastQuery            167
+PQgetNextQuery            168
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 3af222b..31fa437 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -2893,8 +2893,6 @@ freePGconn(PGconn *conn)
 		free(conn->gsslib);
 #endif
 	/* Note that conn->Pfdebug is not ours to close or free */
-	if (conn->last_query)
-		free(conn->last_query);
 	if (conn->inBuffer)
 		free(conn->inBuffer);
 	if (conn->outBuffer)
@@ -2956,6 +2954,29 @@ closePGconn(PGconn *conn)
 										 * absent */
 	conn->asyncStatus = PGASYNC_IDLE;
 	pqClearAsyncResult(conn);	/* deallocate result */
+	
+	/*
+	 * Link active queries into the free list so we can free them
+	 */
+	if (conn->queryTail)
+	{
+		conn->queryTail->next = conn->queryFree;
+		conn->queryFree = conn->queryHead;
+	}
+	conn->queryHead = conn->queryTail = NULL;
+	
+	/*
+	 * Free all query objects
+	 */
+	while (conn->queryFree)
+	{
+		PGquery * prev = conn->queryFree;
+		conn->queryFree = prev->next;
+		if (prev->querycmd)
+			free(prev->querycmd);
+		free(prev);
+	}
+
 	resetPQExpBuffer(&conn->errorMessage);
 	pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist);
 	conn->addrlist = NULL;
@@ -3135,7 +3156,7 @@ PQresetPoll(PGconn *conn)
 }
 
 /*
- * PQcancelGet: get a PGcancel structure corresponding to a connection.
+ * PQgetCancel: get a PGcancel structure corresponding to a connection.
  *
  * A copy is needed to be able to cancel a running query from a different
  * thread. If the same structure is used all structure members would have
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 4075e51..48fc278 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1020,7 +1020,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 	 * row; the original conn->result is left unchanged so that it can be used
 	 * again as the template for future rows.
 	 */
-	if (conn->singleRowMode)
+	if (conn->queryHead && conn->queryHead->singleRowMode)
 	{
 		/* Copy everything that should be in the result at this point */
 		res = PQcopyResult(res,
@@ -1080,7 +1080,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 	 * Success.  In single-row mode, make the result available to the client
 	 * immediately.
 	 */
-	if (conn->singleRowMode)
+	if (conn->queryHead && conn->queryHead->singleRowMode)
 	{
 		/* Change result status to special single-row value */
 		res->resultStatus = PGRES_SINGLE_TUPLE;
@@ -1088,6 +1088,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 		conn->next_result = conn->result;
 		conn->result = res;
 		/* And mark the result ready to return */
+		/* TODO: Still correct ? */
 		conn->asyncStatus = PGASYNC_READY;
 	}
 
@@ -1132,14 +1133,12 @@ PQsendQuery(PGconn *conn, const char *query)
 	}
 
 	/* remember we are using simple query protocol */
-	conn->queryclass = PGQUERY_SIMPLE;
-
+	conn->queryTail->queryclass = PGQUERY_SIMPLE;
+	
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
-
+	/* if insufficient memory, querycmd just winds up NULL */
+	conn->queryTail->querycmd = strdup(query);
+	
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
@@ -1151,7 +1150,9 @@ PQsendQuery(PGconn *conn, const char *query)
 	}
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if( conn->asyncStatus == PGASYNC_IDLE )
+		conn->asyncStatus = PGASYNC_BUSY;
+
 	return 1;
 }
 
@@ -1272,13 +1273,11 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* remember we are doing just a Parse */
-	conn->queryclass = PGQUERY_PREPARE;
+	conn->queryTail->queryclass = PGQUERY_PREPARE;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	/* if insufficient memory, querycmd just winds up NULL */
+	conn->queryTail->querycmd = strdup(query);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1288,6 +1287,7 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
+	/* TODO: Check status first! */
 	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
@@ -1344,6 +1344,8 @@ PQsendQueryPrepared(PGconn *conn,
 static bool
 PQsendQueryStart(PGconn *conn)
 {
+	PGquery * query;
+	
 	if (!conn)
 		return false;
 
@@ -1357,21 +1359,46 @@ PQsendQueryStart(PGconn *conn)
 						  libpq_gettext("no connection to the server\n"));
 		return false;
 	}
-	/* Can't send while already busy, either. */
-	if (conn->asyncStatus != PGASYNC_IDLE)
+
+	/* Can't send while in copy mode, either. */
+	switch (conn->asyncStatus)
 	{
-		printfPQExpBuffer(&conn->errorMessage,
+		case PGASYNC_IDLE:
+		case PGASYNC_BUSY:
+		case PGASYNC_READY:
+			break;
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			printfPQExpBuffer(&conn->errorMessage,
 				  libpq_gettext("another command is already in progress\n"));
-		return false;
+			return false;
 	}
 
-	/* initialize async result-accumulation state */
-	conn->result = NULL;
-	conn->next_result = NULL;
-
-	/* reset single-row processing mode */
-	conn->singleRowMode = false;
-
+	if( !conn->queryFree )
+	{
+		query = (PGquery*) malloc(sizeof(PGquery));
+		query->querycmd = 0;
+		query->singleRowMode = false;
+		query->next = 0;
+	}
+	else
+	{
+		query = conn->queryFree;
+		conn->queryFree = query->next;
+		if (query->querycmd)
+			free(query->querycmd);
+		query->querycmd = NULL;
+		query->next = NULL;
+	}
+	
+	if( conn->queryTail )
+		conn->queryTail->next = query;
+	else
+		conn->queryHead = query;
+	
+	conn->queryTail = query;
+	
 	/* ready to send command message */
 	return true;
 }
@@ -1522,16 +1549,12 @@ PQsendQueryGuts(PGconn *conn,
 		goto sendFailed;
 
 	/* remember we are using extended query protocol */
-	conn->queryclass = PGQUERY_EXTENDED;
+	conn->queryTail->queryclass = PGQUERY_EXTENDED;
 
 	/* and remember the query text too, if possible */
-	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
+	/* if insufficient memory, querycmd just winds up NULL */
 	if (command)
-		conn->last_query = strdup(command);
-	else
-		conn->last_query = NULL;
+		conn->queryTail->querycmd = strdup(command);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -1541,6 +1564,7 @@ PQsendQueryGuts(PGconn *conn,
 		goto sendFailed;
 
 	/* OK, it's launched! */
+	/* TODO: Check status first! */
 	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
@@ -1576,7 +1600,7 @@ pqHandleSendFailure(PGconn *conn)
 }
 
 /*
- * Select row-by-row processing mode
+ * Select row-by-row processing mode for the last launched query
  */
 int
 PQsetSingleRowMode(PGconn *conn)
@@ -1585,18 +1609,16 @@ PQsetSingleRowMode(PGconn *conn)
 	 * Only allow setting the flag when we have launched a query and not yet
 	 * received any results.
 	 */
-	if (!conn)
-		return 0;
-	if (conn->asyncStatus != PGASYNC_BUSY)
+	if (!conn || !conn->queryTail)
 		return 0;
-	if (conn->queryclass != PGQUERY_SIMPLE &&
-		conn->queryclass != PGQUERY_EXTENDED)
+	if (conn->asyncStatus != PGASYNC_BUSY && conn->queryTail == conn->queryHead)
 		return 0;
-	if (conn->result)
+	if (conn->queryTail->queryclass != PGQUERY_SIMPLE &&
+		conn->queryTail->queryclass != PGQUERY_EXTENDED)
 		return 0;
 
 	/* OK, set flag */
-	conn->singleRowMode = true;
+	conn->queryTail->singleRowMode = true;
 	return 1;
 }
 
@@ -1670,6 +1692,40 @@ PQisBusy(PGconn *conn)
 
 
 /*
+ * PQgetFirstQuery
+ */
+PGquery *
+PQgetFirstQuery(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+	
+	return conn->queryHead;
+}
+
+/*
+ * PQgetLastQuery
+ */
+PGquery *
+PQgetLastQuery(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+	return conn->queryTail;
+}
+
+/*
+ * PQgetNextQuery
+ */
+PGquery *
+PQgetNextQuery(PGquery *query)
+{
+	if (!query)
+		return 0;
+	return query->next;
+}
+
+/*
  * PQgetResult
  *	  Get the next PGresult produced by a query.  Returns NULL if no
  *	  query work remains or an error has occurred (e.g. out of
@@ -2132,14 +2188,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* remember we are doing a Describe */
-	conn->queryclass = PGQUERY_DESCRIBE;
-
-	/* reset last-query string (not relevant now) */
-	if (conn->last_query)
-	{
-		free(conn->last_query);
-		conn->last_query = NULL;
-	}
+	conn->queryTail->queryclass = PGQUERY_DESCRIBE;
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
@@ -2301,7 +2350,7 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
 		 * If we sent the COPY command in extended-query mode, we must issue a
 		 * Sync as well.
 		 */
-		if (conn->queryclass != PGQUERY_SIMPLE)
+		if (conn->queryHead->queryclass != PGQUERY_SIMPLE)
 		{
 			if (pqPutMsgStart('S', false, conn) < 0 ||
 				pqPutMsgEnd(conn) < 0)
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index c514ca5..c839244 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -55,7 +55,26 @@ static void reportErrorPosition(PQExpBuffer msg, const char *query,
 					int loc, int encoding);
 static int build_startup_packet(const PGconn *conn, char *packet,
 					 const PQEnvironmentOption *options);
+static void pqQueryAdvance(PGconn *conn);
 
+void
+pqQueryAdvance(PGconn *conn)
+{
+	PGquery * query;
+	
+	query = conn->queryHead;
+	if (query == NULL)
+		return;
+	
+	/* Advance queryHead */
+	conn->queryHead = query->next;
+	/* Push last query onto free stack */
+	query->next = conn->queryFree;
+	conn->queryFree = query;
+	
+	if (conn->queryHead == NULL)
+		conn->queryTail = NULL;
+}
 
 /*
  * parseInput: if appropriate, parse input data from backend
@@ -68,7 +87,7 @@ pqParseInput3(PGconn *conn)
 	char		id;
 	int			msgLength;
 	int			avail;
-
+	
 	/*
 	 * Loop to parse successive complete messages available in the buffer.
 	 */
@@ -218,7 +237,15 @@ pqParseInput3(PGconn *conn)
 				case 'Z':		/* backend is ready for new query */
 					if (getReadyForQuery(conn))
 						return;
-					conn->asyncStatus = PGASYNC_IDLE;
+
+					pqQueryAdvance(conn);
+					/* initialize async result-accumulation state */
+					conn->result = NULL;
+					conn->next_result = NULL;
+					if( conn->queryHead != NULL )
+						conn->asyncStatus = PGASYNC_BUSY;
+					else
+						conn->asyncStatus = PGASYNC_IDLE;
 					break;
 				case 'I':		/* empty query */
 					if (conn->result == NULL)
@@ -232,7 +259,7 @@ pqParseInput3(PGconn *conn)
 					break;
 				case '1':		/* Parse Complete */
 					/* If we're doing PQprepare, we're done; else ignore */
-					if (conn->queryclass == PGQUERY_PREPARE)
+					if (conn->queryHead->queryclass == PGQUERY_PREPARE)
 					{
 						if (conn->result == NULL)
 						{
@@ -266,7 +293,7 @@ pqParseInput3(PGconn *conn)
 					break;
 				case 'T':		/* Row Description */
 					if (conn->result == NULL ||
-						conn->queryclass == PGQUERY_DESCRIBE)
+						conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 					{
 						/* First 'T' in a query sequence */
 						if (getRowDescriptions(conn, msgLength))
@@ -299,7 +326,7 @@ pqParseInput3(PGconn *conn)
 					 * instead of TUPLES_OK.  Otherwise we can just ignore
 					 * this message.
 					 */
-					if (conn->queryclass == PGQUERY_DESCRIBE)
+					if (conn->queryHead && conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 					{
 						if (conn->result == NULL)
 						{
@@ -422,6 +449,8 @@ pqParseInput3(PGconn *conn)
 static void
 handleSyncLoss(PGconn *conn, char id, int msgLength)
 {
+	PGquery * query;
+	
 	printfPQExpBuffer(&conn->errorMessage,
 					  libpq_gettext(
 	"lost synchronization with server: got message type \"%c\", length %d\n"),
@@ -430,6 +459,15 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
 	pqSaveErrorResult(conn);
 	conn->asyncStatus = PGASYNC_READY;	/* drop out of GetResult wait loop */
 
+	/* All queries are canceled, move them to the free list and free the query commands */
+	while ((query = conn->queryHead) != 0)
+	{
+		free(query->querycmd);
+		query->querycmd = 0;
+		conn->queryHead = query->next;
+		query->next = conn->queryFree;
+	}
+	
 	pqDropConnection(conn);
 	conn->status = CONNECTION_BAD;		/* No more connection to backend */
 }
@@ -455,7 +493,7 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * PGresult created by getParamDescriptions, and we should fill data into
 	 * that.  Otherwise, create a new, empty PGresult.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if (conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 	{
 		if (conn->result)
 			result = conn->result;
@@ -562,7 +600,7 @@ getRowDescriptions(PGconn *conn, int msgLength)
 	 * If we're doing a Describe, we're done, and ready to pass the result
 	 * back to the client.
 	 */
-	if (conn->queryclass == PGQUERY_DESCRIBE)
+	if (conn->queryHead->queryclass == PGQUERY_DESCRIBE)
 	{
 		conn->asyncStatus = PGASYNC_READY;
 		return 0;
@@ -865,10 +903,10 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
 	val = PQresultErrorField(res, PG_DIAG_STATEMENT_POSITION);
 	if (val)
 	{
-		if (conn->verbosity != PQERRORS_TERSE && conn->last_query != NULL)
+		if (conn->verbosity != PQERRORS_TERSE && conn->queryHead->querycmd != NULL)
 		{
 			/* emit position as a syntax cursor display */
-			querytext = conn->last_query;
+			querytext = conn->queryHead->querycmd;
 			querypos = atoi(val);
 		}
 		else
@@ -1696,7 +1734,7 @@ pqEndcopy3(PGconn *conn)
 		 * If we sent the COPY command in extended-query mode, we must issue a
 		 * Sync as well.
 		 */
-		if (conn->queryclass != PGQUERY_SIMPLE)
+		if (conn->queryHead->queryclass != PGQUERY_SIMPLE)
 		{
 			if (pqPutMsgStart('S', false, conn) < 0 ||
 				pqPutMsgEnd(conn) < 0)
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index b81dc16..750a8b4 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -141,6 +141,13 @@ typedef struct pg_result PGresult;
  */
 typedef struct pg_cancel PGcancel;
 
+/* PGquery encapsulates the progress of a single query command issued
+ * to the async api functions
+ * The contents of this struct are not supposed to be known to applications.
+ */
+typedef struct pg_query PGquery;
+
+
 /* PGnotify represents the occurrence of a NOTIFY message.
  * Ideally this would be an opaque typedef, but it's so simple that it's
  * unlikely to change.
@@ -404,6 +411,10 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
 
+extern PGquery *PQgetFirstQuery(PGconn *conn);
+extern PGquery *PQgetLastQuery(PGconn *conn);
+extern PGquery *PQgetNextQuery(PGquery *query);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 4ef46ff..fb9bd61 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -291,6 +291,16 @@ typedef struct pgDataValue
 	const char *value;			/* data value, without zero-termination */
 } PGdataValue;
 
+typedef struct pg_query
+{
+	PGQueryClass queryclass;
+	char	   *querycmd;		/* last SQL command, or NULL if unknown */
+	bool		singleRowMode;	/* return query result row-by-row? */
+	struct pg_query * next;
+	void	   *userptr;        /* convenience for the user */
+} PGquery;
+
+
 /*
  * PGconn stores all the state data associated with a single connection
  * to a backend.
@@ -350,13 +360,19 @@ struct pg_conn
 	ConnStatusType status;
 	PGAsyncStatusType asyncStatus;
 	PGTransactionStatusType xactStatus; /* never changes to ACTIVE */
-	PGQueryClass queryclass;
-	char	   *last_query;		/* last SQL command, or NULL if unknown */
+	
+	/* queryHead and queryTail form a FIFO representing queries sent
+	 * to the backend.  queryHead is the first query sent, and is the
+	 * query we are receiving results from, or have received results from */
+	PGquery *queryHead;
+	PGquery *queryTail;
+	PGquery *queryFree; /* Reuse PGQuery allocations */
+	int nQueries;
+	
 	char		last_sqlstate[6];		/* last reported SQLSTATE */
 	bool		options_valid;	/* true if OK to attempt connection */
 	bool		nonblocking;	/* whether this connection is using nonblock
 								 * sending semantics */
-	bool		singleRowMode;	/* return current query result row-by-row? */
 	char		copy_is_binary; /* 1 = copy binary, 0 = copy text */
 	int			copy_already_done;		/* # bytes already returned in COPY
 										 * OUT */
diff --git a/src/test/examples/Makefile b/src/test/examples/Makefile
index aee5c04..5d3f317 100644
--- a/src/test/examples/Makefile
+++ b/src/test/examples/Makefile
@@ -14,7 +14,7 @@ override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 override LDLIBS := $(libpq_pgport) $(LDLIBS)
 
 
-PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64
+PROGS = testlibpq testlibpq2 testlibpq3 testlibpq4 testlo testlo64 testlibpqpipeline
 
 all: $(PROGS)
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to