From 6f6cc3efafb8e959b27083429e32ec5165230527 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Sat, 30 Jan 2021 10:21:28 +0530
Subject: [PATCH v23] Tablesync Solution1. ==== Features:

* The tablesync slot is now permanent instead of temporary.

* The tablesync worker is now allowing multiple tx instead of single tx.

* A new state (SUBREL_STATE_FINISHEDCOPY) is persisted after a successful copy_table in tablesync's LogicalRepSyncTableStart.

* If a re-launched tablesync finds state SUBREL_STATE_FINISHEDCOPY then it will bypass the initial copy_table phase.

* Now tablesync sets up replication origin tracking in LogicalRepSyncTableStart (similar as done for the apply worker). The origin is advanced when first created.

* Cleanup of tablesync resources:
- The tablesync slot is dropped by process_syncing_tables_for_sync functions.
- The tablesync replication origin tracking is dropped by process_syncing_tables_for_apply.
- DropSubscription/AlterSubscription_refresh also drop tablesyc slots/origins

* Updates to PG docs.

* New TAP test case.

Known Issues:

* None.
---
 doc/src/sgml/catalogs.sgml                  |   1 +
 doc/src/sgml/logical-replication.sgml       |  17 +-
 doc/src/sgml/ref/drop_subscription.sgml     |   6 +-
 src/backend/access/transam/xact.c           |  11 -
 src/backend/catalog/pg_subscription.c       |   5 +
 src/backend/commands/subscriptioncmds.c     | 382 +++++++++++++++-----
 src/backend/replication/logical/launcher.c  | 147 --------
 src/backend/replication/logical/tablesync.c | 294 ++++++++++++---
 src/backend/replication/logical/worker.c    |  18 +-
 src/backend/tcop/utility.c                  |   3 +-
 src/include/catalog/pg_subscription_rel.h   |   2 +
 src/include/commands/subscriptioncmds.h     |   2 +-
 src/include/replication/logicallauncher.h   |   2 -
 src/include/replication/slot.h              |   3 +
 src/include/replication/worker_internal.h   |   3 +-
 src/test/subscription/t/004_sync.pl         |  69 +++-
 16 files changed, 641 insertions(+), 324 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 865e826fb0..920a39dfa9 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7665,6 +7665,7 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        State code:
        <literal>i</literal> = initialize,
        <literal>d</literal> = data is being copied,
+       <literal>f</literal> = finished table copy,
        <literal>s</literal> = synchronized,
        <literal>r</literal> = ready (normal replication)
       </para></entry>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index a560ad69b4..20cdd5715d 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -248,7 +248,17 @@
 
    <para>
     As mentioned earlier, each (active) subscription receives changes from a
-    replication slot on the remote (publishing) side.  Normally, the remote
+    replication slot on the remote (publishing) side.
+   </para>
+   <para>
+    Additional table synchronization slots are normally transient, created
+    internally and dropped automatically when they are no longer needed.
+    These table synchronization slots have generated names:
+    <quote><literal>pg_%u_sync_%u</literal></quote> (parameters: Subscription
+    <parameter>oid</parameter>, Table <parameter>relid</parameter>)
+   </para>
+   <para>
+    Normally, the remote
     replication slot is created automatically when the subscription is created
     using <command>CREATE SUBSCRIPTION</command> and it is dropped
     automatically when the subscription is dropped using <command>DROP
@@ -294,8 +304,9 @@
        using <command>ALTER SUBSCRIPTION</command> before attempting to drop
        the subscription.  If the remote database instance no longer exists, no
        further action is then necessary.  If, however, the remote database
-       instance is just unreachable, the replication slot should then be
-       dropped manually; otherwise it would continue to reserve WAL and might
+       instance is just unreachable, the replication slot (and any still 
+       remaining table synchronization slots) should then be
+       dropped manually; otherwise it/they would continue to reserve WAL and might
        eventually cause the disk to fill up.  Such cases should be carefully
        investigated.
       </para>
diff --git a/doc/src/sgml/ref/drop_subscription.sgml b/doc/src/sgml/ref/drop_subscription.sgml
index adbdeafb4e..aee9615546 100644
--- a/doc/src/sgml/ref/drop_subscription.sgml
+++ b/doc/src/sgml/ref/drop_subscription.sgml
@@ -79,7 +79,8 @@ DROP SUBSCRIPTION [ IF EXISTS ] <replaceable class="parameter">name</replaceable
   <para>
    When dropping a subscription that is associated with a replication slot on
    the remote host (the normal state), <command>DROP SUBSCRIPTION</command>
-   will connect to the remote host and try to drop the replication slot as
+   will connect to the remote host and try to drop the replication slot (and
+   any remaining table synchronization slots) as
    part of its operation.  This is necessary so that the resources allocated
    for the subscription on the remote host are released.  If this fails,
    either because the remote host is not reachable or because the remote
@@ -89,7 +90,8 @@ DROP SUBSCRIPTION [ IF EXISTS ] <replaceable class="parameter">name</replaceable
    executing <literal>ALTER SUBSCRIPTION ... SET (slot_name = NONE)</literal>.
    After that, <command>DROP SUBSCRIPTION</command> will no longer attempt any
    actions on a remote host.  Note that if the remote replication slot still
-   exists, it should then be dropped manually; otherwise it will continue to
+   exists, it (and any related table synchronization slots) should then be
+   dropped manually; otherwise it/they will continue to
    reserve WAL and might eventually cause the disk to fill up.  See
    also <xref linkend="logical-replication-subscription-slot"/>.
   </para>
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index a2068e3fd4..3c8b4eb362 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2432,15 +2432,6 @@ PrepareTransaction(void)
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("cannot PREPARE a transaction that has exported snapshots")));
 
-	/*
-	 * Don't allow PREPARE but for transaction that has/might kill logical
-	 * replication workers.
-	 */
-	if (XactManipulatesLogicalReplicationWorkers())
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
-
 	/* Prevent cancel/die interrupt while cleaning up */
 	HOLD_INTERRUPTS();
 
