From db2df233fb2a0db44a2beb884d68cf783435822f Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Wed, 9 Jun 2021 06:38:07 -0400
Subject: [PATCH v3] Add support for two-phase decoding in pg_recvlogical.

Modified streamutils to pass in two-phase option when calling
CREATE_REPLICATION_SLOT. Added new option --two-phase in pg_recvlogical
to allow decoding of two-phase transactions.
---
 doc/src/sgml/logicaldecoding.sgml      | 20 ++++++++++++++++++--
 doc/src/sgml/ref/pg_recvlogical.sgml   | 16 ++++++++++++++++
 src/bin/pg_basebackup/pg_basebackup.c  |  2 +-
 src/bin/pg_basebackup/pg_receivewal.c  |  2 +-
 src/bin/pg_basebackup/pg_recvlogical.c | 19 +++++++++++++++++--
 src/bin/pg_basebackup/streamutil.c     | 11 +++++++++--
 src/bin/pg_basebackup/streamutil.h     |  2 +-
 7 files changed, 63 insertions(+), 9 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index d2c6e15..9212984 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -144,14 +144,14 @@ postgres=# SELECT pg_drop_replication_slot('regression_slot');
 </programlisting>
 
    <para>
-    The following example shows how logical decoding is controlled over the
+    The following examples shows how logical decoding is controlled over the
     streaming replication protocol, using the
     program <xref linkend="app-pgrecvlogical"/> included in the PostgreSQL
     distribution.  This requires that client authentication is set up to allow
     replication connections
     (see <xref linkend="streaming-replication-authentication"/>) and
     that <varname>max_wal_senders</varname> is set sufficiently high to allow
-    an additional connection.
+    an additional connection. The second example enables two-phase decoding.
    </para>
 <programlisting>
 $ pg_recvlogical -d postgres --slot=test --create-slot
@@ -164,8 +164,24 @@ table public.data: INSERT: id[integer]:4 data[text]:'4'
 COMMIT 693
 <keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
 $ pg_recvlogical -d postgres --slot=test --drop-slot
+
+$ pg_recvlogical -d postgres --slot=test --create-slot --two-phase
+$ pg_recvlogical -d postgres --slot=test --start -f -
+<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
+$ psql -d postgres -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACTION 'test';"
+$ fg
+BEGIN 694
+table public.data: INSERT: id[integer]:5 data[text]:'5'
+PREPARE TRANSACTION 'test', txid 694
+<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
+$ psql -d postgres -c "COMMIT PREPARED 'test';"
+$ fg
+COMMIT PREPARED 'test', txid 694
+<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
+$ pg_recvlogical -d postgres --slot=test --drop-slot
 </programlisting>
 
+
   <para>
   The following example shows SQL interface that can be used to decode prepared
   transactions. Before you use two-phase commit commands, you must set
diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml
index 6b1d98d..57c7e1b 100644
--- a/doc/src/sgml/ref/pg_recvlogical.sgml
+++ b/doc/src/sgml/ref/pg_recvlogical.sgml
@@ -65,6 +65,11 @@ PostgreSQL documentation
         <option>--plugin</option>, for the database specified
         by <option>--dbname</option>.
        </para>
+
+       <para>
+        The <option>--two-phase</option> can be specified with
+        <option>--create-slot</option> to enable two-phase decoding.
+       </para>
       </listitem>
      </varlistentry>
 
@@ -265,6 +270,17 @@ PostgreSQL documentation
        </para>
        </listitem>
      </varlistentry>
+
+     <varlistentry>
+       <term><option>-t</option></term>
+       <term><option>--two-phase</option></term>
+       <listitem>
+       <para>
+        Enables two-phase decoding. This option should only be used with
+        <option>--create-slot</option>
+       </para>
+       </listitem>
+     </varlistentry>
     </variablelist>
    </para>
 
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 16d8929..8bb0acf 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -646,7 +646,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
 	if (temp_replication_slot || create_slot)
 	{
 		if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
-								   temp_replication_slot, true, true, false))
+								   temp_replication_slot, true, true, false, false))
 			exit(1);
 
 		if (verbose)
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 0d15012..c1334fa 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -741,7 +741,7 @@ main(int argc, char **argv)
 			pg_log_info("creating replication slot \"%s\"", replication_slot);
 
 		if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
