From 9a18a13eb20ba383f09208789a4ca2802284c773 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Tue, 20 Jul 2021 12:36:09 +1000
Subject: [PATCH v98] Add prepare API support for streaming transactions.

* Permits the combination of "streaming" and "two_phase" subscription options.

* Adds the prepare API for streaming transactions which will apply the changes
accumulated in the spool-file at prepare time.

* Adds new subscription TAP tests, and new subscription.sql regression tests.

* Updates PG documentation.
---
 doc/src/sgml/logicaldecoding.sgml                  |  11 +-
 doc/src/sgml/protocol.sgml                         |  68 +++-
 doc/src/sgml/ref/create_subscription.sgml          |  10 -
 src/backend/commands/subscriptioncmds.c            |  25 --
 src/backend/replication/logical/proto.c            |  60 +++
 src/backend/replication/logical/worker.c           | 138 ++++++-
 src/backend/replication/pgoutput/pgoutput.c        |  33 +-
 src/include/replication/logicalproto.h             |  10 +-
 src/test/regress/expected/subscription.out         |  24 +-
 src/test/regress/sql/subscription.sql              |  12 +-
 src/test/subscription/t/023_twophase_stream.pl     | 453 +++++++++++++++++++++
 .../subscription/t/024_twophase_cascade_stream.pl  | 271 ++++++++++++
 12 files changed, 1032 insertions(+), 83 deletions(-)
 create mode 100644 src/test/subscription/t/023_twophase_stream.pl
 create mode 100644 src/test/subscription/t/024_twophase_cascade_stream.pl

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 002efc8..f6832c2 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -1199,6 +1199,9 @@ OutputPluginWrite(ctx, true);
     <function>stream_abort_cb</function>, <function>stream_commit_cb</function>
     and <function>stream_change_cb</function>) and two optional callbacks
     (<function>stream_message_cb</function> and <function>stream_truncate_cb</function>).
+    Also, if streaming of two-phase commands is to be supported, then additional
+    callbacks must be provided. (See <xref linkend="logicaldecoding-two-phase-commits"/>
+    for details).
    </para>
 
    <para>
@@ -1237,7 +1240,13 @@ stream_start_cb(...);   &lt;-- start of second block of changes
   stream_change_cb(...);
 stream_stop_cb(...);    &lt;-- end of second block of changes
 
-stream_commit_cb(...);  &lt;-- commit of the streamed transaction
+
+[a. when using normal commit]
+stream_commit_cb(...);    &lt;-- commit of the streamed transaction
+
+[b. when using two-phase commit]
+stream_prepare_cb(...);   &lt;-- prepare the streamed transaction
+commit_prepared_cb(...);  &lt;-- commit of the prepared transaction
 </programlisting>
    </para>
 
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index e8cb78f..221589a 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2881,7 +2881,7 @@ The commands accepted in replication mode are:
    Begin Prepare and Prepare messages belong to the same transaction.
    It also sends changes of large in-progress transactions between a pair of
    Stream Start and Stream Stop messages. The last stream of such a transaction 
-   contains a Stream Commit or Stream Abort message.
+   contains a Stream Prepare, Stream Commit or Stream Abort message.
   </para>
 
   <para>
@@ -7398,7 +7398,7 @@ Stream Abort
 </variablelist>
 
 <para>
-The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared)
+The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared, Stream Prepare)
 are available since protocol version 3.
 </para>
 
@@ -7661,6 +7661,70 @@ are available since protocol version 3.
 </listitem>
 </varlistentry>
 
+<varlistentry>
+
+<term>Stream Prepare</term>
+<listitem>
+<para>
+
+<variablelist>
+
+<varlistentry>
+<term>Byte1('p')</term>
+<listitem><para>
+                Identifies the message as a two-phase prepare for a large in-progress transaction message.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int8</term>
+<listitem><para>
+                Flags; currently unused (must be 0).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                The LSN of the prepare.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                The end LSN of the prepare transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
+                Prepare timestamp of the transaction. The value is in number
+                of microseconds since PostgreSQL epoch (2000-01-01).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int32</term>
+<listitem><para>
+                Xid of the transaction.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>String</term>
+<listitem><para>
+                The user defined GID of the two-phase transaction.
+</para></listitem>
+</varlistentry>
+
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
 </variablelist>
 
 <para>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 1433905..702934e 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -238,11 +238,6 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           subscriber as a whole.
          </para>
 