@@ -4899,7 +4890,6 @@ CommitSubTransaction(void)
 	AtEOSubXact_HashTables(true, s->nestingLevel);
 	AtEOSubXact_PgStat(true, s->nestingLevel);
 	AtSubCommit_Snapshot(s->nestingLevel);
-	AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
 
 	/*
 	 * We need to restore the upper transaction's read-only state, in case the
@@ -5059,7 +5049,6 @@ AbortSubTransaction(void)
 		AtEOSubXact_HashTables(false, s->nestingLevel);
 		AtEOSubXact_PgStat(false, s->nestingLevel);
 		AtSubAbort_Snapshot(s->nestingLevel);
-		AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
 	}
 
 	/*
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 44cb285b68..303791d580 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -337,6 +337,9 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
 	char		substate;
 	bool		isnull;
 	Datum		d;
+	Relation	rel;
+
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
 
 	/* Try finding the mapping. */
 	tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
@@ -363,6 +366,8 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
 	/* Cleanup */
 	ReleaseSysCache(tup);
 
+	table_close(rel, AccessShareLock);
+
 	return substate;
 }
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 082f7855b8..b15964e462 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
 #include "nodes/makefuncs.h"
 #include "replication/logicallauncher.h"
 #include "replication/origin.h"
+#include "replication/slot.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "replication/worker_internal.h"
@@ -566,107 +567,175 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 	Oid		   *pubrel_local_oids;
 	ListCell   *lc;
 	int			off;
+	Relation	rel;
+	bool		sub_rel_locked = false;
 
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
 
