From 757e519b98805ace371e5c3f972979d2b85ffd0c Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Thu, 2 Jun 2022 17:39:37 +0300
Subject: [PATCH 2/2] Reuse Logical Replication Background worker

This commit allows tablesync workers to move to another table that needs synchronization,
when they're done with the current table in tablesync phase of Logical Replication.

Before this commit, tablesync workers were capable of syncing only one
relation. A new worker, replication slot and origin were needed for each
relation in the subscription.

Now, tablesync workers are not only limited with one relation and can move to another relation and reuse existing
replication slots and origins

This reduces the overhead of launching/killing a new background worker for each relation.
By reusing tablesync workers, replication slots and origins created for tablesync can be reused as well.
Removing the burden of creating/dropping replication slot/origin improves tablesync speed significantly especially for empty or small tables.

Discussion: http://postgr.es/m/CAGPVpCTq=rUDd4JUdaRc1XUWf4BrH2gdSNf3rtOMUGj9rPpfzQ@mail.gmail.com
---
 doc/src/sgml/catalogs.sgml                    |  30 ++
 src/backend/catalog/pg_subscription.c         | 284 +++++++++++-
 src/backend/commands/subscriptioncmds.c       | 226 ++++++----
 .../replication/logical/applyparallelworker.c |   3 +-
 src/backend/replication/logical/launcher.c    |   9 +-
 src/backend/replication/logical/tablesync.c   | 421 +++++++++++++-----
 src/backend/replication/logical/worker.c      | 388 ++++++++++------
 src/include/catalog/pg_subscription.h         |   6 +
 src/include/catalog/pg_subscription_rel.h     |  15 +-
 src/include/replication/slot.h                |   3 +-
 src/include/replication/worker_internal.h     |  29 +-
 src/test/regress/expected/misc_sanity.out     |  30 +-
 12 files changed, 1094 insertions(+), 350 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index c1e4048054..eb8d4e1005 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -8002,6 +8002,18 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        origin.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>sublastusedid</structfield> <type>int8</type>
+      </para>
+      <para>
+       The last used ID for tablesync workers. This ID is used to
+       create replication slots. The last used ID needs to be stored
+       to make logical replication can safely proceed after any interruption.
+       If sublastusedid is 0, then no table has been synced yet.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
@@ -8086,6 +8098,24 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        otherwise null
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>srrelslotname</structfield> <type>text</type>
+      </para>
+      <para>
+       Replication slot name that is used for synchronization of relation
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>srreloriginname</structfield> <type>text</type>
+      </para>
+      <para>
+       Origin name that is used for tracking synchronization of relation
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a56ae311c3..f8dcad16ee 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -114,6 +114,14 @@ GetSubscription(Oid subid, bool missing_ok)
 	Assert(!isnull);
 	sub->origin = TextDatumGetCString(datum);
 
+	/* Get last used id */
+	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+							tup,
+							Anum_pg_subscription_sublastusedid,
+							&isnull);
+	Assert(!isnull);
+	sub->lastusedid = DatumGetInt64(datum);
+
 	ReleaseSysCache(tup);
 
 	return sub;
@@ -205,6 +213,44 @@ DisableSubscription(Oid subid)
 	table_close(rel, NoLock);
 }
 
+/*
+ * Update the last used replication slot ID for the given subscription.
+ */
+void
+UpdateSubscriptionLastSlotId(Oid subid, int64 lastusedid)
+{
+	Relation	rel;
+	bool		nulls[Natts_pg_subscription];
+	bool		replaces[Natts_pg_subscription];
+	Datum		values[Natts_pg_subscription];
+	HeapTuple	tup;
+
+	/* Look up the subscription in the catalog */
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "cache lookup failed for subscription %u", subid);
+
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	/* Form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	replaces[Anum_pg_subscription_sublastusedid - 1] = true;
+	values[Anum_pg_subscription_sublastusedid- 1] = Int64GetDatum(lastusedid);
+
+	/* Update the catalog */
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+	heap_freetuple(tup);
+
+	table_close(rel, NoLock);
+}
+
 /*
  * Convert text array to list of strings.
  *
@@ -234,7 +280,7 @@ textarray_to_stringlist(ArrayType *textarray)
  */
 void
 AddSubscriptionRelState(Oid subid, Oid relid, char state,
-						XLogRecPtr sublsn)
+						XLogRecPtr sublsn, char *relslotname, char *reloriginname)
 {
 	Relation	rel;
 	HeapTuple	tup;
@@ -263,6 +309,14 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
 		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
 	else
 		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	if (relslotname)
+		values[Anum_pg_subscription_rel_srrelslotname - 1] = CStringGetTextDatum(relslotname);
+	else
+		nulls[Anum_pg_subscription_rel_srrelslotname - 1] = true;
+	if (reloriginname)
+		values[Anum_pg_subscription_rel_srreloriginname - 1] = CStringGetTextDatum(reloriginname);
+	else
+		nulls[Anum_pg_subscription_rel_srreloriginname - 1] = true;
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -275,6 +329,58 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
 	table_close(rel, NoLock);
 }
 
+/*
+ * Internal function to modify columns for relation state update
+ */
+static void
+UpdateSubscriptionRelState_internal(Datum *values,
+									bool *nulls,
+									bool *replaces,
+									char state,
+									XLogRecPtr sublsn)
+{
+	replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+	replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	if (sublsn != InvalidXLogRecPtr)
+		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+	else
+		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+}
+
+/*
+ * Internal function to modify columns for replication slot update
+ */
+static void
+UpdateSubscriptionRelReplicationSlot_internal(Datum *values,
+											bool *nulls,
+											bool *replaces,
+											char *relslotname)
+{
+	replaces[Anum_pg_subscription_rel_srrelslotname - 1] = true;
+	if (relslotname)
+		values[Anum_pg_subscription_rel_srrelslotname - 1] = CStringGetTextDatum(relslotname);
+	else
+		nulls[Anum_pg_subscription_rel_srrelslotname - 1] = true;
+}
+
+/*
+ * Internal function to modify columns for replication origin update
+ */
+static void
+UpdateSubscriptionRelOrigin_internal(Datum *values,
+									bool *nulls,
+									bool *replaces,
+									char *reloriginname)
+{
+	replaces[Anum_pg_subscription_rel_srreloriginname - 1] = true;
+	if (reloriginname)
+		values[Anum_pg_subscription_rel_srreloriginname - 1] = CStringGetTextDatum(reloriginname);
+	else
+		nulls[Anum_pg_subscription_rel_srreloriginname - 1] = true;
+}
+
 /*
  * Update the state of a subscription table.
  */
@@ -305,14 +411,48 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	memset(nulls, false, sizeof(nulls));
 	memset(replaces, false, sizeof(replaces));
 
-	replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
-	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+	UpdateSubscriptionRelState_internal(values, nulls, replaces, state, sublsn);
 
