Hello,

On 05/07/2023 21:45, Daniel Gustafsson wrote:
Please rebase and send an updated version.
Here it is (including the warning fix).

Thanks,
Anton Kirilov
From 3c2e064a151568830e4d9e4bf238739b458350b4 Mon Sep 17 00:00:00 2001
From: Anton Kirilov <antonvkiri...@gmail.com>
Date: Wed, 22 Mar 2023 20:39:57 +0000
Subject: [PATCH v4] Add PQpipelinePutSync() to libpq

This new function is equivalent to PQpipelineSync(),
except that it lets the caller choose whether anything
is flushed to the server. Its purpose is to reduce the
system call overhead of pipeline mode.
---
 doc/src/sgml/libpq.sgml                       | 52 ++++++++++++++++---
 src/interfaces/libpq/exports.txt              |  1 +
 src/interfaces/libpq/fe-exec.c                | 30 ++++++++---
 src/interfaces/libpq/libpq-fe.h               |  6 +++
 .../modules/libpq_pipeline/libpq_pipeline.c   | 37 +++++++++++++
 .../traces/multi_pipelines.trace              | 11 ++++
 6 files changed, 121 insertions(+), 16 deletions(-)

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index b6f5aba1b1..0bc88b1569 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3547,8 +3547,9 @@ ExecStatusType PQresultStatus(const PGresult *res);
           <listitem>
            <para>
             The <structname>PGresult</structname> represents a
-            synchronization point in pipeline mode, requested by
-            <xref linkend="libpq-PQpipelineSync"/>.
+            synchronization point in pipeline mode, requested by either
+            <xref linkend="libpq-PQpipelineSync"/> or
+            <xref linkend="libpq-PQpipelinePutSync"/>.
             This status occurs only when pipeline mode has been selected.
            </para>
           </listitem>
@@ -5122,7 +5123,8 @@ int PQsendClosePortal(PGconn *conn, const char *portalName);
        <xref linkend="libpq-PQsendDescribePrepared"/>,
        <xref linkend="libpq-PQsendDescribePortal"/>,
        <xref linkend="libpq-PQsendClosePrepared"/>,
-       <xref linkend="libpq-PQsendClosePortal"/>, or
+       <xref linkend="libpq-PQsendClosePortal"/>,
+       <xref linkend="libpq-PQpipelinePutSync"/>, or
        <xref linkend="libpq-PQpipelineSync"/>
        call, and returns it.
        A null pointer is returned when the command is complete and there
@@ -5490,7 +5492,8 @@ int PQflush(PGconn *conn);
      or its prepared-query sibling
      <xref linkend="libpq-PQsendQueryPrepared"/>.
      These requests are queued on the client-side until flushed to the server;
-     this occurs when <xref linkend="libpq-PQpipelineSync"/> is used to
+     this occurs when <xref linkend="libpq-PQpipelineSync"/> (or optionally
+     <xref linkend="libpq-PQpipelinePutSync"/>) is used to
      establish a synchronization point in the pipeline,
      or when <xref linkend="libpq-PQflush"/> is called.
      The functions <xref linkend="libpq-PQsendPrepare"/>,
@@ -5506,8 +5509,9 @@ int PQflush(PGconn *conn);
      client sends them.  The server will begin executing the commands in the
      pipeline immediately, not waiting for the end of the pipeline.
      Note that results are buffered on the server side; the server flushes
-     that buffer when a synchronization point is established with
-     <function>PQpipelineSync</function>, or when
+     that buffer when a synchronization point is established with either
+     <function>PQpipelineSync</function> or
+     <function>PQpipelinePutSync</function>, or when
      <function>PQsendFlushRequest</function> is called.
      If any statement encounters an error, the server aborts the current
      transaction and does not execute any subsequent command in the queue
@@ -5564,7 +5568,8 @@ int PQflush(PGconn *conn);
      <type>PGresult</type> types <literal>PGRES_PIPELINE_SYNC</literal>
      and <literal>PGRES_PIPELINE_ABORTED</literal>.
      <literal>PGRES_PIPELINE_SYNC</literal> is reported exactly once for each
-     <function>PQpipelineSync</function> at the corresponding point
+     <function>PQpipelineSync</function> or
+     <function>PQpipelinePutSync</function> at the corresponding point
      in the pipeline.
      <literal>PGRES_PIPELINE_ABORTED</literal> is emitted in place of a normal
      query result for the first error and all subsequent results
@@ -5602,7 +5607,8 @@ int PQflush(PGconn *conn);
      <function>PQresultStatus</function> will report a
      <literal>PGRES_PIPELINE_ABORTED</literal> result for each remaining queued
      operation in an aborted pipeline. The result for
-     <function>PQpipelineSync</function> is reported as
+     <function>PQpipelineSync</function> or
+     <function>PQpipelinePutSync</function> is reported as
      <literal>PGRES_PIPELINE_SYNC</literal> to signal the end of the aborted pipeline
      and resumption of normal result processing.
     </para>
@@ -5834,6 +5840,36 @@ int PQsendFlushRequest(PGconn *conn);
        </para>
       </listitem>
      </varlistentry>