-	/* Try to connect to the publisher. */
-	wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
-	if (!wrconn)
-		ereport(ERROR,
-				(errmsg("could not connect to the publisher: %s", err)));
+	PG_TRY();
+	{
+		/* Try to connect to the publisher. */
+		wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+		if (!wrconn)
+			ereport(ERROR,
+					(errmsg("could not connect to the publisher: %s", err)));
 
-	/* Get the table list from publisher. */
-	pubrel_names = fetch_table_list(wrconn, sub->publications);
+		/* Get the table list from publisher. */
+		pubrel_names = fetch_table_list(wrconn, sub->publications);
 
-	/* We are done with the remote side, close connection. */
-	walrcv_disconnect(wrconn);
+		/* Get local table list. */
+		subrel_states = GetSubscriptionRelations(sub->oid);
 
-	/* Get local table list. */
-	subrel_states = GetSubscriptionRelations(sub->oid);
+		/*
+		 * Build qsorted array of local table oids for faster lookup. This can
+		 * potentially contain all tables in the database so speed of lookup
+		 * is important.
+		 */
+		subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+		off = 0;
+		foreach(lc, subrel_states)
+		{
+			SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
 
-	/*
-	 * Build qsorted array of local table oids for faster lookup. This can
-	 * potentially contain all tables in the database so speed of lookup is
-	 * important.
-	 */
-	subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
-	off = 0;
-	foreach(lc, subrel_states)
-	{
-		SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+			subrel_local_oids[off++] = relstate->relid;
+		}
+		qsort(subrel_local_oids, list_length(subrel_states),
+			  sizeof(Oid), oid_cmp);
+
+		/*
+		 * Walk over the remote tables and try to match them to locally known
+		 * tables. If the table is not known locally create a new state for
+		 * it.
+		 *
+		 * Also builds array of local oids of remote tables for the next step.
+		 */
+		off = 0;
+		pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+
+		foreach(lc, pubrel_names)
+		{
+			RangeVar   *rv = (RangeVar *) lfirst(lc);
+			Oid			relid;
 
-		subrel_local_oids[off++] = relstate->relid;
-	}
-	qsort(subrel_local_oids, list_length(subrel_states),
-		  sizeof(Oid), oid_cmp);
+			relid = RangeVarGetRelid(rv, AccessShareLock, false);
 
-	/*
-	 * Walk over the remote tables and try to match them to locally known
-	 * tables. If the table is not known locally create a new state for it.
-	 *
-	 * Also builds array of local oids of remote tables for the next step.
-	 */
-	off = 0;
-	pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
-
-	foreach(lc, pubrel_names)
-	{
-		RangeVar   *rv = (RangeVar *) lfirst(lc);
-		Oid			relid;
+			/* Check for supported relkind. */
+			CheckSubscriptionRelkind(get_rel_relkind(relid),
+									 rv->schemaname, rv->relname);
 
-		relid = RangeVarGetRelid(rv, AccessShareLock, false);
+			pubrel_local_oids[off++] = relid;
 
-		/* Check for supported relkind. */
-		CheckSubscriptionRelkind(get_rel_relkind(relid),
-								 rv->schemaname, rv->relname);
+			if (!bsearch(&relid, subrel_local_oids,
+						 list_length(subrel_states), sizeof(Oid), oid_cmp))
+			{
+				AddSubscriptionRelState(sub->oid, relid,
+										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+										InvalidXLogRecPtr);
+				ereport(DEBUG1,
+						(errmsg("table \"%s.%s\" added to subscription \"%s\"",
+								rv->schemaname, rv->relname, sub->name)));
+			}
+		}
 
-		pubrel_local_oids[off++] = relid;
+		/*
+		 * Next remove state for tables we should not care about anymore using
+		 * the data we collected above
+		 */
+		qsort(pubrel_local_oids, list_length(pubrel_names),
+			  sizeof(Oid), oid_cmp);
 
-		if (!bsearch(&relid, subrel_local_oids,
-					 list_length(subrel_states), sizeof(Oid), oid_cmp))
+		for (off = 0; off < list_length(subrel_states); off++)
 		{
-			AddSubscriptionRelState(sub->oid, relid,
-									copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-									InvalidXLogRecPtr);
-			ereport(DEBUG1,
-					(errmsg("table \"%s.%s\" added to subscription \"%s\"",
-							rv->schemaname, rv->relname, sub->name)));
-		}
-	}
+			Oid			relid = subrel_local_oids[off];
 
-	/*
-	 * Next remove state for tables we should not care about anymore using the
-	 * data we collected above
-	 */
-	qsort(pubrel_local_oids, list_length(pubrel_names),
-		  sizeof(Oid), oid_cmp);
+			if (!bsearch(&relid, pubrel_local_oids,
+						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
+			{
+				char		state;
+				XLogRecPtr	statelsn;
+
+				/*
+				 * Lock pg_subscription_rel with AccessExclusiveLock to prevent any race
+				 * conditions with the apply worker re-launching workers at the same time
+				 * this code is trying to remove those tables.
+				 *
+				 * Even if new worker for this particular rel is restarted it won't be able
+				 * to make any progress as we hold exclusive lock on subscription_rel till
+				 * the transaction end. It will simply exit as there is no corresponding
+				 * rel entry.
+				 *
+				 * This locking also ensures that the state of rels won't change till we
+				 * are done with this refresh operation.
+				 */
+				if (!sub_rel_locked)
+				{
+					rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+					sub_rel_locked = true;
+				}
 
-	for (off = 0; off < list_length(subrel_states); off++)
-	{
-		Oid			relid = subrel_local_oids[off];
+				/* Last known rel state. */
+				state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
 
-		if (!bsearch(&relid, pubrel_local_oids,
-					 list_length(pubrel_names), sizeof(Oid), oid_cmp))
-		{
-			RemoveSubscriptionRel(sub->oid, relid);
+				RemoveSubscriptionRel(sub->oid, relid);
 
-			logicalrep_worker_stop_at_commit(sub->oid, relid);
+				logicalrep_worker_stop(sub->oid, relid);
 
-			ereport(DEBUG1,
-					(errmsg("table \"%s.%s\" removed from subscription \"%s\"",
-							get_namespace_name(get_rel_namespace(relid)),
-							get_rel_name(relid),
-							sub->name)));
+				/*
+				 * For READY state, we would have already dropped the tablesync
+				 * slot and origin.
+				 */
+				if (state != SUBREL_STATE_READY)
+				{
+					/*
+					 * Drop the tablesync slot.
+					 *
+					 * For SYNCDONE state we know the tablesync slot has already
+					 * been dropped by the tablesync worker.
+					 *
+					 * For other states, there is no certainty. Maybe the slot
+					 * does not exist yet; Maybe the slot is already deleted but
+					 * SYNCDONE is not yet set. For this reason we allow
+					 * missing_ok = true for the drop.
+					 */
+					if (state != SUBREL_STATE_SYNCDONE)
+					{
+						char		syncslotname[NAMEDATALEN] = { 0 };
+
+						ReplicationSlotNameForTablesync(sub->oid, relid, syncslotname);
+						ReplicationSlotDropAtPubNode(wrconn, syncslotname, true /* missing_ok */);
+					}
+
+					/*
+					 * Drop the tablesync's origin tracking if exists.
+					 */
+					tablesync_replorigin_drop(sub->oid, relid, false /* nowait */);
+				}
+
+				ereport(DEBUG1,
+						(errmsg("table \"%s.%s\" removed from subscription \"%s\"",
+								get_namespace_name(get_rel_namespace(relid)),
+								get_rel_name(relid),
+								sub->name)));
+			}
 		}
 	}
+	PG_FINALLY();
+	{
+		if (wrconn)
+			walrcv_disconnect(wrconn);
+	}
+	PG_END_TRY();
+
+	if (sub_rel_locked)
+		table_close(rel, NoLock);
 }
 
 /*
  * Alter the existing subscription.
  */
 ObjectAddress
-AlterSubscription(AlterSubscriptionStmt *stmt)
+AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 {
 	Relation	rel;
 	ObjectAddress myself;
@@ -848,6 +917,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 								 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
 								 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
 
+					PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
+
 					/* Make sure refresh sees the new list of publications. */
 					sub->publications = stmt->publication;
 
@@ -877,6 +948,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 										   NULL, NULL,	/* no "binary" */
 										   NULL, NULL); /* no "streaming" */
 
+				PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
+
 				AlterSubscription_refresh(sub, copy_data);
 
 				break;
@@ -928,8 +1001,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	char	   *err = NULL;
 	RepOriginId originid;
 	WalReceiverConn *wrconn = NULL;
-	StringInfoData cmd;
 	Form_pg_subscription form;
+	List	   *rstates;
 
 	/*
 	 * Lock pg_subscription with AccessExclusiveLock to ensure that the
@@ -1042,6 +1115,31 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 	list_free(subworkers);
 
+	/*
+	 * Tablesync resource cleanup (slots and origins).
+	 *
+	 * Any READY-state relations would already have dealt with clean-ups.
+	 *
+	 * Note that the state can't change because we have already stopped both
+	 * the apply and tablesync workers and they can't restart because of
+	 * exclusive lock on the subscription.
+	 */
+	rstates = GetSubscriptionNotReadyRelations(subid);
+	foreach(lc, rstates)
+	{
+		SubscriptionRelState* rstate = (SubscriptionRelState*) lfirst(lc);
+		Oid			relid = rstate->relid;
+
+		/* Only cleanup resources of tablesync workers */
+		if (!OidIsValid(relid))
+			continue;
+
+		/*
+		 * Drop the tablesync's origin tracking if exists.
+		 */
+		tablesync_replorigin_drop(subid, relid, false /* nowait */);
+	}
+
 	/* Clean up dependencies */
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
@@ -1054,34 +1152,114 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	if (originid != InvalidRepOriginId)
 		replorigin_drop(originid, false);
 
+
 	/*
 	 * If there is no slot associated with the subscription, we can finish
 	 * here.
 	 */
-	if (!slotname)
+	if (!slotname && rstates == NIL)
 	{
 		table_close(rel, NoLock);
 		return;
 	}
 
 	/*
-	 * Otherwise drop the replication slot at the publisher node using the
-	 * replication connection.
+	 * Try to acquire the connection necessary for dropping slots.
+	 *
+	 * Note: If the slotname is NONE/NULL then we allow the command to finish
+	 * and users need to manually cleanup the apply and tablesync worker slots
+	 * later.
 	 */
 	load_file("libpqwalreceiver", false);
 
-	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
-
 	wrconn = walrcv_connect(conninfo, true, subname, &err);
 	if (wrconn == NULL)
-		ereport(ERROR,
-				(errmsg("could not connect to publisher when attempting to "
-						"drop the replication slot \"%s\"", slotname),
-				 errdetail("The error was: %s", err),
-		/* translator: %s is an SQL ALTER command */
-				 errhint("Use %s to disassociate the subscription from the slot.",
-						 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
+	{
+		if (!slotname)
+		{
+			/* be tidy */
+			list_free(rstates);
+			return;
+		}
+		else
+		{
+			ereport(ERROR,
+					(errmsg("could not connect to publisher when attempting to "
+							"drop the replication slot \"%s\"", slotname),
+					errdetail("The error was: %s", err),
+					/* translator: %s is an SQL ALTER command */
+					errhint("Use %s to disassociate the subscription from the slot.",
+							"ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
+		}
+
+	}
+
+	PG_TRY();
+	{
+		foreach(lc, rstates)
+		{
+			SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+			Oid			relid = rstate->relid;
+
+			/* Only cleanup resources of tablesync workers */
+			if (!OidIsValid(relid))
+				continue;
+
+			/*
+			 * Drop the tablesync slot.
+			 *
+			 * For SYNCDONE/READY states, the tablesync slot is known to have
+			 * already been dropped by the tablesync worker.
+			 *
+			 * For other states, there is no certainty. Maybe the slot does
+			 * not exist yet; Maybe the slot is already deleted but SYNCDONE
+			 * is not yet set. For this reason, we allow missing_ok = true for
+			 * the drop.
+			 */
+			if (rstate->state != SUBREL_STATE_SYNCDONE)
+			{
+				char		syncslotname[NAMEDATALEN] = {0};
+
+				ReplicationSlotNameForTablesync(subid, relid, syncslotname);
+				ReplicationSlotDropAtPubNode(wrconn, syncslotname, true /* missing_ok */ );
+			}
+		}
+
+		list_free(rstates);
+
+		/*
+		 * If there is a slot associated with the subscription, then drop the
+		 * replication slot at the publisher.
+		 */
+		if (slotname)
+			ReplicationSlotDropAtPubNode(wrconn, slotname, false /* missing_ok */ );
+
+	}
+	PG_FINALLY();
+	{
+		walrcv_disconnect(wrconn);
+	}
+	PG_END_TRY();
+
+	table_close(rel, NoLock);
+}
+
+/*
+ * Drop the replication slot at the publisher node using the replication connection.
+ *
+ * missing_ok - if true then only issue WARNING message if the slot cannot be deleted.
+ */
+void
+ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
+{
+	StringInfoData cmd;
+
+	Assert(wrconn);
+
+	load_file("libpqwalreceiver", false);
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
 
 	PG_TRY();
 	{
@@ -1089,27 +1267,37 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 
 		res = walrcv_exec(wrconn, cmd.data, 0, NULL);
 
-		if (res->status != WALRCV_OK_COMMAND)
-			ereport(ERROR,
+		if (res->status == WALRCV_OK_COMMAND)
+		{
+			/* NOTICE. Success. */
+			ereport(NOTICE,
+					(errmsg("dropped replication slot \"%s\" on publisher",
+							slotname)));
+		}
+		else if (res->status == WALRCV_ERROR && missing_ok)
+		{
+			/* WARNING. Error, but missing_ok = true. */
+			ereport(WARNING,
 					(errmsg("could not drop the replication slot \"%s\" on publisher",
 							slotname),
 					 errdetail("The error was: %s", res->err)));
+		}
 		else
-			ereport(NOTICE,
-					(errmsg("dropped replication slot \"%s\" on publisher",
-							slotname)));
+		{
+			/* ERROR. */
+			ereport(ERROR,
+					(errmsg("could not drop the replication slot \"%s\" on publisher",
+							slotname),
+					 errdetail("The error was: %s", res->err)));
+		}
 
 		walrcv_clear_result(res);
 	}
 	PG_FINALLY();
 	{
-		walrcv_disconnect(wrconn);
+		pfree(cmd.data);
 	}
 	PG_END_TRY();
-
-	pfree(cmd.data);
-
-	table_close(rel, NoLock);
 }
 
 /*
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 186514cd9e..58082dde18 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -73,20 +73,6 @@ typedef struct LogicalRepWorkerId
 	Oid			relid;
 } LogicalRepWorkerId;
 
-typedef struct StopWorkersData
-{
-	int			nestDepth;		/* Sub-transaction nest level */
-	List	   *workers;		/* List of LogicalRepWorkerId */
-	struct StopWorkersData *parent; /* This need not be an immediate
-									 * subtransaction parent */
-} StopWorkersData;
-
-/*
- * Stack of StopWorkersData elements. Each stack element contains the workers
- * to be stopped for that subtransaction.
- */
-static StopWorkersData *on_commit_stop_workers = NULL;
-
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
@@ -546,51 +532,6 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 	LWLockRelease(LogicalRepWorkerLock);
 }
 
-/*
- * Request worker for specified sub/rel to be stopped on commit.
- */
-void
-logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
-{
-	int			nestDepth = GetCurrentTransactionNestLevel();
-	LogicalRepWorkerId *wid;
-	MemoryContext oldctx;
-
-	/* Make sure we store the info in context that survives until commit. */
-	oldctx = MemoryContextSwitchTo(TopTransactionContext);
-
-	/* Check that previous transactions were properly cleaned up. */
-	Assert(on_commit_stop_workers == NULL ||
-		   nestDepth >= on_commit_stop_workers->nestDepth);
-
-	/*
-	 * Push a new stack element if we don't already have one for the current
-	 * nestDepth.
-	 */
-	if (on_commit_stop_workers == NULL ||
-		nestDepth > on_commit_stop_workers->nestDepth)
-	{
-		StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
-
-		newdata->nestDepth = nestDepth;
-		newdata->workers = NIL;
-		newdata->parent = on_commit_stop_workers;
-		on_commit_stop_workers = newdata;
-	}
-
-	/*
-	 * Finally add a new worker into the worker list of the current
-	 * subtransaction.
-	 */
-	wid = palloc(sizeof(LogicalRepWorkerId));
-	wid->subid = subid;
-	wid->relid = relid;
-	on_commit_stop_workers->workers =
-		lappend(on_commit_stop_workers->workers, wid);
-
-	MemoryContextSwitchTo(oldctx);
-}
-
 /*
  * Wake up (using latch) any logical replication worker for specified sub/rel.
  */
@@ -819,109 +760,21 @@ ApplyLauncherShmemInit(void)
 	}
 }
 
-/*
- * Check whether current transaction has manipulated logical replication
- * workers.
- */
-bool
-XactManipulatesLogicalReplicationWorkers(void)
-{
-	return (on_commit_stop_workers != NULL);
-}
-
 /*
  * Wakeup the launcher on commit if requested.
  */
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
-
-	Assert(on_commit_stop_workers == NULL ||
-		   (on_commit_stop_workers->nestDepth == 1 &&
-			on_commit_stop_workers->parent == NULL));
-
 	if (isCommit)
 	{
-		ListCell   *lc;
-
-		if (on_commit_stop_workers != NULL)
-		{
-			List	   *workers = on_commit_stop_workers->workers;
-
-			foreach(lc, workers)
-			{
-				LogicalRepWorkerId *wid = lfirst(lc);
-
-				logicalrep_worker_stop(wid->subid, wid->relid);
-			}
-		}
-
 		if (on_commit_launcher_wakeup)
 			ApplyLauncherWakeup();
 	}
 
-	/*
-	 * No need to pfree on_commit_stop_workers.  It was allocated in
-	 * transaction memory context, which is going to be cleaned soon.
-	 */
-	on_commit_stop_workers = NULL;
 	on_commit_launcher_wakeup = false;
 }
 
-/*
- * On commit, merge the current on_commit_stop_workers list into the
- * immediate parent, if present.
- * On rollback, discard the current on_commit_stop_workers list.
- * Pop out the stack.
- */
-void
-AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
-{
-	StopWorkersData *parent;
-
-	/* Exit immediately if there's no work to do at this level. */
-	if (on_commit_stop_workers == NULL ||
-		on_commit_stop_workers->nestDepth < nestDepth)
-		return;
-
-	Assert(on_commit_stop_workers->nestDepth == nestDepth);
-
-	parent = on_commit_stop_workers->parent;
-
-	if (isCommit)
-	{
-		/*
-		 * If the upper stack element is not an immediate parent
-		 * subtransaction, just decrement the notional nesting depth without
-		 * doing any real work.  Else, we need to merge the current workers
-		 * list into the parent.
-		 */
-		if (!parent || parent->nestDepth < nestDepth - 1)
-		{
-			on_commit_stop_workers->nestDepth--;
-			return;
-		}
-
-		parent->workers =
-			list_concat(parent->workers, on_commit_stop_workers->workers);
-	}
-	else
-	{
-		/*
-		 * Abandon everything that was done at this nesting level.  Explicitly
-		 * free memory to avoid a transaction-lifespan leak.
-		 */
-		list_free_deep(on_commit_stop_workers->workers);
-	}
-
-	/*
-	 * We have taken care of the current subtransaction workers list for both
-	 * abort or commit. So we are ready to pop the stack.
-	 */
-	pfree(on_commit_stop_workers);
-	on_commit_stop_workers = parent;
-}
-
 /*
  * Request wakeup of the launcher on commit of the transaction.
  *
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 863d196fd7..165086ad66 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -31,8 +31,11 @@
  *		 table state to INIT.
  *	   - Tablesync worker starts; changes table state from INIT to DATASYNC while
  *		 copying.
- *	   - Tablesync worker finishes the copy and sets table state to SYNCWAIT;
- *		 waits for state change.
+ *	   - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
+ *		 worker specific) state to indicate when the copy phase has completed, so
+ *		 if the worker crashes with this (non-memory) state then the copy will not
+ *		 be re-attempted.
+ *	   - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
  *	   - Apply worker periodically checks for tables in SYNCWAIT state.  When
  *		 any appear, it sets the table state to CATCHUP and starts loop-waiting
  *		 until either the table state is set to SYNCDONE or the sync worker
@@ -48,8 +51,8 @@
  *		 point it sets state to READY and stops tracking.  Again, there might
  *		 be zero changes in between.
  *
- *	  So the state progression is always: INIT -> DATASYNC -> SYNCWAIT ->
- *	  CATCHUP -> SYNCDONE -> READY.
+ *	  So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
+ *	  -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
  *
  *	  The catalog pg_subscription_rel is used to keep information about
  *	  subscribed tables and their state.  Some transient state during data
@@ -59,6 +62,7 @@
  *	  Example flows look like this:
  *	   - Apply is in front:
  *		  sync:8
+ *			-> set in catalog FINISHEDCOPY
  *			-> set in memory SYNCWAIT
  *		  apply:10
  *			-> set in memory CATCHUP
@@ -74,6 +78,7 @@
  *
  *	   - Sync is in front:
  *		  sync:10
+ *			-> set in catalog FINISHEDCOPY
  *			-> set in memory SYNCWAIT
  *		  apply:8
  *			-> set in memory CATCHUP
@@ -98,11 +103,16 @@
 #include "miscadmin.h"
 #include "parser/parse_relation.h"
 #include "pgstat.h"
+#include "postmaster/interrupt.h"
 #include "replication/logicallauncher.h"
 #include "replication/logicalrelation.h"
+#include "replication/logicalworker.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
+#include "replication/slot.h"
+#include "replication/origin.h"
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -112,6 +122,42 @@ static bool table_states_valid = false;
 
 StringInfo	copybuf = NULL;
 
+/*
+ * Common code to drop the origin of a tablesync worker.
+ *
+ * There is a potential race condition if two processes attempt to call
+ * replorigin_drop for the same originid at the same time. The loser of
+ * that race would give an ERROR saying that it failed to find the
+ * expected originid.
+ *
+ * The TRY/CATCH below supresses such errors allowing the tablesync cleanup
+ * code to proceed.
+ */
+void
+tablesync_replorigin_drop(Oid subid, Oid relid, bool nowait)
+{
+	char		originname[NAMEDATALEN];
+	RepOriginId originid;
+
+	snprintf(originname, sizeof(originname), "pg_%u_%u", subid, relid);
+	originid = replorigin_by_name(originname, true);
+	if (OidIsValid(originid))
+	{
+		PG_TRY();
+		{
+			replorigin_drop(originid, nowait);
+		}
+		PG_CATCH();
+		{
+			ereport(WARNING,
+					errmsg("could not drop replication origin with OID %d, named \"%s\"",
+						   originid,
+						   originname));
+		}
+		PG_END_TRY();
+	}
+}
+
 /*
  * Exit routine for synchronization worker.
  */
@@ -270,30 +316,55 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 static void
 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 {
-	Assert(IsTransactionState());
+	bool		sync_done = false;
+	Oid			subid = MySubscription->oid;
+	Oid			relid = MyLogicalRepWorker->relid;
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+	sync_done = MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
+		current_lsn >= MyLogicalRepWorker->relstate_lsn;
+	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
-		current_lsn >= MyLogicalRepWorker->relstate_lsn)
+	if (sync_done)
 	{
 		TimeLineID	tli;
+		char		syncslotname[NAMEDATALEN] = {0};
+
+		/* End wal streaming so wrconn can be re-used to drop the slot. */
+		walrcv_endstreaming(wrconn, &tli);
+
+		/*
+		 * Cleanup the tablesync slot.
+		 */
+		ReplicationSlotNameForTablesync(subid, relid, syncslotname);
 
+		ReplicationSlotDropAtPubNode(wrconn, syncslotname, false /* missing_ok */ );
+
+		/*
+		 * Change state to SYNCDONE.
+		 */
+		SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 		MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
 		MyLogicalRepWorker->relstate_lsn = current_lsn;
 
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+		/*
+		 * UpdateSubscriptionRelState must be called within a transaction.
+		 * That transaction will be ended within the finish_sync_worker().
+		 */
+		if (!IsTransactionState())
+		{
+			StartTransactionCommand();
+		}
+
 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 								   MyLogicalRepWorker->relid,
 								   MyLogicalRepWorker->relstate,
 								   MyLogicalRepWorker->relstate_lsn);
 
-		walrcv_endstreaming(wrconn, &tli);
 		finish_sync_worker();
 	}
-	else
-		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 }
 
 /*
@@ -412,6 +483,21 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					started_tx = true;
 				}
 
+				/*
+				 * Remove the tablesync origin tracking if exists.
+				 *
+				 * The normal case origin drop is done here instead of in the
+				 * process_syncing_tables_for_sync function because if the
+				 * tablesync worker process attempted to call drop its own
+				 * orign then would prevent the origin from advancing properly
+				 * on commit TX.
+				 */
+				tablesync_replorigin_drop(MyLogicalRepWorker->subid,
+										  rstate->relid, false /* nowait */ );
+
+				/*
+				 * Update the state to READY only after the origin cleanup.
+				 */
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 										   rstate->relid, rstate->state,
 										   rstate->lsn);