-         <para>
-          The <literal>streaming</literal> option cannot be used with the
-          <literal>two_phase</literal> option.
-         </para>
-
         </listitem>
        </varlistentry>
        <varlistentry>
@@ -269,11 +264,6 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           to know the actual two-phase state.
          </para>
 
-         <para>
-          The <literal>two_phase</literal> option cannot be used with the
-          <literal>streaming</literal> option.
-         </para>
-
         </listitem>
        </varlistentry>
       </variablelist></para>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 239d263..5cf64b6 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -333,25 +333,6 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 					 errmsg("subscription with %s must also set %s",
 							"slot_name = NONE", "create_slot = false")));
 	}
-
-	/*
-	 * Do additional checking for the disallowed combination of two_phase and
-	 * streaming. While streaming and two_phase can theoretically be
-	 * supported, it needs more analysis to allow them together.
-	 */
-	if (opts->twophase &&
-		IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
-		IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
-	{
-		if (opts->streaming &&
-			IsSet(supported_opts, SUBOPT_STREAMING) &&
-			IsSet(opts->specified_opts, SUBOPT_STREAMING))
-			ereport(ERROR,
-					(errcode(ERRCODE_SYNTAX_ERROR),
-			/*- translator: both %s are strings of the form "option = value" */
-					 errmsg("%s and %s are mutually exclusive options",
-							"two_phase = true", "streaming = true")));
-	}
 }
 
 /*
@@ -924,12 +905,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
 				{
-					if ((sub->twophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED) && opts.streaming)
-						ereport(ERROR,
-								(errcode(ERRCODE_SYNTAX_ERROR),
-								 errmsg("cannot set %s for two-phase enabled subscription",
-										"streaming = true")));
-
 					values[Anum_pg_subscription_substream - 1] =
 						BoolGetDatum(opts.streaming);
 					replaces[Anum_pg_subscription_substream - 1] = true;
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index a245252..00990ca 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -318,6 +318,66 @@ logicalrep_read_rollback_prepared(StringInfo in,
 }
 
 /*
+ * Write STREAM PREPARE to the output stream.
+ */
+void
+logicalrep_write_stream_prepare(StringInfo out,
+								ReorderBufferTXN *txn,
+								XLogRecPtr prepare_lsn)
+{
+	uint8		flags = 0;
+
+	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_PREPARE);
+
+	/*
+	 * This should only ever happen for two-phase commit transactions, in
+	 * which case we expect to have a valid GID.
+	 */
+	Assert(txn->gid != NULL);
+
+	Assert(rbtxn_prepared(txn));
+	Assert(TransactionIdIsValid(txn->xid));
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* send fields */
+	pq_sendint64(out, prepare_lsn);
+	pq_sendint64(out, txn->end_lsn);
+	pq_sendint64(out, txn->xact_time.prepare_time);
+	pq_sendint32(out, txn->xid);
+
+	/* send gid */
+	pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read STREAM PREPARE from the output stream.
+ */
+TransactionId
+logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
+{
+	uint8		flags;
+
+	/* read flags */
+	flags = pq_getmsgbyte(in);
+
+	if (flags != 0)
+		elog(ERROR, "unrecognized flags %u in stream prepare message", flags);
+
+	/* read fields */
+	prepare_data->prepare_lsn = pq_getmsgint64(in);
+	prepare_data->end_lsn = pq_getmsgint64(in);
+	prepare_data->prepare_time = pq_getmsgint64(in);
+	prepare_data->xid = pq_getmsgint(in, 4);
+
+	/* read gid (copy it into a pre-allocated buffer) */
+	strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
+
+	return prepare_data->xid;
+}
+
+/*
  * Write ORIGIN to the output stream.
  */
 void
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b9a7a7f..ac505e1 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -333,6 +333,8 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
 /* Compute GID for two_phase transactions */
 static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
 
