From fbcc9bbc36d002a9e20edd76a5eac0eca0434ed0 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Tue, 22 Sep 2020 07:12:45 -0400
Subject: [PATCH v5] pgoutput output plugin support for logical decoding of 2pc

---
 src/backend/access/transam/twophase.c       |  31 ++++++
 src/backend/replication/logical/proto.c     |  90 ++++++++++++++-
 src/backend/replication/logical/worker.c    | 147 ++++++++++++++++++++++++-
 src/backend/replication/pgoutput/pgoutput.c |  54 ++++++++-
 src/include/access/twophase.h               |   1 +
 src/include/replication/logicalproto.h      |  37 ++++++-
 src/test/subscription/t/020_twophase.pl     | 163 ++++++++++++++++++++++++++++
 7 files changed, 514 insertions(+), 9 deletions(-)
 create mode 100644 src/test/subscription/t/020_twophase.pl

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index ef4f998..bed87d5 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -548,6 +548,37 @@ MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
 }
 
 /*
+ * LookupGXact
+ *		Check if the prepared transaction with the given GID is	around
+ */
+bool
+LookupGXact(const char *gid)
+{
+	int			i;
+
+	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+
+	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+	{
+		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+		/* Ignore not-yet-valid GIDs */
+		if (!gxact->valid)
+			continue;
+		if (strcmp(gxact->gid, gid) != 0)
+			continue;
+
+		LWLockRelease(TwoPhaseStateLock);
+
+		return true;
+	}
+
+	LWLockRelease(TwoPhaseStateLock);
+
+	return false;
+}
+
+/*
  * LockGXact
  *		Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
  */
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index eb19142..291ed10 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -72,12 +72,17 @@ logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
  */
 void
 logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
-						XLogRecPtr commit_lsn)
+						XLogRecPtr commit_lsn, bool is_commit)
 {
 	uint8		flags = 0;
 
 	pq_sendbyte(out, 'C');		/* sending COMMIT */
 
+	if (is_commit)
+		flags |= LOGICALREP_IS_COMMIT;
+	else
+		flags |= LOGICALREP_IS_ABORT;
+
 	/* send the flags field (unused for now) */
 	pq_sendbyte(out, flags);
 
@@ -88,16 +93,20 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
 }
 
 /*
- * Read transaction COMMIT from the stream.
+ * Read transaction COMMIT|ABORT from the stream.
  */
 void
 logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
 {
-	/* read flags (unused for now) */
+	/* read flags */
 	uint8		flags = pq_getmsgbyte(in);
 
-	if (flags != 0)
-		elog(ERROR, "unrecognized flags %u in commit message", flags);
+	if (!CommitFlagsAreValid(flags))
+		elog(ERROR, "unrecognized flags %u in commit|abort message",
+			 flags);
+
+	/* the flag is either commit or abort */
+	commit_data->is_commit = (flags == LOGICALREP_IS_COMMIT);
 
 	/* read fields */
 	commit_data->commit_lsn = pq_getmsgint64(in);
@@ -106,6 +115,77 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
 }
 
 /*
+ * Write PREPARE to the output stream.
+ */
+void
+logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+						 XLogRecPtr prepare_lsn)
+{
+	uint8		flags = 0;
+
+	pq_sendbyte(out, 'P');		/* sending PREPARE protocol */
+
+	/*
+	 * This should only ever happen for 2PC transactions. In which case we
+	 * expect to have a non-empty GID.
+	 */
+	Assert(rbtxn_prepared(txn));
+	Assert(strlen(txn->gid) > 0);
+
+	/*
+	 * Flags are determined from the state of the transaction. We know we
+	 * always get PREPARE first and then [COMMIT|ROLLBACK] PREPARED, so if
+	 * it's already marked as committed then it has to be COMMIT PREPARED (and
+	 * likewise for abort / ROLLBACK PREPARED).
+	 */
+	if (rbtxn_commit_prepared(txn))
+		flags |= LOGICALREP_IS_COMMIT_PREPARED;
+	else if (rbtxn_rollback_prepared(txn))
+		flags |= LOGICALREP_IS_ROLLBACK_PREPARED;
+	else
+		flags |= LOGICALREP_IS_PREPARE;
+
+	/* Make sure exactly one of the expected flags is set. */
+	if (!PrepareFlagsAreValid(flags))
+		elog(ERROR, "unrecognized flags %u in prepare message", flags);
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* send fields */
+	pq_sendint64(out, prepare_lsn);
+	pq_sendint64(out, txn->end_lsn);
+	pq_sendint64(out, txn->commit_time);
+
+	/* send gid */
+	pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction PREPARE from the stream.
+ */
+void
+logicalrep_read_prepare(StringInfo in, LogicalRepPrepareData * prepare_data)
+{
+	/* read flags */
+	uint8		flags = pq_getmsgbyte(in);
+
+	if (!PrepareFlagsAreValid(flags))
+		elog(ERROR, "unrecognized flags %u in prepare message", flags);
+
+	/* set the action (reuse the constants used for the flags) */
+	prepare_data->prepare_type = flags;
+
+	/* read fields */
+	prepare_data->prepare_lsn = pq_getmsgint64(in);
+	prepare_data->end_lsn = pq_getmsgint64(in);
+	prepare_data->preparetime = pq_getmsgint64(in);
+
+	/* read gid (copy it into a pre-allocated buffer) */
+	strcpy(prepare_data->gid, pq_getmsgstring(in));
+}
+
+/*
  * Write ORIGIN to the output stream.
  */
 void
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d239d28..62c571e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -729,7 +729,11 @@ apply_handle_commit(StringInfo s)
 		replorigin_session_origin_lsn = commit_data.end_lsn;
 		replorigin_session_origin_timestamp = commit_data.committime;
 
-		CommitTransactionCommand();
+		if (commit_data.is_commit)
+			CommitTransactionCommand();
+		else
+			AbortCurrentTransaction();
+
 		pgstat_report_stat(false);
 
 		store_flush_position(commit_data.end_lsn);
@@ -749,6 +753,141 @@ apply_handle_commit(StringInfo s)
 	pgstat_report_activity(STATE_IDLE, NULL);
 }
 