@@ -807,6 +893,40 @@ copy_table(Relation rel)
 	logicalrep_rel_close(relmapentry, NoLock);
 }
 
+/*
+ * Determine the tablesync slot name.
+ *
+ * The name must not exceed NAMEDATALEN -1 because of remote node constraints
+ * on slot name length.
+ *
+ * The returned slot name is either:
+ * - stored in the supplied buffer (syncslotname), or
+ * - palloc'ed in current memory context (if syncslotname = NULL).
+ */
+char *
+ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char syncslotname[NAMEDATALEN])
+{
+	/*
+	 * Note: Since now we are using PERMANENT tablesync slots this code is not
+	 * using the Subscription slot name as the first part of the tablesync
+	 * slot name anymore. This part is omitted because we are now responsible
+	 * for cleaning up the permanenet tablesync slots, so it could become
+	 * impossible to recalculate what name to cleanup if the Subscription slot
+	 * name had changed.
+	 */
+
+	if (syncslotname)
+	{
+		sprintf(syncslotname, "pg_%u_sync_%u", suboid, relid);
+	}
+	else
+	{
+		syncslotname = psprintf("pg_%u_sync_%u", suboid, relid);
+	}
+
+	return syncslotname;
+}
+
 /*
  * Start syncing the table in the sync worker.
  *
@@ -824,6 +944,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	XLogRecPtr	relstate_lsn;
 	Relation	rel;
 	WalRcvExecResult *res;
+	char		originname[NAMEDATALEN];
+	RepOriginId originid;
 
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
@@ -849,19 +971,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 			finish_sync_worker();	/* doesn't return */
 	}
 
