From ec2dfa63e3be485d6700c56ff4fe0a64d47181ea Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Tue, 5 Nov 2024 10:26:54 +0100
Subject: Add pipelining support in psql

With \bind, \parse, \bind_named and \close, it is possible to issue
queries from psql using the extended protocol. However, it wasn't
possible to send those queries using pipelining and the only way to test
pipelined queries was through pgbench's tap tests.

This patch adds additional psql meta-commands to support pipelining:
\startpipeline, \endpipeline and \syncpipeline, mirroring the existing
meta-commands in pgbench. Additional meta-commands allow to flush and
read results of an ongoing pipeline: \flushrequest, \flush and \getresults

\startpipeline starts a new pipeline. All extended queries will be
queued until the end of the pipeline is reached.
\endpipeline ends an ongoing pipeline. All queued commands will be sent
to the server and all responses will be processed by the psql.
\syncpipeline queues a synchronisation point without flushing the
commands to the server
\flush Call PQflush on psql's connection
\flushrequest queues a flushrequest
\getresults reads server's results. Unsent data are automatically pushed
when \getresults is called

Those meta-commands will allow to test pipeline behaviour using
psql regression tests.
---
 doc/src/sgml/ref/psql-ref.sgml     |  96 +++++
 src/bin/psql/command.c             | 151 +++++++
 src/bin/psql/common.c              | 274 +++++++++++-
 src/bin/psql/help.c                |   7 +
 src/bin/psql/prompt.c              |  12 +
 src/bin/psql/settings.h            |  16 +-
 src/bin/psql/tab-complete.in.c     |   8 +-
 src/test/regress/expected/psql.out | 668 +++++++++++++++++++++++++++++
 src/test/regress/sql/psql.sql      | 399 +++++++++++++++++
 9 files changed, 1619 insertions(+), 12 deletions(-)

diff --git a/doc/src/sgml/ref/psql-ref.sgml b/doc/src/sgml/ref/psql-ref.sgml
index 72f3347e53d..3bc77d2bb0e 100644
--- a/doc/src/sgml/ref/psql-ref.sgml
+++ b/doc/src/sgml/ref/psql-ref.sgml
@@ -3562,6 +3562,82 @@ testdb=&gt; <userinput>\setenv LESS -imx4F</userinput>
         </listitem>
       </varlistentry>
 
+     <varlistentry id="app-psql-meta-command-pipeline">
+      <term><literal>\startpipeline</literal></term>
+      <term><literal>\syncpipeline</literal></term>
+      <term><literal>\endpipeline</literal></term>
+      <term><literal>\flushrequest</literal></term>
+      <term><literal>\flush</literal></term>
+      <term><literal>\getresults [ <replaceable class="parameter">number_results</replaceable> ]</literal></term>
+
+      <listitem>
+        <para>
+          This group of commands implements pipelining of SQL statements.
+          A pipeline must begin with a <command>\startpipeline</command>
+          and end with an <command>\endpipeline</command>. In between there
+          may be any number of <command>\syncpipeline</command> commands,
+          which sends a <link linkend="protocol-flow-ext-query">sync message</link>
+          without ending the ongoing pipeline and flushing the send buffer.
+          In pipeline mode, statements are sent to the server without waiting
+          for the results of previous statements.  See
+          <xref linkend="libpq-pipeline-mode"/> for more details.
+       </para>
+
+        <para>
+          Pipeline mode requires the use of extended query protocol. All queries need
+          to be sent using the meta-commands <literal>\bind</literal>,
+          <literal>\bind_named</literal>, <literal>\close</literal> or
+          <literal>\parse</literal>. While a pipeline is ongoing,
+          <literal>\g</literal> will append the current query buffer to the pipeline and
+          other meta-commands like <literal>\gx</literal> or <literal>\gdesc</literal>
+          are not allowed in pipeline mode.
+       </para>
+
+        <para>
+          <command>\flushrequest</command> appends a flush command to the pipeline,
+          allowing to read results with <command>\getresults</command> without issuing
+          a sync or ending the pipeline. <command>\getresults</command> will automatically
+          push unsent data to the server. <command>\flush</command> can be used to manually
+          push unsent data.
+       </para>
+
+        <para>
+          <command>\getresults</command> accepts an optional
+          <replaceable class="parameter">number_results</replaceable> parameter. If provided,
+          only the first <replaceable class="parameter">number_results</replaceable> pending
+          results will be read. If not provided or 0, all pending results are read. The
+          commands <literal>\bind</literal>, <literal>\bind_named</literal>,
+          <literal>\close</literal>, <literal>\parse</literal> and <literal>\syncpipeline</literal>
+          generate one result to get.
+       </para>
+
+        <para>
+          When pipeline mode is active, a dedicated prompt variable is
+          available to report the pipeline status. See
+          <xref linkend="app-psql-prompting-p-uc"/> for more details
+       </para>
+
+       <para>
+        Example:
+<programlisting>
+\startpipeline
+SELECT 1 \bind \g
+SELECT $1 \parse stmt1
+\bind_named stmt1 1 \g
+SELECT pg_current_xact_id() \bind \g
+\flushrequest
+\getresults
+\syncpipeline
+SELECT pg_current_xact_id() \bind \g
+\flushrequest
+\getresults 2
+\close stmt1
+\endpipeline
+</programlisting></para>
+
+      </listitem>
+     </varlistentry>
+
 
       <varlistentry id="app-psql-meta-command-t-lc">
         <term><literal>\t</literal></term>
@@ -4719,6 +4795,26 @@ testdb=&gt; <userinput>INSERT INTO my_table VALUES (:'content');</userinput>
         </listitem>
       </varlistentry>
 
+      <varlistentry id="app-psql-prompting-p-uc">
+        <term><literal>%P</literal></term>
+        <listitem>
+        <para>
+        Pipeline status: an empty string when not in a pipeline,
+        or <literal>|piped_syncs,piped_commands,pending_results|</literal>
+        when in an ongoing pipeline, or <literal>+piped_syncs,piped_commands,pending_results+</literal>
+        when in an aborted pipeline. <literal>piped_syncs</literal> is the
+        number of syncs queued in the pipeline. <literal>piped_commands</literal>
+        is the number of commands generated by <literal>\bind</literal>,
+        <literal>\bind_named</literal>, <literal>\close</literal> or
+        <literal>\parse</literal> queued in the pipeline. <literal>pending_results</literal>
+        is the number of commands that were followed by either a
+        <command>\flushrequest</command> or a <command>\syncpipeline</command>,
+        forcing the server to send the results that can be retrieved with
+        <command>\getresults</command>.
+        </para>
+        </listitem>
+      </varlistentry>
+
       <varlistentry id="app-psql-prompting-r">
         <term><literal>%R</literal></term>
         <listitem>
diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c
index 5dd4c2d2687..47682353fc6 100644
--- a/src/bin/psql/command.c
+++ b/src/bin/psql/command.c
@@ -90,9 +90,12 @@ static backslashResult exec_command_else(PsqlScanState scan_state, ConditionalSt
 										 PQExpBuffer query_buf);
 static backslashResult exec_command_endif(PsqlScanState scan_state, ConditionalStack cstack,
 										  PQExpBuffer query_buf);