+static void
+apply_handle_prepare_txn(LogicalRepPrepareData * prepare_data)
+{
+	Assert(prepare_data->prepare_lsn == remote_final_lsn);
+
+	/* The synchronization worker runs in single transaction. */
+	if (IsTransactionState() && !am_tablesync_worker())
+	{
+		/* End the earlier transaction and start a new one */
+		BeginTransactionBlock();
+		CommitTransactionCommand();
+		StartTransactionCommand();
+
+		/*
+		 * Update origin state so we can restart streaming from correct
+		 * position in case of crash.
+		 */
+		replorigin_session_origin_lsn = prepare_data->end_lsn;
+		replorigin_session_origin_timestamp = prepare_data->preparetime;
+
+		PrepareTransactionBlock(prepare_data->gid);
+		CommitTransactionCommand();
+		pgstat_report_stat(false);
+
+		store_flush_position(prepare_data->end_lsn);
+	}
+	else
+	{
+		/* Process any invalidation messages that might have accumulated. */
+		AcceptInvalidationMessages();
+		maybe_reread_subscription();
+	}
+
+	in_remote_transaction = false;
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(prepare_data->end_lsn);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+static void
+apply_handle_commit_prepared_txn(LogicalRepPrepareData * prepare_data)
+{
+	/* there is no transaction when COMMIT PREPARED is called */
+	ensure_transaction();
+
+	/*
+	 * Update origin state so we can restart streaming from correct position
+	 * in case of crash.
+	 */
+	replorigin_session_origin_lsn = prepare_data->end_lsn;
+	replorigin_session_origin_timestamp = prepare_data->preparetime;
+
+	FinishPreparedTransaction(prepare_data->gid, true);
+	CommitTransactionCommand();
+	pgstat_report_stat(false);
+
+	store_flush_position(prepare_data->end_lsn);
+	in_remote_transaction = false;
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(prepare_data->end_lsn);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+static void
+apply_handle_rollback_prepared_txn(LogicalRepPrepareData * prepare_data)
+{
+	/*
+	 * Update origin state so we can restart streaming from correct position
+	 * in case of crash.
+	 */
+	replorigin_session_origin_lsn = prepare_data->end_lsn;
+	replorigin_session_origin_timestamp = prepare_data->preparetime;
+
+	/*
+	 * During logical decoding, on the apply side, it's possible that a
+	 * prepared transaction got aborted while decoding. In that case, we stop
+	 * the decoding and abort the transaction immediately. However the
+	 * ROLLBACK prepared processing still reaches the subscriber. In that case
+	 * it's ok to have a missing gid
+	 */
+	if (LookupGXact(prepare_data->gid))
+	{
+		/* there is no transaction when ABORT/ROLLBACK PREPARED is called */
+		ensure_transaction();
+		FinishPreparedTransaction(prepare_data->gid, false);
+		CommitTransactionCommand();
+	}
+
+	pgstat_report_stat(false);
+
+	store_flush_position(prepare_data->end_lsn);
+	in_remote_transaction = false;
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(prepare_data->end_lsn);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle PREPARE message.
+ */
+static void
+apply_handle_prepare(StringInfo s)
+{
+	LogicalRepPrepareData prepare_data;
+
+	logicalrep_read_prepare(s, &prepare_data);
+
+	switch (prepare_data.prepare_type)
+	{
+		case LOGICALREP_IS_PREPARE:
+			apply_handle_prepare_txn(&prepare_data);
+			break;
+
+		case LOGICALREP_IS_COMMIT_PREPARED:
+			apply_handle_commit_prepared_txn(&prepare_data);
+			break;
+
+		case LOGICALREP_IS_ROLLBACK_PREPARED:
+			apply_handle_rollback_prepared_txn(&prepare_data);
+			break;
+
+		default:
+			ereport(ERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("unexpected type of prepare message: %d",
+							prepare_data.prepare_type)));
+	}
+}
+
 /*
  * Handle ORIGIN message.
  *
@@ -1909,10 +2048,14 @@ apply_dispatch(StringInfo s)
 		case 'B':
 			apply_handle_begin(s);
 			break;
-			/* COMMIT */
+			/* COMMIT/ABORT */
 		case 'C':
 			apply_handle_commit(s);
 			break;
+			/* PREPARE and [COMMIT|ROLLBACK] PREPARED */
+		case 'P':
+			apply_handle_prepare(s);
+			break;
 			/* INSERT */
 		case 'I':
 			apply_handle_insert(s);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index eb1f230..729b655 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -47,6 +47,12 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx,
 							  ReorderBufferChange *change);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
+static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
+					 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
+							 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+static void pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx,
+							ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
 static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
 								  ReorderBufferTXN *txn);
 static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