-	/*
-	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
-	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
-	 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
-	 * NAMEDATALEN on the remote that matters, but this scheme will also work
-	 * reasonably if that is different.)
-	 */
-	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");	/* for sanity */
-	slotname = psprintf("%.*s_%u_sync_%u",
-						NAMEDATALEN - 28,
-						MySubscription->slotname,
-						MySubscription->oid,
-						MyLogicalRepWorker->relid);
+	/* Calculate the name of the tablesync slot. */
+	slotname = ReplicationSlotNameForTablesync(MySubscription->oid,
+											   MyLogicalRepWorker->relid,
+											   NULL /* use palloc */ );
 
 	/*
 	 * Here we use the slot name instead of the subscription name as the
@@ -874,7 +987,33 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 				(errmsg("could not connect to the publisher: %s", err)));
 
 	Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
-		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
+
+	/* Assign the origin tracking record name. */
+	snprintf(originname, sizeof(originname), "pg_%u_%u", MySubscription->oid, MyLogicalRepWorker->relid);
+
+	if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
+	{
+		/*
+		 * The COPY phase was previously done, but tablesync then crashed
+		 * before it was able to finish normally.
+		 */
+		StartTransactionCommand();
+
+		/*
+		 * The origin tracking name must already exist. It was created first
+		 * time this tablesync was launched.
+		 */
+		originid = replorigin_by_name(originname, false /* missing_ok */ );
+		replorigin_session_setup(originid);
+		replorigin_session_origin = originid;
+		*origin_startpos = replorigin_session_get_progress(false);
+
+		CommitTransactionCommand();
+
+		goto copy_table_done;
+	}
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 	MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -890,9 +1029,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	/*
-	 * We want to do the table data sync in a single transaction.
-	 */
 	StartTransactionCommand();
 
 	/*
@@ -918,29 +1054,97 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	walrcv_clear_result(res);
 
 	/*
-	 * Create a new temporary logical decoding slot.  This slot will be used
-	 * for the catchup phase after COPY is done, so tell it to use the
-	 * snapshot to make the final data consistent.
+	 * Be sure to remove the newly created tablesync slot if the COPY fails.
 	 */
