From 11b861098eb71f4a34397c33a40d71ca33479bcb Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Thu, 10 Jun 2021 01:50:07 -0400
Subject: [PATCH v4] Add support for two-phase decoding in pg_recvlogical.

Modified streamutils to pass in two-phase option when calling
CREATE_REPLICATION_SLOT. Added new option --two-phase in pg_recvlogical
to allow decoding of two-phase transactions.
---
 doc/src/sgml/logicaldecoding.sgml             | 20 ++++++++++--
 doc/src/sgml/ref/pg_recvlogical.sgml          | 16 ++++++++++
 src/bin/pg_basebackup/pg_basebackup.c         |  2 +-
 src/bin/pg_basebackup/pg_receivewal.c         |  2 +-
 src/bin/pg_basebackup/pg_recvlogical.c        | 19 +++++++++--
 src/bin/pg_basebackup/streamutil.c            |  7 +++-
 src/bin/pg_basebackup/streamutil.h            |  2 +-
 src/bin/pg_basebackup/t/030_pg_recvlogical.pl | 46 ++++++++++++++++++++++++++-
 8 files changed, 105 insertions(+), 9 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index d2c6e15..9212984 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -144,14 +144,14 @@ postgres=# SELECT pg_drop_replication_slot('regression_slot');
 </programlisting>
 
    <para>
-    The following example shows how logical decoding is controlled over the
+    The following examples shows how logical decoding is controlled over the
     streaming replication protocol, using the
     program <xref linkend="app-pgrecvlogical"/> included in the PostgreSQL
     distribution.  This requires that client authentication is set up to allow
     replication connections
     (see <xref linkend="streaming-replication-authentication"/>) and
     that <varname>max_wal_senders</varname> is set sufficiently high to allow
-    an additional connection.
+    an additional connection. The second example enables two-phase decoding.
    </para>
 <programlisting>
 $ pg_recvlogical -d postgres --slot=test --create-slot
@@ -164,8 +164,24 @@ table public.data: INSERT: id[integer]:4 data[text]:'4'
 COMMIT 693
 <keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
 $ pg_recvlogical -d postgres --slot=test --drop-slot
+
+$ pg_recvlogical -d postgres --slot=test --create-slot --two-phase
+$ pg_recvlogical -d postgres --slot=test --start -f -
+<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
+$ psql -d postgres -c "BEGIN;INSERT INTO data(data) VALUES('5');PREPARE TRANSACTION 'test';"
+$ fg
+BEGIN 694
+table public.data: INSERT: id[integer]:5 data[text]:'5'
+PREPARE TRANSACTION 'test', txid 694
+<keycombo action="simul"><keycap>Control</keycap><keycap>Z</keycap></keycombo>
+$ psql -d postgres -c "COMMIT PREPARED 'test';"
+$ fg
+COMMIT PREPARED 'test', txid 694
+<keycombo action="simul"><keycap>Control</keycap><keycap>C</keycap></keycombo>
+$ pg_recvlogical -d postgres --slot=test --drop-slot
 </programlisting>
 
+
   <para>
   The following example shows SQL interface that can be used to decode prepared
   transactions. Before you use two-phase commit commands, you must set
diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml
index 6b1d98d..57c7e1b 100644
--- a/doc/src/sgml/ref/pg_recvlogical.sgml
+++ b/doc/src/sgml/ref/pg_recvlogical.sgml
@@ -65,6 +65,11 @@ PostgreSQL documentation
         <option>--plugin</option>, for the database specified
         by <option>--dbname</option>.
        </para>
+
+       <para>
+        The <option>--two-phase</option> can be specified with
+        <option>--create-slot</option> to enable two-phase decoding.
+       </para>
       </listitem>
      </varlistentry>
 
@@ -265,6 +270,17 @@ PostgreSQL documentation
        </para>
        </listitem>
      </varlistentry>
+
+     <varlistentry>
+       <term><option>-t</option></term>
+       <term><option>--two-phase</option></term>
+       <listitem>
+       <para>
+        Enables two-phase decoding. This option should only be used with
+        <option>--create-slot</option>
+       </para>
+       </listitem>
+     </varlistentry>
     </variablelist>
    </para>
 
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 16d8929..8bb0acf 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -646,7 +646,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
 	if (temp_replication_slot || create_slot)
 	{
 		if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL,
-								   temp_replication_slot, true, true, false))
+								   temp_replication_slot, true, true, false, false))
 			exit(1);
 
 		if (verbose)
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 0d15012..c1334fa 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -741,7 +741,7 @@ main(int argc, char **argv)
 			pg_log_info("creating replication slot \"%s\"", replication_slot);
 
 		if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
-								   slot_exists_ok))
+								   slot_exists_ok, false))
 			exit(1);
 		exit(0);
 	}
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 5efec16..729082b 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -35,6 +35,7 @@
 /* Global Options */
 static char *outfile = NULL;
 static int	verbose = 0;
+static bool two_phase = false;
 static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;	/* 10 sec = default */
 static int	fsync_interval = 10 * 1000; /* 10 sec = default */
@@ -94,6 +95,7 @@ usage(void)
 			 "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
 	printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
 	printf(_("  -v, --verbose          output verbose messages\n"));