-	replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
-	if (sublsn != InvalidXLogRecPtr)
-		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-	else
-		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	/* Cleanup. */
+	table_close(rel, NoLock);
+}
+
+/*
+ * Update the replication slot name of a subscription table.
+ */
+void
+UpdateSubscriptionRelReplicationSlot(Oid subid, Oid relid, char *relslotname)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	bool		nulls[Natts_pg_subscription_rel];
+	Datum		values[Natts_pg_subscription_rel];
+	bool		replaces[Natts_pg_subscription_rel];
+
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+	/* Try finding existing mapping. */
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+							  ObjectIdGetDatum(relid),
+							  ObjectIdGetDatum(subid));
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u does not exist",
+			 relid, subid);
+
+	/* Update the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	UpdateSubscriptionRelReplicationSlot_internal(values, nulls, replaces, relslotname);
 
 	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
 							replaces);
@@ -324,6 +464,134 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	table_close(rel, NoLock);
 }
 
+/*
+ * Update replication slot name, origin name and state of
+ * a subscription table in one transaction.
+ */
+void
+UpdateSubscriptionRel(Oid subid,
+					  Oid relid,
+					  char state,
+					  XLogRecPtr sublsn,
+					  char *relslotname,
+					  char *reloriginname)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	bool		nulls[Natts_pg_subscription_rel];
+	Datum		values[Natts_pg_subscription_rel];
+	bool		replaces[Natts_pg_subscription_rel];
+
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+	/* Try finding existing mapping. */
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+							  ObjectIdGetDatum(relid),
+							  ObjectIdGetDatum(subid));
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u does not exist",
+			 relid, subid);
+
+	/* Update the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	UpdateSubscriptionRelState_internal(values, nulls, replaces, state, sublsn);
+	UpdateSubscriptionRelReplicationSlot_internal(values, nulls, replaces, relslotname);
+	UpdateSubscriptionRelOrigin_internal(values, nulls, replaces, reloriginname);
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	/* Cleanup. */
+	table_close(rel, NoLock);
+}
+
+/*
+ * Get origin name of subscription table.
+ *
+ * reloriginname's value has the replication origin name if the origin exists.
+ */
+void
+GetSubscriptionRelOrigin(Oid subid, Oid relid, char *reloriginname, bool *isnull)
+{
+	HeapTuple	tup;
+	Relation	rel;
+	Datum 		d;
+	char		*originname;
+
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+	/* Try finding the mapping. */
+	tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
+						  ObjectIdGetDatum(relid),
+						  ObjectIdGetDatum(subid));
+
+	if (!HeapTupleIsValid(tup))
+	{
+		table_close(rel, AccessShareLock);
+	}
+
+	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
+						Anum_pg_subscription_rel_srreloriginname, isnull);
+	if (!*isnull)
+	{
+		originname = TextDatumGetCString(d);
+		memcpy(reloriginname, originname, NAMEDATALEN);
+	}
+
+	/* Cleanup */
+	ReleaseSysCache(tup);
+
+	table_close(rel, AccessShareLock);
+}
+
+/*
+ * Get replication slot name of subscription table.
+ *
+ * slotname's value has the replication slot name if the subscription has any.
+ */
+void
+GetSubscriptionRelReplicationSlot(Oid subid, Oid relid, char *slotname)
+{
+	HeapTuple	tup;
+	Relation	rel;
+	Datum 		d;
+	char		*relrepslot;
+	bool		isnull;
+
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+	/* Try finding the mapping. */
+	tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
+						  ObjectIdGetDatum(relid),
+						  ObjectIdGetDatum(subid));
+
+	if (!HeapTupleIsValid(tup))
+	{
+		table_close(rel, AccessShareLock);
+	}
+
+	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
+						Anum_pg_subscription_rel_srrelslotname, &isnull);
+	if (!isnull)
+	{
+		relrepslot = TextDatumGetCString(d);
+		memcpy(slotname, relrepslot, NAMEDATALEN);
+	}
+
+	/* Cleanup */
+	ReleaseSysCache(tup);
+
+	table_close(rel, AccessShareLock);
+}
+
 /*
  * Get state of subscription table.
  *
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 464db6d247..1c4033fc79 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -649,6 +649,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		publicationListToArray(publications);
 	values[Anum_pg_subscription_suborigin - 1] =
 		CStringGetTextDatum(opts.origin);
+	values[Anum_pg_subscription_sublastusedid - 1] = Int64GetDatum(0);
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -709,7 +710,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 										 rv->schemaname, rv->relname);
 
 				AddSubscriptionRelState(subid, relid, table_state,
-										InvalidXLogRecPtr);
+										InvalidXLogRecPtr, NULL, NULL);
 			}
 
 			/*
@@ -799,6 +800,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 	} SubRemoveRels;
 	SubRemoveRels *sub_remove_rels;
 	WalReceiverConn *wrconn;
+	List	   *sub_remove_slots = NIL;
+	LogicalRepWorker *worker;
 
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
@@ -876,7 +879,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 			{
 				AddSubscriptionRelState(sub->oid, relid,
 										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-										InvalidXLogRecPtr);
+										InvalidXLogRecPtr, NULL, NULL);
 				ereport(DEBUG1,
 						(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
 										 rv->schemaname, rv->relname, sub->name)));
@@ -900,6 +903,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 			{
 				char		state;
 				XLogRecPtr	statelsn;
+				char		slotname[NAMEDATALEN] = {0};
 
 				/*
 				 * Lock pg_subscription_rel with AccessExclusiveLock to
@@ -926,7 +930,29 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 
 				RemoveSubscriptionRel(sub->oid, relid);
 
-				logicalrep_worker_stop(sub->oid, relid);
+				/*
+				 * Find the logical replication sync worker if exists store
+				 * the slot number for dropping associated replication slots
+				 * later.
+				 */
+				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+				worker = logicalrep_worker_find(sub->oid, relid, false);
+				if (worker)
+				{
+					logicalrep_worker_stop(sub->oid, relid);
+					sub_remove_slots = lappend(sub_remove_slots, &worker->slot_name);
+				}
+				else
+				{
+					/*
+					 * Sync of this relation might be failed in an earlier
+					 * attempt, but the replication slot might still exist.
+					 */
+					GetSubscriptionRelReplicationSlot(sub->oid, relid, slotname);
+					if (strlen(slotname) > 0)
+						sub_remove_slots = lappend(sub_remove_slots, slotname);
+				}
+				LWLockRelease(LogicalRepWorkerLock);
 
 				/*
 				 * For READY state, we would have already dropped the
@@ -960,31 +986,24 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		}
 
 		/*
-		 * Drop the tablesync slots associated with removed tables. This has
-		 * to be at the end because otherwise if there is an error while doing
-		 * the database operations we won't be able to rollback dropped slots.
+		 * Drop the replication slots associated with tablesync workers for
+		 * removed tables. This has to be at the end because otherwise if
+		 * there is an error while doing the database operations we won't be
+		 * able to rollback dropped slots.
 		 */
-		for (off = 0; off < remove_rel_len; off++)
+		foreach(lc, sub_remove_slots)
 		{
-			if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
-				sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
-			{
-				char		syncslotname[NAMEDATALEN] = {0};
+			char		syncslotname[NAMEDATALEN] = {0};
 
-				/*
-				 * For READY/SYNCDONE states we know the tablesync slot has
-				 * already been dropped by the tablesync worker.
-				 *
-				 * For other states, there is no certainty, maybe the slot
-				 * does not exist yet. Also, if we fail after removing some of
-				 * the slots, next time, it will again try to drop already
-				 * dropped slots and fail. For these reasons, we allow
-				 * missing_ok = true for the drop.
-				 */
-				ReplicationSlotNameForTablesync(sub->oid, sub_remove_rels[off].relid,
-												syncslotname, sizeof(syncslotname));
-				ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
-			}
+			memcpy(syncslotname, lfirst(lc), sizeof(NAMEDATALEN));
+
+			/*
+			 * There is no certainty, maybe the slot does not exist yet. Also,
+			 * if we fail after removing some of the slots, next time, it will
+			 * again try to drop already dropped slots and fail. For these
+			 * reasons, we allow missing_ok = true for the drop.
+			 */
+			ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
 		}
 	}
 	PG_FINALLY();
@@ -1384,6 +1403,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	char	   *subname;
 	char	   *conninfo;
 	char	   *slotname;
+	int64		lastusedid;
 	List	   *subworkers;
 	ListCell   *lc;
 	char		originname[NAMEDATALEN];
@@ -1455,6 +1475,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	else
 		slotname = NULL;
 
+	/* Get the last used identifier by the subscription */
+	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
+							Anum_pg_subscription_sublastusedid, &isnull);
+	if (!isnull)
+		lastusedid = DatumGetInt64(datum);
+	else
+		lastusedid = 0;
+
 	/*
 	 * Since dropping a replication slot is not transactional, the replication
 	 * slot stays dropped even if the transaction rolls back.  So we cannot
@@ -1504,6 +1532,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 	list_free(subworkers);
 
+	rstates = GetSubscriptionRelations(subid, true);
+
 	/*
 	 * Remove the no-longer-useful entry in the launcher's table of apply
 	 * worker start times.
@@ -1515,36 +1545,26 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	ApplyLauncherForgetWorkerStartTime(subid);
 
 	/*
-	 * Cleanup of tablesync replication origins.
-	 *
-	 * Any READY-state relations would already have dealt with clean-ups.
+	 * Cleanup of tablesync replication origins associated with the
+	 * subscription, if exists. Try to drop origins by creating all origin
+	 * names created for this subscription.
 	 *
 	 * Note that the state can't change because we have already stopped both
 	 * the apply and tablesync workers and they can't restart because of
 	 * exclusive lock on the subscription.
+	 *
+	 * XXX: This can be handled better instead of looping through all possible
 	 */