-	walrcv_create_slot(wrconn, slotname, true,
-					   CRS_USE_SNAPSHOT, origin_startpos);
+	PG_TRY();
+	{
+		/*
+		 * Create a new permanent logical decoding slot. This slot will be
+		 * used for the catchup phase after COPY is done, so tell it to use
+		 * the snapshot to make the final data consistent.
+		 */
+		walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
+						   CRS_USE_SNAPSHOT, origin_startpos);
 
-	/* Now do the initial data copy */
-	PushActiveSnapshot(GetTransactionSnapshot());
-	copy_table(rel);
-	PopActiveSnapshot();
+		/* Now do the initial data copy */
+		PushActiveSnapshot(GetTransactionSnapshot());
+		copy_table(rel);
+		PopActiveSnapshot();
 
-	res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
-	if (res->status != WALRCV_OK_COMMAND)
-		ereport(ERROR,
-				(errmsg("table copy could not finish transaction on publisher"),
-				 errdetail("The error was: %s", res->err)));
-	walrcv_clear_result(res);
+		res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+		if (res->status != WALRCV_OK_COMMAND)
+			ereport(ERROR,
+					(errmsg("table copy could not finish transaction on publisher"),
+					 errdetail("The error was: %s", res->err)));
+		walrcv_clear_result(res);
+
+		table_close(rel, NoLock);
+
+		/* Make the copy visible. */
+		CommandCounterIncrement();
+
+		/* Setup replication origin tracking. */
+		originid = replorigin_by_name(originname, true);
+		if (!OidIsValid(originid))
+		{
+			/*
+			 * Origin tracking does not exist, so create it now.
+			 *
+			 * Then advance to the LSN got from walrcv_create_slot. This is
+			 * WAL logged for the purpose of recovery. Locks are to prevent
+			 * the replication origin from vanishing while advancing.
+			 */
+			originid = replorigin_create(originname);
+
+			LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+			replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+							   true /* go backward */ , true /* WAL log */ );
+			UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+
+			replorigin_session_setup(originid);
+			replorigin_session_origin = originid;
+		}
+		else
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_DUPLICATE_OBJECT),
+					 errmsg("replication origin \"%s\" already exists",
+							originname)));
+		}
+
+		/*
+		 * Update the persisted state to indicate the COPY phase is done; make
+		 * it visible to others.
+		 */
+		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+								   MyLogicalRepWorker->relid,
+								   SUBREL_STATE_FINISHEDCOPY,
+								   MyLogicalRepWorker->relstate_lsn);
+
+		CommitTransactionCommand();
+	}
+	PG_CATCH();
+	{
+		/*
+		 * If something failed during copy table then cleanup the created
+		 * slot.
+		 */
+		ReplicationSlotDropAtPubNode(wrconn, slotname, false /* missing_ok */ );
+
+		pfree(slotname);
+		slotname = NULL;
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
 
-	table_close(rel, NoLock);
+copy_table_done:
 
-	/* Make the copy visible. */
-	CommandCounterIncrement();
+	elog(DEBUG1,
+		 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
+		 originname,
+		 (uint32) (*origin_startpos >> 32),
+		 (uint32) *origin_startpos);
 
 	/*
 	 * We are done with the initial data synchronization, update the state.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index eb7db89cef..cfc924cd89 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -807,12 +807,8 @@ apply_handle_stream_stop(StringInfo s)
 	/* We must be in a valid transaction state */
 	Assert(IsTransactionState());
 
-	/* The synchronization worker runs in single transaction. */
-	if (!am_tablesync_worker())
-	{
-		/* Commit the per-stream transaction */
-		CommitTransactionCommand();
-	}
+	/* Commit the per-stream transaction */
+	CommitTransactionCommand();
 
 	in_streamed_transaction = false;
 
@@ -889,9 +885,7 @@ apply_handle_stream_abort(StringInfo s)
 			/* Cleanup the subxact info */
 			cleanup_subxact_info();
 
-			/* The synchronization worker runs in single transaction */
-			if (!am_tablesync_worker())
-				CommitTransactionCommand();
+			CommitTransactionCommand();
 			return;
 		}
 
