From b8c81c4d8cfa4fb94d274c6b5722d4e8705d0ffd Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Fri, 4 Jun 2021 01:43:43 -0400
Subject: [PATCH v2] Add option to set two-phase in CREATE_REPLICATION_SLOT
 command.

CREATE_REPLICATION_SLOT modified to support two-phase encoding in the slot.
This will allow the decoding of commands like PREPARE TRANSACTION,
COMMIT PREPARED and ROLLBACK PREPARED for slots created with this option.
---
 src/backend/replication/repl_gram.y    | 16 ++++++++++++----
 src/backend/replication/repl_scanner.l |  1 +
 src/backend/replication/walsender.c    |  2 +-
 src/include/nodes/replnodes.h          |  1 +
 4 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index eb283a8..8c1f353 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -84,6 +84,7 @@ static SQLCmd *make_sqlcmd(void);
 %token K_SLOT
 %token K_RESERVE_WAL
 %token K_TEMPORARY
+%token K_TWO_PHASE
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
@@ -102,6 +103,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <node>	plugin_opt_arg
 %type <str>		opt_slot var_name
 %type <boolval>	opt_temporary
+%type <boolval>	opt_two_phase
 %type <list>	create_slot_opt_list
 %type <defelt>	create_slot_opt
 
@@ -241,16 +243,17 @@ create_replication_slot:
 					cmd->options = $5;
 					$$ = (Node *) cmd;
 				}
-			/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
-			| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list
+			/* CREATE_REPLICATION_SLOT slot TEMPORARY TWO_PHASE LOGICAL plugin */
+			| K_CREATE_REPLICATION_SLOT IDENT opt_temporary opt_two_phase K_LOGICAL IDENT create_slot_opt_list
 				{
 					CreateReplicationSlotCmd *cmd;
 					cmd = makeNode(CreateReplicationSlotCmd);
 					cmd->kind = REPLICATION_KIND_LOGICAL;
 					cmd->slotname = $2;
 					cmd->temporary = $3;
-					cmd->plugin = $5;
-					cmd->options = $6;
+					cmd->two_phase = $4;
+					cmd->plugin = $6;
+					cmd->options = $7;
 					$$ = (Node *) cmd;
 				}
 			;
@@ -365,6 +368,11 @@ opt_temporary:
 			| /* EMPTY */					{ $$ = false; }
 			;
 
+opt_two_phase:
+			K_TWO_PHASE						{ $$ = true; }
+			| /* EMPTY */					{ $$ = false; }
+			;
+
 opt_slot:
 			K_SLOT IDENT
 				{ $$ = $2; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index dcc3c3f..c038a63 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -103,6 +103,7 @@ RESERVE_WAL			{ return K_RESERVE_WAL; }
 LOGICAL				{ return K_LOGICAL; }
 SLOT				{ return K_SLOT; }
 TEMPORARY			{ return K_TEMPORARY; }
+TWO_PHASE			{ return K_TWO_PHASE; }
 EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
 NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
 USE_SNAPSHOT		{ return K_USE_SNAPSHOT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 109c723..e94069c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -954,7 +954,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 */
 		ReplicationSlotCreate(cmd->slotname, true,
 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-							  false);
+							  cmd->two_phase);
 	}
 
 	if (cmd->kind == REPLICATION_KIND_LOGICAL)
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a25..ebc43a0 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -56,6 +56,7 @@ typedef struct CreateReplicationSlotCmd
 	ReplicationKind kind;
 	char	   *plugin;
 	bool		temporary;
+	bool		two_phase;
 	List	   *options;
 } CreateReplicationSlotCmd;
 
-- 
1.8.3.1

