From 1f7366853599622f6c34a36fe24f2aa9ac39058c Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <jelte.fennema@microsoft.com>
Date: Fri, 5 Jan 2024 15:29:41 +0100
Subject: [PATCH v4 7/7] Add protcol message to change protocol extension
 parameters

Originally it was only possible to set protocol extension paramters,
after that their values would stay the same until the end of the
session. This extends the PostgreSQL protocol with a new message type
that can be used to change protocol extension parameters on the fly: the
ParameterSet message.

An important use case for this is connection poolers. Without this
change connection poolers are unable to share server connections across
clients with different values for their protocol extension parameters.
Since it would be impossible to change the protocol parameter to the
value that the client expects, before assigning the server connection to
the client.
---
 doc/src/sgml/config.sgml                      |   6 +-
 doc/src/sgml/libpq.sgml                       |  52 ++++
 doc/src/sgml/protocol.sgml                    |  98 +++++++
 src/backend/tcop/postgres.c                   |  26 ++
 src/backend/utils/misc/guc.c                  |  14 +-
 src/backend/utils/misc/guc_tables.c           |   1 +
 src/include/libpq/protocol.h                  |   2 +
 src/include/utils/guc.h                       |   1 +
 src/interfaces/libpq/exports.txt              |   2 +
 src/interfaces/libpq/fe-exec.c                |  76 ++++++
 src/interfaces/libpq/fe-protocol3.c           |  22 ++
 src/interfaces/libpq/fe-trace.c               |  21 ++
 src/interfaces/libpq/libpq-fe.h               |   3 +
 src/interfaces/libpq/libpq-int.h              |   3 +-
 .../modules/libpq_pipeline/libpq_pipeline.c   | 254 ++++++++++++++++++
 .../libpq_pipeline/t/001_libpq_pipeline.pl    |   2 +-
 16 files changed, 577 insertions(+), 6 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 8308cfdba2c..f22cfbff4db 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -10984,7 +10984,8 @@ dynamic_library_path = 'C:\tools\postgresql;H:\my_project\lib;$libdir'
      protocol extension parameters. These types of parameters are different
      from all other runtime parameters in a few important ways. First, they all
      start with a <literal>_pq_.</literal> prefix. Secondly, they can only be
-     set at the protocol level using the StartupMessage.
+     set at the protocol level using the StartupMessage and ParamaterSet
+     messages.
      It's not possible to set them in any other way (not through command line
      arguments, configuration files, SET, etc). Finally, if you configure one
      of them in the StartupMessage, but the server doesn't support the
@@ -11009,7 +11010,8 @@ dynamic_library_path = 'C:\tools\postgresql;H:\my_project\lib;$libdir'
        <para>
         This parameter can be used to "upgrade" regular runtime parameters to a
         protocol extension parameter. Thus disallowing it to be set in any
-        other way than through the StartupMessage.
+        other way than through the StartupMessage and ParameterSet protocol
+        messages.
         This can be useful to limit the power of an attacker with arbitrary SQL
         execution. For example, if you set
         <literal>_pq_.protocol_managed_params</literal> to
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 46f97bd69a3..dc78d98f03e 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3446,6 +3446,37 @@ PGresult *PQclosePortal(PGconn *conn, const char *portalName);
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry id="libpq-PQparameterSet">
+      <term><function>PQparameterSet</function><indexterm><primary>PQparameterSet</primary></indexterm></term>
+
+      <listitem>
+       <para>
+        Submits a request to change a protocol parameter, and waits for completion.
+<synopsis>
+PGresult *PQparameterSet(PGconn *conn, const char *parameter, const char *value);
+</synopsis>
+       </para>
+
+       <para>
+        <xref linkend="libpq-PQparameterSet"/> allows a client to change
+        protocol extension parameters on the current connection. Protocol
+        parameters start with <literal>_pq_.</literal>, or should be listed in
+        <xref linkend="guc-pq-protocol-managed-params"/>. Making changes to
+        these parameters using this function is non transactional, and this
+        function will return an error if it's called while a transaction is
+        active (or aborted).
+       </para>
+
+       <para>
+        <parameter>parameter</parameter> is the name of the parameter to
+        change, and <parameter>value</parameter> is the value to which to
+        change it. On success, a <structname>PGresult</structname> with status
+        <literal>PGRES_COMMAND_OK</literal> is returned.
+       </para>
+      </listitem>
+     </varlistentry>
+
     </variablelist>
    </para>
 
