From 682b0de2827d1f55c4e471c3129eb687ae0825a5 Mon Sep 17 00:00:00 2001
From: Nikhil Sontakke <nikhils@2ndQuadrant.com>
Date: Wed, 13 Jun 2018 16:32:16 +0530
Subject: [PATCH 5/5] Additional test case to demonstrate decoding/rollback
 interlocking

Introduce a decode-delay parameter in the test_decoding plugin. Based
on the value provided in the plugin, sleep for those many seconds while
inside the "decode change" plugin call. A concurrent rollback is fired
off which aborts that transaction in the meanwhile. A subsequent
systable access will error out causing the logical decoding to abort.
---
 contrib/test_decoding/Makefile          |  5 +-
 contrib/test_decoding/t/001_twophase.pl | 98 +++++++++++++++++++++++++++++++++
 contrib/test_decoding/test_decoding.c   | 24 ++++++++
 3 files changed, 126 insertions(+), 1 deletion(-)
 create mode 100644 contrib/test_decoding/t/001_twophase.pl

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index afcab930f7..3f0b1c6ebd 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -26,7 +26,7 @@ installcheck:;
 # installation, allow to do so, but only if requested explicitly.
 installcheck-force: regresscheck-install-force isolationcheck-install-force
 
-check: regresscheck isolationcheck
+check: regresscheck isolationcheck 2pc-check
 
 submake-regress:
 	$(MAKE) -C $(top_builddir)/src/test/regress all
@@ -67,3 +67,6 @@ isolationcheck-install-force: all | submake-isolation submake-test_decoding temp
 	isolationcheck isolationcheck-install-force
 
 temp-install: EXTRA_INSTALL=contrib/test_decoding
+
+2pc-check: temp-install
+	$(prove_check)
diff --git a/contrib/test_decoding/t/001_twophase.pl b/contrib/test_decoding/t/001_twophase.pl
new file mode 100644
index 0000000000..3e68bac3f4
--- /dev/null
+++ b/contrib/test_decoding/t/001_twophase.pl
@@ -0,0 +1,98 @@
+# logical replication of 2PC test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+
+# Initialize node
+my $node_logical = get_new_node('logical');
+$node_logical->init(allows_streaming => 'logical');
+$node_logical->append_conf(
+        'postgresql.conf', qq(
+        max_prepared_transactions = 10
+));
+$node_logical->start;
+
+# Create some pre-existing content on logical
+$node_logical->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
+$node_logical->safe_psql('postgres',
+	"INSERT INTO tab SELECT generate_series(1,10)");
+$node_logical->safe_psql('postgres',
+	"SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');");
+$node_logical->safe_psql('postgres',
+	"SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot2', 'test_decoding');");
+
+# This test is specifically for testing concurrent abort while logical decode
+# is ongoing. The decode-delay value will allow for each change decode to
+# sleep for those many seconds. We will fire off a ROLLBACK from another
+# session when this delayed decode is ongoing.
+#
+# We will stop decoding immediately post this and the next
+# pg_logical_slot_get_changes call should show only a few records decoded
+# from the entire two phase transaction
+#
+# We use two slots to test multiple decoding backends here
+
+$node_logical->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab VALUES (11);
+    INSERT INTO tab VALUES (12);
+    ALTER TABLE tab ADD COLUMN b INT;
+    INSERT INTO tab VALUES (13,14);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+# start decoding the above with decode-delay in the background.
+my $logical_connstr = $node_logical->connstr . ' dbname=postgres';
+
+# decode now, it should only decode 2 INSERT records and should include
+# an ABORT entry because of the ROLLBACK below
+system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1', 'decode-delay', '3');\" \&");
+
+system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1', 'decode-delay', '3');\" \&");
+
+# sleep for a little while (shorter than decode-delay)
+$node_logical->safe_psql('postgres', "select pg_sleep(1)");
+
+# rollback the prepared transaction whose first record is being decoded
+# after sleeping for decode-delay time
+$node_logical->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';");
+
+# wait for decoding to stop
+$node_logical->psql('postgres', "select pg_sleep(4)");
+
+# consume any remaining changes
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');");
+
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');");
+
+# check for occurrence of log about stopping decoding
+my $output_file = slurp_file($node_logical->logfile());
+my $abort_str = "stopping decoding of test_prepared_tab ";
+like($output_file, qr/$abort_str/, "ABORT found in server log");
+
+# Check that commit prepared is decoded properly on immediate restart
+$node_logical->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab VALUES (11);
+    INSERT INTO tab VALUES (12);
+    ALTER TABLE tab ADD COLUMN b INT;
+    INSERT INTO tab VALUES (13, 11);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+# consume changes
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');");
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');");
+$node_logical->stop('immediate');
+$node_logical->start;
+
+# commit post the restart
+$node_logical->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');");
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');");
+
+# check inserts are visible
+my $result = $node_logical->safe_psql('postgres', "SELECT count(*) FROM tab where a IN (11,12) OR b IN (11);");
+is($result, qq(3), 'Rows inserted via 2PC are visible on restart');
+
+$node_logical->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot');");
+$node_logical->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2');");
+$node_logical->stop('fast');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 140010a8b1..7762a290f9 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -37,6 +37,7 @@ typedef struct
 	bool		xact_wrote_changes;
 	bool		only_local;
 	bool		enable_twophase;
+	bool		decode_delay; /* seconds to sleep after every change record */
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -123,6 +124,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->skip_empty_xacts = false;
 	data->only_local = false;
 	data->enable_twophase = false;
+	data->decode_delay = 0;
 
 	ctx->output_plugin_private = data;
 
@@ -214,6 +216,21 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "decode-delay") == 0)
+		{
+			if (elem->arg == NULL)
+				data->decode_delay = 2; /* default to 2 seconds */
+			else
+				data->decode_delay = pg_atoi(strVal(elem->arg),
+											 sizeof(int), 0);
+
+			if (data->decode_delay <= 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("Specify positive value for parameter \"%s\","
+								" you specified \"%s\"",
+								elem->defname, strVal(elem->arg))));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -553,6 +570,13 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	}
 	data->xact_wrote_changes = true;
 
+	/* if decode_delay is specified */
+	if (data->decode_delay > 0)
+	{
+		elog(LOG, "sleeping for %d seconds", data->decode_delay);
+		pg_usleep(data->decode_delay * 1000000L);
+	}
+
 	class_form = RelationGetForm(relation);
 	tupdesc = RelationGetDescr(relation);
 
-- 
2.15.2 (Apple Git-101.1)