-	rstates = GetSubscriptionRelations(subid, true);
-	foreach(lc, rstates)
+	for (int64 i = 1; i <= lastusedid; i++)
 	{
-		SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
-		Oid			relid = rstate->relid;
-
-		/* Only cleanup resources of tablesync workers */
-		if (!OidIsValid(relid))
-			continue;
+		char		originname_to_drop[NAMEDATALEN] = {0};
 
-		/*
-		 * Drop the tablesync's origin tracking if exists.
-		 *
-		 * It is possible that the origin is not yet created for tablesync
-		 * worker so passing missing_ok = true. This can happen for the states
-		 * before SUBREL_STATE_FINISHEDCOPY.
-		 */
-		ReplicationOriginNameForLogicalRep(subid, relid, originname,
-										   sizeof(originname));
-		replorigin_drop_by_name(originname, true, false);
+		snprintf(originname_to_drop, sizeof(originname_to_drop), "pg_%u_%lld", subid, (long long) i);
+		/* missin_ok = true, since the origin might be already dropped. */
+		replorigin_drop_by_name(originname_to_drop, true, false);
 	}
 
+
 	/* Clean up dependencies */
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
@@ -1596,39 +1616,17 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 
 	PG_TRY();
 	{
-		foreach(lc, rstates)
-		{
-			SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
-			Oid			relid = rstate->relid;
+		List	   *slots = NULL;
 
-			/* Only cleanup resources of tablesync workers */
-			if (!OidIsValid(relid))
-				continue;
 
-			/*
-			 * Drop the tablesync slots associated with removed tables.
-			 *
-			 * For SYNCDONE/READY states, the tablesync slot is known to have
-			 * already been dropped by the tablesync worker.
-			 *
-			 * For other states, there is no certainty, maybe the slot does
-			 * not exist yet. Also, if we fail after removing some of the
-			 * slots, next time, it will again try to drop already dropped
-			 * slots and fail. For these reasons, we allow missing_ok = true
-			 * for the drop.
-			 */
-			if (rstate->state != SUBREL_STATE_SYNCDONE)
-			{
-				char		syncslotname[NAMEDATALEN] = {0};
+		slots = GetReplicationSlotNamesBySubId(wrconn, subid, true);
+		foreach(lc, slots)
+		{
+			char	   *syncslotname = (char *) lfirst(lc);
 
-				ReplicationSlotNameForTablesync(subid, relid, syncslotname,
-												sizeof(syncslotname));
-				ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
-			}
+			ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
 		}
 
-		list_free(rstates);
-
 		/*
 		 * If there is a slot associated with the subscription, then drop the
 		 * replication slot at the publisher.
@@ -1651,6 +1649,71 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	table_close(rel, NoLock);
 }
 
+/*
+ * GetReplicationSlotNamesBySubId
+ *
+ * Get the replication slot names associated with the subscription.
+ */
+List *
+GetReplicationSlotNamesBySubId(WalReceiverConn *wrconn, Oid subid, bool missing_ok)
+{
+	StringInfoData cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[1] = {NAMEOID};
+	List	   *tablelist = NIL;
+
+	Assert(wrconn);
+
+	load_file("libpqwalreceiver", false);
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd, "SELECT slot_name"
+					 " FROM pg_replication_slots"
+					 " WHERE slot_name LIKE 'pg_%i_sync_%%';",
+					 subid);
+	PG_TRY();
+	{
+		WalRcvExecResult *res;
+
+		res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
+
+		if (res->status != WALRCV_OK_TUPLES)
+		{
+			ereport(ERROR,
+					errmsg("not tuple returned."));
+		}
+
+		/* Process tables. */
+		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+		{
+			char	   *repslotname;
+			char	   *slotattr;
+			bool		isnull;
+
+			slotattr = NameStr(*DatumGetName(slot_getattr(slot, 1, &isnull)));
+			Assert(!isnull);
+
+			repslotname = palloc(sizeof(char) * strlen(slotattr) + 1);
+			memcpy(repslotname, slotattr, sizeof(char) * strlen(slotattr));
+			repslotname[strlen(slotattr)] = '\0';
+			tablelist = lappend(tablelist, repslotname);
+
+			ExecClearTuple(slot);
+		}
+		ExecDropSingleTupleTableSlot(slot);
+
+		walrcv_clear_result(res);
+	}
+	PG_FINALLY();
+	{
+		pfree(cmd.data);
+	}
+	PG_END_TRY();
+	\
+		return tablelist;
+}
+
 /*
  * Drop the replication slot at the publisher node using the replication
  * connection.
@@ -2005,6 +2068,7 @@ static void
 ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
 {
 	ListCell   *lc;
+	LogicalRepWorker *worker;
 
 	foreach(lc, rstates)
 	{
@@ -2015,18 +2079,20 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
 		if (!OidIsValid(relid))
 			continue;
 
+		/* Check if there is a sync worker for the relation */
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		worker = logicalrep_worker_find(subid, relid, false);
+		LWLockRelease(LogicalRepWorkerLock);
+
 		/*
 		 * Caller needs to ensure that relstate doesn't change underneath us.
 		 * See DropSubscription where we get the relstates.
 		 */
-		if (rstate->state != SUBREL_STATE_SYNCDONE)
+		if (worker &&
+			rstate->state != SUBREL_STATE_SYNCDONE)
 		{
-			char		syncslotname[NAMEDATALEN] = {0};
-
-			ReplicationSlotNameForTablesync(subid, relid, syncslotname,
-											sizeof(syncslotname));
 			elog(WARNING, "could not drop tablesync replication slot \"%s\"",
-				 syncslotname);
+				 worker->slot_name);
 		}
 	}
 
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 3579e704fe..1cab625dfb 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -440,7 +440,8 @@ pa_launch_parallel_worker(void)
 										MySubscription->name,
 										MyLogicalRepWorker->userid,
 										InvalidOid,
-										dsm_segment_handle(winfo->dsm_seg));
+										dsm_segment_handle(winfo->dsm_seg),
+										0);
 
 	if (launched)
 	{
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 970d170e73..77738e94a3 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -304,7 +304,7 @@ logicalrep_workers_find(Oid subid, bool only_running)
  */
 bool
 logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
-						 Oid relid, dsm_handle subworker_dsm)
+						 Oid relid, dsm_handle subworker_dsm, int64 slotid)
 {
 	BackgroundWorker bgw;
 	BackgroundWorkerHandle *bgw_handle;
@@ -429,7 +429,11 @@ retry:
 	/* Prepare the worker slot. */
 	worker->launch_time = now;
 	worker->in_use = true;
+	worker->is_first_run = true;
 	worker->generation++;
+	worker->created_slot = false;
+	worker->rep_slot_id = slotid;
+	worker->slot_name = (char *) palloc(NAMEDATALEN);
 	worker->proc = NULL;
 	worker->dbid = dbid;
 	worker->userid = userid;
@@ -437,6 +441,7 @@ retry:
 	worker->relid = relid;
 	worker->relstate = SUBREL_STATE_UNKNOWN;
 	worker->relstate_lsn = InvalidXLogRecPtr;
+	worker->move_to_next_rel = false;
 	worker->stream_fileset = NULL;
 	worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
 	worker->parallel_apply = is_parallel_apply_worker;
@@ -1155,7 +1160,7 @@ ApplyLauncherMain(Datum main_arg)
 				ApplyLauncherSetWorkerStartTime(sub->oid, now);
 				logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
 										 sub->owner, InvalidOid,
-										 DSM_HANDLE_INVALID);
+										 DSM_HANDLE_INVALID, 0);
 			}
 			else
 			{
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 07eea504ba..f39c037d79 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -126,12 +126,8 @@ static bool FetchTableStates(bool *started_tx);
 
 static StringInfo copybuf = NULL;
 
-/*
- * Exit routine for synchronization worker.
- */
 static void
-pg_attribute_noreturn()
-finish_sync_worker(void)
+clean_sync_worker(void)
 {
 	/*
 	 * Commit any outstanding transaction. This is the usual case, unless
@@ -143,18 +139,28 @@ finish_sync_worker(void)
 		pgstat_report_stat(true);
 	}
 
-	/* And flush all writes. */
-	XLogFlush(GetXLogWriteRecPtr());
-
-	StartTransactionCommand();
-	ereport(LOG,
-			(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
-					MySubscription->name,
-					get_rel_name(MyLogicalRepWorker->relid))));
-	CommitTransactionCommand();
+	/*
+	 * Disconnect from publisher. Otherwise reused sync workers causes
+	 * exceeding max_wal_senders
+	 */
+	walrcv_disconnect(LogRepWorkerWalRcvConn);
+	LogRepWorkerWalRcvConn = NULL;
 
 	/* Find the leader apply worker and signal it. */
 	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+}