@@ -918,8 +912,7 @@ apply_handle_stream_abort(StringInfo s)
 		/* write the updated subxact list */
 		subxact_info_write(MyLogicalRepWorker->subid, xid);
 
-		if (!am_tablesync_worker())
-			CommitTransactionCommand();
+		CommitTransactionCommand();
 	}
 }
 
@@ -1062,8 +1055,7 @@ apply_handle_stream_commit(StringInfo s)
 static void
 apply_handle_commit_internal(StringInfo s, LogicalRepCommitData *commit_data)
 {
-	/* The synchronization worker runs in single transaction. */
-	if (IsTransactionState() && !am_tablesync_worker())
+	if (IsTransactionState())
 	{
 		/*
 		 * Update origin state so we can restart streaming from correct
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 1d81071c35..05bb698cf4 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -1786,7 +1786,8 @@ ProcessUtilitySlow(ParseState *pstate,
 				break;
 
 			case T_AlterSubscriptionStmt:
-				address = AlterSubscription((AlterSubscriptionStmt *) parsetree);
+				address = AlterSubscription((AlterSubscriptionStmt *) parsetree,
+											isTopLevel);
 				break;
 
 			case T_DropSubscriptionStmt:
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 06663b9f16..9027c42976 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -61,6 +61,8 @@ DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg_subs
 #define SUBREL_STATE_INIT		'i' /* initializing (sublsn NULL) */
 #define SUBREL_STATE_DATASYNC	'd' /* data is being synchronized (sublsn
 									 * NULL) */
+#define SUBREL_STATE_FINISHEDCOPY 'f'	/* tablesync copy phase is completed
+										 * (sublsn NULL) */
 #define SUBREL_STATE_SYNCDONE	's' /* synchronization finished in front of
 									 * apply (sublsn set) */
 #define SUBREL_STATE_READY		'r' /* ready (sublsn set) */
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index a81865079d..3b926f35d7 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -20,7 +20,7 @@
 
 extern ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt,
 										bool isTopLevel);
