Hello,

Thanks for the feedback!

On 07/11/2023 09:23, Jelte Fennema-Nio wrote:
> But I think it's looking at the situation from the wrong direction. [...] we should look at it as an addition to our current list of PQsend functions for a new packet type. And none of those PQsend functions ever needed a flag. Which makes sense, because they are the lowest level building blocks that make sense from a user perspective: They send a message type over the socket and don't do anything else.

Yes, I think that this is quite close to my thinking when I created the original version of the patch. Also, the protocol specification states that the Sync message lacks parameters.

Since there haven't been any comments from the other people who have chimed in on this e-mail thread, I will assume that there is consensus (we are doing a U-turn with the implementation approach after all), so here is the updated version of the patch.

Best wishes,
Anton Kirilov
From b752269b2763f8d66bcfc79faf751e52226c344b Mon Sep 17 00:00:00 2001
From: Anton Kirilov <antonvkiri...@gmail.com>
Date: Wed, 22 Mar 2023 20:39:57 +0000
Subject: [PATCH v5] Add PQsendPipelineSync() to libpq

This new function is equivalent to PQpipelineSync(), except
that it does not flush anything to the server; the user must
subsequently call PQflush() instead. Its purpose is to reduce
the system call overhead of pipeline mode.
---
 doc/src/sgml/libpq.sgml                       | 45 ++++++++++++++++---
 src/interfaces/libpq/exports.txt              |  1 +
 src/interfaces/libpq/fe-exec.c                | 17 +++++--
 src/interfaces/libpq/libpq-fe.h               |  1 +
 .../modules/libpq_pipeline/libpq_pipeline.c   | 37 +++++++++++++++
 .../traces/multi_pipelines.trace              | 11 +++++
 6 files changed, 102 insertions(+), 10 deletions(-)

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 64b2910fee..61bee82a54 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3547,8 +3547,9 @@ ExecStatusType PQresultStatus(const PGresult *res);
           <listitem>
            <para>
             The <structname>PGresult</structname> represents a
-            synchronization point in pipeline mode, requested by
-            <xref linkend="libpq-PQpipelineSync"/>.
+            synchronization point in pipeline mode, requested by either
+            <xref linkend="libpq-PQpipelineSync"/> or
+            <xref linkend="libpq-PQsendPipelineSync"/>.
             This status occurs only when pipeline mode has been selected.
            </para>
           </listitem>
@@ -5122,7 +5123,8 @@ int PQsendClosePortal(PGconn *conn, const char *portalName);
        <xref linkend="libpq-PQsendDescribePrepared"/>,
        <xref linkend="libpq-PQsendDescribePortal"/>,
        <xref linkend="libpq-PQsendClosePrepared"/>,
-       <xref linkend="libpq-PQsendClosePortal"/>, or
+       <xref linkend="libpq-PQsendClosePortal"/>,
+       <xref linkend="libpq-PQsendPipelineSync"/>, or
        <xref linkend="libpq-PQpipelineSync"/>
        call, and returns it.
        A null pointer is returned when the command is complete and there
@@ -5506,8 +5508,9 @@ int PQflush(PGconn *conn);
      client sends them.  The server will begin executing the commands in the
      pipeline immediately, not waiting for the end of the pipeline.
      Note that results are buffered on the server side; the server flushes
-     that buffer when a synchronization point is established with
-     <function>PQpipelineSync</function>, or when
+     that buffer when a synchronization point is established with either
+     <function>PQpipelineSync</function> or
+     <function>PQsendPipelineSync</function>, or when
      <function>PQsendFlushRequest</function> is called.
      If any statement encounters an error, the server aborts the current
      transaction and does not execute any subsequent command in the queue
@@ -5564,7 +5567,8 @@ int PQflush(PGconn *conn);
      <type>PGresult</type> types <literal>PGRES_PIPELINE_SYNC</literal>
      and <literal>PGRES_PIPELINE_ABORTED</literal>.
      <literal>PGRES_PIPELINE_SYNC</literal> is reported exactly once for each
-     <function>PQpipelineSync</function> at the corresponding point
+     <function>PQpipelineSync</function> or
+     <function>PQsendPipelineSync</function> at the corresponding point
      in the pipeline.
      <literal>PGRES_PIPELINE_ABORTED</literal> is emitted in place of a normal
      query result for the first error and all subsequent results
@@ -5602,7 +5606,8 @@ int PQflush(PGconn *conn);
      <function>PQresultStatus</function> will report a
      <literal>PGRES_PIPELINE_ABORTED</literal> result for each remaining queued
      operation in an aborted pipeline. The result for
-     <function>PQpipelineSync</function> is reported as
+     <function>PQpipelineSync</function> or
+     <function>PQsendPipelineSync</function> is reported as
      <literal>PGRES_PIPELINE_SYNC</literal> to signal the end of the aborted pipeline
      and resumption of normal result processing.
     </para>