+
+/*
+ * Exit routine for synchronization worker.
+ */
+static void
+pg_attribute_noreturn()
+finish_sync_worker(void)
+{
+	clean_sync_worker();
+
+	/* And flush all writes. */
+	XLogFlush(GetXLogWriteRecPtr());
 
 	/* Stop gracefully */
 	proc_exit(0);
@@ -284,6 +290,10 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 static void
 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 {
+	List	   *rstates;
+	SubscriptionRelState *rstate;
+	ListCell   *lc;
+
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 
 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
@@ -292,6 +302,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		TimeLineID	tli;
 		char		syncslotname[NAMEDATALEN] = {0};
 		char		originname[NAMEDATALEN] = {0};
+		bool		is_streaming_ended = false;
 
 		MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
 		MyLogicalRepWorker->relstate_lsn = current_lsn;
@@ -308,40 +319,29 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 								   MyLogicalRepWorker->relid,
 								   MyLogicalRepWorker->relstate,
 								   MyLogicalRepWorker->relstate_lsn);
+		CommitTransactionCommand();
 
 		/*
-		 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
-		 * the slot.
-		 */
-		walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
-
-		/*
-		 * Cleanup the tablesync slot.
+		 * Cleanup the tablesync slot. If the slot name used by this worker is
+		 * different from the default slot name for the worker, this means the
+		 * current table had started to being synchronized by another worker
+		 * and replication slot. And this worker is reusing a replication slot
+		 * from a previous attempt. We do not need that replication slot
+		 * anymore.
 		 *
 		 * This has to be done after updating the state because otherwise if
 		 * there is an error while doing the database operations we won't be
 		 * able to rollback dropped slot.
 		 */
 		ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
-										MyLogicalRepWorker->relid,
+										MyLogicalRepWorker->rep_slot_id,
 										syncslotname,
 										sizeof(syncslotname));
 
 		/*
-		 * It is important to give an error if we are unable to drop the slot,
-		 * otherwise, it won't be dropped till the corresponding subscription
-		 * is dropped. So passing missing_ok = false.
-		 */
-		ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
-
-		CommitTransactionCommand();
-		pgstat_report_stat(false);
-
-		/*
-		 * Start a new transaction to clean up the tablesync origin tracking.
-		 * This transaction will be ended within the finish_sync_worker().
-		 * Now, even, if we fail to remove this here, the apply worker will
-		 * ensure to clean it up afterward.
+		 * We are safe to drop the replication trackin origin after this
+		 * point. Now, even, if we fail to remove this here, the apply worker
+		 * will ensure to clean it up afterward.
 		 *
 		 * We need to do this after the table state is set to SYNCDONE.
 		 * Otherwise, if an error occurs while performing the database
@@ -350,34 +350,125 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 * have been cleared before restart. So, the restarted worker will use
 		 * invalid replication progress state resulting in replay of
 		 * transactions that have already been applied.
+		 *
+		 * Firstly reset the origin session to remove the ownership of the
+		 * slot. This is needed to allow the origin to be dropped or reused
+		 * later.
+		 */
+		replorigin_session_reset();
+		replorigin_session_origin = InvalidRepOriginId;
+		replorigin_session_origin_lsn = InvalidXLogRecPtr;
+		replorigin_session_origin_timestamp = 0;
+
+		StartTransactionCommand();
+		if (MyLogicalRepWorker->slot_name && strcmp(syncslotname, MyLogicalRepWorker->slot_name) != 0)
+		{
+			/*
+			 * End streaming so that LogRepWorkerWalRcvConn can be used to
+			 * drop the slot.
+			 */
+			walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
+			is_streaming_ended = true;
+			ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, MyLogicalRepWorker->slot_name, false);
+
+			ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
+											   MyLogicalRepWorker->relid,
+											   originname,
+											   sizeof(originname));
+
+			/* Drop replication origin */
+			replorigin_drop_by_name(originname, true, false);
+		}
+
+		/*
+		 * We are safe to remove persisted replication slot and origin data,
+		 * since it's already in SYNCDONE state. They will not be needed
+		 * anymore.
 		 */
+		UpdateSubscriptionRel(MyLogicalRepWorker->subid,
+							  MyLogicalRepWorker->relid,
+							  MyLogicalRepWorker->relstate,
+							  MyLogicalRepWorker->relstate_lsn,
+							  NULL,
+							  NULL);
+
+		ereport(LOG,
+				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
+						MySubscription->name,
+						get_rel_name(MyLogicalRepWorker->relid))));
+
+		CommitTransactionCommand();
+		pgstat_report_stat(false);
+
 		StartTransactionCommand();
 
+		/*
+		 * This should return the default origin name for the worker. Even if
+		 * the worker used a different origin for this table, it should be
+		 * dropped and removed from the catalog so far.
+		 */
 		ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
 										   MyLogicalRepWorker->relid,
 										   originname,
 										   sizeof(originname));
 
 		/*
-		 * Resetting the origin session removes the ownership of the slot.
-		 * This is needed to allow the origin to be dropped.
+		 * Check if any table whose relation state is still INIT. If a table
+		 * in INIT state is found, the worker will not be finished, it will be
+		 * reused instead.
 		 */
-		replorigin_session_reset();
-		replorigin_session_origin = InvalidRepOriginId;
-		replorigin_session_origin_lsn = InvalidXLogRecPtr;
-		replorigin_session_origin_timestamp = 0;
+		rstates = GetSubscriptionRelations(MySubscription->oid, true);
 
-		/*
-		 * Drop the tablesync's origin tracking if exists.
-		 *
-		 * There is a chance that the user is concurrently performing refresh
-		 * for the subscription where we remove the table state and its origin
-		 * or the apply worker would have removed this origin. So passing
-		 * missing_ok = true.
-		 */
-		replorigin_drop_by_name(originname, true, false);
+		foreach(lc, rstates)
+		{
+			rstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
+			memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
+
+			/*
+			 * Pick the table for the next run if there is not another worker
+			 * already picked that table.
+			 */
+			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+			if (rstate->state != SUBREL_STATE_SYNCDONE &&
+				!logicalrep_worker_find(MySubscription->oid, rstate->relid, false))
+			{
+				/* Update worker state for the next table */
+				MyLogicalRepWorker->is_first_run = false;
+				MyLogicalRepWorker->relid = rstate->relid;
+				MyLogicalRepWorker->relstate = rstate->state;
+				MyLogicalRepWorker->relstate_lsn = rstate->lsn;
+				MyLogicalRepWorker->move_to_next_rel = true;
+				LWLockRelease(LogicalRepWorkerLock);
+				break;
+			}
+			LWLockRelease(LogicalRepWorkerLock);
+		}
+
+		/* Cleanup before next run or ending the worker. */
+		if (!MyLogicalRepWorker->move_to_next_rel)
+		{
+			/*
+			 * It is important to give an error if we are unable to drop the
+			 * slot, otherwise, it won't be dropped till the corresponding
+			 * subscription is dropped. So passing missing_ok = false.
+			 */
+			if (MyLogicalRepWorker->created_slot)
+			{
+				/* End streaming if it's not already ended. */
+				if (!is_streaming_ended)
+					walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
+				ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
+			}
 
-		finish_sync_worker();
+			/* Drop replication origin before exiting. */
+			replorigin_drop_by_name(originname, true, false);
+
+			finish_sync_worker();
+		}
+		else
+		{
+			clean_sync_worker();
+		}
 	}
 	else
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
@@ -464,6 +555,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			if (current_lsn >= rstate->lsn)
 			{
 				char		originname[NAMEDATALEN];
+				bool		is_origin_null = true;
 
 				rstate->state = SUBREL_STATE_READY;
 				rstate->lsn = current_lsn;
@@ -484,18 +576,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * error while dropping we won't restart it to drop the
 				 * origin. So passing missing_ok = true.
 				 */
-				ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
-												   rstate->relid,
-												   originname,
-												   sizeof(originname));
-				replorigin_drop_by_name(originname, true, false);
+				GetSubscriptionRelOrigin(MyLogicalRepWorker->subid,
+										 rstate->relid, originname,
+										 &is_origin_null);
+
+				if (!is_origin_null)
+				{
+					replorigin_drop_by_name(originname, true, false);
+				}
 
 				/*
 				 * Update the state to READY only after the origin cleanup.
 				 */
