From 5b3f7306852d5245c2876439f00add49a9810454 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Wed, 9 Jun 2021 06:32:30 -0400
Subject: [PATCH v3] Add option to set two-phase in CREATE_REPLICATION_SLOT
 command.

CREATE_REPLICATION_SLOT modified to support two-phase encoding in the slot.
This will allow the decoding of commands like PREPARE TRANSACTION,
COMMIT PREPARED and ROLLBACK PREPARED for slots created with this option.
---
 doc/src/sgml/protocol.sgml             | 14 ++++++++++++++
 src/backend/replication/repl_gram.y    | 16 ++++++++++++----
 src/backend/replication/repl_scanner.l |  1 +
 src/backend/replication/walsender.c    |  2 +-
 src/include/nodes/replnodes.h          |  1 +
 5 files changed, 29 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 2f4dde3..3c66efc 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1956,6 +1956,20 @@ The commands accepted in replication mode are:
       </varlistentry>
 
       <varlistentry>
+       <term><literal>TWO_PHASE</literal></term>
+       <listitem>
+        <para>
+         Specify that this replication slot supports decode of two-phase
+         transactions. With this option, two-phase commands like
+         <literal>PREPARE TRANSACTION</literal>, <literal>COMMIT PREPARED</literal>
+         and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
+         The transaction will be  decoded and transmitted at
+         <literal>PREPARE TRANSACTION</literal> time.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
        <term><literal>RESERVE_WAL</literal></term>
        <listitem>
         <para>
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index eb283a8..5c2164e 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -84,6 +84,7 @@ static SQLCmd *make_sqlcmd(void);
 %token K_SLOT
 %token K_RESERVE_WAL
 %token K_TEMPORARY
+%token K_TWO_PHASE
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
@@ -102,6 +103,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <node>	plugin_opt_arg
 %type <str>		opt_slot var_name
 %type <boolval>	opt_temporary
+%type <boolval>	opt_two_phase
 %type <list>	create_slot_opt_list
 %type <defelt>	create_slot_opt
 
@@ -241,16 +243,17 @@ create_replication_slot:
 					cmd->options = $5;
 					$$ = (Node *) cmd;
 				}
-			/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
-			| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list
+			/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL TWO_PHASE plugin */
+			| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL opt_two_phase IDENT create_slot_opt_list
 				{
 					CreateReplicationSlotCmd *cmd;
 					cmd = makeNode(CreateReplicationSlotCmd);
 					cmd->kind = REPLICATION_KIND_LOGICAL;
 					cmd->slotname = $2;
 					cmd->temporary = $3;
-					cmd->plugin = $5;
-					cmd->options = $6;
+					cmd->two_phase = $5;
+					cmd->plugin = $6;
+					cmd->options = $7;
 					$$ = (Node *) cmd;
 				}
 			;
@@ -365,6 +368,11 @@ opt_temporary:
 			| /* EMPTY */					{ $$ = false; }
 			;
 
+opt_two_phase:
+			K_TWO_PHASE						{ $$ = true; }
+			| /* EMPTY */					{ $$ = false; }
+			;
+
 opt_slot:
 			K_SLOT IDENT
 				{ $$ = $2; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index dcc3c3f..c038a63 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -103,6 +103,7 @@ RESERVE_WAL			{ return K_RESERVE_WAL; }
 LOGICAL				{ return K_LOGICAL; }
 SLOT				{ return K_SLOT; }
 TEMPORARY			{ return K_TEMPORARY; }
+TWO_PHASE			{ return K_TWO_PHASE; }
 EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
 NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
 USE_SNAPSHOT		{ return K_USE_SNAPSHOT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 109c723..e94069c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -954,7 +954,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 */
 		ReplicationSlotCreate(cmd->slotname, true,
 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-							  false);
+							  cmd->two_phase);
 	}
 
 	if (cmd->kind == REPLICATION_KIND_LOGICAL)
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a25..ebc43a0 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -56,6 +56,7 @@ typedef struct CreateReplicationSlotCmd
 	ReplicationKind kind;
 	char	   *plugin;
 	bool		temporary;
+	bool		two_phase;
 	List	   *options;
 } CreateReplicationSlotCmd;
 
-- 
1.8.3.1