@@ -143,6 +149,10 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->change_cb = pgoutput_change;
 	cb->truncate_cb = pgoutput_truncate;
 	cb->commit_cb = pgoutput_commit_txn;
+
+	cb->prepare_cb = pgoutput_prepare_txn;
+	cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
+	cb->abort_prepared_cb = pgoutput_abort_prepared_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
 	cb->shutdown_cb = pgoutput_shutdown;
 
@@ -373,7 +383,49 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginUpdateProgress(ctx);
 
 	OutputPluginPrepareWrite(ctx, true);
-	logicalrep_write_commit(ctx->out, txn, commit_lsn);
+	logicalrep_write_commit(ctx->out, txn, commit_lsn, true);
+	OutputPluginWrite(ctx, true);
+}
+
+/*
+ * PREPARE callback
+ */
+static void
+pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					 XLogRecPtr prepare_lsn)
+{
+	OutputPluginUpdateProgress(ctx);
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+	OutputPluginWrite(ctx, true);
+}
+
+/*
+ * COMMIT PREPARED callback
+ */
+static void
+pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+							 XLogRecPtr prepare_lsn)
+{
+	OutputPluginUpdateProgress(ctx);
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+	OutputPluginWrite(ctx, true);
+}
+
+/*
+ * PREPARE callback
+ */
+static void
+pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+							XLogRecPtr prepare_lsn)
+{
+	OutputPluginUpdateProgress(ctx);
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
 }
 
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 2ca71c3..b2628ea 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -44,6 +44,7 @@ extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid,
 extern void StartPrepare(GlobalTransaction gxact);
 extern void EndPrepare(GlobalTransaction gxact);
 extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
+extern bool LookupGXact(const char *gid);
 
 extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
 												 int *nxids_p);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 607a728..fb07580 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -85,20 +85,55 @@ typedef struct LogicalRepBeginData
 	TransactionId xid;
 } LogicalRepBeginData;
 
+/* Commit (and abort) information */
 typedef struct LogicalRepCommitData
 {
+	bool        is_commit;
 	XLogRecPtr	commit_lsn;
 	XLogRecPtr	end_lsn;
 	TimestampTz committime;
 } LogicalRepCommitData;
 