-				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-										   rstate->relid, rstate->state,
-										   rstate->lsn);
+				UpdateSubscriptionRel(MyLogicalRepWorker->subid,
+									  rstate->relid,
+									  rstate->state,
+									  rstate->lsn,
+									  NULL,
+									  NULL);
+
+				CommitTransactionCommand();
+				started_tx = false;
 			}
 		}
 		else
@@ -584,12 +685,22 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						TimestampDifferenceExceeds(hentry->last_start_time, now,
 												   wal_retrieve_retry_interval))
 					{
+						if (IsTransactionState())
+							CommitTransactionCommand();
+						StartTransactionCommand();
+						started_tx = true;
+
+						MySubscription->lastusedid++;
+						UpdateSubscriptionLastSlotId(MyLogicalRepWorker->subid,
+													 MySubscription->lastusedid);
+
 						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
 												 MySubscription->oid,
 												 MySubscription->name,
 												 MyLogicalRepWorker->userid,
 												 rstate->relid,
-												 DSM_HANDLE_INVALID);
+												 DSM_HANDLE_INVALID,
+												 MySubscription->lastusedid);
 						hentry->last_start_time = now;
 					}
 				}
@@ -1198,8 +1309,8 @@ copy_table(Relation rel)
  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
  * on slot name length. We append system_identifier to avoid slot_name
  * collision with subscriptions in other clusters. With the current scheme
- * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
- * length of slot_name will be 50.
+ * pg_%u_sync_%lu_UINT64_FORMAT (3 + 10 + 6 + 20 + 20 + '\0'), the maximum
+ * length of slot_name will be 45.
  *
  * The returned slot name is stored in the supplied buffer (syncslotname) with
  * the given size.
@@ -1210,11 +1321,11 @@ copy_table(Relation rel)
  * had changed.
  */
 void
-ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
+ReplicationSlotNameForTablesync(Oid suboid, int64 slotid,
 								char *syncslotname, Size szslot)
 {
-	snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
-			 relid, GetSystemIdentifier());
+	snprintf(syncslotname, szslot, "pg_%u_sync_%lld_" UINT64_FORMAT, suboid,
+			(long long) slotid, GetSystemIdentifier());
 }
 
 /*
@@ -1237,6 +1348,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	WalRcvExecResult *res;
 	char		originname[NAMEDATALEN];
 	RepOriginId originid;
+	char	   *prev_slotname;
 
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
@@ -1265,7 +1377,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	/* Calculate the name of the tablesync slot. */
 	slotname = (char *) palloc(NAMEDATALEN);
 	ReplicationSlotNameForTablesync(MySubscription->oid,
-									MyLogicalRepWorker->relid,
+									MyLogicalRepWorker->rep_slot_id,
 									slotname,
 									NAMEDATALEN);
 
@@ -1285,12 +1397,26 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
 		   MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
 
+	/*
+	 * See if tablesync of the current relation has been started with another
+	 * replication slot.
+	 *
+	 * Read previous slot name from the catalog, if exists.
+	 */
+	prev_slotname = (char *) palloc0(NAMEDATALEN);
+	StartTransactionCommand();
+	GetSubscriptionRelReplicationSlot(MyLogicalRepWorker->subid,
+									  MyLogicalRepWorker->relid,
+									  prev_slotname);
+
 	/* Assign the origin tracking record name. */
 	ReplicationOriginNameForLogicalRep(MySubscription->oid,
 									   MyLogicalRepWorker->relid,
 									   originname,
 									   sizeof(originname));
 
+	CommitTransactionCommand();
+
 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
 	{
 		/*
@@ -1304,10 +1430,48 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 * breakdown then it wouldn't have succeeded so trying it next time
 		 * seems like a better bet.
 		 */
-		ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
+		if (strlen(prev_slotname) > 0)
+		{
+			ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, prev_slotname, true);
+
+			StartTransactionCommand();
+			/* Replication drop might still exist. Try to drop */
+			replorigin_drop_by_name(originname, true, false);
+
+			/*
+			 * Remove replication slot and origin name from the relation's
+			 * catalog record
+			 */
+			UpdateSubscriptionRel(MyLogicalRepWorker->subid,
+								  MyLogicalRepWorker->relid,
+								  MyLogicalRepWorker->relstate,
+								  MyLogicalRepWorker->relstate_lsn,
+								  NULL,
+								  NULL);
+			CommitTransactionCommand();
+		}
 	}
 	else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
 	{
+		/*
+		 * At this point, the table that is currently being synchronized
+		 * should have its replication slot name filled in the catalog. The
+		 * tablesync process was started with another sync worker and
+		 * replication slot. We need to continue using the same replication
+		 * slot in this worker too.
+		 */
+		if (strlen(prev_slotname) == 0)
+		{
+			elog(ERROR, "Replication slot could not be found for relation %u",
+				 MyLogicalRepWorker->relid);
+		}
+
+		/*
+		 * Proceed with the correct replication slot. Use previously created
+		 * replication slot to sync this table.
+		 */
+		slotname = prev_slotname;
+
 		/*
 		 * The COPY phase was previously done, but tablesync then crashed
 		 * before it was able to finish normally.
@@ -1328,6 +1492,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		goto copy_table_done;
 	}
 
+	/* Preparing for table copy operation */
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 	MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
 	MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
@@ -1335,10 +1500,12 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 	/* Update the state and make it visible to others. */
 	StartTransactionCommand();
-	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-							   MyLogicalRepWorker->relid,
-							   MyLogicalRepWorker->relstate,
-							   MyLogicalRepWorker->relstate_lsn);
+	UpdateSubscriptionRel(MyLogicalRepWorker->subid,
+						  MyLogicalRepWorker->relid,
+						  MyLogicalRepWorker->relstate,
+						  MyLogicalRepWorker->relstate_lsn,
+						  slotname,
+						  originname);
 	CommitTransactionCommand();
 	pgstat_report_stat(true);
 
@@ -1377,6 +1544,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 						GetUserNameFromId(GetUserId(), true),
 						RelationGetRelationName(rel))));
 
+
 	/*
 	 * Start a transaction in the remote node in REPEATABLE READ mode.  This
 	 * ensures that both the replication slot we create (see below) and the
@@ -1392,48 +1560,91 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 						res->err)));
 	walrcv_clear_result(res);
 
+	originid = replorigin_by_name(originname, true);
+
 	/*
 	 * Create a new permanent logical decoding slot. This slot will be used
 	 * for the catchup phase after COPY is done, so tell it to use the
 	 * snapshot to make the final data consistent.
+	 *
+	 * Replication slot will only be created if either this is the first run
+	 * of the worker or we're not using a previous replication slot.
 	 */