+/* Common streaming function to apply all the spooled messages */
+static int apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
 
 /*
  * Should this worker apply changes for given relation.
@@ -1041,6 +1043,90 @@ apply_handle_rollback_prepared(StringInfo s)
 }
 
 /*
+ * Handle STREAM PREPARE.
+ *
+ * Logic is in two parts:
+ * 1. Replay all the spooled operations
+ * 2. Mark the transaction as prepared
+ */
+static void
+apply_handle_stream_prepare(StringInfo s)
+{
+	int			nchanges = 0;
+	LogicalRepPreparedTxnData prepare_data;
+	TransactionId xid;
+	char		gid[GIDSIZE];
+
+	if (in_streamed_transaction)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg_internal("STREAM PREPARE message without STREAM STOP")));
+
+	/* Tablesync should never receive prepare. */
+	if (am_tablesync_worker())
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg_internal("tablesync worker received a STREAM PREPARE message")));
+
+	xid = logicalrep_read_stream_prepare(s, &prepare_data);
+	elog(DEBUG1, "received prepare for streamed transaction %u", xid);
+
+	/*
+	 * Compute unique GID for two_phase transactions. We don't use GID of
+	 * prepared transaction sent by server as that can lead to deadlock when
+	 * we have multiple subscriptions from same node point to publications on
+	 * the same node. See comments atop worker.c
+	 */
+	TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
+						   gid, sizeof(gid));
+
+	/*
+	 * 1. Replay all the spooled operations - Similar code as for
+	 * apply_handle_stream_commit (i.e. non two-phase stream commit)
+	 */
+
+	nchanges = apply_spooled_messages(xid, prepare_data.prepare_lsn);
+
+	/*
+	 * 2. Mark the transaction as prepared. - Similar code as for
+	 * apply_handle_prepare (i.e. two-phase non-streamed prepare)
+	 */
+
+	/*
+	 * BeginTransactionBlock is necessary to balance the EndTransactionBlock
+	 * called within the PrepareTransactionBlock below.
+	 */
+	BeginTransactionBlock();
+	CommitTransactionCommand(); /* Completes the preceding Begin command. */
+
+	/*
+	 * Update origin state so we can restart streaming from correct position
+	 * in case of crash.
+	 */
+	replorigin_session_origin_lsn = prepare_data.end_lsn;
+	replorigin_session_origin_timestamp = prepare_data.prepare_time;
+
+	PrepareTransactionBlock(gid);
+	CommitTransactionCommand();
+
+	pgstat_report_stat(false);
+
+	store_flush_position(prepare_data.end_lsn);
+
+	elog(DEBUG1, "apply_handle_stream_prepare: replayed %d (all) changes", nchanges);
+
+	in_remote_transaction = false;
+
+	/* unlink the files with serialized changes and subxact info */
+	stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(prepare_data.end_lsn);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
  * Handle ORIGIN message.
  *
  * TODO, support tracking of multiple origins
@@ -1256,30 +1342,20 @@ apply_handle_stream_abort(StringInfo s)
 }
 
 /*
- * Handle STREAM COMMIT message.
+ * Common spoolfile processing.
+ * Returns how many changes were applied.
  */
-static void
-apply_handle_stream_commit(StringInfo s)
+static int
+apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
 {
-	TransactionId xid;
 	StringInfoData s2;
 	int			nchanges;
 	char		path[MAXPGPATH];
 	char	   *buffer = NULL;
-	LogicalRepCommitData commit_data;
 	StreamXidHash *ent;
 	MemoryContext oldcxt;
 	BufFile    *fd;
 
-	if (in_streamed_transaction)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROTOCOL_VIOLATION),
-				 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
-
-	xid = logicalrep_read_stream_commit(s, &commit_data);
-
-	elog(DEBUG1, "received commit for streamed transaction %u", xid);
-
 	/* Make sure we have an open transaction */
 	begin_replication_step();
 
@@ -1290,7 +1366,7 @@ apply_handle_stream_commit(StringInfo s)
 	 */
 	oldcxt = MemoryContextSwitchTo(TopTransactionContext);
 
-	/* open the spool file for the committed transaction */
+	/* Open the spool file for the committed/prepared transaction */
 	changes_filename(path, MyLogicalRepWorker->subid, xid);
 	elog(DEBUG1, "replaying changes from file \"%s\"", path);
 
@@ -1311,7 +1387,7 @@ apply_handle_stream_commit(StringInfo s)
 
 	MemoryContextSwitchTo(oldcxt);
 