@@ -5138,6 +5169,27 @@ int PQsendClosePortal(PGconn *conn, const char *portalName);
      </listitem>
     </varlistentry>
 
+    <varlistentry id="libpq-PQsendParameterSet">
+     <term><function>PQsendParameterSet</function><indexterm><primary>PQsendParameterSet</primary></indexterm></term>
+
+     <listitem>
+      <para>
+        Submits a request to change a protocol parameter, without waiting for
+        completion.
+<synopsis>
+int PQsendParameterSet(PGconn *conn, const char *portalName);
+</synopsis>
+
+       This is an asynchronous version of <xref linkend="libpq-PQparameterSet"/>:
+       it returns 1 if it was able to dispatch the request, and 0 if not.
+       After a successful call, call <xref linkend="libpq-PQgetResult"/> to
+       obtain the results.  The function's parameters are handled
+       identically to <xref linkend="libpq-PQparameterSet"/>.
+      </para>
+     </listitem>
+    </varlistentry>
+
+
     <varlistentry id="libpq-PQgetResult">
      <term><function>PQgetResult</function><indexterm><primary>PQgetResult</primary></indexterm></term>
 
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 6c3e8a631d7..1a43a90616e 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1368,6 +1368,36 @@ SELCT 1/0;<!-- this typo is intentional -->
    </note>
   </sect2>
 
+  <sect2 id="protocol-changing-backend-parameters">
+   <title>Changing backend parameters</title>
+   <para>
+    The ParameterSet message can be used to change the value of a protocol
+    extension parameter. It's not allowed to change protocol extension
+    parameters using a SET command. So this message is essentially the SET
+    equivalent for protocol extension parameters. There are a few other
+    differences between SET and ParameterSet. First, ParameterSet is
+    not allowed to be sent while a transaction is in progress.
+    Second, the ParameterSet message is
+    easier for connection poolers to intercept. Finally, using the
+    _pq_.protocol_managed_params protocol extension it's possible to change a
+    normal parameter to a protocol extension parameter. Thus allowing it not to
+    be modified using SET anymore.
+   </para>
+
+   <note>
+    <para>
+     While ParameterSet is not itself part of any transaction, it is possible
+     to execute it as part of a pipeline, but only if no transaction has been
+     started. For most usecases this effectively means that ParameterSet
+     messages need to be the only (or at least the first) messages in a
+     pipeline because any regular query will open an implicit transaction.
+     Just like with other messages in a pipeline, if an error occurs while
+     processing ParameterSet, the following messages in the pipeline are
+     ignored. Regular processing only continues when a Sync is received.
+    </para>
+   </note>
+  </sect2>
+
   <sect2 id="protocol-flow-canceling-requests">
    <title>Canceling Requests in Progress</title>
 
@@ -5271,6 +5301,74 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
     </listitem>
    </varlistentry>
 
+   <varlistentry id="protocol-message-formats-ParameterSet">
+    <term>ParameterSet (F)</term>
+    <listitem>
+     <variablelist>
+      <varlistentry>
+       <term>Byte1('U')</term>
+       <listitem>
+        <para>
+         Identifies the message as a run-time parameter change.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
+       <term>Int32</term>
+       <listitem>
+        <para>
+         Length of message contents in bytes, including self.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
+       <term>String</term>
+       <listitem>
+        <para>
+         The name of the run-time parameter to change.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
+       <term>String</term>
+       <listitem>
+        <para>
+         The new value of the parameter.
+        </para>
+       </listitem>
+      </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry id="protocol-message-formats-ParameterSetComplete">
+    <term>ParameterSetComplete (B)</term>
+    <listitem>
+     <variablelist>
+      <varlistentry>
+       <term>Byte1('U')</term>
+       <listitem>
+        <para>
+         Identifies the message as a ParamaterSet-complete indicator.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
+       <term>Int32(4)</term>
+       <listitem>
+        <para>
+         Length of message contents in bytes, including self.
+        </para>
+       </listitem>
+      </varlistentry>
+     </variablelist>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="protocol-message-formats-Parse">
     <term>Parse (F)</term>
     <listitem>
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 1eaaf3c6c58..370b9c409d8 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -427,6 +427,7 @@ SocketBackend(StringInfo inBuf)
 		case PqMsg_Describe:
 		case PqMsg_Execute:
 		case PqMsg_Flush:
+		case PqMsg_ParameterSet:
 			maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
 			doing_extended_query_message = true;
 			break;
@@ -4851,6 +4852,31 @@ PostgresMain(const char *dbname, const char *username)
 				send_ready_for_query = true;
 				break;
 
+			case PqMsg_ParameterSet:
+				{
+					const char *parameter_name;
+					const char *parameter_value;
+
+					forbidden_in_wal_sender(firstchar);
+					if (IsTransactionOrTransactionBlock())
+						ereport(ERROR,
+								(errcode(ERRCODE_PROTOCOL_VIOLATION),
+								 errmsg("ParameterSet message is not allowed within a transaction")));
+
+					parameter_name = pq_getmsgstring(&input_message);
+					parameter_value = pq_getmsgstring(&input_message);
+					pq_getmsgend(&input_message);
+
+					SetConfigOption(
+									parameter_name,
+									parameter_value,
+									PGC_USERSET,
+									PGC_S_PROTOCOL);
+					if (whereToSendOutput == DestRemote)
+						pq_putemptymessage(PqMsg_ParameterSetComplete);
+				}
+				break;
+
 				/*
 				 * 'X' means that the frontend is closing down the socket. EOF
 				 * means unexpected loss of frontend connection. Either way,
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 07fa42ab073..4adc551c750 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -3575,11 +3575,21 @@ set_config_with_handle(const char *name, config_handle *handle,
 			break;
 	}
 
-	if (record->flags & GUC_PROTOCOL_EXTENSION && source != PGC_S_CLIENT)
+	if (record->flags & GUC_PROTOCOL_EXTENSION)
+	{
+		if (source != PGC_S_CLIENT && source != PGC_S_PROTOCOL)
+		{
+			ereport(elevel,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("parameter can only be set at the protocol level \"%s\"", name)));
+			return 0;
+		}
+	}
+	else if (source == PGC_S_PROTOCOL)
 	{
 		ereport(elevel,
 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-				 errmsg("parameter can only be set at the protocol level \"%s\"", name)));
+				 errmsg("parameter cannot be set at the protocol level \"%s\"", name)));
 		return 0;
 	}
 
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index e17170d5943..a84e08f0c70 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -659,6 +659,7 @@ const char *const GucSource_Names[] =
 	 /* PGC_S_CLIENT */ "client",
 	 /* PGC_S_OVERRIDE */ "override",
 	 /* PGC_S_INTERACTIVE */ "interactive",
+	 /* PGC_S_PROTOCOL */ "protocol",
 	 /* PGC_S_TEST */ "test",
 	 /* PGC_S_SESSION */ "session"
 };
diff --git a/src/include/libpq/protocol.h b/src/include/libpq/protocol.h
index 4b8d4403656..c4040f99a66 100644
--- a/src/include/libpq/protocol.h
+++ b/src/include/libpq/protocol.h
@@ -25,6 +25,7 @@
 #define PqMsg_Parse					'P'
 #define PqMsg_Query					'Q'
 #define PqMsg_Sync					'S'
+#define PqMsg_ParameterSet			'U'
 #define PqMsg_Terminate				'X'
 #define PqMsg_CopyFail				'f'
 #define PqMsg_GSSResponse			'p'
@@ -52,6 +53,7 @@
 #define PqMsg_RowDescription		'T'
 #define PqMsg_FunctionCallResponse	'V'
 #define PqMsg_CopyBothResponse		'W'
+#define PqMsg_ParameterSetComplete	'U'
 #define PqMsg_ReadyForQuery			'Z'
 #define PqMsg_NoData				'n'
 #define PqMsg_PortalSuspended		's'
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index aae7334ccd5..4196d9a30e9 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -118,6 +118,7 @@ typedef enum
 	PGC_S_CLIENT,				/* from client connection request */
 	PGC_S_OVERRIDE,				/* special case to forcibly set default */
 	PGC_S_INTERACTIVE,			/* dividing line for error reporting */