-	walrcv_create_slot(LogRepWorkerWalRcvConn,
-					   slotname, false /* permanent */ , false /* two_phase */ ,
-					   CRS_USE_SNAPSHOT, origin_startpos);
-
-	/*
-	 * Setup replication origin tracking. The purpose of doing this before the
-	 * copy is to avoid doing the copy again due to any error in setting up
-	 * origin tracking.
-	 */
-	originid = replorigin_by_name(originname, true);
-	if (!OidIsValid(originid))
+	if (!MyLogicalRepWorker->created_slot)
 	{
+		walrcv_create_slot(LogRepWorkerWalRcvConn,
+						   slotname, false /* permanent */ , false /* two_phase */ ,
+						   CRS_USE_SNAPSHOT, origin_startpos);
+
 		/*
-		 * Origin tracking does not exist, so create it now.
-		 *
-		 * Then advance to the LSN got from walrcv_create_slot. This is WAL
-		 * logged for the purpose of recovery. Locks are to prevent the
-		 * replication origin from vanishing while advancing.
+		 * Remember that we created the slot so that we will not try to create
+		 * it again.
 		 */
-		originid = replorigin_create(originname);
-
-		LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
-		replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
-						   true /* go backward */ , true /* WAL log */ );
-		UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+		SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+		MyLogicalRepWorker->created_slot = true;
+		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-		replorigin_session_setup(originid, 0);
-		replorigin_session_origin = originid;
+		/*
+		 * Setup replication origin tracking. The purpose of doing this before
+		 * the copy is to avoid doing the copy again due to any error in
+		 * setting up origin tracking.
+		 */
+		if (!OidIsValid(originid))
+		{
+			/*
+			 * Origin tracking does not exist, so create it now.
+			 */
+			originid = replorigin_create(originname);
+		}
+		else
+		{
+			/*
+			 * At this point, there shouldn't be any existing replication
+			 * origin with the same name.
+			 */
+			ereport(ERROR,
+					(errcode(ERRCODE_DUPLICATE_OBJECT),
+					 errmsg("replication origin \"%s\" already exists",
+							originname)));
+		}
 	}
 	else
 	{
-		ereport(ERROR,
-				(errcode(ERRCODE_DUPLICATE_OBJECT),
-				 errmsg("replication origin \"%s\" already exists",
-						originname)));
+		/*
+		 * Do not create a new replication slot, reuse the existing one
+		 * instead. Use a new snapshot for the replication slot to ensure that
+		 * tablesync and apply proceses are consistent with each other.
+		 */
+		WalRcvStreamOptions options;
+		int			server_version;
+
+		server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
+		options.proto.logical.proto_version =
+			server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
+			server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
+			LOGICALREP_PROTO_VERSION_NUM;
+		options.proto.logical.publication_names = MySubscription->publications;
+
+		HOLD_INTERRUPTS();
+		walrcv_slot_snapshot(LogRepWorkerWalRcvConn, slotname, &options, origin_startpos);
+		RESUME_INTERRUPTS();
 	}
 
+	/*
+	 * Advance to the LSN got from walrcv_create_slot. This is WAL
+	 * logged for the purpose of recovery. Locks are to prevent the
+	 * replication origin from vanishing while advancing.
+	 *
+	 * Then setup replication origin tracking.
+	 */
+	LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+	replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+					   true /* go backward */ , true /* WAL log */ );
+	UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+
+	replorigin_session_setup(originid, 0);
+	replorigin_session_origin = originid;
+
 	/* Now do the initial data copy */
 	PushActiveSnapshot(GetTransactionSnapshot());
 	copy_table(rel);
@@ -1456,10 +1667,12 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 * Update the persisted state to indicate the COPY phase is done; make it
 	 * visible to others.
 	 */
-	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-							   MyLogicalRepWorker->relid,
-							   SUBREL_STATE_FINISHEDCOPY,
-							   MyLogicalRepWorker->relstate_lsn);
+	UpdateSubscriptionRel(MyLogicalRepWorker->subid,
+						  MyLogicalRepWorker->relid,
+						  SUBREL_STATE_FINISHEDCOPY,
+						  MyLogicalRepWorker->relstate_lsn,
+						  slotname,
+						  originname);
 
 	CommitTransactionCommand();
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cfb2ab6248..c12924e4da 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -388,6 +388,7 @@ static void stream_open_file(Oid subid, TransactionId xid,
 static void stream_write_change(char action, StringInfo s);
 static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
 static void stream_close_file(void);
+static void stream_build_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos);
 
 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
 
@@ -456,13 +457,22 @@ ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 {
 	if (OidIsValid(relid))
 	{
-		/* Replication origin name for tablesync workers. */
-		snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
+		bool		is_null = true;
+
+		/*
+		 * Replication origin name for tablesync workers. First, look into the
+		 * catalog. If originname does not exist, then use the default name.
+		 */
+		GetSubscriptionRelOrigin(suboid, relid,
+								 originname, &is_null);
+		if (is_null)
+			snprintf(originname, szoriginname, "pg_%u_%lld", suboid, (long long) MyLogicalRepWorker->rep_slot_id);
 	}
 	else
 	{
 		/* Replication origin name for non-tablesync workers. */
 		snprintf(originname, szoriginname, "pg_%u", suboid);
+		elog(LOG, "apply worker originname %s", originname);
 	}
 }
 
@@ -3576,6 +3586,23 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 					MemoryContextReset(ApplyMessageContext);
 				}
 
+				/*
+				 * apply_dispatch() may have gone into apply_handle_commit()
+				 * which can move to next table while running
+				 * process_syncing_tables_for_sync. Before we were able to
+				 * reuse tablesync workers, that
+				 * process_syncing_tables_for_sync call would exit the worker
+				 * instead of moving to next table. Now that tablesync workers
+				 * can be reused, we need to take care of memory contexts here
+				 * before moving to sync a table.
+				 */
+				if (MyLogicalRepWorker->move_to_next_rel)
+				{
+					MemoryContextResetAndDeleteChildren(ApplyMessageContext);
+					MemoryContextSwitchTo(TopMemoryContext);
+					return;
+				}
+
 				len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 			}
 		}
@@ -3595,6 +3622,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 			/* Process any table synchronization changes. */
 			process_syncing_tables(last_received);
+			if (MyLogicalRepWorker->move_to_next_rel)
+			{
+				endofstream = true;
+			}
 		}
 
 		/* Cleanup the memory. */
@@ -3697,8 +3728,16 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	error_context_stack = errcallback.previous;
 	apply_error_context_stack = error_context_stack;
 
-	/* All done */
-	walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
+	/*
+	 * If it's moving to next relation, this is a sync worker. Sync workers
+	 * end the streaming during process_syncing_tables_for_sync. Calling
+	 * endstreaming twice causes "no COPY in progress" errors.
+	 */
+	if (!MyLogicalRepWorker->move_to_next_rel)
+	{
+		/* All done */
+		walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
+	}
 }
 
 /*
@@ -4282,6 +4321,56 @@ stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
 	stream_stop_internal(xid);
 }
 
+ /* stream_build_options
+  * 	Build logical replication streaming options.
+  *
+  * This function sets streaming options including replication slot name
+  * and origin start position. Workers need these options for logical replication.
+  */
+static void
+stream_build_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
+{
+	int			server_version;
+
+	options->logical = true;
+	options->startpoint = *origin_startpos;
+	options->slotname = slotname;
+
+	server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
+	options->proto.logical.proto_version =
+		server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
+		server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
+		server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
+		LOGICALREP_PROTO_VERSION_NUM;
+
+	options->proto.logical.publication_names = MySubscription->publications;
+	options->proto.logical.binary = MySubscription->binary;
+	options->proto.logical.twophase = false;
+	options->proto.logical.origin = pstrdup(MySubscription->origin);
+
+	/*
+	 * Assign the appropriate option value for streaming option according to
+	 * the 'streaming' mode and the publisher's ability to support that mode.
+	 */
+	if (server_version >= 160000 &&
+		MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
+	{
+		options->proto.logical.streaming_str = "parallel";
+		MyLogicalRepWorker->parallel_apply = true;
+	}
+	else if (server_version >= 140000 &&
+			 MySubscription->stream != LOGICALREP_STREAM_OFF)
+	{
+		options->proto.logical.streaming_str = "on";
+		MyLogicalRepWorker->parallel_apply = false;
+	}
+	else
+	{
+		options->proto.logical.streaming_str = NULL;
+		MyLogicalRepWorker->parallel_apply = false;
+	}
+}
+
 /*
  * Cleanup the memory for subxacts and reset the related variables.
  */
@@ -4356,6 +4445,9 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
 
 	/* allocate slot name in long-lived context */
 	*myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
+
+	/* Keep the replication slot name used for this sync. */
+	MyLogicalRepWorker->slot_name = *myslotname;
 	pfree(syncslotname);
 }
 