+/* types of the commit protocol message */
+#define LOGICALREP_IS_COMMIT			0x01
+#define LOGICALREP_IS_ABORT				0x02
+
+/* commit message is COMMIT or ABORT, and there is nothing else */
+#define CommitFlagsAreValid(flags) \
+	((flags == LOGICALREP_IS_COMMIT) || (flags == LOGICALREP_IS_ABORT))
+
+/* Prepare protocol information */
+typedef struct LogicalRepPrepareData
+{
+	uint8		prepare_type;
+	XLogRecPtr	prepare_lsn;
+	XLogRecPtr	end_lsn;
+	TimestampTz preparetime;
+	char		gid[GIDSIZE];
+}			LogicalRepPrepareData;
+
+/* types of the prepare protocol message */
+#define LOGICALREP_IS_PREPARE			0x01
+#define LOGICALREP_IS_COMMIT_PREPARED	0x02
+#define LOGICALREP_IS_ROLLBACK_PREPARED	0x04
+
+/* prepare can be exactly one of PREPARE, [COMMIT|ABORT] PREPARED*/
+#define PrepareFlagsAreValid(flags) \
+	((flags == LOGICALREP_IS_PREPARE) || \
+	 (flags == LOGICALREP_IS_COMMIT_PREPARED) || \
+	 (flags == LOGICALREP_IS_ROLLBACK_PREPARED))
+
 extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn);
 extern void logicalrep_read_begin(StringInfo in,
 								  LogicalRepBeginData *begin_data);
 extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
-									XLogRecPtr commit_lsn);
+									XLogRecPtr commit_lsn, bool is_commit);
 extern void logicalrep_read_commit(StringInfo in,
 								   LogicalRepCommitData *commit_data);
+extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+						 XLogRecPtr prepare_lsn);
+extern void logicalrep_read_prepare(StringInfo in,
+						LogicalRepPrepareData * prepare_data);
 extern void logicalrep_write_origin(StringInfo out, const char *origin,
 									XLogRecPtr origin_lsn);
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
diff --git a/src/test/subscription/t/020_twophase.pl b/src/test/subscription/t/020_twophase.pl
new file mode 100644
index 0000000..c7f373d
--- /dev/null
+++ b/src/test/subscription/t/020_twophase.pl
@@ -0,0 +1,163 @@
+# logical replication of 2PC test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 12;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf(
+        'postgresql.conf', qq(
+        max_prepared_transactions = 10
+        ));
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf(
+        'postgresql.conf', qq(max_prepared_transactions = 10));
+$node_subscriber->start;
+
+# Create some pre-existing content on publisher
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_full SELECT generate_series(1,10)");
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub");
+$node_publisher->safe_psql('postgres',
+"ALTER PUBLICATION tap_pub ADD TABLE tab_full, tab_full2"
+);
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"
+);
+
+# Wait for subscriber to finish initialization
+my $caughtup_query =
+"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';";
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+# Also wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# check that 2PC gets replicated to subscriber
+$node_publisher->safe_psql('postgres',
+	"BEGIN;INSERT INTO tab_full VALUES (11);PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+my $result =
+   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+   is($result, qq(1), 'transaction is prepared on subscriber');
+
+# check that 2PC gets committed on subscriber
+$node_publisher->safe_psql('postgres',
+	"COMMIT PREPARED 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is committed on subscriber
+$result =
+   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;");
+   is($result, qq(1), 'Row inserted via 2PC has committed on subscriber');
+$result =
+   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+   is($result, qq(0), 'transaction is committed on subscriber');
+
+# check that 2PC gets replicated to subscriber
+$node_publisher->safe_psql('postgres',
+	"BEGIN;INSERT INTO tab_full VALUES (12);PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is in prepared state on subscriber
+$result =
+   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+   is($result, qq(1), 'transaction is prepared on subscriber');
+
+# check that 2PC gets aborted on subscriber
+$node_publisher->safe_psql('postgres',
+	"ROLLBACK PREPARED 'test_prepared_tab_full';");
+
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+# check that transaction is aborted on subscriber
+$result =
+   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;");
+   is($result, qq(0), 'Row inserted via 2PC is not present on subscriber');
+
+$result =
+   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts where gid = 'test_prepared_tab_full';");
+   is($result, qq(0), 'transaction is aborted on subscriber');
+
+# Check that commit prepared is decoded properly on crash restart
+$node_publisher->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab_full VALUES (12);
+    INSERT INTO tab_full VALUES (13);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+$node_publisher->start;
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->poll_query_until('postgres', $caughtup_query)
+  or die "Timed out while waiting for subscriber to catch up";
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_full where a IN (11,12);");
+is($result, qq(2), 'Rows inserted via 2PC are visible on the subscriber');
+
+# TODO add test cases involving DDL. This can be added after we add functionality
+# to replicate DDL changes to subscriber.
+
+# check all the cleanup
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0),
+	'check subscription relation status was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
1.8.3.1