-	remote_final_lsn = commit_data.commit_lsn;
+	remote_final_lsn = lsn;
 
 	/*
 	 * Make sure the handle apply_dispatch methods are aware we're in a remote
@@ -1390,6 +1466,32 @@ apply_handle_stream_commit(StringInfo s)
 	elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
 		 nchanges, path);
 
+	return nchanges;
+}
+
+/*
+ * Handle STREAM COMMIT message.
+ */
+static void
+apply_handle_stream_commit(StringInfo s)
+{
+	TransactionId xid;
+	LogicalRepCommitData commit_data;
+	int			nchanges = 0;
+
+	if (in_streamed_transaction)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg_internal("STREAM COMMIT message without STREAM STOP")));
+
+	xid = logicalrep_read_stream_commit(s, &commit_data);
+
+	elog(DEBUG1, "received commit for streamed transaction %u", xid);
+
+	nchanges = apply_spooled_messages(xid, commit_data.commit_lsn);
+
+	elog(DEBUG1, "apply_handle_stream_commit: replayed %d (all) changes.", nchanges);
+
 	apply_handle_commit_internal(s, &commit_data);
 
 	/* unlink the files with serialized changes and subxact info */
@@ -2333,6 +2435,10 @@ apply_dispatch(StringInfo s)
 		case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
 			apply_handle_rollback_prepared(s);
 			return;
+
+		case LOGICAL_REP_MSG_STREAM_PREPARE:
+			apply_handle_stream_prepare(s);
+			return;
 	}
 
 	ereport(ERROR,
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index e4314af..286119c 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -71,6 +71,8 @@ static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
 static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 								   ReorderBufferTXN *txn,
 								   XLogRecPtr commit_lsn);
+static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+										ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
 
 static bool publications_valid;
 static bool in_streaming;
@@ -175,7 +177,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_message_cb = pgoutput_message;
 	cb->stream_truncate_cb = pgoutput_truncate;
 	/* transaction streaming - two-phase commit */
-	cb->stream_prepare_cb = NULL;
+	cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
 }
 
 static void
@@ -280,17 +282,6 @@ parse_output_parameters(List *options, PGOutputData *data)
 		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
-
-		/*
-		 * Do additional checking for the disallowed combination of two_phase
-		 * and streaming. While streaming and two_phase can theoretically be
-		 * supported, it needs more analysis to allow them together.
-		 */
-		if (data->two_phase && data->streaming)
-			ereport(ERROR,
-					(errcode(ERRCODE_SYNTAX_ERROR),
-					 errmsg("%s and %s are mutually exclusive options",
-							"two_phase", "streaming")));
 	}
 }
 
@@ -1030,6 +1021,24 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 }
 
 /*
+ * PREPARE callback (for streaming two-phase commit).
+ *
+ * Notify the downstream to prepare the transaction.
+ */
+static void
+pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+							ReorderBufferTXN *txn,
+							XLogRecPtr prepare_lsn)
+{
+	Assert(rbtxn_is_streamed(txn));
+
+	OutputPluginUpdateProgress(ctx);
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
+	OutputPluginWrite(ctx, true);
+}
+
+/*
  * Initialize the relation schema sync cache for a decoding session.
  *
  * The hash table is destroyed at the end of a decoding session. While
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 63de90d..d193b41 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -67,7 +67,8 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_STREAM_START = 'S',
 	LOGICAL_REP_MSG_STREAM_END = 'E',
 	LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
-	LOGICAL_REP_MSG_STREAM_ABORT = 'A'
+	LOGICAL_REP_MSG_STREAM_ABORT = 'A',
+	LOGICAL_REP_MSG_STREAM_PREPARE = 'p'
 } LogicalRepMsgType;
 
 /*
@@ -124,6 +125,7 @@ typedef struct LogicalRepBeginData
 	TransactionId xid;
 } LogicalRepBeginData;
 
+/* Commit (and abort) information */
 typedef struct LogicalRepCommitData
 {
 	XLogRecPtr	commit_lsn;
@@ -243,4 +245,10 @@ extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
 extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
 										 TransactionId *subxid);
 
+extern void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn,
+											XLogRecPtr prepare_lsn);
+extern TransactionId logicalrep_read_stream_prepare(StringInfo in,
+													LogicalRepPreparedTxnData *prepare_data);
+
+
 #endif							/* LOGICAL_PROTO_H */
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index ad6b4e4..34ebca4 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -279,27 +279,29 @@ WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ..
 --fail - alter of two_phase option not supported.
 ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
 ERROR:  unrecognized subscription parameter: "two_phase"