@@ -4393,6 +4485,135 @@ start_apply(XLogRecPtr origin_startpos)
 	PG_END_TRY();
 }
 
+/*
+ * Runs the tablesync worker.
+ * It starts table sync. After successful sync,
+ * builds streaming options and starts streaming.
+ */
+static void
+run_tablesync_worker(WalRcvStreamOptions *options,
+					 char *slotname,
+					 char *originname,
+					 int originname_size,
+					 XLogRecPtr *origin_startpos)
+{
+	/* Set this to false for safety, in case we're already reusing the worker */
+	MyLogicalRepWorker->move_to_next_rel = false;
+
+	start_table_sync(origin_startpos, &slotname);
+
+	/*
+	 * Allocate the origin name in long-lived context for error context
+	 * message.
+	 */
+	StartTransactionCommand();
+	ReplicationOriginNameForLogicalRep(MySubscription->oid,
+									   MyLogicalRepWorker->relid,
+									   originname,
+									   originname_size);
+	CommitTransactionCommand();
+
+	set_apply_error_context_origin(originname);
+
+	stream_build_options(options, slotname, origin_startpos);
+
+	/* Start normal logical streaming replication. */
+	walrcv_startstreaming(LogRepWorkerWalRcvConn, options);
+}
+
+/*
+ * Runs the apply worker.
+ * It sets up replication origin, the streaming options
+ * and then starts streaming.
+ */
+static void
+run_apply_worker(WalRcvStreamOptions *options,
+				 char *slotname,
+				 char *originname,
+				 int originname_size,
+				 XLogRecPtr *origin_startpos)
+{
+	/* This is the leader apply worker */
+	RepOriginId originid;
+	TimeLineID	startpointTLI;
+	char	   *err;
+
+	slotname = MySubscription->slotname;
+
+	/*
+		* This shouldn't happen if the subscription is enabled, but guard
+		* against DDL bugs or manual catalog changes.  (libpqwalreceiver will
+		* crash if slot is NULL.)
+		*/
+	if (!slotname)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("subscription has no replication slot set")));
+
+	/* Setup replication origin tracking. */
+	StartTransactionCommand();
+	ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+									   originname, originname_size);
+	originid = replorigin_by_name(originname, true);
+	if (!OidIsValid(originid))
+		originid = replorigin_create(originname);
+	replorigin_session_setup(originid, 0);
+	replorigin_session_origin = originid;
+	*origin_startpos = replorigin_session_get_progress(false);
+	CommitTransactionCommand();
+
+	LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+											MySubscription->name, &err);
+	if (LogRepWorkerWalRcvConn == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+					errmsg("could not connect to the publisher: %s", err)));
+
+	/*
+		* We don't really use the output identify_system for anything but it
+		* does some initializations on the upstream so let's still call it.
+		*/
+	(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+	set_apply_error_context_origin(originname);
+
+	stream_build_options(options, slotname, origin_startpos);
+
+	/*
+	 * Even when the two_phase mode is requested by the user, it remains as
+	 * the tri-state PENDING until all tablesyncs have reached READY state.
+	 * Only then, can it become ENABLED.
+	 *
+	 * Note: If the subscription has no tables then leave the state as
+	 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
+	 * work.
+	 */
+	if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+		AllTablesyncsReady())
+	{
+		/* Start streaming with two_phase enabled */
+		options->proto.logical.twophase = true;
+		walrcv_startstreaming(LogRepWorkerWalRcvConn, options);
+
+		StartTransactionCommand();
+		UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
+		MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
+		CommitTransactionCommand();
+	}
+	else
+	{
+		walrcv_startstreaming(LogRepWorkerWalRcvConn, options);
+	}
+
+	ereport(DEBUG1,
+			(errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
+							 MySubscription->name,
+							 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
+							 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
+							 MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
+							 "?")));
+}
+
 /*
  * Common initialization for leader apply worker and parallel apply worker.
  *
@@ -4485,7 +4706,6 @@ ApplyWorkerMain(Datum main_arg)
 	XLogRecPtr	origin_startpos = InvalidXLogRecPtr;
 	char	   *myslotname = NULL;
 	WalRcvStreamOptions options;
-	int			server_version;
 
 	/* Attach to slot */
 	logicalrep_worker_attach(worker_slot);
@@ -4513,156 +4733,48 @@ ApplyWorkerMain(Datum main_arg)
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
 
-	if (am_tablesync_worker())
-	{
-		start_table_sync(&origin_startpos, &myslotname);
-
-		ReplicationOriginNameForLogicalRep(MySubscription->oid,
-										   MyLogicalRepWorker->relid,
-										   originname,
-										   sizeof(originname));
-		set_apply_error_context_origin(originname);
-	}
-	else
-	{
-		/* This is the leader apply worker */
-		RepOriginId originid;
-		TimeLineID	startpointTLI;
-		char	   *err;
-
-		myslotname = MySubscription->slotname;
-
-		/*
-		 * This shouldn't happen if the subscription is enabled, but guard
-		 * against DDL bugs or manual catalog changes.  (libpqwalreceiver will
-		 * crash if slot is NULL.)
-		 */
-		if (!myslotname)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-					 errmsg("subscription has no replication slot set")));
-
-		/* Setup replication origin tracking. */
-		StartTransactionCommand();
-		ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
-										   originname, sizeof(originname));
-		originid = replorigin_by_name(originname, true);
-		if (!OidIsValid(originid))
-			originid = replorigin_create(originname);
-		replorigin_session_setup(originid, 0);
-		replorigin_session_origin = originid;
-		origin_startpos = replorigin_session_get_progress(false);
-		CommitTransactionCommand();
-
-		LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
-												MySubscription->name, &err);
-		if (LogRepWorkerWalRcvConn == NULL)
-			ereport(ERROR,
-					(errcode(ERRCODE_CONNECTION_FAILURE),
-					 errmsg("could not connect to the publisher: %s", err)));
-
-		/*
-		 * We don't really use the output identify_system for anything but it
-		 * does some initializations on the upstream so let's still call it.
-		 */
-		(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
-
-		set_apply_error_context_origin(originname);
-	}
-
 	/*
 	 * Setup callback for syscache so that we know when something changes in
-	 * the subscription relation state.
+	 * the subscription relation state. Do this outside the loop to avoid
+	 * exceeding MAX_SYSCACHE_CALLBACKS
 	 */
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
 								  invalidate_syncing_table_states,
 								  (Datum) 0);
 
-	/* Build logical replication streaming options. */
-	options.logical = true;
-	options.startpoint = origin_startpos;
-	options.slotname = myslotname;
-
-	server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
-	options.proto.logical.proto_version =
-		server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
-		server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
-		server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
-		LOGICALREP_PROTO_VERSION_NUM;
-
-	options.proto.logical.publication_names = MySubscription->publications;
-	options.proto.logical.binary = MySubscription->binary;
-
 	/*
-	 * Assign the appropriate option value for streaming option according to
-	 * the 'streaming' mode and the publisher's ability to support that mode.
+	 * The loop where worker does its job. It loops until the worker is not
+	 * reused.
 	 */