+
+    <varlistentry id="libpq-PQpipelinePutSync">
+     <term><function>PQpipelinePutSync</function><indexterm><primary>PQpipelinePutSync</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       Marks a synchronization point in a pipeline by sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>.
+       This serves as the delimiter of an implicit transaction and
+       an error recovery point; see <xref linkend="libpq-pipeline-errors"/>.
+
+<synopsis>
+int PQpipelinePutSync(PGconn *conn, int flags);
+</synopsis>
+      </para>
+      <para>
+       Returns 0 for success. Returns -1 if the connection is not in
+       pipeline mode or sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>
+       failed. Returns 1 if it was unable to send all the data in
+       the send queue yet (this case can only occur if the connection
+       is nonblocking and output data is flushed to the server).
+       The <parameter>flags</parameter> argument is a bitwise OR of
+       several flags. <literal>PG_PIPELINEPUTSYNC_FLUSH</literal>
+       specifies that any queued output data is flushed to the
+       server. Otherwise, nothing is flushed automatically;
+       use <function>PQflush</function> if necessary.
+      </para>
+     </listitem>
+    </varlistentry>
    </variablelist>
   </sect2>
 
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 850734ac96..18e7b31539 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -191,3 +191,4 @@ PQclosePrepared           188
 PQclosePortal             189
 PQsendClosePrepared       190
 PQsendClosePortal         191
+PQpipelinePutSync         192
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 01f8efabbe..823bd74267 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -3225,16 +3225,28 @@ pqPipelineProcessQueue(PGconn *conn)
  */
 int
 PQpipelineSync(PGconn *conn)
+{
+	return PQpipelinePutSync(conn, PG_PIPELINEPUTSYNC_FLUSH) >= 0;
+}
+
+/*
+ * PQpipelinePutSync
+ *		Send a Sync message as part of a pipeline,
+ *		and optionally flush to server
+ */
+int
+PQpipelinePutSync(PGconn *conn, int flags)
 {
 	PGcmdQueueEntry *entry;
+	int ret;
 
-	if (!conn)
-		return 0;
+	if (!conn || flags & ~PG_PIPELINEPUTSYNC_FLUSH)
+		return -1;
 
 	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
 	{
 		libpq_append_conn_error(conn, "cannot send pipeline when not in pipeline mode");
-		return 0;
+		return -1;
 	}
 
 	switch (conn->asyncStatus)
@@ -3245,7 +3257,7 @@ PQpipelineSync(PGconn *conn)
 			/* should be unreachable */
 			appendPQExpBufferStr(&conn->errorMessage,
 								 "internal error: cannot send pipeline while in COPY\n");
-			return 0;
+			return -1;
 		case PGASYNC_READY:
 		case PGASYNC_READY_MORE:
 		case PGASYNC_BUSY:
@@ -3257,7 +3269,7 @@ PQpipelineSync(PGconn *conn)
 
 	entry = pqAllocCmdQueueEntry(conn);
 	if (entry == NULL)
-		return 0;				/* error msg already set */
+		return -1;				/* error msg already set */
 
 	entry->queryclass = PGQUERY_SYNC;
 	entry->query = NULL;
@@ -3271,18 +3283,20 @@ PQpipelineSync(PGconn *conn)
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (PQflush(conn) < 0)
+	ret = flags & PG_PIPELINEPUTSYNC_FLUSH ? PQflush(conn) : 0;
+
+	if (ret < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
 	pqAppendCmdQueueEntry(conn, entry);
 
-	return 1;
+	return ret;
 
 sendFailed:
 	pqRecycleCmdQueueEntry(conn, entry);
 	/* error message should be set up already */
-	return 0;
+	return -1;
 }
 
 /*
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 97762d56f5..eaa09696bb 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -47,6 +47,11 @@ extern "C"
 #define PG_COPYRES_EVENTS		  0x04
 #define PG_COPYRES_NOTICEHOOKS	  0x08
 
+/*
+ * Option flags for PQpipelinePutSync
+ */
+#define PG_PIPELINEPUTSYNC_FLUSH  1
+
 /* Application-visible enum types */
 
 /*
@@ -474,6 +479,7 @@ extern int	PQenterPipelineMode(PGconn *conn);
 extern int	PQexitPipelineMode(PGconn *conn);
 extern int	PQpipelineSync(PGconn *conn);
 extern int	PQsendFlushRequest(PGconn *conn);
+extern int	PQpipelinePutSync(PGconn *conn, int flags);
 
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 9907bc8600..bf2d333ad0 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -169,6 +169,14 @@ test_multi_pipelines(PGconn *conn)
 	if (PQpipelineSync(conn) != 1)
 		pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
 
+	if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
+
+	/* Skip flushing once. */
+	if (PQpipelinePutSync(conn, 0) < 0)
+		pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
+
 	if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
 						  dummy_params, NULL, NULL, 0) != 1)
 		pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
@@ -206,6 +214,35 @@ test_multi_pipelines(PGconn *conn)
 
 	/* second pipeline */
 
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = NULL;
+
+	if (PQgetResult(conn) != NULL)
+		pg_fatal("PQgetResult returned something extra after first result");
+
+	if (PQexitPipelineMode(conn) != 0)
+		pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when sync result expected: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of sync result, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	PQclear(res);
+
+	/* third pipeline */
+
 	res = PQgetResult(conn);
 	if (res == NULL)
 		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
diff --git a/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace
index 4b9ab07ca4..1ee21f61dc 100644
--- a/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace
+++ b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace
@@ -8,6 +8,17 @@ F	19	Bind	 "" "" 0 1 1 '1' 1 0
 F	6	Describe	 P ""
 F	9	Execute	 "" 0
 F	4	Sync
+F	21	Parse	 "" "SELECT $1" 1 NNNN
+F	19	Bind	 "" "" 0 1 1 '1' 1 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	4	Sync
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+B	5	ReadyForQuery	 I
 B	4	ParseComplete
 B	4	BindComplete
 B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
-- 
2.34.1

Reply via email to