From 762e19dc2db535393cefabeea97ed3aad1c1f5c3 Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Tue, 5 Nov 2024 10:26:54 +0100
Subject: Add pipeline support in psql

With \bind, \parse, \bind_named and \close, it is possible to issue
queries from psql using the extended protocol. However, it wasn't
possible to send those queries using pipelining and the only way to test
pipelined queries was through pgbench's tap tests.

This patch adds additional psql meta-commands to support pipelining:
\startpipeline, \endpipeline and \syncpipeline, mirroring the existing
meta-commands in pgbench.

\startpipeline starts a new pipeline. All extended queries will be
queued until the end of the pipeline is reached.
\endpipeline ends an ongoing pipeline. All queued commands will be sent
to the server and all responses will be processed by the psql.
\syncpipeline queue a synchronisation point without flushing the
commands to the server

Those meta-commands will allow to test pipelined query behaviour using
psql regression tests.
---
 doc/src/sgml/ref/psql-ref.sgml     |  50 ++++++
 src/bin/psql/command.c             |  74 ++++++++
 src/bin/psql/common.c              |  69 +++++++-
 src/bin/psql/help.c                |   3 +
 src/bin/psql/settings.h            |   4 +
 src/bin/psql/tab-complete.in.c     |   4 +-
 src/test/regress/expected/psql.out | 266 +++++++++++++++++++++++++++++
 src/test/regress/sql/psql.sql      | 156 +++++++++++++++++
 8 files changed, 622 insertions(+), 4 deletions(-)

diff --git a/doc/src/sgml/ref/psql-ref.sgml b/doc/src/sgml/ref/psql-ref.sgml
index e42073ed748..13dadc72615 100644
--- a/doc/src/sgml/ref/psql-ref.sgml
+++ b/doc/src/sgml/ref/psql-ref.sgml
@@ -3562,6 +3562,56 @@ testdb=&gt; <userinput>\setenv LESS -imx4F</userinput>
         </listitem>
       </varlistentry>
 
+     <varlistentry id="app-psql-meta-command-pipeline">
+      <term><literal>\startpipeline</literal></term>
+      <term><literal>\syncpipeline</literal></term>
+      <term><literal>\endpipeline</literal></term>
+
+      <listitem>
+        <para>
+          This group of commands implements pipelining of SQL statements.
+          A pipeline must begin with a <command>\startpipeline</command>
+          and end with an <command>\endpipeline</command>. In between there
+          may be any number of <command>\syncpipeline</command> commands,
+          which sends a <link linkend="protocol-flow-ext-query">sync message</link>
+          without ending the ongoing pipeline and flushing the send buffer.
+          In pipeline mode, statements are sent to the server without waiting
+          for the results of previous statements.  See
+          <xref linkend="libpq-pipeline-mode"/> for more details.
+       </para>
+
+        <para>
+          Pipeline mode requires the use of extended query protocol. All queries need
+          to be sent using the meta-commands <literal>\bind</literal>,
+          <literal>\bind_named</literal>, <literal>\close</literal> or
+          <literal>\parse</literal>. While a pipeline is ongoing,
+          <literal>\g</literal> will append the current query buffer to the pipeline and
+          other meta-commands like <literal>\gx</literal> or <literal>\gdesc</literal>
+          are not allowed in pipeline mode.
+       </para>
+
+        <para>
+       </para>
+
+       <para>
+        Example:
+<programlisting>
+\startpipeline
+SELECT pg_current_xact_id() \bind \g
+SELECT $1 \parse stmt1
+SELECT $1, $2 \parse stmt2
+\bind_named stmt1 1 \g
+\syncpipeline
+\bind_named stmt2 1, 2 \g
+SELECT pg_current_xact_id() \bind \g
+\close stmt1
+\close stmt2
+\endpipeline
+</programlisting></para>
+
+      </listitem>
+     </varlistentry>
+
 
       <varlistentry id="app-psql-meta-command-t-lc">
         <term><literal>\t</literal></term>
diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c
index ca69912905f..ab8f06a5883 100644
--- a/src/bin/psql/command.c
+++ b/src/bin/psql/command.c
@@ -90,6 +90,7 @@ static backslashResult exec_command_else(PsqlScanState scan_state, ConditionalSt
 										 PQExpBuffer query_buf);
 static backslashResult exec_command_endif(PsqlScanState scan_state, ConditionalStack cstack,
 										  PQExpBuffer query_buf);
+static backslashResult exec_command_endpipeline(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_encoding(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_errverbose(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_f(PsqlScanState scan_state, bool active_branch);
@@ -132,6 +133,8 @@ static backslashResult exec_command_setenv(PsqlScanState scan_state, bool active
 										   const char *cmd);
 static backslashResult exec_command_sf_sv(PsqlScanState scan_state, bool active_branch,
 										  const char *cmd, bool is_func);
+static backslashResult exec_command_startpipeline(PsqlScanState scan_state, bool active_branch);
+static backslashResult exec_command_syncpipeline(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_t(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_T(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_timing(PsqlScanState scan_state, bool active_branch);
@@ -353,6 +356,8 @@ exec_command(const char *cmd,
 		status = exec_command_else(scan_state, cstack, query_buf);
 	else if (strcmp(cmd, "endif") == 0)
 		status = exec_command_endif(scan_state, cstack, query_buf);
+	else if (strcmp(cmd, "endpipeline") == 0)
+		status = exec_command_endpipeline(scan_state, active_branch);
 	else if (strcmp(cmd, "encoding") == 0)
 		status = exec_command_encoding(scan_state, active_branch);
 	else if (strcmp(cmd, "errverbose") == 0)
@@ -410,6 +415,10 @@ exec_command(const char *cmd,
 		status = exec_command_sf_sv(scan_state, active_branch, cmd, true);
 	else if (strcmp(cmd, "sv") == 0 || strcmp(cmd, "sv+") == 0)
 		status = exec_command_sf_sv(scan_state, active_branch, cmd, false);
+	else if (strcmp(cmd, "startpipeline") == 0)
+		status = exec_command_startpipeline(scan_state, active_branch);
+	else if (strcmp(cmd, "syncpipeline") == 0)
+		status = exec_command_syncpipeline(scan_state, active_branch);
 	else if (strcmp(cmd, "t") == 0)
 		status = exec_command_t(scan_state, active_branch);
 	else if (strcmp(cmd, "T") == 0)
@@ -1537,6 +1546,12 @@ exec_command_g(PsqlScanState scan_state, bool active_branch, const char *cmd)
 		}
 		if (strcmp(cmd, "gx") == 0)
 		{
+			if (PQpipelineStatus(pset.db) == PQ_PIPELINE_ON)
+			{
+				pg_log_error("\\gx not allowed in pipeline mode");
+				free(fname);
+				return PSQL_CMD_ERROR;
+			}
 			/* save settings if not done already, then force expanded=on */
 			if (pset.gsavepopt == NULL)
 				pset.gsavepopt = savePsetInfo(&pset.popt);
@@ -1691,6 +1706,11 @@ exec_command_gexec(PsqlScanState scan_state, bool active_branch)
 
 	if (active_branch)
 	{
+		if (PQpipelineStatus(pset.db) == PQ_PIPELINE_ON)
+		{
+			pg_log_error("\\gexec not allowed in pipeline mode");
+			return PSQL_CMD_ERROR;
+		}
 		pset.gexec_flag = true;
 		status = PSQL_CMD_SEND;
 	}
@@ -1711,6 +1731,12 @@ exec_command_gset(PsqlScanState scan_state, bool active_branch)
 		char	   *prefix = psql_scan_slash_option(scan_state,
 													OT_NORMAL, NULL, false);
 
+		if (PQpipelineStatus(pset.db) == PQ_PIPELINE_ON)
+		{
+			pg_log_error("\\gset not allowed in pipeline mode");
+			return PSQL_CMD_ERROR;
+		}
+
 		if (prefix)
 			pset.gset_prefix = prefix;
 		else
@@ -2674,6 +2700,54 @@ exec_command_sf_sv(PsqlScanState scan_state, bool active_branch,
 	return status;
 }
 
+/*
+ * \startpipeline -- enter pipeline mode
+ */
+static backslashResult
+exec_command_startpipeline(PsqlScanState scan_state, bool active_branch)
+{
+	if (active_branch)
+	{
+		pset.send_mode = PSQL_START_PIPELINE_MODE;
+	}
+	else
+		ignore_slash_options(scan_state);
+
+	return PSQL_CMD_SEND;
+}
+
+/*
+ * \syncpipeline -- send a sync message to an active pipeline
+ */
+static backslashResult
+exec_command_syncpipeline(PsqlScanState scan_state, bool active_branch)
+{
+	if (active_branch)
+	{
+		pset.send_mode = PSQL_SEND_PIPELINE_SYNC;
+	}
+	else
+		ignore_slash_options(scan_state);
+
+	return PSQL_CMD_SEND;
+}
+
+/*
+ * \endpipeline -- end pipeline mode
+ */
+static backslashResult
+exec_command_endpipeline(PsqlScanState scan_state, bool active_branch)
+{
+	if (active_branch)
+	{
+		pset.send_mode = PSQL_END_PIPELINE_MODE;
+	}
+	else
+		ignore_slash_options(scan_state);
+
+	return PSQL_CMD_SEND;
+}
+
 /*
  * \t -- turn off table headers and row count
  */
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index 8a9211db41a..c9d0db92e16 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -415,6 +415,8 @@ AcceptResult(const PGresult *result, bool show_error)
 			case PGRES_EMPTY_QUERY:
 			case PGRES_COPY_IN:
 			case PGRES_COPY_OUT:
+			case PGRES_PIPELINE_SYNC:
+			case PGRES_PIPELINE_ABORTED:
 				/* Fine, do nothing */
 				OK = true;
 				break;
@@ -1451,6 +1453,7 @@ ExecQueryAndProcessResults(const char *query,
 	bool		timing = pset.timing;
 	bool		success = false;
 	bool		return_early = false;
+	bool		process_pipeline = false;
 	instr_time	before,
 				after;
 	PGresult   *result;
@@ -1484,6 +1487,21 @@ ExecQueryAndProcessResults(const char *query,
 										  (const char *const *) pset.bind_params,
 										  NULL, NULL, 0);
 			break;
+		case PSQL_START_PIPELINE_MODE:
+			success = PQenterPipelineMode(pset.db);
+			break;
+		case PSQL_END_PIPELINE_MODE:
+			success = PQpipelineSync(pset.db);
+			/* End of the pipeline, all queued commands need to be processed */
+			process_pipeline = true;
+			if (success)
+				pset.num_syncs++;
+			break;
+		case PSQL_SEND_PIPELINE_SYNC:
+			success = PQsendPipelineSync(pset.db);
+			if (success)
+				pset.num_syncs++;
+			break;
 		case PSQL_SEND_QUERY:
 			success = PQsendQuery(pset.db, query);
 			break;
@@ -1501,6 +1519,12 @@ ExecQueryAndProcessResults(const char *query,
 		return -1;
 	}
 
+	if (!process_pipeline && PQpipelineStatus(pset.db) == PQ_PIPELINE_ON)
+	{
+		/* We're in a pipeline and haven't received a pipeline end yet, exit */
+		return 0;
+	}
+
 	/*
 	 * Fetch the result in chunks if FETCH_COUNT is set, except when:
 	 *
@@ -1585,6 +1609,17 @@ ExecQueryAndProcessResults(const char *query,
 				 * ignore manually.
 				 */
 				result = NULL;
+			else if (process_pipeline)
+			{
+				/*
+				 * In pipeline mode, a NULL result is returned to notify the
+				 * next query is being processed. We need to consume it and
+				 * get the next result.
+				 */
+				result = PQgetResult(pset.db);
+				Assert(result == NULL);
+				result = PQgetResult(pset.db);
+			}
 			else
 				result = PQgetResult(pset.db);
 
@@ -1771,12 +1806,32 @@ ExecQueryAndProcessResults(const char *query,
 			}
 		}
 
+		if (result_status == PGRES_PIPELINE_SYNC)
+		{
+			/* We have a sync response, decrease the sync counter */
+			pset.num_syncs--;
+			/* If all syncs were processed, exit pipeline mode */
+			if (pset.num_syncs <= 0)
+				success &= PQexitPipelineMode(pset.db);
+		}
+
 		/*
 		 * Check PQgetResult() again.  In the typical case of a single-command
 		 * string, it will return NULL.  Otherwise, we'll have other results
 		 * to process.  We need to do that to check whether this is the last.
 		 */
 		next_result = PQgetResult(pset.db);
+		if (process_pipeline && result_status != PGRES_PIPELINE_SYNC)
+		{
+			/*
+			 * In pipeline mode, a NULL result indicates the end of the
+			 * current query being processed. We need to call PQgetResult a
+			 * second time to move to the next response.
+			 */
+			Assert(next_result == NULL);
+			next_result = PQgetResult(pset.db);
+		}
+
 		last = (next_result == NULL);
 
 		/*
@@ -1798,8 +1853,12 @@ ExecQueryAndProcessResults(const char *query,
 			*elapsed_msec = INSTR_TIME_GET_MILLISEC(after);
 		}
 
-		/* this may or may not print something depending on settings */
-		if (result != NULL)
+		/*
+		 * This may or may not print something depending on settings. A
+		 * pipeline sync will have a non null result but doesn't have anything
+		 * to print, thus we ignore them
+		 */
+		if (result != NULL && result_status != PGRES_PIPELINE_SYNC)
 		{
 			/*
 			 * If results need to be printed into the file specified by \g,
@@ -1837,6 +1896,9 @@ ExecQueryAndProcessResults(const char *query,
 	/* close \g file if we opened it */
 	CloseGOutput(gfile_fout, gfile_is_pipe);
 
+	/* After query process, pipeline numsyncs should be 0 */
+	Assert(pset.num_syncs == 0);
+
 	/* may need this to recover from conn loss during COPY */
 	if (!CheckConnection())
 		return -1;
@@ -2296,6 +2358,9 @@ clean_extended_state(void)
 			free(pset.stmtName);
 			pset.bind_params = NULL;
 			break;
+		case PSQL_START_PIPELINE_MODE:	/* \startpipeline */
+		case PSQL_END_PIPELINE_MODE:	/* \endpipeline */
+		case PSQL_SEND_PIPELINE_SYNC:	/* \syncpipeline */
 		case PSQL_SEND_QUERY:
 			break;
 	}
diff --git a/src/bin/psql/help.c b/src/bin/psql/help.c
index 3f4afc2d141..e550a29297c 100644
--- a/src/bin/psql/help.c
+++ b/src/bin/psql/help.c
@@ -167,6 +167,7 @@ slashUsage(unsigned short int pager)
 	HELP0("  \\close STMT_NAME       close an existing prepared statement\n");
 	HELP0("  \\copyright             show PostgreSQL usage and distribution terms\n");
 	HELP0("  \\crosstabview [COLUMNS] execute query and display result in crosstab\n");
+	HELP0("  \\endpipeline           exit pipeline mode\n");
 	HELP0("  \\errverbose            show most recent error message at maximum verbosity\n");
 	HELP0("  \\g [(OPTIONS)] [FILE]  execute query (and send result to file or |pipe);\n"
 		  "                         \\g with no arguments is equivalent to a semicolon\n");
@@ -176,6 +177,8 @@ slashUsage(unsigned short int pager)
 	HELP0("  \\gx [(OPTIONS)] [FILE] as \\g, but forces expanded output mode\n");
 	HELP0("  \\parse STMT_NAME       create a prepared statement\n");
 	HELP0("  \\q                     quit psql\n");
+	HELP0("  \\startpipeline         enter pipeline mode\n");
+	HELP0("  \\syncpipeline          add a synchronisation point to an ongoing pipeline\n");
 	HELP0("  \\watch [[i=]SEC] [c=N] [m=MIN]\n"
 		  "                         execute query every SEC seconds, up to N times,\n"
 		  "                         stop if less than MIN rows are returned\n");
diff --git a/src/bin/psql/settings.h b/src/bin/psql/settings.h
index a22de8ef78e..55247c4dc33 100644
--- a/src/bin/psql/settings.h
+++ b/src/bin/psql/settings.h
@@ -69,6 +69,9 @@ typedef enum
 	PSQL_SEND_EXTENDED_PARSE,
 	PSQL_SEND_EXTENDED_QUERY_PARAMS,
 	PSQL_SEND_EXTENDED_QUERY_PREPARED,
+	PSQL_SEND_PIPELINE_SYNC,
+	PSQL_START_PIPELINE_MODE,
+	PSQL_END_PIPELINE_MODE,
 } PSQL_SEND_MODE;
 
 typedef enum
@@ -108,6 +111,7 @@ typedef struct _psqlSettings
 	PSQL_SEND_MODE send_mode;	/* one-shot request to send query with normal
 								 * or extended query protocol */
 	int			bind_nparams;	/* number of parameters */
+	int			num_syncs;		/* number of ongoing syncs */
 	char	  **bind_params;	/* parameters for extended query protocol call */
 	char	   *stmtName;		/* prepared statement name used for extended
 								 * query protocol commands */
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index fad2277991d..742d2627afc 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -1867,7 +1867,7 @@ psql_completion(const char *text, int start, int end)
 		"\\drds", "\\drg", "\\dRs", "\\dRp", "\\ds",
 		"\\dt", "\\dT", "\\dv", "\\du", "\\dx", "\\dX", "\\dy",
 		"\\echo", "\\edit", "\\ef", "\\elif", "\\else", "\\encoding",
-		"\\endif", "\\errverbose", "\\ev",
+		"\\endif", "\\endpipeline", "\\errverbose", "\\ev",
 		"\\f",
 		"\\g", "\\gdesc", "\\getenv", "\\gexec", "\\gset", "\\gx",
 		"\\help", "\\html",
@@ -1877,7 +1877,7 @@ psql_completion(const char *text, int start, int end)
 		"\\parse", "\\password", "\\print", "\\prompt", "\\pset",
 		"\\qecho", "\\quit",
 		"\\reset",
-		"\\s", "\\set", "\\setenv", "\\sf", "\\sv",
+		"\\s", "\\set", "\\setenv", "\\sf", "\\startpipeline", "\\sv", "\\syncpipeline",
 		"\\t", "\\T", "\\timing",
 		"\\unset",
 		"\\x",
diff --git a/src/test/regress/expected/psql.out b/src/test/regress/expected/psql.out
index 3c51af09086..8bd9b26a257 100644
--- a/src/test/regress/expected/psql.out
+++ b/src/test/regress/expected/psql.out
@@ -6835,3 +6835,269 @@ CREATE TABLE defprivs (a int);
 
 \pset null ''
 DROP TABLE defprivs;
+-- pipelining
+CREATE TABLE psql_pipeline(a INTEGER, s TEXT);
+-- single query
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\endpipeline
+ ?column? 
+----------
+ val1
+(1 row)
+
+-- multiple queries
+\startpipeline
+SELECT $1 \bind 'val1' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\endpipeline
+ ?column? 
+----------
+ val1
+(1 row)
+
+ ?column? | ?column? 
+----------+----------
+ val2     | val3
+(1 row)
+
+ ?column? | ?column? 
+----------+----------
+ val2     | val3
+(1 row)
+
+-- send multiple syncs
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\syncpipeline
+\syncpipeline
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\syncpipeline
+SELECT $1, $2 \bind 'val4' 'val5' \g
+\endpipeline
+ ?column? 
+----------
+ val1
+(1 row)
+
+ ?column? | ?column? 
+----------+----------
+ val2     | val3
+(1 row)
+
+ ?column? | ?column? 
+----------+----------
+ val4     | val5
+(1 row)
+
+-- startpipeline shouldn't have any effect if already in a pipeline
+\startpipeline
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\endpipeline
+ ?column? 
+----------
+ val1
+(1 row)
+
+-- COPY FROM STDIN
+\startpipeline
+SELECT $1 \bind 'val1' \g
+COPY psql_pipeline FROM STDIN \bind \g
+\endpipeline
+ ?column? 
+----------
+ val1
+(1 row)
+
+-- COPY TO STDOUT
+\startpipeline
+SELECT $1 \bind 'val1' \g
+copy psql_pipeline TO STDOUT \bind \g
+SELECT $1 \bind 'val2' \g
+\endpipeline
+ ?column? 
+----------
+ val1
+(1 row)
+
+1	test
+2	test2
+ ?column? 
+----------
+ val2
+(1 row)
+
+-- Use \parse and \bind_named
+\startpipeline
+SELECT $1 \parse ''
+SELECT $1, $2 \parse ''
+SELECT $2 \parse pipeline_1
+\bind_named '' 1 2 \g
+\bind_named pipeline_1 2 \g
+\endpipeline
+ERROR:  could not determine data type of parameter $1
+-- pipelining errors
+-- endpipeline outside of pipeline should fail
+\endpipeline
+cannot send pipeline when not in pipeline mode
+-- Query using simple protocol should not be sent and should leave the pipeline usable
+\startpipeline
+SELECT 1;
+PQsendQuery not allowed in pipeline mode
+SELECT $1 \bind 'val1' \g
+\endpipeline
+ ?column? 
+----------
+ val1
+(1 row)
+
+-- Incorrect number of parameters, the pipeline will be aborted and following queries won't be executed
+\startpipeline
+SELECT \bind 'val1' \g
+SELECT $1 \bind 'val1' \g
+\endpipeline
+ERROR:  bind message supplies 1 parameters, but prepared statement "" requires 0
+-- watch sends query as a simple query which won't be allowed within a pipeline
+\startpipeline
+SELECT \bind \g
+\watch 1
+PQsendQuery not allowed in pipeline mode
+
+\endpipeline
+--
+(1 row)
+
+-- \gdesc should fail as synchronous commands are not allowed in pipeline, pipeline should still be usable
+\startpipeline
+SELECT $1 \bind 1 \gdesc
+synchronous command execution functions are not allowed in pipeline mode
+SELECT $1 \bind 1 \g
+\endpipeline
+ ?column? 
+----------
+ 1
+(1 row)
+
+-- \gset is not allowed, pipeline should still be usable
+\startpipeline
+SELECT $1 as i, $2 as j \parse ''
+SELECT $1 as k, $2 as l \parse 'second'
+\bind_named '' 1 2 \gset
+\gset not allowed in pipeline mode
+\bind_named second 1 2 \gset pref02_ \echo :pref02_i :pref02_j
+\gset not allowed in pipeline mode
+\bind_named '' 1 2 \g
+\endpipeline
+ i | j 
+---+---
+ 1 | 2
+(1 row)
+
+-- \gx is not allowed, pipeline should still be usable
+\startpipeline
+SELECT $1 \bind 1 \gx
+\gx not allowed in pipeline mode
+SELECT $1 \bind 1 \g
+\endpipeline
+ ?column? 
+----------
+ 1
+(1 row)
+
+-- \gexec is not allowed, pipeline should still be usable
+\startpipeline
+SELECT 'INSERT INTO psql_pipeline(a) SELECT generate_series(1, 10)' \parse 'insert_stmt'
+\bind_named insert_stmt \gexec
+\gexec not allowed in pipeline mode
+\bind_named insert_stmt \g
+SELECT COUNT(*) FROM psql_pipeline \bind \g
+\endpipeline
+                          ?column?                          
+------------------------------------------------------------
+ INSERT INTO psql_pipeline(a) SELECT generate_series(1, 10)
+(1 row)
+
+ count 
+-------
+     2
+(1 row)
+
+-- pipelining and transaction block behaviour
+-- set local will issue a warning when modifying a GUC outside of a transaction block
+-- The change will still be valid as a pipeline runs within an implicit transaction block
+-- Sending a sync will commit the implicit transaction block.
+\startpipeline
+SET LOCAL statement_timeout='1h' \bind \g
+SHOW statement_timeout \bind \g
+\syncpipeline
+SHOW statement_timeout \bind \g
+SET LOCAL statement_timeout='2h' \bind \g
+SHOW statement_timeout \bind \g
+\endpipeline
+WARNING:  SET LOCAL can only be used in transaction blocks
+ statement_timeout 
+-------------------
+ 1h
+(1 row)
+
+WARNING:  SET LOCAL can only be used in transaction blocks
+ statement_timeout 
+-------------------
+ 0
+(1 row)
+
+ statement_timeout 
+-------------------
+ 2h
+(1 row)
+
+-- Reindex concurrently is forbidden in the middle of a pipeline
+\startpipeline
+SELECT $1 \bind 1 \g
+REINDEX TABLE CONCURRENTLY psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+ ?column? 
+----------
+ 1
+(1 row)
+
+ERROR:  REINDEX CONCURRENTLY cannot be executed within a pipeline
+-- Reindex concurrently will work if it's the first command of a pipeline
+\startpipeline
+REINDEX TABLE CONCURRENTLY psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+ ?column? 
+----------
+ 2
+(1 row)
+
+-- subtransactions are not allowed in pipeline mode
+\startpipeline
+SAVEPOINT a \bind \g
+SELECT $1 \bind 1 \g
+ROLLBACK TO SAVEPOINT a \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+ERROR:  SAVEPOINT can only be used in transaction blocks
+-- Lock command will fail as pipeline is not seen as a transaction block
+\startpipeline
+LOCK psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+ERROR:  LOCK TABLE can only be used in transaction blocks
+-- Lock command will fail in the middle of the pipeline as it's not seen as a transaction block
+\startpipeline
+SELECT $1 \bind 1 \g
+LOCK psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+ ?column? 
+----------
+ 1
+(1 row)
+
+ERROR:  LOCK TABLE can only be used in transaction blocks
diff --git a/src/test/regress/sql/psql.sql b/src/test/regress/sql/psql.sql
index 13342958f81..ba613a6d435 100644
--- a/src/test/regress/sql/psql.sql
+++ b/src/test/regress/sql/psql.sql
@@ -1933,3 +1933,159 @@ CREATE TABLE defprivs (a int);
 \z defprivs
 \pset null ''
 DROP TABLE defprivs;
+
+-- pipelining
+CREATE TABLE psql_pipeline(a INTEGER, s TEXT);
+
+-- single query
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\endpipeline
+
+-- multiple queries
+\startpipeline
+SELECT $1 \bind 'val1' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\endpipeline
+
+-- send multiple syncs
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\syncpipeline
+\syncpipeline
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\syncpipeline
+SELECT $1, $2 \bind 'val4' 'val5' \g
+\endpipeline
+
+-- startpipeline shouldn't have any effect if already in a pipeline
+\startpipeline
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\endpipeline
+
+-- COPY FROM STDIN
+\startpipeline
+SELECT $1 \bind 'val1' \g
+COPY psql_pipeline FROM STDIN \bind \g
+\endpipeline
+1	test
+2	test2
+\.
+
+-- COPY TO STDOUT
+\startpipeline
+SELECT $1 \bind 'val1' \g
+copy psql_pipeline TO STDOUT \bind \g
+SELECT $1 \bind 'val2' \g
+\endpipeline
+
+-- Use \parse and \bind_named
+\startpipeline
+SELECT $1 \parse ''
+SELECT $1, $2 \parse ''
+SELECT $2 \parse pipeline_1
+\bind_named '' 1 2 \g
+\bind_named pipeline_1 2 \g
+\endpipeline
+
+-- pipelining errors
+
+-- endpipeline outside of pipeline should fail
+\endpipeline
+
+-- Query using simple protocol should not be sent and should leave the pipeline usable
+\startpipeline
+SELECT 1;
+SELECT $1 \bind 'val1' \g
+\endpipeline
+
+-- Incorrect number of parameters, the pipeline will be aborted and following queries won't be executed
+\startpipeline
+SELECT \bind 'val1' \g
+SELECT $1 \bind 'val1' \g
+\endpipeline
+
+-- watch sends query as a simple query which won't be allowed within a pipeline
+\startpipeline
+SELECT \bind \g
+\watch 1
+\endpipeline
+
+-- \gdesc should fail as synchronous commands are not allowed in pipeline, pipeline should still be usable
+\startpipeline
+SELECT $1 \bind 1 \gdesc
+SELECT $1 \bind 1 \g
+\endpipeline
+
+-- \gset is not allowed, pipeline should still be usable
+\startpipeline
+SELECT $1 as i, $2 as j \parse ''
+SELECT $1 as k, $2 as l \parse 'second'
+\bind_named '' 1 2 \gset
+\bind_named second 1 2 \gset pref02_ \echo :pref02_i :pref02_j
+\bind_named '' 1 2 \g
+\endpipeline
+
+-- \gx is not allowed, pipeline should still be usable
+\startpipeline
+SELECT $1 \bind 1 \gx
+SELECT $1 \bind 1 \g
+\endpipeline
+
+-- \gexec is not allowed, pipeline should still be usable
+\startpipeline
+SELECT 'INSERT INTO psql_pipeline(a) SELECT generate_series(1, 10)' \parse 'insert_stmt'
+\bind_named insert_stmt \gexec
+\bind_named insert_stmt \g
+SELECT COUNT(*) FROM psql_pipeline \bind \g
+\endpipeline
+
+-- pipelining and transaction block behaviour
+
+-- set local will issue a warning when modifying a GUC outside of a transaction block
+-- The change will still be valid as a pipeline runs within an implicit transaction block
+-- Sending a sync will commit the implicit transaction block.
+\startpipeline
+SET LOCAL statement_timeout='1h' \bind \g
+SHOW statement_timeout \bind \g
+\syncpipeline
+SHOW statement_timeout \bind \g
+SET LOCAL statement_timeout='2h' \bind \g
+SHOW statement_timeout \bind \g
+\endpipeline
+
+-- Reindex concurrently is forbidden in the middle of a pipeline
+\startpipeline
+SELECT $1 \bind 1 \g
+REINDEX TABLE CONCURRENTLY psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+
+-- Reindex concurrently will work if it's the first command of a pipeline
+\startpipeline
+REINDEX TABLE CONCURRENTLY psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+
+-- subtransactions are not allowed in pipeline mode
+\startpipeline
+SAVEPOINT a \bind \g
+SELECT $1 \bind 1 \g
+ROLLBACK TO SAVEPOINT a \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+
+-- Lock command will fail as pipeline is not seen as a transaction block
+\startpipeline
+LOCK psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+
+-- Lock command will fail in the middle of the pipeline as it's not seen as a transaction block
+\startpipeline
+SELECT $1 \bind 1 \g
+LOCK psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
-- 
2.39.5 (Apple Git-154)

