From 65969be0ed60cc04334cbdf014eaefcb6611e026 Mon Sep 17 00:00:00 2001
From: Anthonin Bonnefoy <anthonin.bonnefoy@datadoghq.com>
Date: Mon, 25 Nov 2024 10:13:40 +0100
Subject: Consider pipeline implicit transaction as a transaction block

When using pipeline with implicit transaction, a transaction will
start from the first command and be committed with a Sync message.
Functions like IsInTransactionBlock and PreventInTransactionBlock
already assimilate this implicit transaction as a transaction block.

However, this is not the case in CheckTransactionBlock. This function is
called for things like warning when set local is used outside of a
transaction block.

This patch changes the transaction state to an implicit transaction
block when a new command is processed while pipelining was detected.
This remove the need to check for XACT_FLAGS_PIPELINING in
IsInTransactionBlock and PreventInTransactionBlock since the transaction
state now reflects the ongoing transaction block.

This allows to avoid warning when `set local` is called within a
pipeline, and makes the detection of transaction block coherent with
what's done in IsInTransactionBlock and PreventInTransactionBlock.

A known limitation of this patch: the first command won't be detected as
part of a transaction block. The implicit block only starts at the
second command so the `set local` warnings will still happen if called
first.
---
 doc/src/sgml/protocol.sgml                   |  21 +--
 src/backend/access/transam/xact.c            |  13 --
 src/backend/tcop/postgres.c                  |  18 +++
 src/bin/pgbench/t/001_pgbench_with_server.pl | 144 +++++++++++++++++++
 4 files changed, 173 insertions(+), 23 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index d5a78694b99..cff0c4099e9 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1070,16 +1070,17 @@ SELCT 1/0;<!-- this typo is intentional -->
 
    <para>
     If the client has not issued an explicit <command>BEGIN</command>,
-    then each Sync ordinarily causes an implicit <command>COMMIT</command>
-    if the preceding step(s) succeeded, or an
-    implicit <command>ROLLBACK</command> if they failed.  However, there
-    are a few DDL commands (such as <command>CREATE DATABASE</command>)
-    that cannot be executed inside a transaction block.  If one of
-    these is executed in a pipeline, it will fail unless it is the first
-    command in the pipeline.  Furthermore, upon success it will force an
-    immediate commit to preserve database consistency.  Thus a Sync
-    immediately following one of these commands has no effect except to
-    respond with ReadyForQuery.
+    then an implicit transaction block is started and each Sync ordinarily
+    causes an implicit <command>COMMIT</command> if the preceding step(s)
+    succeeded, or an implicit <command>ROLLBACK</command> if they failed.
+    This implicit transaction block will only be detected by the server
+    when the first command ends without a sync.  There are a few DDL
+    commands (such as <command>CREATE DATABASE</command>) that cannot be
+    executed inside a transaction block. If one of these is executed in a
+    pipeline, it will fail unless it is the first command after a Sync.
+    Furthermore, upon success it will force an immediate commit to preserve
+    database consistency. Thus a Sync immediately following one of these
+    commands has no effect except to respond with ReadyForQuery.
    </para>
 
    <para>
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b7ebcc2a557..1eccb78ddc4 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -3659,16 +3659,6 @@ PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
 				 errmsg("%s cannot run inside a subtransaction",
 						stmtType)));
 
-	/*
-	 * inside a pipeline that has started an implicit transaction?
-	 */
-	if (MyXactFlags & XACT_FLAGS_PIPELINING)
-		ereport(ERROR,
-				(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
-		/* translator: %s represents an SQL statement name */
-				 errmsg("%s cannot be executed within a pipeline",
-						stmtType)));
-
 	/*
 	 * inside a function call?
 	 */
@@ -3780,9 +3770,6 @@ IsInTransactionBlock(bool isTopLevel)
 	if (IsSubTransaction())
 		return true;
 
-	if (MyXactFlags & XACT_FLAGS_PIPELINING)
-		return true;
-
 	if (!isTopLevel)
 		return true;
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 184b8301687..bd424767176 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2780,6 +2780,17 @@ start_xact_command(void)
 
 		xact_started = true;
 	}