-extern ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt);
+extern ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel);
 extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
 
 extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 421ec1580d..301e494f7b 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -22,9 +22,7 @@ extern Size ApplyLauncherShmemSize(void);
 extern void ApplyLauncherShmemInit(void);
 
 extern void ApplyLauncherWakeupAtCommit(void);
-extern bool XactManipulatesLogicalReplicationWorkers(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
-extern void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth);
 
 extern bool IsLogicalLauncher(void);
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 53f636c56f..5f52335f15 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,6 +15,7 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
+#include "replication/walreceiver.h"
 
 /*
  * Behaviour of replication slots, upon release or crash.
@@ -211,6 +212,8 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern char *ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname);
+extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index d046022e49..4a5c49da7d 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -77,13 +77,14 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 									 Oid userid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
-extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
 extern int	logicalrep_sync_worker_count(Oid subid);
 
 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
+extern void tablesync_replorigin_drop(Oid subid, Oid relid, bool nowait);
+
 void		process_syncing_tables(XLogRecPtr current_lsn);
 void		invalidate_syncing_table_states(Datum arg, int cacheid,
 											uint32 hashvalue);
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index e111ab9181..963a7ee4dc 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -3,7 +3,9 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 7;
+use Test::More tests => 10;
+use Time::HiRes qw(usleep);
+use Scalar::Util qw(looks_like_number);
 
 # Initialize publisher node
 my $node_publisher = get_new_node('publisher');
@@ -149,6 +151,71 @@ $result = $node_subscriber->safe_psql('postgres',
 is($result, qq(20),
 	'changes for table added after subscription initialized replicated');
 
+##
+## slot integrity
+##
+## Manually create a slot with the same name that tablesync will want.
+## Expect tablesync ERROR when clash is detected.
+## Then remove the slot so tablesync can proceed.
+## Expect tablesync can now finish normally.
+##
+
+# drop the subscription
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+# empty the table tab_rep_next
+$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep_next;");
+
+# drop the table tab_rep from publisher and subscriber
+$node_subscriber->safe_psql('postgres', "DROP TABLE tab_rep;");
+$node_publisher->safe_psql('postgres', "DROP TABLE tab_rep;");
+
+# recreate the subscription again, but leave it disabled so that we can get the OID
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub
+	with (enabled = false)"
+);
+
+# need to create the name of the tablesync slot, for this we need the subscription OID
+# and the table OID.
+my $subid = $node_subscriber->safe_psql('postgres',
+	"SELECT oid FROM pg_subscription WHERE subname = 'tap_sub';");
+is(looks_like_number($subid), qq(1), 'get the subscription OID');
+
+my $relid = $node_subscriber->safe_psql('postgres',
+	"SELECT 'tab_rep_next'::regclass::oid");
+is(looks_like_number($relid), qq(1), 'get the table OID');
+
+# name of the tablesync slot is 'pg_'suboid'_sync_'tableoid'.
+my $slotname = 'pg_' . $subid . '_' . 'sync_' . $relid;
+
+# temporarily, create a slot having the same name of the tablesync slot.
+$node_publisher->safe_psql('postgres',
+	"SELECT 'init' FROM pg_create_logical_replication_slot('$slotname', 'pgoutput', false);");
+
+# enable the subscription
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub ENABLE"
+);
+
+# it will be stuck on data sync as slot create will fail because slot already exists.
+$node_subscriber->poll_query_until('postgres', $started_query)
+  or die "Timed out while waiting for subscriber to start sync";
+
+# now drop the offending slot, the tablesync should recover.
+$node_publisher->safe_psql('postgres',
+	"SELECT pg_drop_replication_slot('$slotname');");
+
+# wait for sync to finish
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM tab_rep_next");
+is($result, qq(20),
+	'data for table added after subscription initialized are now synced');
+
+# Cleanup
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
 
 $node_subscriber->stop('fast');
-- 
2.28.0.windows.1