+	PGC_S_PROTOCOL,				/* from a ParameterSet message */
 	PGC_S_TEST,					/* test per-database or per-user setting */
 	PGC_S_SESSION,				/* SET command */
 } GucSource;
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 849617cb9b2..9df8281965a 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -192,3 +192,5 @@ PQclosePortal             189
 PQsendClosePrepared       190
 PQsendClosePortal         191
 PQunsupportedProtocolExtensions 192
+PQparameterSet            193
+PQsendParameterSet        194
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 106d14e6eed..1f7e9cc5614 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -3346,6 +3346,82 @@ PQsendFlushRequest(PGconn *conn)
 	return 1;
 }
 
+/*
+ * PQparameterSet
+ *	  Send a request for the server to change a run-time parameter setting.
+ *
+ * If the query was not even sent, return NULL; conn->errorMessage is set to
+ * a relevant message.
+ * If the query was sent, a new PGresult is returned (which could indicate
+ * either success or failure).  On success, the PGresult contains status
+ * PGRES_COMMAND_OK. The user is responsible for freeing the PGresult via
+ * PQclear() when done with it.
+ */
+PGresult *
+PQparameterSet(PGconn *conn, const char *parameter, const char *value)
+{
+	if (!PQexecStart(conn))
+		return NULL;
+	if (!PQsendParameterSet(conn, parameter, value))
+		return NULL;
+	return PQexecFinish(conn);
+}
+
+/*
+ * PQsendParameterSet
+ *	 Send a request for the server to change a run-time parameter setting.
+ *
+ * Returns 1 on success and 0 on failure.
+ */
+int
+PQsendParameterSet(PGconn *conn, const char *parameter, const char *value)
+{
+	PGcmdQueueEntry *entry = NULL;
+
+	if (!PQsendQueryStart(conn, true))
+		return 0;
+
+	entry = pqAllocCmdQueueEntry(conn);
+	if (entry == NULL)
+		return 0;				/* error msg already set */
+
+	/* construct the Close message */
+	if (pqPutMsgStart(PqMsg_ParameterSet, conn) < 0 ||
+		pqPuts(parameter, conn) < 0 ||
+		pqPuts(value, conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+		goto sendFailed;
+
+	/* construct the Sync message */
+	if (conn->pipelineStatus == PQ_PIPELINE_OFF)
+	{
+		if (pqPutMsgStart(PqMsg_Sync, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
+
+	entry->queryclass = PGQUERY_PARAMETER_SET;
+
+	/*
+	 * Give the data a push (in pipeline mode, only if we're past the size
+	 * threshold).  In nonblock mode, don't complain if we're unable to send
+	 * it all; PQgetResult() will do any additional flushing needed.
+	 */
+	if (pqPipelineFlush(conn) < 0)
+		goto sendFailed;
+
+	/* OK, it's launched! */
+	pqAppendCmdQueueEntry(conn, entry);
+
+	return 1;
+
+sendFailed:
+	pqRecycleCmdQueueEntry(conn, entry);
+	/* error message should be set up already */
+	return 0;
+}
+
+
 /* ====== accessor funcs for PGresult ======== */
 
 ExecStatusType
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 6076c99f053..13e88be0d8f 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -297,6 +297,28 @@ pqParseInput3(PGconn *conn)
 						conn->asyncStatus = PGASYNC_READY;
 					}
 					break;
+				case PqMsg_ParameterSetComplete:
+
+					/*
+					 * If we're doing PQsendParameterSet, we're done; else
+					 * ignore
+					 */
+					if (conn->cmd_queue_head &&
+						conn->cmd_queue_head->queryclass == PGQUERY_PARAMETER_SET)
+					{
+						if (!pgHavePendingResult(conn))
+						{
+							conn->result = PQmakeEmptyPGresult(conn,
+															   PGRES_COMMAND_OK);
+							if (!conn->result)
+							{
+								libpq_append_conn_error(conn, "out of memory");
+								pqSaveErrorResult(conn);
+							}
+						}
+						conn->asyncStatus = PGASYNC_READY;
+					}
+					break;
 				case PqMsg_ParameterStatus:
 					if (getParameterStatus(conn))
 						return;
diff --git a/src/interfaces/libpq/fe-trace.c b/src/interfaces/libpq/fe-trace.c
index c9932fc8a6b..b69ecbd597d 100644
--- a/src/interfaces/libpq/fe-trace.c
+++ b/src/interfaces/libpq/fe-trace.c
@@ -514,6 +514,23 @@ pqTraceOutputW(FILE *f, const char *message, int *cursor, int length)
 		pqTraceOutputInt16(f, message, cursor);
 }
 
+/* ParameterSet(F) or ParameterSetComplete(B) */
+static void
+pqTraceOutputU(FILE *f, bool toServer, const char *message, int *cursor)
+{
+	if (toServer)
+	{
+		fprintf(f, "ParameterSet\t");
+		pqTraceOutputString(f, message, cursor, false);
+		pqTraceOutputString(f, message, cursor, false);
+	}
+	else
+	{
+		fprintf(f, "ParameterSetComplete");
+		/* No message content */
+	}
+}
+
 /* ReadyForQuery */
 static void
 pqTraceOutputZ(FILE *f, const char *message, int *cursor)
@@ -589,6 +606,10 @@ pqTraceOutputMessage(PGconn *conn, const char *message, bool toServer)
 			Assert(PqMsg_Close == PqMsg_CommandComplete);
 			pqTraceOutputC(conn->Pfdebug, toServer, message, &logCursor);
 			break;
+		case PqMsg_ParameterSet:
+			Assert(PqMsg_ParameterSet == PqMsg_ParameterSetComplete);
+			pqTraceOutputU(conn->Pfdebug, toServer, message, &logCursor);
+			break;
 		case PqMsg_CopyData:
 			/* Drop COPY data to reduce the overhead of logging. */
 			break;
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index c60842c8174..dbe6b51542e 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -555,6 +555,9 @@ extern PGresult *PQclosePortal(PGconn *conn, const char *portal);
 extern int	PQsendClosePrepared(PGconn *conn, const char *stmt);
 extern int	PQsendClosePortal(PGconn *conn, const char *portal);
 
+extern PGresult *PQparameterSet(PGconn *conn, const char *parameter, const char *value);
+extern int	PQsendParameterSet(PGconn *conn, const char *parameter, const char *value);
+
 /* Delete a PGresult */
 extern void PQclear(PGresult *res);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 95ed3f0b0c7..58c34fe2c21 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -322,7 +322,8 @@ typedef enum
 	PGQUERY_PREPARE,			/* Parse only (PQprepare) */
 	PGQUERY_DESCRIBE,			/* Describe Statement or Portal */
 	PGQUERY_SYNC,				/* Sync (at end of a pipeline) */
-	PGQUERY_CLOSE				/* Close Statement or Portal */
+	PGQUERY_CLOSE,				/* Close Statement or Portal */
+	PGQUERY_PARAMETER_SET,		/* Set a server parameter */
 } PGQueryClass;
 
 /*
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 71cd04c5f23..1e4d1e7f4f6 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -1046,6 +1046,257 @@ test_prepared(PGconn *conn)
 	fprintf(stderr, "ok\n");
 }
 
+static void
+test_parameter_set(PGconn *conn)
+{
+	PGresult   *res = NULL;
+	const char *val;
+
+	res = PQparameterSet(conn, "work_mem", "42MB");
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("Should not be allowed to set non protocol parameters using ParameterSet");
+
+	/* Outside of a pipeline */
+	res = PQexec(conn, "SHOW _pq_.protocol_managed_params");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Expected tuples, got %s: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	if (PQntuples(res) != 1)
+		pg_fatal("expected 1 result, got %d", PQntuples(res));
+	val = PQgetvalue(res, 0, 0);
+	if (strcmp(val, "") != 0)
+		pg_fatal("expected work_mem, got %s", val);
+
+	if (PQsendParameterSet(conn, "_pq_.protocol_managed_params", "work_mem") != 1)
+		pg_fatal("PQsendParameterSet failed: %s", PQerrorMessage(conn));
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("failed to set parameter: %s", PQerrorMessage(conn));
+	PQclear(res);
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+
+	res = PQexec(conn, "SHOW _pq_.protocol_managed_params");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Expected tuples, got %s: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	if (PQntuples(res) != 1)
+		pg_fatal("expected 1 result, got %d", PQntuples(res));
+	val = PQgetvalue(res, 0, 0);
+	if (strcmp(val, "work_mem") != 0)
+		pg_fatal("expected work_mem, got %s", val);
+
+	if (PQsendParameterSet(conn, "work_mem", "42MB") != 1)
+		pg_fatal("PQsendParameterSet failed: %s", PQerrorMessage(conn));
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("failed to set parameter: %s", PQerrorMessage(conn));
+	PQclear(res);
+
+	res = PQexec(conn, "SHOW work_mem");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Expected tuples, got %s: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	if (PQntuples(res) != 1)
+		pg_fatal("expected 1 result, got %d", PQntuples(res));
+
+	val = PQgetvalue(res, 0, 0);
+	if (strcmp(val, "42MB") != 0)
+		pg_fatal("expected 42MB, got %s", val);
+	PQclear(res);
+
+	res = PQexec(conn, "RESET ALL");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("Expected tuples, got %s: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+
+	res = PQexec(conn, "SHOW _pq_.protocol_managed_params");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Expected tuples, got %s: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	if (PQntuples(res) != 1)
+		pg_fatal("expected 1 result, got %d", PQntuples(res));
+	val = PQgetvalue(res, 0, 0);
+	if (strcmp(val, "work_mem") != 0)
+		pg_fatal("expected 42MB, got %s", val);
+
+	res = PQexec(conn, "SHOW work_mem");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Expected tuples, got %s: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	if (PQntuples(res) != 1)
+		pg_fatal("expected 1 result, got %d", PQntuples(res));
+
+	val = PQgetvalue(res, 0, 0);
+	if (strcmp(val, "42MB") != 0)
+		pg_fatal("expected 42MB, got %s", val);
+	PQclear(res);
+
+	/* In a pipeline with other queries */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+	if (PQsendParameterSet(conn, "work_mem", "10MB") != 1)
+		pg_fatal("PQsendParameterSet failed: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("failed to set parameter: %s", PQerrorMessage(conn));
+	PQclear(res);
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+
+	/*
+	 * This is fine, but it opens an implicit transaction which should cause
+	 * the next ParameterSet message to fail
+	 */
+	if (PQsendQueryParams(conn, "SHOW work_mem", 0, NULL, NULL, NULL, NULL, 0) != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+				 PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("unexpected result code %s from first pipeline item",
+				 PQresStatus(PQresultStatus(res)));
+
+	val = PQgetvalue(res, 0, 0);
+	if (strcmp(val, "10MB") != 0)
+		pg_fatal("expected 10MB, got %s", val);
+	PQclear(res);
+
+	if (PQsendParameterSet(conn, "work_mem", "10MB") != 1)
+		pg_fatal("PQsendParameterSet failed: %s", PQerrorMessage(conn));
+	PQsendFlushRequest(conn);
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("PQsendParameterSet should not be allowed in a transaction");
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("Sync should fail too");
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
+
+	/* In blocking mode */
+	res = PQparameterSet(conn, "work_mem", "42MB");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("failed to set parameter: %s", PQerrorMessage(conn));
+	PQclear(res);
+	res = PQexec(conn, "SHOW work_mem");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Expected tuples, got %s: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	if (PQntuples(res) != 1)
+		pg_fatal("expected 1 result, got %d", PQntuples(res));
+
+	val = PQgetvalue(res, 0, 0);
+	if (strcmp(val, "42MB") != 0)
+		pg_fatal("expected 42MB, got %s", val);
+	PQclear(res);
+
+	/* In transaction */
+
+	res = PQexec(conn, "BEGIN");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
+
+	res = PQparameterSet(conn, "work_mem", "30MB");
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("PQparameterSet should have failed in a transaction");
+
+	res = PQexec(conn, "SELECT 1");
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("PQexec should have failed with 'current transaction is aborted'");
+
+	res = PQexec(conn, "ROLLBACK");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("failed to set parameter: %s", PQerrorMessage(conn));
+
+	res = PQexec(conn, "SHOW work_mem");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Expected tuples, got %s: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	val = PQgetvalue(res, 0, 0);
+	if (strcmp(val, "42MB") != 0)
+		pg_fatal("expected 42MB, got %s", val);
+	PQclear(res);
+
+	/* In a failed pipeline */
+	if (PQenterPipelineMode(conn) != 1)
+		pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+	if (PQsendQueryParams(conn, "SELECT 0/0", 0, NULL, NULL, NULL, NULL, 0) != 1)
+		pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+	if (PQsendParameterSet(conn, "work_mem", "12MB") != 1)
+		pg_fatal("PQsendParameterSet failed: %s", PQerrorMessage(conn));
+	if (PQpipelineSync(conn) != 1)
+		pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("PQexec should have failed with division by zero");
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
+		pg_fatal("pipeline was not aborted");
+	res = PQgetResult(conn);
+	if (res != NULL)
+		pg_fatal("did not receive terminating NULL");
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+		pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	if (PQexitPipelineMode(conn) != 1)
+		pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
+
+	res = PQexec(conn, "SHOW work_mem");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("Expected tuples, got %s: %s",
+				 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+	val = PQgetvalue(res, 0, 0);
+	if (strcmp(val, "42MB") != 0)
+		pg_fatal("expected 42MB, got %s", val);
+	PQclear(res);
+
+	res = PQparameterSet(conn, "_pq_.protocol_managed_params", "role,session_authorization");
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("failed to set parameter: %s", PQerrorMessage(conn));
+
+	res = PQparameterSet(conn, "work_mem", "42MB");
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("Should not be allowed to set work_mem anymore using ParameterSet");
+
+	res = PQexec(conn, "SET SESSION AUTHORIZATION 'postgres'");
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("Should not be allowed to set work_mem anymore using ParameterSet");
+
+	res = PQexec(conn, "SET ROLE 'postgres'");
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("Should not be allowed to set work_mem anymore using ParameterSet");
+	res = PQparameterSet(conn, "_pq_.protocol_managed_params", "doesnotexist");
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("Should not be allowed to use an unknown GUC");
+
+	res = PQparameterSet(conn, "_pq_.protocol_managed_params", "work_mem,_pq_.protocol_managed_params");
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("Should not be allowed to use a protocol extension");
+
+	res = PQparameterSet(conn, "_pq_.protocol_managed_params", "\"");
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal("Should not be allowed to use invalid list syntax");
+}
+
 /* Notice processor: print notices, and count how many we got */
 static void
 notice_processor(void *arg, const char *message)
@@ -1749,6 +2000,7 @@ print_test_list(void)
 	printf("disallowed_in_pipeline\n");
 	printf("multi_pipelines\n");
 	printf("nosync\n");
+	printf("parameter_set\n");
 	printf("pipeline_abort\n");
 	printf("pipeline_idle\n");
 	printf("pipelined_insert\n");
@@ -1853,6 +2105,8 @@ main(int argc, char **argv)
 		test_multi_pipelines(conn);
 	else if (strcmp(testname, "nosync") == 0)
 		test_nosync(conn);
+	else if (strcmp(testname, "parameter_set") == 0)
+		test_parameter_set(conn);
 	else if (strcmp(testname, "pipeline_abort") == 0)
 		test_pipeline_abort(conn);
 	else if (strcmp(testname, "pipeline_idle") == 0)
diff --git a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
index f9e6d07fc0b..eec5d954bf0 100644
--- a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
+++ b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
@@ -37,7 +37,7 @@ for my $testname (@tests)
 {
 	my @extraargs = ('-r', $numrows);
 	my $cmptrace = grep(/^$testname$/,
-		qw(simple_pipeline nosync multi_pipelines prepared singlerow
+		qw(simple_pipeline nosync multi_pipelines parameter_set prepared singlerow
 		  pipeline_abort pipeline_idle transaction
 		  disallowed_in_pipeline)) > 0;
 
-- 
2.34.1