---fail - cannot set streaming when two_phase enabled
+-- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
-ERROR:  cannot set streaming = true for two-phase enabled subscription
-ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
                                                                      List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
 -----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | off                | dbname=regress_doesnotexist
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
 (1 row)
 
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
--- fail - two_phase and streaming are mutually exclusive.
-CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true);
-ERROR:  two_phase = true and streaming = true are mutually exclusive options
+-- two_phase and streaming are compatible.
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                            List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo 
-------+-------+---------+-------------+--------+-----------+------------------+--------------------+----------
-(0 rows)
+                                                                     List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
+(1 row)
 
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index b732871..e304852 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -209,23 +209,25 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 
 \dRs+
+
 --fail - alter of two_phase option not supported.
 ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
 
---fail - cannot set streaming when two_phase enabled
+-- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 
-ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
-
 \dRs+
 
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
--- fail - two_phase and streaming are mutually exclusive.
-CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (streaming = true, two_phase = true);
+-- two_phase and streaming are compatible.
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
 
 \dRs+
 
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
 
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl
new file mode 100644
index 0000000..c90e3f6
--- /dev/null
+++ b/src/test/subscription/t/023_twophase_stream.pl
@@ -0,0 +1,453 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# logical replication of 2PC test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 27;
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', qq(max_prepared_transactions = 10));
+$node_publisher->append_conf('postgresql.conf', qq(logical_decoding_work_mem = 64kB));
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->start;
+
+# Create some pre-existing content on publisher (uses same DDL as 015_stream test)
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+
+# Setup logical replication (streaming = on)
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres', "
+	CREATE SUBSCRIPTION tap_sub
+	CONNECTION '$publisher_connstr application_name=$appname'
+	PUBLICATION tap_pub
+	WITH (streaming = on, two_phase = on)");
+
+# Wait for subscriber to finish initialization
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+	"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Also wait for two-phase to be enabled
+my $twophase_query =
+	"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
+$node_subscriber->poll_query_until('postgres', $twophase_query)
+  or die "Timed out while waiting for subscriber to enable twophase";
+
+###############################
+# Check initial data was copied to subscriber
+###############################
+my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber');
+
+###############################
+# Test 2PC PREPARE / COMMIT PREPARED
+# 1. Data is streamed as a 2PC transaction.
+# 2. Then do commit prepared.
+#
+# Expect all data is replicated on subscriber side after the commit.
+###############################
+
+# check that 2PC gets replicated to subscriber
+# Insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# 2PC transaction gets committed
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# Test 2PC PREPARE / ROLLBACK PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC
+# 3. Do rollback prepared.
+#
+# Expect data rolls back leaving only the original 2 rows.
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres',  "DELETE FROM test_tab WHERE a > 2;");
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# 2PC transaction gets aborted
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is aborted on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# Check that 2PC ROLLBACK PREPARED is decoded properly on crash restart.
+# 1. insert, update and delete enough rows to exceed the 64kB limit.
+# 2. Then server crashes before the 2PC transaction is rolled back.
+# 3. After servers are restarted the pending transaction is rolled back.
+#
+# Expect all inserted data is gone.
+# (Note: both publisher and subscriber crash/restart)
+###############################
+
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+
+$node_publisher->start;
+$node_subscriber->start;
+
+# rollback post the restart
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';");
+$node_publisher->wait_for_catchup($appname);
+
+# check inserts are rolled back
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows');
+
+###############################
+# Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
+# 1. insert, update and delete enough rows to exceed the 64kB limit.
+# 2. Then server crashes before the 2PC transaction is committed.
+# 3. After servers are restarted the pending transaction is committed.
+#
+# Expect all data is replicated on subscriber side after the commit.
+# (Note: both publisher and subscriber crash/restart)
+###############################
+
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+
+$node_publisher->start;
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->wait_for_catchup($appname);
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults');
+
+###############################
+# Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
+# 1. insert, update and delete enough rows to exceed the 64kB limit.
+# 2. Then 1 server crashes before the 2PC transaction is committed.
+# 3. After servers are restarted the pending transaction is committed.
+#
+# Expect all data is replicated on subscriber side after the commit.
+# (Note: only subscriber crashes)
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# insert, update, delete enough data to cause streaming
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_subscriber->stop('immediate');
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->wait_for_catchup($appname);
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber');
+
+###############################
+# Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
+# 1. insert, update and delete enough rows to exceed the 64kB limit.
+# 2. Then 1 server crashes before the 2PC transaction is committed.
+# 3. After servers are restarted the pending transaction is committed.
+#
+# Expect all data is replicated on subscriber side after the commit.
+# (Note: only publisher crashes)
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# insert, update, delete enough data to cause streaming
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->stop('immediate');
+$node_publisher->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->wait_for_catchup($appname);
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber');
+
+###############################
+# Do INSERT after the PREPARE but before ROLLBACK PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC.
+# 3. A single row INSERT is done which is after the PREPARE
+# 4. Then do a ROLLBACK PREPARED.
+#
+# Expect the 2PC data rolls back leaving only 3 rows on the subscriber.
+# (the original 2 + inserted 1)
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# Insert a different record (now we are outside of the 2PC transaction)
+# Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key
+$node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (99999, 'foobar')");
+
+# 2PC transaction gets aborted
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is aborted on subscriber,
+# but the extra INSERT outside of the 2PC still was replicated
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3|3|3), 'check the outside insert was copied to subscriber');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# Do INSERT after the PREPARE but before COMMIT PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC.
+# 3. A single row INSERT is done which is after the PREPARE.
+# 4. Then do a COMMIT PREPARED.
+#
+# Expect 2PC data + the extra row are on the subscriber.
+# (the 3334 + inserted 1 = 3335)
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# Insert a different record (now we are outside of the 2PC transaction)
+# Note: the 2PC transaction still holds row locks so make sure this insert is for a separare primary key
+$node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (99999, 'foobar')");
+
+# 2PC transaction gets committed
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3335|3335|3335), 'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# Do DELETE after PREPARE but before COMMIT PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC.
+# 3. A single row DELETE is done for one of the records that was inserted by the 2PC transaction
+# 4. Then there is a COMMIT PREPARED.
+#
+# Expect all the 2PC data rows on the subscriber (since in fact delete at step 3 would do nothing
+# because that record was not yet committed at the time of the delete).
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# DELETE one of the prepared 2PC records before they get committed (we are outside of the 2PC transaction)
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a = 5");
+
+# 2PC transaction gets committed
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber. Nothing was deleted');
+
+# confirm the "deleted" row was in fact not deleted
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab WHERE a = 5");
+is($result, qq(1), 'The row we deleted before the commit till exists on subscriber.');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# Try 2PC transaction works using an empty GID literal
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION '';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# 2PC transaction gets committed
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED '';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber');
+
+###############################
+# check all the cleanup
+###############################
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0), 'check subscription relation status was dropped on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
diff --git a/src/test/subscription/t/024_twophase_cascade_stream.pl b/src/test/subscription/t/024_twophase_cascade_stream.pl
new file mode 100644
index 0000000..3a0be82
--- /dev/null
+++ b/src/test/subscription/t/024_twophase_cascade_stream.pl
@@ -0,0 +1,271 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test cascading logical replication of 2PC.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 31;
+
+###############################
+# Setup a cascade of pub/sub nodes.
+# node_A -> node_B -> node_C
+###############################
+
+# Initialize nodes
+# node_A
+my $node_A = get_new_node('node_A');
+$node_A->init(allows_streaming => 'logical');
+$node_A->append_conf('postgresql.conf', qq(max_prepared_transactions = 10));
+$node_A->append_conf('postgresql.conf', qq(logical_decoding_work_mem = 64kB));
+$node_A->start;
+# node_B
+my $node_B = get_new_node('node_B');
+$node_B->init(allows_streaming => 'logical');
+$node_B->append_conf('postgresql.conf', qq(max_prepared_transactions = 10));
+$node_B->append_conf('postgresql.conf', qq(logical_decoding_work_mem = 64kB));
+$node_B->start;
+# node_C
+my $node_C = get_new_node('node_C');
+$node_C->init(allows_streaming => 'logical');
+$node_C->append_conf('postgresql.conf', qq(max_prepared_transactions = 10));
+$node_C->append_conf('postgresql.conf', qq(logical_decoding_work_mem = 64kB));
+$node_C->start;
+
+# Create some pre-existing content on node_A (uses same DDL as 015_stream.pl)
+$node_A->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_A->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Create the same tables on node_B amd node_C
+# columns a and b are compatible with same table name on node_A
+$node_B->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+$node_C->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+
+# Setup logical replication (streaming = on)
+
+# node_A (pub) -> node_B (sub)
+my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
+$node_A->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_A FOR TABLE test_tab");
+my $appname_B = 'tap_sub_B';
+$node_B->safe_psql('postgres',	"
+	CREATE SUBSCRIPTION tap_sub_B
+	CONNECTION '$node_A_connstr application_name=$appname_B'
+	PUBLICATION tap_pub_A
+	WITH (streaming = on, two_phase = on)");
+
+# node_B (pub) -> node_C (sub)
+my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
+$node_B->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_B FOR TABLE test_tab");
+my $appname_C = 'tap_sub_C';
+$node_C->safe_psql('postgres',	"
+	CREATE SUBSCRIPTION tap_sub_C
+	CONNECTION '$node_B_connstr application_name=$appname_C'
+	PUBLICATION tap_pub_B
+	WITH (streaming = on, two_phase = on)");
+
+# Wait for subscribers to finish initialization
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# Also wait for two-phase to be enabled
+my $twophase_query = "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
+$node_B->poll_query_until('postgres', $twophase_query)
+	or die "Timed out while waiting for subscriber to enable twophase";
+$node_C->poll_query_until('postgres', $twophase_query)
+	or die "Timed out while waiting for subscriber to enable twophase";
+
+is(1,1, "Cascade setup is complete");
+
+my $result;
+
+###############################
+# Check initial data was copied to subscriber(s)
+###############################
+$result = $node_B->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber C');
+
+###############################
+# Test 2PC PREPARE / COMMIT PREPARED
+# 1. Data is streamed as a 2PC transaction.
+# 2. Then do commit prepared.
+# Expect all data is replicated on subscriber(s) after the commit.
+###############################
+
+# Insert, update and delete enough rows to exceed the 64kB limit.
+# Then 2PC PREPARE
+$node_A->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check that transaction was committed on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber B, and extra columns have local defaults');
+$result = $node_C->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber C, and extra columns have local defaults');
+
+# check the transaction state is ended on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber C');
+
+###############################
+# Test 2PC PREPARE / ROLLBACK PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC
+# 3. Do rollback prepared.
+# Expect data rolls back leaving only the original 2 rows.
+###############################
+
+# First, delete the data except for 2 rows (delete will be replicated)
+$node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# Insert, update and delete enough rows to exceed the 64kB limit.
+# The 2PC PREPARE
+$node_A->safe_psql('postgres', q{
+	BEGIN;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC ROLLBACK
+$node_A->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check that transaction is aborted on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'Row inserted by 2PC is not present. Only initial data remains on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'Row inserted by 2PC is not present. Only initial data remains on subscriber C');
+
+# check the transaction state is ended on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber C');
+
+###############################
+# Test 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT.
+# 0. There are 2 rows only in the table (from previous test)
+# 1. Insert one more row
+# 2. Record a SAVEPOINT
+# 3. Data is streamed using 2PC
+# 4. Do rollback to SAVEPOINT prior to the streamed inserts
+# 5. Then COMMIT PREPARED
+# Expect data after the SAVEPOINT is aborted leaving only 3 rows (= 2 original + 1 from step 1)
+###############################
+
+# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
+$node_A->safe_psql('postgres', "
+	BEGIN;
+	INSERT INTO test_tab VALUES (9999, 'foobar');
+	SAVEPOINT sp_inner;
+	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+	DELETE FROM test_tab WHERE mod(a,3) = 0;
+	ROLLBACK TO SAVEPOINT sp_inner;
+	PREPARE TRANSACTION 'outer';
+	");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state prepared on subscriber(s)
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is ended on subscriber
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber C');
+
+# check inserts are visible at subscriber(s).
+# All the streamed data (prior to the SAVEPOINT) should be rolled back.
+# (3, 'foobar') should be committed.
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab where b = 'foobar';");
+is($result, qq(1), 'Rows committed are present on subscriber B');
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab;");
+is($result, qq(3), 'Rows rolled back are not present on subscriber B');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab where b = 'foobar';");
+is($result, qq(1), 'Rows committed are present on subscriber C');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab;");
+is($result, qq(3), 'Rows rolled back are not present on subscriber C');
+
+###############################
+# check all the cleanup
+###############################
+
+# cleanup the node_B => node_C pub/sub
+$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C");
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber node C');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0), 'check subscription relation status was dropped on subscriber node C');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber node C');
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher node B');
+
+# cleanup the node_A => node_B pub/sub
+$node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B");
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber node B');
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0), 'check subscription relation status was dropped on subscriber node B');
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber node B');
+$result = $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher node A');
+
+# shutdown
+$node_C->stop('fast');
+$node_B->stop('fast');
+$node_A->stop('fast');
-- 
1.8.3.1