+static backslashResult exec_command_endpipeline(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_encoding(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_errverbose(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_f(PsqlScanState scan_state, bool active_branch);
+static backslashResult exec_command_flush(PsqlScanState scan_state, bool active_branch);
+static backslashResult exec_command_flushrequest(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_g(PsqlScanState scan_state, bool active_branch,
 									  const char *cmd);
 static backslashResult process_command_g_options(char *first_option,
@@ -103,6 +106,7 @@ static backslashResult exec_command_gdesc(PsqlScanState scan_state, bool active_
 static backslashResult exec_command_getenv(PsqlScanState scan_state, bool active_branch,
 										   const char *cmd);
 static backslashResult exec_command_gexec(PsqlScanState scan_state, bool active_branch);
+static backslashResult exec_command_getresults(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_gset(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_help(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_html(PsqlScanState scan_state, bool active_branch);
@@ -132,6 +136,8 @@ static backslashResult exec_command_setenv(PsqlScanState scan_state, bool active
 										   const char *cmd);
 static backslashResult exec_command_sf_sv(PsqlScanState scan_state, bool active_branch,
 										  const char *cmd, bool is_func);
+static backslashResult exec_command_startpipeline(PsqlScanState scan_state, bool active_branch);
+static backslashResult exec_command_syncpipeline(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_t(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_T(PsqlScanState scan_state, bool active_branch);
 static backslashResult exec_command_timing(PsqlScanState scan_state, bool active_branch);
@@ -351,18 +357,26 @@ exec_command(const char *cmd,
 		status = exec_command_else(scan_state, cstack, query_buf);
 	else if (strcmp(cmd, "endif") == 0)
 		status = exec_command_endif(scan_state, cstack, query_buf);
+	else if (strcmp(cmd, "endpipeline") == 0)
+		status = exec_command_endpipeline(scan_state, active_branch);
 	else if (strcmp(cmd, "encoding") == 0)
 		status = exec_command_encoding(scan_state, active_branch);
 	else if (strcmp(cmd, "errverbose") == 0)
 		status = exec_command_errverbose(scan_state, active_branch);
 	else if (strcmp(cmd, "f") == 0)
 		status = exec_command_f(scan_state, active_branch);
+	else if (strcmp(cmd, "flush") == 0)
+		status = exec_command_flush(scan_state, active_branch);
+	else if (strcmp(cmd, "flushrequest") == 0)
+		status = exec_command_flushrequest(scan_state, active_branch);
 	else if (strcmp(cmd, "g") == 0 || strcmp(cmd, "gx") == 0)
 		status = exec_command_g(scan_state, active_branch, cmd);
 	else if (strcmp(cmd, "gdesc") == 0)
 		status = exec_command_gdesc(scan_state, active_branch);
 	else if (strcmp(cmd, "getenv") == 0)
 		status = exec_command_getenv(scan_state, active_branch, cmd);
+	else if (strcmp(cmd, "getresults") == 0)
+		status = exec_command_getresults(scan_state, active_branch);
 	else if (strcmp(cmd, "gexec") == 0)
 		status = exec_command_gexec(scan_state, active_branch);
 	else if (strcmp(cmd, "gset") == 0)
@@ -408,6 +422,10 @@ exec_command(const char *cmd,
 		status = exec_command_sf_sv(scan_state, active_branch, cmd, true);
 	else if (strcmp(cmd, "sv") == 0 || strcmp(cmd, "sv+") == 0)
 		status = exec_command_sf_sv(scan_state, active_branch, cmd, false);
+	else if (strcmp(cmd, "startpipeline") == 0)
+		status = exec_command_startpipeline(scan_state, active_branch);
+	else if (strcmp(cmd, "syncpipeline") == 0)
+		status = exec_command_syncpipeline(scan_state, active_branch);
 	else if (strcmp(cmd, "t") == 0)
 		status = exec_command_t(scan_state, active_branch);
 	else if (strcmp(cmd, "T") == 0)
@@ -1491,6 +1509,38 @@ exec_command_f(PsqlScanState scan_state, bool active_branch)
 	return success ? PSQL_CMD_SKIP_LINE : PSQL_CMD_ERROR;
 }
 
+/*
+ * \flush -- call PQflush on the connection
+ */
+static backslashResult
+exec_command_flush(PsqlScanState scan_state, bool active_branch)
+{
+	if (active_branch)
+	{
+		pset.send_mode = PSQL_FLUSH;
+	}
+	else
+		ignore_slash_options(scan_state);
+
+	return PSQL_CMD_SEND;
+}
+
+/*
+ * \flushrequest -- send a flush request to the server
+ */
+static backslashResult
+exec_command_flushrequest(PsqlScanState scan_state, bool active_branch)
+{
+	if (active_branch)
+	{
+		pset.send_mode = PSQL_SEND_FLUSH_REQUEST;
+	}
+	else
+		ignore_slash_options(scan_state);
+
+	return PSQL_CMD_SEND;
+}
+
 /*
  * \g  [(pset-option[=pset-value] ...)] [filename/shell-command]
  * \gx [(pset-option[=pset-value] ...)] [filename/shell-command]
@@ -1526,6 +1576,13 @@ exec_command_g(PsqlScanState scan_state, bool active_branch, const char *cmd)
 
 	if (status == PSQL_CMD_SKIP_LINE && active_branch)
 	{
+		if (strcmp(cmd, "gx") == 0 && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+		{
+			pg_log_error("\\gx not allowed in pipeline mode");
+			clean_extended_state();
+			return PSQL_CMD_ERROR;
+		}
+
 		if (!fname)
 			pset.gfname = NULL;
 		else
@@ -1679,6 +1736,39 @@ exec_command_getenv(PsqlScanState scan_state, bool active_branch,
 	return success ? PSQL_CMD_SKIP_LINE : PSQL_CMD_ERROR;
 }
 
+/*
+ * \getresults -- read results
+ */
+static backslashResult
+exec_command_getresults(PsqlScanState scan_state, bool active_branch)
+{
+	if (active_branch)
+	{
+		char	   *opt;
+		int			num_results;
+
+		pset.send_mode = PSQL_GET_RESULTS;
+		opt = psql_scan_slash_option(scan_state, OT_NORMAL, NULL, false);
+
+		pset.requested_results = 0;
+		if (opt != NULL)
+		{
+			num_results = atoi(opt);
+			if (num_results < 0)
+			{
+				pg_log_error("\\getresults: invalid number of requested results");
+				return PSQL_CMD_SKIP_LINE;
+			}
+			pset.requested_results = num_results;
+		}
+	}
+	else
+		ignore_slash_options(scan_state);
+
+	return PSQL_CMD_SEND;
+}
+
+
 /*
  * \gexec -- send query and execute each field of result
  */
@@ -1689,6 +1779,12 @@ exec_command_gexec(PsqlScanState scan_state, bool active_branch)
 
 	if (active_branch)
 	{
+		if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+		{
+			pg_log_error("\\gexec not allowed in pipeline mode");
+			clean_extended_state();
+			return PSQL_CMD_ERROR;
+		}
 		pset.gexec_flag = true;
 		status = PSQL_CMD_SEND;
 	}
@@ -1709,6 +1805,13 @@ exec_command_gset(PsqlScanState scan_state, bool active_branch)
 		char	   *prefix = psql_scan_slash_option(scan_state,
 													OT_NORMAL, NULL, false);
 
+		if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+		{
+			pg_log_error("\\gset not allowed in pipeline mode");
+			clean_extended_state();
+			return PSQL_CMD_ERROR;
+		}
+
 		if (prefix)
 			pset.gset_prefix = prefix;
 		else
@@ -2672,6 +2775,54 @@ exec_command_sf_sv(PsqlScanState scan_state, bool active_branch,
 	return status;
 }
 
+/*
+ * \startpipeline -- enter pipeline mode
+ */
+static backslashResult
+exec_command_startpipeline(PsqlScanState scan_state, bool active_branch)
+{
+	if (active_branch)
+	{
+		pset.send_mode = PSQL_START_PIPELINE_MODE;
+	}
+	else
+		ignore_slash_options(scan_state);
+
+	return PSQL_CMD_SEND;
+}
+
+/*
+ * \syncpipeline -- send a sync message to an active pipeline
+ */
+static backslashResult
+exec_command_syncpipeline(PsqlScanState scan_state, bool active_branch)
+{
+	if (active_branch)
+	{
+		pset.send_mode = PSQL_SEND_PIPELINE_SYNC;
+	}
+	else
+		ignore_slash_options(scan_state);
+
+	return PSQL_CMD_SEND;
+}
+
+/*
+ * \endpipeline -- end pipeline mode
+ */
+static backslashResult
+exec_command_endpipeline(PsqlScanState scan_state, bool active_branch)
+{
+	if (active_branch)
+	{
+		pset.send_mode = PSQL_END_PIPELINE_MODE;
+	}
+	else
+		ignore_slash_options(scan_state);
+
+	return PSQL_CMD_SEND;
+}
+
 /*
  * \t -- turn off table headers and row count
  */
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index f1a5291c13b..e1b58f08c59 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -121,6 +121,18 @@ CloseGOutput(FILE *gfile_fout, bool is_pipe)
 	}
 }
 
+/*
+ * Reset pset pipeline state
+ */
+static void
+pipelineReset(void)
+{
+	pset.piped_syncs = 0;
+	pset.piped_commands = 0;
+	pset.available_results = 0;
+	pset.requested_results = 0;
+}
+
 /*
  * setQFout
  * -- handler for -o command line option and \o command
@@ -354,6 +366,7 @@ CheckConnection(void)
 
 		fprintf(stderr, _("The connection to the server was lost. Attempting reset: "));
 		PQreset(pset.db);
+		pipelineReset();
 		OK = ConnectionUp();
 		if (!OK)
 		{
@@ -415,10 +428,12 @@ AcceptResult(const PGresult *result, bool show_error)
 			case PGRES_EMPTY_QUERY:
 			case PGRES_COPY_IN:
 			case PGRES_COPY_OUT:
+			case PGRES_PIPELINE_SYNC:
 				/* Fine, do nothing */
 				OK = true;
 				break;
 
+			case PGRES_PIPELINE_ABORTED:
 			case PGRES_BAD_RESPONSE:
 			case PGRES_NONFATAL_ERROR:
 			case PGRES_FATAL_ERROR:
@@ -1050,6 +1065,7 @@ PrintQueryResult(PGresult *result, bool last,
 			success = true;
 			break;
 
+		case PGRES_PIPELINE_ABORTED:
 		case PGRES_BAD_RESPONSE:
 		case PGRES_NONFATAL_ERROR:
 		case PGRES_FATAL_ERROR:
@@ -1418,6 +1434,60 @@ DescribeQuery(const char *query, double *elapsed_msec)
 	return OK;
 }
 
+/*
+ * Read and discard all results in an aborted pipeline.
+ *
+ * If a synchronisation point is found, we can stop discarding results as
+ * pipeline will switch back to an OK state. If no synchronisation point
+ * is available, we need to stop when there's no more pending results,
+ * otherwise, calling PQgetResults will block.
+ */
+static PGresult *
+discardAbortedPipelineResults(void)
+{
+	for (;;)
+	{
+		PGresult   *res = PQgetResult(pset.db);
+		ExecStatusType result_status = PQresultStatus(res);
+
+		if (result_status == PGRES_PIPELINE_SYNC)
+		{
+			/*
+			 * Found a synchronisation point. Decrementing the sync counter
+			 * will be done by the caller
+			 */
+			return res;
+		}
+		else if (res == NULL)
+		{
+			/* A query was processed, decrement the counters */
+			Assert(pset.available_results > 0);
+			Assert(pset.requested_results > 0);
+			pset.available_results--;
+			pset.requested_results--;
+		}
+
+		if (pset.requested_results == 0)
+			/* We've read all requested results, exit */
+			return res;
+
+		if (pset.available_results == 0 && pset.piped_syncs == 0)
+
+			/*
+			 * There's no more results to get and there's no synchronisation
+			 * point to stop at. This will leave the pipeline in an aborted
+			 * state.
+			 */
+			return res;
+
+		/*
+		 * An aborted pipeline will have either NULL results or results in an
+		 * PGRES_PIPELINE_ABORTED status
+		 */
+		Assert(res == NULL || result_status == PGRES_PIPELINE_ABORTED);
+		PQclear(res);
+	}
+}
 
 /*
  * ExecQueryAndProcessResults: utility function for use by SendQuery()
@@ -1451,6 +1521,7 @@ ExecQueryAndProcessResults(const char *query,
 	bool		timing = pset.timing;
 	bool		success = false;
 	bool		return_early = false;
+	bool		end_pipeline = false;
 	instr_time	before,
 				after;
 	PGresult   *result;
@@ -1466,9 +1537,13 @@ ExecQueryAndProcessResults(const char *query,
 	{
 		case PSQL_SEND_EXTENDED_CLOSE:
 			success = PQsendClosePrepared(pset.db, pset.stmtName);
+			if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+				pset.piped_commands++;
 			break;
 		case PSQL_SEND_EXTENDED_PARSE:
 			success = PQsendPrepare(pset.db, pset.stmtName, query, 0, NULL);
+			if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+				pset.piped_commands++;
 			break;
 		case PSQL_SEND_EXTENDED_QUERY_PARAMS:
 			Assert(pset.stmtName == NULL);
@@ -1476,6 +1551,8 @@ ExecQueryAndProcessResults(const char *query,
 										pset.bind_nparams, NULL,
 										(const char *const *) pset.bind_params,
 										NULL, NULL, 0);
+			if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+				pset.piped_commands++;
 			break;
 		case PSQL_SEND_EXTENDED_QUERY_PREPARED:
 			Assert(pset.stmtName != NULL);
@@ -1483,6 +1560,83 @@ ExecQueryAndProcessResults(const char *query,
 										  pset.bind_nparams,
 										  (const char *const *) pset.bind_params,
 										  NULL, NULL, 0);
+			if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+				pset.piped_commands++;
+			break;
+		case PSQL_START_PIPELINE_MODE:
+			success = PQenterPipelineMode(pset.db);
+			break;
+		case PSQL_END_PIPELINE_MODE:
+			success = PQpipelineSync(pset.db);
+			if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+			{
+				/*
+				 * End of the pipeline, all queued commands need to be
+				 * processed
+				 */
+				end_pipeline = true;
+				pset.piped_syncs++;
+
+				/*
+				 * The server will send a ReadyForQuery after a Sync is
+				 * processed, flushing all results to the client
+				 */
+				pset.available_results += pset.piped_commands;
+				pset.piped_commands = 0;
+				/* We want to read all results */
+				pset.requested_results = pset.available_results + pset.piped_syncs;
+			}
+			break;
+		case PSQL_SEND_PIPELINE_SYNC:
+			success = PQsendPipelineSync(pset.db);
+			if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+			{
+				pset.piped_syncs++;
+
+				/*
+				 * The server will send a ReadyForQuery after a Sync is
+				 * processed, flushing all results to the client
+				 */
+				pset.available_results += pset.piped_commands;
+				pset.piped_commands = 0;
+			}
+			break;
+		case PSQL_FLUSH:
+			success = PQflush(pset.db);
+			break;
+		case PSQL_SEND_FLUSH_REQUEST:
+			success = PQsendFlushRequest(pset.db);
+			if (success && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+			{
+				/*
+				 * With the flush request, all piped commands are pushed and
+				 * the server will forcefully flush the results to the client,
+				 * making them available
+				 */
+				pset.available_results += pset.piped_commands;
+				pset.piped_commands = 0;
+			}
+			break;
+		case PSQL_GET_RESULTS:
+			if (pset.available_results == 0 && pset.piped_syncs == 0)
+			{
+				/*
+				 * If no sync or flush request were sent, PQgetResult will
+				 * block. Forbid the call to \getresults to avoid staying
+				 * stuck
+				 */
+				pg_log_info("No pending results to get");
+				success = false;
+				pset.requested_results = 0;
+			}
+			else
+			{
+				success = true;
+				/* Cap requested_results to the maximum known results */
+				if (pset.requested_results == 0 ||
+					pset.requested_results > (pset.available_results + pset.piped_syncs))
+					pset.requested_results = pset.available_results + pset.piped_syncs;
+			}
 			break;
 		case PSQL_SEND_QUERY:
 			success = PQsendQuery(pset.db, query);
@@ -1501,6 +1655,16 @@ ExecQueryAndProcessResults(const char *query,
 		return -1;
 	}
 
+	if (pset.requested_results == 0 && !end_pipeline &&
+		PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+	{
+		/*
+		 * We're in a pipeline and haven't reached the pipeline end or there
+		 * was no request to read pipeline results, exit.
+		 */
+		return 1;
+	}
+
 	/*
 	 * Fetch the result in chunks if FETCH_COUNT is set, except when:
 	 *
@@ -1548,7 +1712,7 @@ ExecQueryAndProcessResults(const char *query,
 	{
 		ExecStatusType result_status;
 		bool		is_chunked_result = false;
-		PGresult   *next_result;
+		PGresult   *next_result = NULL;
 		bool		last;
 
 		if (!AcceptResult(result, false))
@@ -1571,6 +1735,9 @@ ExecQueryAndProcessResults(const char *query,
 			ClearOrSaveResult(result);
 			success = false;
 
+			if (result_status == PGRES_PIPELINE_ABORTED)
+				pg_log_info("Pipeline aborted, command didn't run");
+
 			/*
 			 * switch to next result
 			 */
@@ -1585,6 +1752,20 @@ ExecQueryAndProcessResults(const char *query,
 				 * ignore manually.
 				 */
 				result = NULL;
+			else if ((end_pipeline || pset.requested_results > 0)
+					 && PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF)
+
+				/*
+				 * We have an error within a pipeline. All commands are
+				 * aborted until the next synchronisation point. We need to
+				 * consume all results until this synchronisation point, or
+				 * stop when there's no more result to discard
+				 *
+				 * Checking pipeline status is necessary in case the
+				 * connection was reset. The new connection isn't in any kind
+				 * of pipeline state and thus has no result to discard
+				 */
+				result = discardAbortedPipelineResults();
 			else
 				result = PQgetResult(pset.db);
 
@@ -1771,12 +1952,66 @@ ExecQueryAndProcessResults(const char *query,
 			}
 		}
 
+		if (result_status == PGRES_PIPELINE_SYNC)
+		{
+			Assert(pset.piped_syncs > 0);
+
+			/*
+			 * We have a sync response, decrease the sync and
+			 * requested_results counters
+			 */
+			pset.piped_syncs--;
+			pset.requested_results--;
+
+			/*
+			 * After a synchronisation point, reset success state to print
+			 * possible successful results that will be processed after this
+			 */
+			success = true;
+
+			/*
+			 * If all syncs were processed and pipeline end was requested,
+			 * exit pipeline mode
+			 */
+			if (end_pipeline && pset.piped_syncs == 0)
+				success &= PQexitPipelineMode(pset.db);
+		}
+		else if (PQpipelineStatus(pset.db) != PQ_PIPELINE_OFF &&
+				 result_status != PGRES_PIPELINE_SYNC)
+		{
+			/*
+			 * We are in a pipeline and have a non sync response, decrease the
+			 * results counters
+			 */
+			pset.available_results--;
+			pset.requested_results--;
+		}
+
 		/*
 		 * Check PQgetResult() again.  In the typical case of a single-command
 		 * string, it will return NULL.  Otherwise, we'll have other results
 		 * to process.  We need to do that to check whether this is the last.
 		 */
-		next_result = PQgetResult(pset.db);
+		if (PQpipelineStatus(pset.db) == PQ_PIPELINE_OFF)
+			next_result = PQgetResult(pset.db);
+		else
+		{
+			/*
+			 * In pipeline mode, a NULL result indicates the end of the
+			 * current query being processed. Call PQgetResult to consume this
+			 * NULL
+			 */
+			if (result_status != PGRES_PIPELINE_SYNC)
+			{
+				next_result = PQgetResult(pset.db);
+				Assert(next_result == NULL);
+			}
+
+			/* We can now get the next result in the pipeline */
+			if (pset.requested_results > 0)
+				next_result = PQgetResult(pset.db);
+		}
+
 		last = (next_result == NULL);
 
 		/*
@@ -1798,8 +2033,12 @@ ExecQueryAndProcessResults(const char *query,
 			*elapsed_msec = INSTR_TIME_GET_MILLISEC(after);
 		}
 
-		/* this may or may not print something depending on settings */
-		if (result != NULL)
+		/*
+		 * This may or may not print something depending on settings. A
+		 * pipeline sync will have a non null result but doesn't have anything
+		 * to print, thus ignore them
+		 */
+		if (result != NULL && result_status != PGRES_PIPELINE_SYNC)
 		{
 			/*
 			 * If results need to be printed into the file specified by \g,
@@ -1825,9 +2064,15 @@ ExecQueryAndProcessResults(const char *query,
 		ClearOrSaveResult(result);
 		result = next_result;
 
-		if (cancel_pressed)
+		if (cancel_pressed && PQpipelineStatus(pset.db) == PQ_PIPELINE_OFF)
 		{
-			/* drop this next result, as well as any others not yet read */
+			/*
+			 * Outside of a pipeline, drop this next result, as well as any
+			 * others not yet read
+			 *
+			 * Within a pipeline, we can let the outer loop handle this as an
+			 * aborted pipeline, which will discard all results
+			 */
 			ClearOrSaveResult(result);
 			ClearOrSaveAllResults();
 			break;
@@ -1837,6 +2082,17 @@ ExecQueryAndProcessResults(const char *query,
 	/* close \g file if we opened it */
 	CloseGOutput(gfile_fout, gfile_is_pipe);
 
+	if (end_pipeline)
+	{
+		/* After a pipeline is processed, pipeline piped_syncs should be 0 */
+		Assert(pset.piped_syncs == 0);
+		/* all commands were processed */
+		Assert(pset.piped_commands == 0);
+		/* and all results were read */
+		Assert(pset.available_results == 0);
+	}
+	Assert(pset.requested_results == 0);
+
 	/* may need this to recover from conn loss during COPY */
 	if (!CheckConnection())
 		return -1;
@@ -2296,7 +2552,13 @@ clean_extended_state(void)
 			free(pset.stmtName);
 			pset.bind_params = NULL;
 			break;
+		case PSQL_START_PIPELINE_MODE:	/* \startpipeline */
+		case PSQL_END_PIPELINE_MODE:	/* \endpipeline */
+		case PSQL_SEND_PIPELINE_SYNC:	/* \syncpipeline */
 		case PSQL_SEND_QUERY:
+		case PSQL_FLUSH:		/* \flush */
+		case PSQL_GET_RESULTS:	/* \getresults */
+		case PSQL_SEND_FLUSH_REQUEST:	/* \flushrequest */
 			break;
 	}
 
diff --git a/src/bin/psql/help.c b/src/bin/psql/help.c
index fda83465efa..92df53ffd67 100644
--- a/src/bin/psql/help.c
+++ b/src/bin/psql/help.c
@@ -167,15 +167,22 @@ slashUsage(unsigned short int pager)
 	HELP0("  \\close STMT_NAME       close an existing prepared statement\n");
 	HELP0("  \\copyright             show PostgreSQL usage and distribution terms\n");
 	HELP0("  \\crosstabview [COLUMNS] execute query and display result in crosstab\n");
+	HELP0("  \\endpipeline           exit pipeline mode\n");
 	HELP0("  \\errverbose            show most recent error message at maximum verbosity\n");
+	HELP0("  \\flush                 push unsent data to the server\n");
+	HELP0("  \\flushrequest          send a flushrequest command\n");
 	HELP0("  \\g [(OPTIONS)] [FILE]  execute query (and send result to file or |pipe);\n"
 		  "                         \\g with no arguments is equivalent to a semicolon\n");
 	HELP0("  \\gdesc                 describe result of query, without executing it\n");
+	HELP0("  \\getresults [NUM_RES]  read NUM_RES pending results. All pending results are\n"
+		  "                         read if no argument is provided\n");
 	HELP0("  \\gexec                 execute query, then execute each value in its result\n");
 	HELP0("  \\gset [PREFIX]         execute query and store result in psql variables\n");
 	HELP0("  \\gx [(OPTIONS)] [FILE] as \\g, but forces expanded output mode\n");
 	HELP0("  \\parse STMT_NAME       create a prepared statement\n");
 	HELP0("  \\q                     quit psql\n");
+	HELP0("  \\startpipeline         enter pipeline mode\n");
+	HELP0("  \\syncpipeline          add a synchronisation point to an ongoing pipeline\n");
 	HELP0("  \\watch [[i=]SEC] [c=N] [m=MIN]\n"
 		  "                         execute query every SEC seconds, up to N times,\n"
 		  "                         stop if less than MIN rows are returned\n");
diff --git a/src/bin/psql/prompt.c b/src/bin/psql/prompt.c
index 08a14feb3c3..d8e271b82a1 100644
--- a/src/bin/psql/prompt.c
+++ b/src/bin/psql/prompt.c
@@ -31,6 +31,9 @@
  *		sockets, "[local:/dir/name]" if not default
  * %m - like %M, but hostname only (before first dot), or always "[local]"
  * %p - backend pid
+ * %P - pipeline status: no pipeline: empty
+ * 		ongoing pipeline: |piped_syncs,piped_commands,pending_results|
+ * 		aborted pipeline: +piped_syncs,piped_commands,pending_results+
  * %> - database server port number
  * %n - database user name
  * %s - service
@@ -181,7 +184,16 @@ get_prompt(promptStatus_t status, ConditionalStack cstack)
 							snprintf(buf, sizeof(buf), "%d", pid);
 					}
 					break;
+				case 'P':
+					{
+						PGpipelineStatus status = PQpipelineStatus(pset.db);
 
+						if (status == PQ_PIPELINE_ON)
+							snprintf(buf, sizeof(buf), "|%d,%d,%d|", pset.piped_syncs, pset.piped_commands, pset.available_results);
+						else if (status == PQ_PIPELINE_ABORTED)
+							snprintf(buf, sizeof(buf), "+%d,%d,%d+", pset.piped_syncs, pset.piped_commands, pset.available_results);
+						break;
+					}
 				case '0':
 				case '1':
 				case '2':
diff --git a/src/bin/psql/settings.h b/src/bin/psql/settings.h
index 2a8fe12eb55..89219e159e3 100644
--- a/src/bin/psql/settings.h
+++ b/src/bin/psql/settings.h
@@ -23,8 +23,8 @@
 #define DEFAULT_EDITOR_LINENUMBER_ARG "+"
 #endif
 
-#define DEFAULT_PROMPT1 "%/%R%x%# "
-#define DEFAULT_PROMPT2 "%/%R%x%# "
+#define DEFAULT_PROMPT1 "%/%R%P%x%# "
+#define DEFAULT_PROMPT2 "%/%R%P%x%# "
 #define DEFAULT_PROMPT3 ">> "
 
 /*
@@ -69,6 +69,12 @@ typedef enum
 	PSQL_SEND_EXTENDED_PARSE,
 	PSQL_SEND_EXTENDED_QUERY_PARAMS,
 	PSQL_SEND_EXTENDED_QUERY_PREPARED,
+	PSQL_SEND_PIPELINE_SYNC,
+	PSQL_START_PIPELINE_MODE,
+	PSQL_END_PIPELINE_MODE,
+	PSQL_FLUSH,
+	PSQL_SEND_FLUSH_REQUEST,
+	PSQL_GET_RESULTS,
 } PSQL_SEND_MODE;
 
 typedef enum
@@ -111,6 +117,12 @@ typedef struct _psqlSettings
 	char	  **bind_params;	/* parameters for extended query protocol call */
 	char	   *stmtName;		/* prepared statement name used for extended
 								 * query protocol commands */
+	int			piped_commands; /* number of piped commands */
+	int			piped_syncs;	/* number of piped syncs */
+	int			available_results;	/* number of results available to get */
+	int			requested_results;	/* number of requested results, include
+									 * sync messages. Used to read a limited
+									 * subset of the available_results */
 	bool		crosstab_flag;	/* one-shot request to crosstab result */
 	char	   *ctv_args[4];	/* \crosstabview arguments */
 
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index 81cbf10aa28..bd3a78d909d 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -1867,9 +1867,9 @@ psql_completion(const char *text, int start, int end)
 		"\\drds", "\\drg", "\\dRs", "\\dRp", "\\ds",
 		"\\dt", "\\dT", "\\dv", "\\du", "\\dx", "\\dX", "\\dy",
 		"\\echo", "\\edit", "\\ef", "\\elif", "\\else", "\\encoding",
-		"\\endif", "\\errverbose", "\\ev",
-		"\\f",
-		"\\g", "\\gdesc", "\\getenv", "\\gexec", "\\gset", "\\gx",
+		"\\endif", "\\endpipeline", "\\errverbose", "\\ev",
+		"\\f", "\\flush", "\\flushrequest",
+		"\\g", "\\gdesc", "\\getenv", "\\getresults", "\\gexec", "\\gset", "\\gx",
 		"\\help", "\\html",
 		"\\if", "\\include", "\\include_relative", "\\ir",
 		"\\list", "\\lo_import", "\\lo_export", "\\lo_list", "\\lo_unlink",
@@ -1877,7 +1877,7 @@ psql_completion(const char *text, int start, int end)
 		"\\parse", "\\password", "\\print", "\\prompt", "\\pset",
 		"\\qecho", "\\quit",
 		"\\reset",
-		"\\s", "\\set", "\\setenv", "\\sf", "\\sv",
+		"\\s", "\\set", "\\setenv", "\\sf", "\\startpipeline", "\\sv", "\\syncpipeline",
 		"\\t", "\\T", "\\timing",
 		"\\unset",
 		"\\x",
diff --git a/src/test/regress/expected/psql.out b/src/test/regress/expected/psql.out
index 36dc31c16c4..ded510d50bc 100644
--- a/src/test/regress/expected/psql.out
+++ b/src/test/regress/expected/psql.out
@@ -6828,3 +6828,671 @@ CREATE TABLE defprivs (a int);
 
 \pset null ''
 DROP TABLE defprivs;
+-- pipelining
+CREATE TABLE psql_pipeline(a INTEGER PRIMARY KEY, s TEXT);
+-- single query
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\endpipeline
+ ?column? 
+----------
+ val1
+(1 row)
+
+-- multiple queries
+\startpipeline
+SELECT $1 \bind 'val1' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\endpipeline
+ ?column? 
+----------
+ val1
+(1 row)
+
+ ?column? | ?column? 
+----------+----------
+ val2     | val3
+(1 row)
+
+ ?column? | ?column? 
+----------+----------
+ val2     | val3
+(1 row)
+
+-- Test \flush
+\startpipeline
+\flush
+SELECT $1 \bind 'val1' \g
+\flush
+SELECT $1, $2 \bind 'val2' 'val3' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\endpipeline
+ ?column? 
+----------
+ val1
+(1 row)
+
+ ?column? | ?column? 
+----------+----------
+ val2     | val3
+(1 row)
+
+ ?column? | ?column? 
+----------+----------
+ val2     | val3
+(1 row)
+
+-- send multiple syncs
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\syncpipeline
+\syncpipeline
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\syncpipeline
+SELECT $1, $2 \bind 'val4' 'val5' \g
+\endpipeline
+ ?column? 
+----------
+ val1
+(1 row)
+
+ ?column? | ?column? 
+----------+----------
+ val2     | val3
+(1 row)
+
+ ?column? | ?column? 
+----------+----------
+ val4     | val5
+(1 row)
+
+-- startpipeline shouldn't have any effect if already in a pipeline
+\startpipeline
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\endpipeline
+ ?column? 
+----------
+ val1
+(1 row)
+
+-- Convert an implicit tx block to an explicit tx block
+\startpipeline
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 2 \g
+ROLLBACK \bind \g
+\endpipeline
+-- Multiple explicit transactions
+\startpipeline
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+ROLLBACK \bind \g
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+COMMIT \bind \g
+\endpipeline
+-- COPY FROM STDIN
+\startpipeline
+SELECT $1 \bind 'val1' \g
+COPY psql_pipeline FROM STDIN \bind \g
+\endpipeline
+ ?column? 
+----------
+ val1
+(1 row)
+
+-- COPY FROM STDIN with \flushrequest + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+COPY psql_pipeline FROM STDIN \bind \g
+\flushrequest
+\getresults
+ ?column? 
+----------
+ val1
+(1 row)
+
+message type 0x5a arrived from server while idle
+\endpipeline
+-- COPY FROM STDIN with \syncpipeline + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+COPY psql_pipeline FROM STDIN \bind \g
+\syncpipeline
+\getresults
+ ?column? 
+----------
+ val1
+(1 row)
+
+\endpipeline
+-- COPY TO STDOUT
+\startpipeline
+SELECT $1 \bind 'val1' \g
+copy psql_pipeline TO STDOUT \bind \g
+\endpipeline
+ ?column? 
+----------
+ val1
+(1 row)
+
+1	\N
+2	test2
+3	test3
+4	test4
+-- COPY TO STDOUT with \flushrequest + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+copy psql_pipeline TO STDOUT \bind \g
+\flushrequest
+\getresults
+ ?column? 
+----------
+ val1
+(1 row)
+
+1	\N
+2	test2
+3	test3
+4	test4
+\endpipeline
+-- COPY TO STDOUT with \syncpipeline + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+copy psql_pipeline TO STDOUT \bind \g
+\syncpipeline
+\getresults
+ ?column? 
+----------
+ val1
+(1 row)
+
+1	\N
+2	test2
+3	test3
+4	test4
+\endpipeline
+-- Use \parse and \bind_named
+\startpipeline
+SELECT $1 \parse ''
+SELECT $1, $2 \parse ''
+SELECT $2 \parse pipeline_1
+\bind_named '' 1 2 \g
+\bind_named pipeline_1 2 \g
+\endpipeline
+ERROR:  could not determine data type of parameter $1
+-- \getresults displays all results preceding a \flushrequest
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+\flushrequest
+\getresults
+ ?column? 
+----------
+ 1
+(1 row)
+
+ ?column? 
+----------
+ 2
+(1 row)
+
+\endpipeline
+-- \getresults displays all results preceding a sync
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+\syncpipeline
+\getresults
+ ?column? 
+----------
+ 1
+(1 row)
+
+ ?column? 
+----------
+ 2
+(1 row)
+
+\endpipeline
+-- \getresults immediately returns if there's no result to fetch
+\startpipeline
+\getresults
+No pending results to get
+SELECT $1 \bind 2 \g
+\getresults
+No pending results to get
+\flushrequest
+\endpipeline
+ ?column? 
+----------
+ 2
+(1 row)
+
+\getresults
+No pending results to get
+-- \getresults only fetch results preceding a flushrequest
+\startpipeline
+SELECT $1 \bind 2 \g
+\flushrequest
+SELECT $1 \bind 2 \g
+\getresults
+ ?column? 
+----------
+ 2
+(1 row)
+
+\endpipeline
+ ?column? 
+----------
+ 2
+(1 row)
+
+-- \getresults only fetch results preceding a sync message
+\startpipeline
+SELECT $1 \bind 2 \g
+\syncpipeline
+SELECT $1 \bind 2 \g
+\getresults
+ ?column? 
+----------
+ 2
+(1 row)
+
+\endpipeline
+ ?column? 
+----------
+ 2
+(1 row)
+
+-- use pipeline with chunked results with both \getresults and \endpipeline
+\startpipeline
+\set FETCH_COUNT 10
+SELECT $1 \bind 2 \g
+\flushrequest
+\getresults
+ ?column? 
+----------
+ 2
+(1 row)
+
+SELECT $1 \bind 2 \g
+\endpipeline
+ ?column? 
+----------
+ 2
+(1 row)
+
+\unset FETCH_COUNT
+-- \getresults with specific number of requested results
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+SELECT $1 \bind 3 \g
+\syncpipeline
+\getresults 1
+ ?column? 
+----------
+ 1
+(1 row)
+
+SELECT $1 \bind 4 \g
+\getresults 3
+ ?column? 
+----------
+ 2
+(1 row)
+
+ ?column? 
+----------
+ 3
+(1 row)
+
+\endpipeline
+ ?column? 
+----------
+ 4
+(1 row)
+
+-- \syncpipeline count as one command to fetch for \getresults
+\startpipeline
+\syncpipeline
+\syncpipeline
+SELECT $1 \bind 1 \g
+\flushrequest
+\getresults 2
+\getresults 1
+ ?column? 
+----------
+ 1
+(1 row)
+
+\endpipeline
+-- \getresults 0 should get all results
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+SELECT $1 \bind 3 \g
+\syncpipeline
+\getresults 0
+ ?column? 
+----------
+ 1
+(1 row)
+
+ ?column? 
+----------
+ 2
+(1 row)
+
+ ?column? 
+----------
+ 3
+(1 row)
+
+\endpipeline
+-- pipelining errors
+-- endpipeline outside of pipeline should fail
+\endpipeline
+cannot send pipeline when not in pipeline mode
+-- Query using simple protocol should not be sent and should leave the pipeline usable
+\startpipeline
+SELECT 1;
+PQsendQuery not allowed in pipeline mode
+SELECT $1 \bind 'val1' \g
+\endpipeline
+ ?column? 
+----------
+ val1
+(1 row)
+
+-- After an aborted pipeline, commands after a sync should be displayed
+\startpipeline
+SELECT $1 \bind \g
+\syncpipeline
+SELECT $1 \bind 1 \g
+\endpipeline
+ERROR:  bind message supplies 0 parameters, but prepared statement "" requires 1
+ ?column? 
+----------
+ 1
+(1 row)
+
+-- Incorrect number of parameters, the pipeline will be aborted and following queries won't be executed
+\startpipeline
+SELECT \bind 'val1' \g
+SELECT $1 \bind 'val1' \g
+\endpipeline
+ERROR:  bind message supplies 1 parameters, but prepared statement "" requires 0
+-- An explicit transaction with an error needs to be rollbacked after the pipeline
+\startpipeline
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+ROLLBACK \bind \g
+\endpipeline
+ERROR:  duplicate key value violates unique constraint "psql_pipeline_pkey"
+DETAIL:  Key (a)=(1) already exists.
+ROLLBACK;
+-- \watch sends a simple query which won't be allowed within a pipeline
+\startpipeline
+SELECT \bind \g
+\watch 1
+PQsendQuery not allowed in pipeline mode
+
+\endpipeline
+--
+(1 row)
+
+-- \gdesc should fail as synchronous commands are not allowed in pipeline, pipeline should still be usable
+\startpipeline
+SELECT $1 \bind 1 \gdesc
+synchronous command execution functions are not allowed in pipeline mode
+SELECT $1 \bind 1 \g
+\endpipeline
+ ?column? 
+----------
+ 1
+(1 row)
+
+-- \gset is not allowed, pipeline should still be usable
+\startpipeline
+SELECT $1 as i, $2 as j \parse ''
+SELECT $1 as k, $2 as l \parse 'second'
+\bind_named '' 1 2 \gset
+\gset not allowed in pipeline mode
+\bind_named second 1 2 \gset pref02_ \echo :pref02_i :pref02_j
+\gset not allowed in pipeline mode
+\bind_named '' 1 2 \g
+\endpipeline
+ i | j 
+---+---
+ 1 | 2
+(1 row)
+
+-- \gx is not allowed, pipeline should still be usable
+\startpipeline
+SELECT $1 \bind 1 \gx
+\gx not allowed in pipeline mode
+\reset
+SELECT $1 \bind 1 \g
+\endpipeline
+ ?column? 
+----------
+ 1
+(1 row)
+
+-- \gx warning should be emitted in an aborted pipeline
+\startpipeline
+SELECT $1 \bind \g
+\flushrequest
+\getresults
+ERROR:  bind message supplies 0 parameters, but prepared statement "" requires 1
+SELECT $1 \bind 1 \gx
+\gx not allowed in pipeline mode
+\endpipeline
+-- \gexec is not allowed, pipeline should still be usable
+\startpipeline
+SELECT 'INSERT INTO psql_pipeline(a) SELECT generate_series(1, 10)' \parse 'insert_stmt'
+\bind_named insert_stmt \gexec
+\gexec not allowed in pipeline mode
+\bind_named insert_stmt \g
+SELECT COUNT(*) FROM psql_pipeline \bind \g
+\endpipeline
+                          ?column?                          
+------------------------------------------------------------
+ INSERT INTO psql_pipeline(a) SELECT generate_series(1, 10)
+(1 row)
+
+ count 
+-------
+     4
+(1 row)
+
+-- After an error, pipeline is aborted and requires a sync to be reusable
+\startpipeline
+SELECT $1 \bind \g
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\close a
+\flushrequest
+\getresults
+ERROR:  bind message supplies 0 parameters, but prepared statement "" requires 1
+-- Pipeline is aborted
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\close a
+-- Sync allows pipeline to recover
+\syncpipeline
+\getresults
+Pipeline aborted, command didn't run
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\close a
+\flushrequest
+\getresults
+ ?column? 
+----------
+ 1
+(1 row)
+
+ ?column? 
+----------
+ 1
+(1 row)
+
+\endpipeline
+-- In an aborted pipeline, \getresults 1 aborted commands one at a time
+\startpipeline
+SELECT $1 \bind \g
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\syncpipeline
+\getresults 1
+ERROR:  bind message supplies 0 parameters, but prepared statement "" requires 1
+\getresults 1
+Pipeline aborted, command didn't run
+\getresults 1
+Pipeline aborted, command didn't run
+\getresults 1
+Pipeline aborted, command didn't run
+\getresults 1
+\endpipeline
+-- Test chunked results with an aborted pipeline
+\startpipeline
+\set FETCH_COUNT 10
+SELECT $1 \bind \g
+\flushrequest
+\getresults
+ERROR:  bind message supplies 0 parameters, but prepared statement "" requires 1
+SELECT $1 \bind \g
+\endpipeline
+fetching results in chunked mode failed
+Pipeline aborted, command didn't run
+\unset FETCH_COUNT
+-- \getresults returns an error when an incorrect number is provided
+\startpipeline
+\getresults -1
+\getresults: invalid number of requested results
+\endpipeline
+-- \getresults when there's no result shouldn't impact the following query
+\getresults 1
+No pending results to get
+select 1;
+ ?column? 
+----------
+        1
+(1 row)
+
+-- pipelining and transaction block behaviour
+-- set local will issue a warning when modifying a GUC outside of a transaction block
+-- The change will still be valid as a pipeline runs within an implicit transaction block
+-- Sending a sync will commit the implicit transaction block. The first command after a sync
+-- won't be seen as belonging to a pipeline.
+\startpipeline
+SET LOCAL statement_timeout='1h' \bind \g
+SHOW statement_timeout \bind \g
+\syncpipeline
+SHOW statement_timeout \bind \g
+SET LOCAL statement_timeout='2h' \bind \g
+SHOW statement_timeout \bind \g
+\endpipeline
+WARNING:  SET LOCAL can only be used in transaction blocks
+ statement_timeout 
+-------------------
+ 1h
+(1 row)
+
+ statement_timeout 
+-------------------
+ 0
+(1 row)
+
+ statement_timeout 
+-------------------
+ 2h
+(1 row)
+
+-- Reindex concurrently is forbidden in the middle of a pipeline
+\startpipeline
+SELECT $1 \bind 1 \g
+REINDEX TABLE CONCURRENTLY psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+ ?column? 
+----------
+ 1
+(1 row)
+
+ERROR:  REINDEX CONCURRENTLY cannot run inside a transaction block
+-- Reindex concurrently will work if it's the first command of a pipeline
+\startpipeline
+REINDEX TABLE CONCURRENTLY psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+ ?column? 
+----------
+ 2
+(1 row)
+
+-- subtransactions are not allowed in pipeline mode
+\startpipeline
+SAVEPOINT a \bind \g
+SELECT $1 \bind 1 \g
+ROLLBACK TO SAVEPOINT a \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+ERROR:  SAVEPOINT can only be used in transaction blocks
+-- Lock command will fail as first pipeline command is not seen as a transaction block
+\startpipeline
+LOCK psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+ERROR:  LOCK TABLE can only be used in transaction blocks
+-- Lock command will succeed after the first command as pipeline will be seen as an implicit transaction block
+\startpipeline
+SELECT $1 \bind 1 \g
+LOCK psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+ ?column? 
+----------
+ 1
+(1 row)
+
+ ?column? 
+----------
+ 2
+(1 row)
+
+-- Vacuum command will work as the first command
+\startpipeline
+VACUUM psql_pipeline \bind \g
+\endpipeline
+-- Vacuum command will fail within pipeline implicit transaction
+\startpipeline
+SELECT 1 \bind \g
+VACUUM psql_pipeline \bind \g
+\endpipeline
+ ?column? 
+----------
+        1
+(1 row)
+
+ERROR:  VACUUM cannot run inside a transaction block
+-- Vacuum command will work after a sync
+\startpipeline
+SELECT 1 \bind \g
+\syncpipeline
+VACUUM psql_pipeline \bind \g
+\endpipeline
+ ?column? 
+----------
+        1
+(1 row)
+
diff --git a/src/test/regress/sql/psql.sql b/src/test/regress/sql/psql.sql
index c5021fc0b13..33b204edd7f 100644
--- a/src/test/regress/sql/psql.sql
+++ b/src/test/regress/sql/psql.sql
@@ -1929,3 +1929,402 @@ CREATE TABLE defprivs (a int);
 \z defprivs
 \pset null ''
 DROP TABLE defprivs;
+
+-- pipelining
+CREATE TABLE psql_pipeline(a INTEGER PRIMARY KEY, s TEXT);
+
+-- single query
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\endpipeline
+
+-- multiple queries
+\startpipeline
+SELECT $1 \bind 'val1' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\endpipeline
+
+-- Test \flush
+\startpipeline
+\flush
+SELECT $1 \bind 'val1' \g
+\flush
+SELECT $1, $2 \bind 'val2' 'val3' \g
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\endpipeline
+
+-- send multiple syncs
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\syncpipeline
+\syncpipeline
+SELECT $1, $2 \bind 'val2' 'val3' \g
+\syncpipeline
+SELECT $1, $2 \bind 'val4' 'val5' \g
+\endpipeline
+
+-- startpipeline shouldn't have any effect if already in a pipeline
+\startpipeline
+\startpipeline
+SELECT $1 \bind 'val1' \g
+\endpipeline
+
+-- Convert an implicit tx block to an explicit tx block
+\startpipeline
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 2 \g
+ROLLBACK \bind \g
+\endpipeline
+
+-- Multiple explicit transactions
+\startpipeline
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+ROLLBACK \bind \g
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+COMMIT \bind \g
+\endpipeline
+
+-- COPY FROM STDIN
+\startpipeline
+SELECT $1 \bind 'val1' \g
+COPY psql_pipeline FROM STDIN \bind \g
+\endpipeline
+2	test2
+\.
+
+-- COPY FROM STDIN with \flushrequest + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+COPY psql_pipeline FROM STDIN \bind \g
+\flushrequest
+\getresults
+3	test3
+\.
+\endpipeline
+
+-- COPY FROM STDIN with \syncpipeline + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+COPY psql_pipeline FROM STDIN \bind \g
+\syncpipeline
+\getresults
+4	test4
+\.
+\endpipeline
+
+-- COPY TO STDOUT
+\startpipeline
+SELECT $1 \bind 'val1' \g
+copy psql_pipeline TO STDOUT \bind \g
+\endpipeline
+
+-- COPY TO STDOUT with \flushrequest + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+copy psql_pipeline TO STDOUT \bind \g
+\flushrequest
+\getresults
+\endpipeline
+
+-- COPY TO STDOUT with \syncpipeline + \getresults
+\startpipeline
+SELECT $1 \bind 'val1' \g
+copy psql_pipeline TO STDOUT \bind \g
+\syncpipeline
+\getresults
+\endpipeline
+
+-- Use \parse and \bind_named
+\startpipeline
+SELECT $1 \parse ''
+SELECT $1, $2 \parse ''
+SELECT $2 \parse pipeline_1
+\bind_named '' 1 2 \g
+\bind_named pipeline_1 2 \g
+\endpipeline
+
+-- \getresults displays all results preceding a \flushrequest
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+\flushrequest
+\getresults
+\endpipeline
+
+-- \getresults displays all results preceding a sync
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+\syncpipeline
+\getresults
+\endpipeline
+
+-- \getresults immediately returns if there's no result to fetch
+\startpipeline
+\getresults
+SELECT $1 \bind 2 \g
+\getresults
+\flushrequest
+\endpipeline
+\getresults
+
+-- \getresults only fetch results preceding a flushrequest
+\startpipeline
+SELECT $1 \bind 2 \g
+\flushrequest
+SELECT $1 \bind 2 \g
+\getresults
+\endpipeline
+
+-- \getresults only fetch results preceding a sync message
+\startpipeline
+SELECT $1 \bind 2 \g
+\syncpipeline
+SELECT $1 \bind 2 \g
+\getresults
+\endpipeline
+
+-- use pipeline with chunked results with both \getresults and \endpipeline
+\startpipeline
+\set FETCH_COUNT 10
+SELECT $1 \bind 2 \g
+\flushrequest
+\getresults
+SELECT $1 \bind 2 \g
+\endpipeline
+\unset FETCH_COUNT
+
+-- \getresults with specific number of requested results
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+SELECT $1 \bind 3 \g
+\syncpipeline
+\getresults 1
+SELECT $1 \bind 4 \g
+\getresults 3
+\endpipeline
+
+-- \syncpipeline count as one command to fetch for \getresults
+\startpipeline
+\syncpipeline
+\syncpipeline
+SELECT $1 \bind 1 \g
+\flushrequest
+\getresults 2
+\getresults 1
+\endpipeline
+
+-- \getresults 0 should get all results
+\startpipeline
+SELECT $1 \bind 1 \g
+SELECT $1 \bind 2 \g
+SELECT $1 \bind 3 \g
+\syncpipeline
+\getresults 0
+\endpipeline
+
+-- pipelining errors
+
+-- endpipeline outside of pipeline should fail
+\endpipeline
+
+-- Query using simple protocol should not be sent and should leave the pipeline usable
+\startpipeline
+SELECT 1;
+SELECT $1 \bind 'val1' \g
+\endpipeline
+
+-- After an aborted pipeline, commands after a sync should be displayed
+\startpipeline
+SELECT $1 \bind \g
+\syncpipeline
+SELECT $1 \bind 1 \g
+\endpipeline
+
+-- Incorrect number of parameters, the pipeline will be aborted and following queries won't be executed
+\startpipeline
+SELECT \bind 'val1' \g
+SELECT $1 \bind 'val1' \g
+\endpipeline
+
+-- An explicit transaction with an error needs to be rollbacked after the pipeline
+\startpipeline
+BEGIN \bind \g
+INSERT INTO psql_pipeline VALUES ($1) \bind 1 \g
+ROLLBACK \bind \g
+\endpipeline
+ROLLBACK;
+
+-- \watch sends a simple query which won't be allowed within a pipeline
+\startpipeline
+SELECT \bind \g
+\watch 1
+\endpipeline
+
+-- \gdesc should fail as synchronous commands are not allowed in pipeline, pipeline should still be usable
+\startpipeline
+SELECT $1 \bind 1 \gdesc
+SELECT $1 \bind 1 \g
+\endpipeline
+
+-- \gset is not allowed, pipeline should still be usable
+\startpipeline
+SELECT $1 as i, $2 as j \parse ''
+SELECT $1 as k, $2 as l \parse 'second'
+\bind_named '' 1 2 \gset
+\bind_named second 1 2 \gset pref02_ \echo :pref02_i :pref02_j
+\bind_named '' 1 2 \g
+\endpipeline
+
+-- \gx is not allowed, pipeline should still be usable
+\startpipeline
+SELECT $1 \bind 1 \gx
+\reset
+SELECT $1 \bind 1 \g
+\endpipeline
+
+-- \gx warning should be emitted in an aborted pipeline
+\startpipeline
+SELECT $1 \bind \g
+\flushrequest
+\getresults
+SELECT $1 \bind 1 \gx
+\endpipeline
+
+-- \gexec is not allowed, pipeline should still be usable
+\startpipeline
+SELECT 'INSERT INTO psql_pipeline(a) SELECT generate_series(1, 10)' \parse 'insert_stmt'
+\bind_named insert_stmt \gexec
+\bind_named insert_stmt \g
+SELECT COUNT(*) FROM psql_pipeline \bind \g
+\endpipeline
+
+-- After an error, pipeline is aborted and requires a sync to be reusable
+\startpipeline
+SELECT $1 \bind \g
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\close a
+\flushrequest
+\getresults
+-- Pipeline is aborted
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\close a
+-- Sync allows pipeline to recover
+\syncpipeline
+\getresults
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\close a
+\flushrequest
+\getresults
+\endpipeline
+
+-- In an aborted pipeline, \getresults 1 aborted commands one at a time
+\startpipeline
+SELECT $1 \bind \g
+SELECT $1 \bind 1 \g
+SELECT $1 \parse a
+\bind_named a 1 \g
+\syncpipeline
+\getresults 1
+\getresults 1
+\getresults 1
+\getresults 1
+\getresults 1
+\endpipeline
+
+-- Test chunked results with an aborted pipeline
+\startpipeline
+\set FETCH_COUNT 10
+SELECT $1 \bind \g
+\flushrequest
+\getresults
+SELECT $1 \bind \g
+\endpipeline
+\unset FETCH_COUNT
+
+-- \getresults returns an error when an incorrect number is provided
+\startpipeline
+\getresults -1
+\endpipeline
+
+-- \getresults when there's no result shouldn't impact the following query
+\getresults 1
+select 1;
+
+-- pipelining and transaction block behaviour
+
+-- set local will issue a warning when modifying a GUC outside of a transaction block
+-- The change will still be valid as a pipeline runs within an implicit transaction block
+-- Sending a sync will commit the implicit transaction block. The first command after a sync
+-- won't be seen as belonging to a pipeline.
+\startpipeline
+SET LOCAL statement_timeout='1h' \bind \g
+SHOW statement_timeout \bind \g
+\syncpipeline
+SHOW statement_timeout \bind \g
+SET LOCAL statement_timeout='2h' \bind \g
+SHOW statement_timeout \bind \g
+\endpipeline
+
+-- Reindex concurrently is forbidden in the middle of a pipeline
+\startpipeline
+SELECT $1 \bind 1 \g
+REINDEX TABLE CONCURRENTLY psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+
+-- Reindex concurrently will work if it's the first command of a pipeline
+\startpipeline
+REINDEX TABLE CONCURRENTLY psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+
+-- subtransactions are not allowed in pipeline mode
+\startpipeline
+SAVEPOINT a \bind \g
+SELECT $1 \bind 1 \g
+ROLLBACK TO SAVEPOINT a \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+
+-- Lock command will fail as first pipeline command is not seen as a transaction block
+\startpipeline
+LOCK psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+
+-- Lock command will succeed after the first command as pipeline will be seen as an implicit transaction block
+\startpipeline
+SELECT $1 \bind 1 \g
+LOCK psql_pipeline \bind \g
+SELECT $1 \bind 2 \g
+\endpipeline
+
+-- Vacuum command will work as the first command
+\startpipeline
+VACUUM psql_pipeline \bind \g
+\endpipeline
+
+-- Vacuum command will fail within pipeline implicit transaction
+\startpipeline
+SELECT 1 \bind \g
+VACUUM psql_pipeline \bind \g
+\endpipeline
+
+-- Vacuum command will work after a sync
+\startpipeline
+SELECT 1 \bind \g
+\syncpipeline
+VACUUM psql_pipeline \bind \g
+\endpipeline
-- 
2.39.5 (Apple Git-154)

