On 2021-Jun-24, Boris Kolpackov wrote:

> Boris Kolpackov <bo...@codesynthesis.com> writes:
> 
> > What's strange here is that the first PQgetResult() call (marked with ???)
> > returns NULL instead of result for INSERT #1 as in the first call sequence.
> 
> I've hit another similar case except now an unexpected NULL result is
> returned in the middle of PGRES_PIPELINE_ABORTED result sequence. The
> call sequence is as follows:

I haven't been able to get this to break for me yet, and I probably
won't today.  In the meantime, here's patches for the first one.  The
test added by 0003 fails, and then 0004 fixes it.

-- 
Álvaro Herrera                            39°49'30"S 73°17'W
>From 40023992dd73edaf7947796e0e4e36065fcd908c Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 23 Jun 2021 12:40:15 -0400
Subject: [PATCH v2 1/4] Clarify that pipeline sync is mandatory

---
 doc/src/sgml/libpq.sgml | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 641970f2a6..7f8a4db089 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -5102,10 +5102,12 @@ int PQflush(PGconn *conn);
      The server executes statements, and returns results, in the order the
      client sends them.  The server will begin executing the commands in the
      pipeline immediately, not waiting for the end of the pipeline.
+     Do note that results are buffered on the server side; a synchronization
+     point, established with <function>PQpipelineSync</function>, is necessary
+     in order for all results to be flushed to the client.
      If any statement encounters an error, the server aborts the current
      transaction and does not execute any subsequent command in the queue
-     until the next synchronization point established by
-     <function>PQpipelineSync</function>;
+     until the next synchronization point;
      a <literal>PGRES_PIPELINE_ABORTED</literal> result is produced for
      each such command.
      (This remains true even if the commands in the pipeline would rollback
-- 
2.30.2

>From 071757645ee0f9f15f57e43447d7c234deb062c0 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Fri, 25 Jun 2021 16:02:00 -0400
Subject: [PATCH v2 2/4] Add PQrequestFlush()

---
 doc/src/sgml/libpq.sgml          | 17 +++++++++++++++
 src/interfaces/libpq/exports.txt |  1 +
 src/interfaces/libpq/fe-exec.c   | 37 ++++++++++++++++++++++++++++++++
 src/interfaces/libpq/libpq-fe.h  |  1 +
 4 files changed, 56 insertions(+)

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 7f8a4db089..4d203f51e1 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -5401,6 +5401,23 @@ int PQpipelineSync(PGconn *conn);
       </para>
      </listitem>
     </varlistentry>
+
+    <varlistentry id="libpq-PQrequestFlush">
+     <term><function>PQrequestFlush</function><indexterm><primary>PQrequestFlush</primary></indexterm></term>
+
+      <listitem>
+       <para>
+        Requests the server to flush its output buffer.  The server does
+        that automatically upon <function>PQpipelineSync</function> or
+        any request when not in pipeline mode; this function is useful
+        if results are expected without establishing a synchronization
+        point.  Returns 1 for success and 0 on failure.
+<synopsis>
+int PQrequestFlush(PGconn *conn);
+</synopsis>
+       </para>
+      </listitem>
+     </varlistentry>
    </variablelist>
   </sect2>
 
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 824a03ffbd..c616059f58 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -185,3 +185,4 @@ PQpipelineSync            182
 PQpipelineStatus          183
 PQsetTraceFlags           184
 PQmblenBounded              185
+PQrequestFlush            186
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 7bd5b3a7b9..00d744eaa7 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -3696,6 +3696,43 @@ PQflush(PGconn *conn)
 	return pqFlush(conn);
 }
 