+	else if (MyXactFlags & XACT_FLAGS_PIPELINING)
+	{
+		/*
+		 * When the first Execute message is completed, following commands
+		 * will be done in an implicit transaction block created via
+		 * pipelining. The transaction state needs to be updated to an
+		 * implicit block if we're not already in a transaction block (like
+		 * one started by an explicit BEGIN).
+		 */
+		BeginImplicitTransactionBlock();
+	}
 
 	/*
 	 * Start statement timeout if necessary.  Note that this'll intentionally
@@ -4991,6 +5002,13 @@ PostgresMain(const char *dbname, const char *username)
 
 			case PqMsg_Sync:
 				pq_getmsgend(&input_message);
+
+				/*
+				 * If pipelining was used, we may be in an implicit
+				 * transaction block. Close it before calling
+				 * finish_xact_command.
+				 */
+				EndImplicitTransactionBlock();
 				finish_xact_command();
 				valgrind_report_error_query("SYNC message");
 				send_ready_for_query = true;
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index 956e290f3ef..8506e472fdc 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -968,6 +968,150 @@ $node->pgbench(
 }
 	});
 
+# set local as first pipeline command
+$node->pgbench(
+	'-t 1 -n -M extended',
+	0,
+	[],
+	[qr{WARNING:  SET LOCAL can only be used in transaction blocks}],
+	'set local outside of pipeline implicit transaction block',
+	{
+		'001_pgbench_pipeline_tx_block_1' => q{
+\startpipeline
+SET LOCAL statement_timeout='1h';
+\endpipeline
+}
+	});
+
+# set local as second pipeline command
+$node->pgbench(
+	'-t 1 -n -M extended',
+	0,
+	[],
+	[qr{^$}],
+	'set local within pipeline implicit transaction block',
+	{
+		'001_pgbench_pipeline_tx_block_2' => q{
+\startpipeline
+SELECT 1;
+SET LOCAL statement_timeout='1h';
+\endpipeline
+}
+	});
+
+# Reindex concurrently as first pipeline command
+$node->pgbench(
+	'-t 1 -n -M extended',
+	0,
+	[],
+	[],
+	'Reindex concurrently outside pipeline implicit transaction block',
+	{
+		'001_pgbench_pipeline_tx_block_3' => q{
+\startpipeline
+REINDEX TABLE CONCURRENTLY pgbench_accounts;
+SELECT 1;
+\endpipeline
+}
+	});
+
+# Reindex concurrently as second pipeline command
+$node->pgbench(
+	'-t 1 -n -M extended',
+	2,
+	[],
+	[],
+	'error: reindex concurrently within pipeline implicit transaction block',
+	{
+		'001_pgbench_pipeline_tx_block_4' => q{
+\startpipeline
+SELECT 1;
+REINDEX TABLE CONCURRENTLY pgbench_accounts;
+\endpipeline
+}
+	});
+
+# Subtransactions are not allowed in pipeline
+$node->pgbench(
+	'-t 1 -n -M extended',
+	2,
+	[],
+	[],
+	'error: subtransactions are not allowed in pipeline',
+	{
+		'001_pgbench_pipeline_tx_block_5' => q{
+\startpipeline
+SAVEPOINT a;
+SELECT 1;
+ROLLBACK TO SAVEPOINT a;
+SELECT 2;
+\endpipeline
+}
+	});
+
+# Lock table as first pipeline command
+$node->pgbench(
+	'-t 1 -n -M extended',
+	2,
+	[],
+	[],
+	'error: lock table outside of pipeline implicit transaction block',
+	{
+		'001_pgbench_pipeline_tx_block_6' => q{
+\startpipeline
+LOCK pgbench_accounts;
+SELECT 1;
+\endpipeline
+}
+	});
+
+# Lock table as second pipeline command
+$node->pgbench(
+	'-t 1 -n -M extended',
+	0,
+	[],
+	[],
+	'lock table within pipeline implicit transaction block',
+	{
+		'001_pgbench_pipeline_tx_block_7' => q{
+\startpipeline
+SELECT 1;
+LOCK pgbench_accounts;
+\endpipeline
+}
+	});
+
+# Vacuum table as first pipeline command
+$node->pgbench(
+	'-t 1 -n -M extended',
+	0,
+	[],
+	[],
+	'vacuum table outside pipeline implicit transaction block',
+	{
+		'001_pgbench_pipeline_tx_block_8' => q{
+\startpipeline
+VACUUM pgbench_accounts;
+\endpipeline
+}
+	});
+
+# Vacuum table as second pipeline command
+$node->pgbench(
+	'-t 1 -n -M extended',
+	2,
+	[],
+	[],
+	'error: vacuum table within pipeline implicit transaction block',
+	{
+		'001_pgbench_pipeline_tx_block_9' => q{
+\startpipeline
+SELECT 1;
+VACUUM pgbench_accounts;
+\endpipeline
+}
+	});
+
 # Working \startpipeline in prepared query mode with serializable
 $node->pgbench(
 	'-c4 -t 10 -n -M prepared',
-- 
2.39.5 (Apple Git-154)