@@ -5834,6 +5839,32 @@ int PQsendFlushRequest(PGconn *conn);
        </para>
       </listitem>
      </varlistentry>
+
+    <varlistentry id="libpq-PQsendPipelineSync">
+     <term><function>PQsendPipelineSync</function><indexterm><primary>PQsendPipelineSync</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       Marks a synchronization point in a pipeline by sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>
+       without flushing the send buffer. This serves as
+       the delimiter of an implicit transaction and an error recovery
+       point; see <xref linkend="libpq-pipeline-errors"/>.
+
+<synopsis>
+int PQsendPipelineSync(PGconn *conn);
+</synopsis>
+      </para>
+      <para>
+       Returns 1 for success. Returns 0 if the connection is not in
+       pipeline mode or sending a
+       <link linkend="protocol-flow-ext-query">sync message</link>
+       failed.
+       Note that the message is not itself flushed to the server automatically;
+       use <function>PQflush</function> if necessary.
+      </para>
+     </listitem>
+    </varlistentry>
    </variablelist>
   </sect2>
 
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 850734ac96..f5aa641001 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -191,3 +191,4 @@ PQclosePrepared           188
 PQclosePortal             189
 PQsendClosePrepared       190
 PQsendClosePortal         191
+PQsendPipelineSync        192
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 04610ccf5e..c0dcb04282 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -3224,6 +3224,16 @@ pqPipelineProcessQueue(PGconn *conn)
  */
 int
 PQpipelineSync(PGconn *conn)
+{
+	return PQsendPipelineSync(conn) && pqFlush(conn) >= 0;
+}
+
+/*
+ * PQsendPipelineSync
+ *		Send a Sync message as part of a pipeline without flushing to server
+ */
+int
+PQsendPipelineSync(PGconn *conn)
 {
 	PGcmdQueueEntry *entry;
 
@@ -3267,10 +3277,11 @@ PQpipelineSync(PGconn *conn)
 		goto sendFailed;
 
 	/*
-	 * Give the data a push.  In nonblock mode, don't complain if we're unable
-	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 * Give the data a push if we're past the size threshold. In nonblock
+	 * mode, don't complain if we're unable to send it all; the caller is
+	 * expected to execute PQflush() at some point anyway.
 	 */
-	if (PQflush(conn) < 0)
+	if (pqPipelineFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 97762d56f5..b0d89a592d 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -474,6 +474,7 @@ extern int	PQenterPipelineMode(PGconn *conn);
 extern int	PQexitPipelineMode(PGconn *conn);
 extern int	PQpipelineSync(PGconn *conn);
 extern int	PQsendFlushRequest(PGconn *conn);
+extern int	PQsendPipelineSync(PGconn *conn);
 
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 3c009ee153..23fb80986b 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -169,6 +169,14 @@ test_multi_pipelines(PGconn *conn)
 	if (PQpipelineSync(conn) != 1)
 		pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
 
+	if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+						  dummy_params, NULL, NULL, 0) != 1)
+		pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
+
+	/* Skip flushing once. */
+	if (PQsendPipelineSync(conn) != 1)
+		pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
+
 	if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
 						  dummy_params, NULL, NULL, 0) != 1)
 		pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
@@ -206,6 +214,35 @@ test_multi_pipelines(PGconn *conn)
 
 	/* second pipeline */
 
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+	PQclear(res);
+	res = NULL;
+
+	if (PQgetResult(conn) != NULL)
+		pg_fatal("PQgetResult returned something extra after first result");
+
+	if (PQexitPipelineMode(conn) != 0)
+		pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when sync result expected: %s",
+				 PQerrorMessage(conn));
+
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of sync result, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	PQclear(res);
+
+	/* third pipeline */
+
 	res = PQgetResult(conn);
 	if (res == NULL)
 		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
diff --git a/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace
index 4b9ab07ca4..1ee21f61dc 100644
--- a/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace
+++ b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace
@@ -8,6 +8,17 @@ F	19	Bind	 "" "" 0 1 1 '1' 1 0
 F	6	Describe	 P ""
 F	9	Execute	 "" 0
 F	4	Sync
+F	21	Parse	 "" "SELECT $1" 1 NNNN
+F	19	Bind	 "" "" 0 1 1 '1' 1 0
+F	6	Describe	 P ""
+F	9	Execute	 "" 0
+F	4	Sync
+B	4	ParseComplete
+B	4	BindComplete
+B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
+B	11	DataRow	 1 1 '1'
+B	13	CommandComplete	 "SELECT 1"
+B	5	ReadyForQuery	 I
 B	4	ParseComplete
 B	4	BindComplete
 B	33	RowDescription	 1 "?column?" NNNN 0 NNNN 4 -1 0
-- 
2.34.1

Reply via email to