-	if (server_version >= 160000 &&
-		MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
-	{
-		options.proto.logical.streaming_str = "parallel";
-		MyLogicalRepWorker->parallel_apply = true;
-	}
-	else if (server_version >= 140000 &&
-			 MySubscription->stream != LOGICALREP_STREAM_OFF)
-	{
-		options.proto.logical.streaming_str = "on";
-		MyLogicalRepWorker->parallel_apply = false;
-	}
-	else
-	{
-		options.proto.logical.streaming_str = NULL;
-		MyLogicalRepWorker->parallel_apply = false;
-	}
-
-	options.proto.logical.twophase = false;
-	options.proto.logical.origin = pstrdup(MySubscription->origin);
-
-	if (!am_tablesync_worker())
+	while (MyLogicalRepWorker->is_first_run ||
+		   MyLogicalRepWorker->move_to_next_rel)
 	{
-		/*
-		 * Even when the two_phase mode is requested by the user, it remains
-		 * as the tri-state PENDING until all tablesyncs have reached READY
-		 * state. Only then, can it become ENABLED.
-		 *
-		 * Note: If the subscription has no tables then leave the state as
-		 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
-		 * work.
-		 */
-		if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
-			AllTablesyncsReady())
+		if (am_tablesync_worker())
 		{
-			/* Start streaming with two_phase enabled */
-			options.proto.logical.twophase = true;
-			walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
-
-			StartTransactionCommand();
-			UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
-			MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
-			CommitTransactionCommand();
+			/*
+			 * This is a tablesync worker. Start syncing tables before
+			 * starting the apply loop.
+			 */
+			run_tablesync_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos);
 		}
 		else
 		{
-			walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
+			/* This is main apply worker */
+			run_apply_worker(&options, myslotname, originname, sizeof(originname), &origin_startpos);
 		}
 
-		ereport(DEBUG1,
-				(errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
-						MySubscription->name,
-						MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
-						MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
-						MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
-						"?")));
-	}
-	else
-	{
-		/* Start normal logical streaming replication. */
-		walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
-	}
-
-	/* Run the main loop. */
-	start_apply(origin_startpos);
+		/* Run the main loop. */
+		start_apply(origin_startpos);
 
+		if (MyLogicalRepWorker->move_to_next_rel)
+		{
+			StartTransactionCommand();
+			ereport(LOG,
+					(errmsg("logical replication table synchronization worker for subscription \"%s\" has moved to sync table \"%s\".",
+							MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
+			CommitTransactionCommand();
+		}
+	}
 	proc_exit(0);
 }
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index b0f2a1705d..a0ee12e259 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -103,6 +103,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	/* Only publish data originating from the specified origin */
 	text		suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY);
+
+	/* The last used ID to create a replication slot for tablesync */
+	int64		sublastusedid BKI_DEFAULT(0);
 #endif
 } FormData_pg_subscription;
 
@@ -137,6 +140,8 @@ typedef struct Subscription
 	List	   *publications;	/* List of publication names to subscribe to */
 	char	   *origin;			/* Only publish data originating from the
 								 * specified origin */
+	int64		lastusedid;		/* Last used unique ID to create replication
+								 * slots in tablesync */
 } Subscription;
 
 /* Disallow streaming in-progress transactions. */
@@ -157,6 +162,7 @@ typedef struct Subscription
 extern Subscription *GetSubscription(Oid subid, bool missing_ok);
 extern void FreeSubscription(Subscription *sub);
 extern void DisableSubscription(Oid subid);
+extern void UpdateSubscriptionLastSlotId(Oid subid, int64 lastusedid);
 
 extern int	CountDBSubscriptions(Oid dbid);
 
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 60a2bcca23..a35d04cccd 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -44,6 +44,12 @@ CATALOG(pg_subscription_rel,6102,SubscriptionRelRelationId)
 											 * used for synchronization
 											 * coordination, or NULL if not
 											 * valid */
+	text		srrelslotname BKI_FORCE_NULL;	/* name of the replication
+												 * slot for relation in
+												 * subscription */
+	text		srreloriginname BKI_FORCE_NULL; /* origin name for relation in
+												 * subscription */
+
 #endif
 } FormData_pg_subscription_rel;
 
@@ -81,10 +87,17 @@ typedef struct SubscriptionRelState
 } SubscriptionRelState;
 
 extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
-									XLogRecPtr sublsn);
+									XLogRecPtr sublsn, char *relslotname, char *reloriginname);
 extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 									   XLogRecPtr sublsn);
+extern void UpdateSubscriptionRel(Oid subid, Oid relid, char state,
+								  XLogRecPtr sublsn, char *relslotname, char *reloriginname);
+extern void UpdateSubscriptionRelReplicationSlot(Oid subid, Oid relid, char *relslotname);
+
 extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
+extern void GetSubscriptionRelReplicationSlot(Oid subid, Oid relid, char *slotname);
+extern void GetSubscriptionRelOrigin(Oid subid, Oid relid, char *reloriginname, bool *isnull);
+
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
 extern bool HasSubscriptionRelations(Oid subid);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8872c80cdf..3547daaaec 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -219,8 +219,9 @@ extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern int	ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
-extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot);
+extern void ReplicationSlotNameForTablesync(Oid suboid, int64 slotid, char *syncslotname, Size szslot);
 extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
+extern List *GetReplicationSlotNamesBySubId(WalReceiverConn *wrconn, Oid subid, bool missing_ok);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index dc87a4edd1..5f4b7b1f7c 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -35,6 +35,26 @@ typedef struct LogicalRepWorker
 	/* Indicates if this slot is used or free. */
 	bool		in_use;
 
+	/*
+	 * Indicates if worker is running for the first time or in reuse
+	 */
+	bool		is_first_run;
+
+	/*
+	 * Indicates if the sync worker created a replication slot or it reuses an
+	 * existing one created by another worker.
+	 */
+	bool		created_slot;
+
+	/*
+	 * Unique identifier for replication slot to be created by tablesnync
+	 * workers, if needed.
+	 */
+	int64		rep_slot_id;
+
+	/* Replication slot name used by the worker. */
+	char	   *slot_name;
+
 	/* Increased every time the slot is taken by new worker. */
 	uint16		generation;
 
@@ -56,6 +76,12 @@ typedef struct LogicalRepWorker
 	XLogRecPtr	relstate_lsn;
 	slock_t		relmutex;
 
+	/*
+	 * Used to indicate whether sync worker will be reused for another
+	 * relation
+	 */
+	bool		move_to_next_rel;
+
 	/*
 	 * Used to create the changes and subxact files for the streaming
 	 * transactions.  Upon the arrival of the first streaming transaction or
@@ -231,7 +257,8 @@ extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
 extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 									 Oid userid, Oid relid,
-									 dsm_handle subworker_dsm);
+									 dsm_handle subworker_dsm,
+									 int64 slotid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
 extern void logicalrep_pa_worker_stop(int slot_no, uint16 generation);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
diff --git a/src/test/regress/expected/misc_sanity.out b/src/test/regress/expected/misc_sanity.out
index a57fd142a9..3d34a21421 100644
--- a/src/test/regress/expected/misc_sanity.out
+++ b/src/test/regress/expected/misc_sanity.out
@@ -47,20 +47,22 @@ WHERE c.oid < 16384 AND
       relkind = 'r' AND
       attstorage != 'p'
 ORDER BY 1, 2;
-         relname         |    attname    |   atttypid   
--------------------------+---------------+--------------
- pg_attribute            | attacl        | aclitem[]
- pg_attribute            | attfdwoptions | text[]
- pg_attribute            | attmissingval | anyarray
- pg_attribute            | attoptions    | text[]
- pg_class                | relacl        | aclitem[]
- pg_class                | reloptions    | text[]
- pg_class                | relpartbound  | pg_node_tree
- pg_index                | indexprs      | pg_node_tree
- pg_index                | indpred       | pg_node_tree
- pg_largeobject          | data          | bytea
- pg_largeobject_metadata | lomacl        | aclitem[]
-(11 rows)
+         relname         |     attname     |   atttypid   
+-------------------------+-----------------+--------------
+ pg_attribute            | attacl          | aclitem[]
+ pg_attribute            | attfdwoptions   | text[]
+ pg_attribute            | attmissingval   | anyarray
+ pg_attribute            | attoptions      | text[]
+ pg_class                | relacl          | aclitem[]
+ pg_class                | reloptions      | text[]
+ pg_class                | relpartbound    | pg_node_tree
+ pg_index                | indexprs        | pg_node_tree
+ pg_index                | indpred         | pg_node_tree
+ pg_largeobject          | data            | bytea
+ pg_largeobject_metadata | lomacl          | aclitem[]
+ pg_subscription_rel     | srreloriginname | text
+ pg_subscription_rel     | srrelslotname   | text
+(13 rows)
 
 -- system catalogs without primary keys
 --
-- 
2.25.1

