From 426c80f427ebd9d5817d10a1e557c543cedf631d Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Tue, 15 Jun 2021 01:52:39 -0400
Subject: [PATCH v5] Add option to set two-phase in CREATE_REPLICATION_SLOT
 command.

CREATE_REPLICATION_SLOT modified to support two-phase encoding in the slot.
This will allow the decoding of commands like PREPARE TRANSACTION,
COMMIT PREPARED and ROLLBACK PREPARED for slots created with this option.
---
 doc/src/sgml/protocol.sgml             | 16 +++++++++++++++-
 src/backend/replication/repl_gram.y    | 12 ++++++++++++
 src/backend/replication/repl_scanner.l |  1 +
 src/backend/replication/walsender.c    | 18 +++++++++++++++---
 4 files changed, 43 insertions(+), 4 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index bc2a2fe..205fbd2 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1914,7 +1914,7 @@ The commands accepted in replication mode are:
   </varlistentry>
 
   <varlistentry id="protocol-replication-create-slot" xreflabel="CREATE_REPLICATION_SLOT">
-   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> ] }
+   <term><literal>CREATE_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> [ <literal>TEMPORARY</literal> ] { <literal>PHYSICAL</literal> [ <literal>RESERVE_WAL</literal> ] | <literal>LOGICAL</literal> <replaceable class="parameter">output_plugin</replaceable> [ <literal>EXPORT_SNAPSHOT</literal> | <literal>NOEXPORT_SNAPSHOT</literal> | <literal>USE_SNAPSHOT</literal> | <literal>TWO_PHASE</literal> ] }
      <indexterm><primary>CREATE_REPLICATION_SLOT</primary></indexterm>
     </term>
     <listitem>
@@ -1956,6 +1956,20 @@ The commands accepted in replication mode are:
       </varlistentry>
 
       <varlistentry>
+       <term><literal>TWO_PHASE</literal></term>
+       <listitem>
+        <para>
+         Specify that this logical replication slot supports decoding of two-phase
+         transactions. With this option, two-phase commands like
+         <literal>PREPARE TRANSACTION</literal>, <literal>COMMIT PREPARED</literal>
+         and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
+         The transaction will be decoded and transmitted at
+         <literal>PREPARE TRANSACTION</literal> time.
+        </para>
+       </listitem>
+      </varlistentry>
+
+      <varlistentry>
        <term><literal>RESERVE_WAL</literal></term>
        <listitem>
         <para>
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index eb283a8..eead144 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -84,6 +84,7 @@ static SQLCmd *make_sqlcmd(void);
 %token K_SLOT
 %token K_RESERVE_WAL
 %token K_TEMPORARY
+%token K_TWO_PHASE
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
@@ -102,6 +103,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <node>	plugin_opt_arg
 %type <str>		opt_slot var_name
 %type <boolval>	opt_temporary
+%type <boolval>	opt_two_phase
 %type <list>	create_slot_opt_list
 %type <defelt>	create_slot_opt
 
@@ -283,6 +285,11 @@ create_slot_opt:
 				  $$ = makeDefElem("reserve_wal",
 								   (Node *)makeInteger(true), -1);
 				}
+			| K_TWO_PHASE
+				{
+				  $$ = makeDefElem("two_phase",
+								   (Node *)makeInteger(true), -1);
+				}
 			;
 
 /* DROP_REPLICATION_SLOT slot */
@@ -365,6 +372,11 @@ opt_temporary:
 			| /* EMPTY */					{ $$ = false; }
 			;
 
+opt_two_phase:
+			K_TWO_PHASE						{ $$ = true; }
+			| /* EMPTY */					{ $$ = false; }
+			;
+
 opt_slot:
 			K_SLOT IDENT
 				{ $$ = $2; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index dcc3c3f..c038a63 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -103,6 +103,7 @@ RESERVE_WAL			{ return K_RESERVE_WAL; }
 LOGICAL				{ return K_LOGICAL; }
 SLOT				{ return K_SLOT; }
 TEMPORARY			{ return K_TEMPORARY; }
+TWO_PHASE			{ return K_TWO_PHASE; }
 EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
 NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
 USE_SNAPSHOT		{ return K_USE_SNAPSHOT; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3224536..92c755f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -863,11 +863,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 static void
 parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
 						   bool *reserve_wal,
-						   CRSSnapshotAction *snapshot_action)
+						   CRSSnapshotAction *snapshot_action,
+						   bool *two_phase)
 {
 	ListCell   *lc;
 	bool		snapshot_action_given = false;
 	bool		reserve_wal_given = false;
+	bool		two_phase_given = false;
 
 	/* Parse options */
 	foreach(lc, cmd->options)
@@ -905,6 +907,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
 			reserve_wal_given = true;
 			*reserve_wal = true;
 		}
+		else if (strcmp(defel->defname, "two_phase") == 0)
+		{
+			if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			two_phase_given = true;
+			*two_phase = true;
+		}
 		else
 			elog(ERROR, "unrecognized option: %s", defel->defname);
 	}
@@ -920,6 +931,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	char		xloc[MAXFNAMELEN];
 	char	   *slot_name;
 	bool		reserve_wal = false;
+	bool		two_phase = false;
 	CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
 	DestReceiver *dest;
 	TupOutputState *tstate;
@@ -929,7 +941,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
+	parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
 
 	/* setup state for WalSndSegmentOpen */
 	sendTimeLineIsHistoric = false;
@@ -954,7 +966,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 */
 		ReplicationSlotCreate(cmd->slotname, true,
 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-							  false);
+							  two_phase);
 	}
 
 	if (cmd->kind == REPLICATION_KIND_LOGICAL)
-- 
1.8.3.1