+/*
+ * Send request for server to flush its buffer
+ */
+int
+PQrequestFlush(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+
+	/* Don't try to send if we know there's no live connection. */
+	if (conn->status != CONNECTION_OK)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("no connection to the server\n"));
+		return 0;
+	}
+
+	/* Can't send while already busy, either, unless enqueuing for later */
+	if (conn->asyncStatus != PGASYNC_IDLE &&
+		conn->pipelineStatus == PQ_PIPELINE_OFF)
+	{
+		appendPQExpBufferStr(&conn->errorMessage,
+							 libpq_gettext("another command is already in progress\n"));
+		return false;
+	}
+
+	if (pqPutMsgStart('H', conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+	{
+		return 0;
+	}
+	/* XXX useless without a flush ...? */
+	pqFlush(conn);
+
+	return 1;
+}
+
 /*
  * pqPipelineFlush
  *
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index cc6032b15b..cf7cbe942e 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -496,6 +496,7 @@ extern PGPing PQpingParams(const char *const *keywords,
 
 /* Force the write buffer to be written (or at least try) */
 extern int	PQflush(PGconn *conn);
+extern int	PQrequestFlush(PGconn *conn);
 
 /*
  * "Fast path" interface --- not really recommended for application
-- 
2.30.2

>From 035781482d139face53c81f70ca4591b5292c4b4 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Fri, 25 Jun 2021 13:26:43 -0400
Subject: [PATCH v2 3/4] test nosync

---
 .../modules/libpq_pipeline/libpq_pipeline.c   | 92 +++++++++++++++++++
 1 file changed, 92 insertions(+)

diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 71eedb6dbb..00d6b9f401 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -230,6 +230,95 @@ test_multi_pipelines(PGconn *conn)
 	fprintf(stderr, "ok\n");
 }
 
+/*
+ * Test behavior when a pipeline dispatches a number of commands that are
+ * not flushed by a sync point.
+ */
+static void
+test_nosync(PGconn *conn)
+{
+	int		numqueries = 0;
+	int		results = 0;
+	int		sock = PQsocket(conn);
+
+	fprintf(stderr, "nosync... ");
+
+	if (sock < 0)
+		pg_fatal("invalid socket");
+
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("could not enter pipeline mode");
+	for (;;)
+	{
+		fd_set		input_mask;
+		struct timeval	tv;
+
+		if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
+							  0, NULL, NULL, NULL, NULL, 0) != 1)
+			pg_fatal("error sending select: %s", PQerrorMessage(conn));
+		PQflush(conn);
+
+		numqueries++;
+
+		/*
+		 * If the server has written anything to us, read (some of) it now.
+		 */
+		FD_ZERO(&input_mask);
+		FD_SET(sock, &input_mask);
+		tv.tv_sec = 0;
+		tv.tv_usec = 0;
+		if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
+		{
+			fprintf(stderr, "select() failed: %s\n", strerror(errno));
+			exit_nicely(conn);
+		}
+		if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
+			pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
+
+		if (numqueries >= 500)
+			break;
+	}
+
+	PQrequestFlush(conn);
+
+	/* Now read all results */
+	for (;;)
+	{
+		PGresult *res;
+
+		res = PQgetResult(conn);
+
+		/* NULL results are only expected after a TUPLES_OK */
+		if (res == NULL)
+			pg_fatal("got unexpected NULL result after %d results", results);
+
+		/* We expect exacly one TUPLES_OK result for each query we sent */
+		if (PQresultStatus(res) == PGRES_TUPLES_OK)
+		{
+			PGresult *res2;
+
+			/* and one NULL result should follow each */
+			res2 = PQgetResult(conn);
+			if (res2 != NULL)
+				pg_fatal("expected NULL, got %s",
+						 PQresStatus(PQresultStatus(res2)));
+			PQclear(res);
+			results++;
+
+			/* if we're done, we're done */
+			if (results == numqueries)
+				break;
+
+			continue;
+		}
+
+		/* anything else is unexpected */
+		pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
+	}
+
+	fprintf(stderr, "ok\n");
+}
+
 /*
  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
  * still have to get results for each pipeline item, but the item will just be
@@ -1237,6 +1326,7 @@ print_test_list(void)
 {
 	printf("disallowed_in_pipeline\n");
 	printf("multi_pipelines\n");
+	printf("nosync\n");
 	printf("pipeline_abort\n");
 	printf("pipelined_insert\n");
 	printf("prepared\n");
@@ -1334,6 +1424,8 @@ main(int argc, char **argv)
 		test_disallowed_in_pipeline(conn);
 	else if (strcmp(testname, "multi_pipelines") == 0)
 		test_multi_pipelines(conn);
+	else if (strcmp(testname, "nosync") == 0)
+		test_nosync(conn);
 	else if (strcmp(testname, "pipeline_abort") == 0)
 		test_pipeline_abort(conn);
 	else if (strcmp(testname, "pipelined_insert") == 0)
-- 
2.30.2

>From 4e26c887512609935aa3251cc6b31c234507c855 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 24 Jun 2021 18:24:04 -0400
Subject: [PATCH v2 4/4] Fix libpq state machine

---
 src/interfaces/libpq/fe-exec.c | 19 +++++--------------
 1 file changed, 5 insertions(+), 14 deletions(-)

diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 00d744eaa7..cb5734c3cd 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1375,8 +1375,7 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-		conn->asyncStatus = PGASYNC_BUSY;
+	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
@@ -1513,8 +1512,7 @@ PQsendPrepare(PGconn *conn,
 
 	pqAppendCmdQueueEntry(conn, entry);
 
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-		conn->asyncStatus = PGASYNC_BUSY;
+	conn->asyncStatus = PGASYNC_BUSY;
 
 	/*
 	 * Give the data a push (in pipeline mode, only if we're past the size
@@ -1817,8 +1815,7 @@ PQsendQueryGuts(PGconn *conn,
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-		conn->asyncStatus = PGASYNC_BUSY;
+	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
@@ -2448,8 +2445,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
-	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
-		conn->asyncStatus = PGASYNC_BUSY;
+	conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
@@ -3084,12 +3080,7 @@ PQpipelineSync(PGconn *conn)
 	 */
 	if (PQflush(conn) < 0)
 		goto sendFailed;
-
-	/*
-	 * Call pqPipelineProcessQueue so the user can call start calling
-	 * PQgetResult.
-	 */
-	pqPipelineProcessQueue(conn);
+	conn->asyncStatus = PGASYNC_BUSY;
 
 	return 1;
 
-- 
2.30.2

Reply via email to