-								   slot_exists_ok))
+								   slot_exists_ok, false))
 			exit(1);
 		exit(0);
 	}
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 5efec16..729082b 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -35,6 +35,7 @@
 /* Global Options */
 static char *outfile = NULL;
 static int	verbose = 0;
+static bool two_phase = false;
 static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;	/* 10 sec = default */
 static int	fsync_interval = 10 * 1000; /* 10 sec = default */
@@ -94,6 +95,7 @@ usage(void)
 			 "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
 	printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
 	printf(_("  -v, --verbose          output verbose messages\n"));
+	printf(_("  -t, --two-phase        enable two-phase decoding when creating a slot\n"));
 	printf(_("  -V, --version          output version information, then exit\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
 	printf(_("\nConnection options:\n"));
@@ -678,6 +680,7 @@ main(int argc, char **argv)
 		{"fsync-interval", required_argument, NULL, 'F'},
 		{"no-loop", no_argument, NULL, 'n'},
 		{"verbose", no_argument, NULL, 'v'},
+		{"two-phase", no_argument, NULL, 't'},
 		{"version", no_argument, NULL, 'V'},
 		{"help", no_argument, NULL, '?'},
 /* connection options */
@@ -726,7 +729,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
+	while ((c = getopt_long(argc, argv, "E:f:F:nvtd:h:p:U:wWI:o:P:s:S:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -749,6 +752,9 @@ main(int argc, char **argv)
 			case 'v':
 				verbose++;
 				break;
+			case 't':
+				two_phase = true;
+				break;
 /* connection options */
 			case 'd':
 				dbname = pg_strdup(optarg);
@@ -920,6 +926,15 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (two_phase && !do_create_slot)
+	{
+		pg_log_error("--two-phase may only be specified with --create-slot");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+
 #ifndef WIN32
 	pqsignal(SIGINT, sigint_handler);
 	pqsignal(SIGHUP, sighup_handler);
@@ -976,7 +991,7 @@ main(int argc, char **argv)
 			pg_log_info("creating replication slot \"%s\"", replication_slot);
 
 		if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
-								   false, false, slot_exists_ok))
+								   false, false, slot_exists_ok, two_phase))
 			exit(1);
 		startpos = InvalidXLogRecPtr;
 	}
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 99daf0e..a437533 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -486,7 +486,7 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
 bool
 CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 					  bool is_temporary, bool is_physical, bool reserve_wal,
-					  bool slot_exists_ok)
+					  bool slot_exists_ok, bool two_phase)
 {
 	PQExpBuffer query;
 	PGresult   *res;
@@ -495,12 +495,14 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 
 	Assert((is_physical && plugin == NULL) ||
 		   (!is_physical && plugin != NULL));
+	Assert(!(two_phase && is_physical));
 	Assert(slot_name != NULL);
 
 	/* Build query */
 	appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
 	if (is_temporary)
 		appendPQExpBufferStr(query, " TEMPORARY");
+
 	if (is_physical)
 	{
 		appendPQExpBufferStr(query, " PHYSICAL");
@@ -509,7 +511,12 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 	}
 	else
 	{
-		appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
+		appendPQExpBuffer(query, " LOGICAL", plugin);
+		if (two_phase)
+			appendPQExpBufferStr(query, " TWO_PHASE ");
+
+		appendPQExpBuffer(query, "\"%s\"", plugin);
+
 		if (PQserverVersion(conn) >= 100000)
 			/* pg_recvlogical doesn't use an exported snapshot, so suppress */
 			appendPQExpBufferStr(query, " NOEXPORT_SNAPSHOT");
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 10f87ad..504803b 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -34,7 +34,7 @@ extern PGconn *GetConnection(void);
 extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
 								  const char *plugin, bool is_temporary,
 								  bool is_physical, bool reserve_wal,
-								  bool slot_exists_ok);
+								  bool slot_exists_ok, bool two_phase);
 extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
 extern bool RunIdentifySystem(PGconn *conn, char **sysid,
 							  TimeLineID *starttli,
-- 
1.8.3.1