+	printf(_("  -t, --two-phase        enable two-phase decoding when creating a slot\n"));
 	printf(_("  -V, --version          output version information, then exit\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
 	printf(_("\nConnection options:\n"));
@@ -678,6 +680,7 @@ main(int argc, char **argv)
 		{"fsync-interval", required_argument, NULL, 'F'},
 		{"no-loop", no_argument, NULL, 'n'},
 		{"verbose", no_argument, NULL, 'v'},
+		{"two-phase", no_argument, NULL, 't'},
 		{"version", no_argument, NULL, 'V'},
 		{"help", no_argument, NULL, '?'},
 /* connection options */
@@ -726,7 +729,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
+	while ((c = getopt_long(argc, argv, "E:f:F:nvtd:h:p:U:wWI:o:P:s:S:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -749,6 +752,9 @@ main(int argc, char **argv)
 			case 'v':
 				verbose++;
 				break;
+			case 't':
+				two_phase = true;
+				break;
 /* connection options */
 			case 'd':
 				dbname = pg_strdup(optarg);
@@ -920,6 +926,15 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (two_phase && !do_create_slot)
+	{
+		pg_log_error("--two-phase may only be specified with --create-slot");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+
 #ifndef WIN32
 	pqsignal(SIGINT, sigint_handler);
 	pqsignal(SIGHUP, sighup_handler);
@@ -976,7 +991,7 @@ main(int argc, char **argv)
 			pg_log_info("creating replication slot \"%s\"", replication_slot);
 
 		if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
-								   false, false, slot_exists_ok))
+								   false, false, slot_exists_ok, two_phase))
 			exit(1);
 		startpos = InvalidXLogRecPtr;
 	}
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 99daf0e..ad92b91 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -486,7 +486,7 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
 bool
 CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 					  bool is_temporary, bool is_physical, bool reserve_wal,
-					  bool slot_exists_ok)
+					  bool slot_exists_ok, bool two_phase)
 {
 	PQExpBuffer query;
 	PGresult   *res;
@@ -495,12 +495,14 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 
 	Assert((is_physical && plugin == NULL) ||
 		   (!is_physical && plugin != NULL));
+	Assert(!(two_phase && is_physical));
 	Assert(slot_name != NULL);
 
 	/* Build query */
 	appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
 	if (is_temporary)
 		appendPQExpBufferStr(query, " TEMPORARY");
+
 	if (is_physical)
 	{
 		appendPQExpBufferStr(query, " PHYSICAL");
@@ -510,6 +512,9 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
 	else
 	{
 		appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
+		if (two_phase && PQserverVersion(conn) >= 140000)
+			appendPQExpBufferStr(query, " TWO_PHASE");
+
 		if (PQserverVersion(conn) >= 100000)
 			/* pg_recvlogical doesn't use an exported snapshot, so suppress */
 			appendPQExpBufferStr(query, " NOEXPORT_SNAPSHOT");
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 10f87ad..504803b 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -34,7 +34,7 @@ extern PGconn *GetConnection(void);
 extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name,
 								  const char *plugin, bool is_temporary,
 								  bool is_physical, bool reserve_wal,
-								  bool slot_exists_ok);
+								  bool slot_exists_ok, bool two_phase);
 extern bool DropReplicationSlot(PGconn *conn, const char *slot_name);
 extern bool RunIdentifySystem(PGconn *conn, char **sysid,
 							  TimeLineID *starttli,
diff --git a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
index 53f4181..956dd96 100644
--- a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
+++ b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use TestLib;
 use PostgresNode;
-use Test::More tests => 15;
+use Test::More tests => 20;
 
 program_help_ok('pg_recvlogical');
 program_version_ok('pg_recvlogical');
@@ -22,6 +22,7 @@ max_replication_slots = 4
 max_wal_senders = 4
 log_min_messages = 'debug1'
 log_error_verbosity = verbose
+max_prepared_transactions = 10
 });
 $node->dump_info;
 $node->start;
@@ -63,3 +64,46 @@ $node->command_ok(
 		'--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'
 	],
 	'replayed a transaction');
+
+$node->command_ok(
+	[
+		'pg_recvlogical',           '-S',
+		'test',                     '-d',
+		$node->connstr('postgres'), '--drop-slot'
+	],
+	'slot dropped');
+
+#test with two-phase option enabled
+$node->command_ok(
+	[
+		'pg_recvlogical',           '-S',
+		'test',                     '-d',
+		$node->connstr('postgres'), '--create-slot', '--two-phase'
+	],
+	'slot with two-phase created');
+
+$slot = $node->slot('test');
+isnt($slot->{'restart_lsn'}, '', 'restart lsn is defined for new slot');
+
+$node->safe_psql('postgres',
+	"BEGIN;INSERT INTO test_table values (11); PREPARE TRANSACTION 'test'");
+$node->safe_psql('postgres',
+	"COMMIT PREPARED 'test'");
+$nextlsn =
+  $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn()');
+chomp($nextlsn);
+
+$node->command_fails(
+	[
+		'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'),
+		'--start', '--endpos', "$nextlsn", '--two-phase', '--no-loop', '-f', '-'
+	],
+	'incorrect usage');
+
+$node->command_ok(
+	[
+		'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'),
+		'--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-'
+	],
+	'replayed a two-phase transaction');
+
-- 
1.8.3.1

