From 243ffbfc7622af7bbfc69ee5aa816198568c019c Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Wed, 23 Dec 2020 17:08:30 +1100
Subject: [PATCH v7] WIP patch for the Solution1.

This patch still applies onto the v30 patch set [1] from other 2PC thread:
[1] https://www.postgresql.org/message-id/CAFPTHDYA8yE6tEmQ2USYS68kNt%2BkM%3DSwKgj%3Djy4AvFD5e9-UTQ%40mail.gmail.com

(I understand this should be delivered as a separate patch independent of v30. I will convert it ASAP)

====

Coded / WIP:

* tablesync slot is now permanent instead of temporary. The tablesync slot name is no longer tied to the Subscription slot name.

* the tablesync slot cleanup (drop) code is added for DropSubscription and for finish_sync_worker functions

* tablesync worked now allowing multiple tx instead of single tx

* a new state (SUBREL_STATE_COPYDONE) is persisted after a successful copy_table in LogicalRepSyncTableStart.

* if a relaunched tablesync finds the state is SUBREL_STATE_COPYDONE then it will bypass the initial copy_table phase.

* tablesync sets up replication origin tracking in LogicalRepSyncTableStart (similar as done for the apply worker). The origin is advanced when first created.

* tablesync replication origin tracking is cleaned up during DropSubscription and/or process_syncing_tables_for_apply

* The v7 DropSubscription cleanup code is quite different now to how it was in v6. The subscription TAP tests have been executed 6x now without observing any race problems that were sometimes seen to happen in the v6 patch.

TODO / Known Issues:

* Help / comments / cleanup

* There is temporary "!!>>" excessive logging scattered around which I added to help my testing during development

* Address review comments
---
 src/backend/commands/subscriptioncmds.c     | 221 +++++++++++++++++++-------
 src/backend/replication/logical/origin.c    |   4 +-
 src/backend/replication/logical/tablesync.c | 231 ++++++++++++++++++++++++----
 src/backend/replication/logical/worker.c    |  21 +--
 src/include/catalog/pg_subscription_rel.h   |   1 +
 src/include/replication/slot.h              |   3 +
 6 files changed, 375 insertions(+), 106 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b0745d5..b98a7e5 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -37,6 +37,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "replication/worker_internal.h"
+#include "replication/slot.h"
 #include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -960,7 +961,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	char	   *err = NULL;
 	RepOriginId originid;
 	WalReceiverConn *wrconn = NULL;
-	StringInfoData cmd;
 	Form_pg_subscription form;
 
 	/*
@@ -1048,76 +1048,187 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	ReleaseSysCache(tup);
 
 	/*
-	 * Stop all the subscription workers immediately.
-	 *
-	 * This is necessary if we are dropping the replication slot, so that the
-	 * slot becomes accessible.
+	 * Try to acquire the connection necessary for dropping slots.
+	 * We do this here so that the same connection may be shared
+	 * for dropping the Subscription slot, as well as dropping any
+	 * tablesync slots.
 	 *
-	 * It is also necessary if the subscription is disabled and was disabled
-	 * in the same transaction.  Then the workers haven't seen the disabling
-	 * yet and will still be running, leading to hangs later when we want to
-	 * drop the replication origin.  If the subscription was disabled before
-	 * this transaction, then there shouldn't be any workers left, so this
-	 * won't make a difference.
-	 *
-	 * New workers won't be started because we hold an exclusive lock on the
-	 * subscription till the end of the transaction.
+	 * Note: If the slotname is NONE/NULL then connection errors are
+	 * suppressed. This is necessary so that the DROP SUBSCRIPTION
+	 * can still complete even when the connection to publisher is
+	 * broken.
 	 */
-	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-	subworkers = logicalrep_workers_find(subid, false);
-	LWLockRelease(LogicalRepWorkerLock);
-	foreach(lc, subworkers)
+	load_file("libpqwalreceiver", false);
+
+	wrconn = walrcv_connect(conninfo, true, subname, &err);
+	if (wrconn == NULL && slotname != NULL)
+		ereport(ERROR,
+				(errmsg("could not connect to publisher when attempting to "
+						"drop the replication slot \"%s\"", slotname),
+				 errdetail("The error was: %s", err),
+		/* translator: %s is an SQL ALTER command */
+				 errhint("Use %s to disassociate the subscription from the slot.",
+						 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
+
+	PG_TRY();
 	{
-		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+		/*
+		 * Stop all the subscription workers immediately.
+		 *
+		 * This is necessary if we are dropping the replication slot, so that the
+		 * slot becomes accessible.
+		 *
+		 * It is also necessary if the subscription is disabled and was disabled
+		 * in the same transaction.  Then the workers haven't seen the disabling
+		 * yet and will still be running, leading to hangs later when we want to
+		 * drop the replication origin.  If the subscription was disabled before
+		 * this transaction, then there shouldn't be any workers left, so this
+		 * won't make a difference.
+		 *
+		 * New workers won't be started because we hold an exclusive lock on the
+		 * subscription till the end of the transaction.
+		 */
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		subworkers = logicalrep_workers_find(subid, false);
+		LWLockRelease(LogicalRepWorkerLock);
+		foreach(lc, subworkers)
+		{
+			LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-		logicalrep_worker_stop(w->subid, w->relid);
-	}
-	list_free(subworkers);
+			logicalrep_worker_stop(w->subid, w->relid);
+		}
+		list_free(subworkers);
 
-	/* Clean up dependencies */
-	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
+		/*
+		 * Tablesync resource cleanup (slots and origins).
+		 *
+		 * Any READY-state relations should already have been dealt with clean-ups.
+		 */
+		{
+			List	   *rstates;
+			ListCell   *lc;
 
-	/* Remove any associated relation synchronization states. */
-	RemoveSubscriptionRel(subid, InvalidOid);
+			rstates = GetSubscriptionNotReadyRelations(subid);
+			foreach(lc, rstates)
+			{
+				SubscriptionRelState   *rstate = (SubscriptionRelState *) lfirst(lc);
+				Oid						relid = rstate->relid;
 
-	/* Remove the origin tracking if exists. */
-	snprintf(originname, sizeof(originname), "pg_%u", subid);
-	originid = replorigin_by_name(originname, true);
-	if (originid != InvalidRepOriginId)
-		replorigin_drop(originid, false);
+				/* Only cleanup the tablesync worker resources */
+				if (!OidIsValid(relid))
+					continue;
 
-	/*
-	 * If there is no slot associated with the subscription, we can finish
-	 * here.
-	 */
-	if (!slotname)
+				/* Drop the tablesync slot. */
+				{
+					char *syncslotname = ReplicationSlotNameForTablesync(subid, relid);
+
+					/*
+					 * If the subscription slotname is NONE/NULL and the connection to publisher is
+					 * broken, then the DropSubscription should still be allowed to complete.
+					 * But without a connection it is not possible to drop any tablesync slots.
+					 */
+					if (!wrconn)
+					{
+						/* FIXME - OK to just log a warning? */
+						elog(WARNING, "!!>> DropSubscription: no connection. Cannot drop tablesync slot \"%s\".",
+									  syncslotname);
+					}
+					else
+					{
+						PG_TRY();
+						{
+							elog(LOG, "!!>> DropSubscription: dropping the tablesync slot \"%s\".", syncslotname);
+							ReplicationSlotDropAtPubNode(wrconn, syncslotname);
+							elog(LOG, "!!>> DropSubscription: dropped the tablesync slot \"%s\".", syncslotname);
+						}
+						PG_CATCH();
+						{
+							/*
+							 * Typically tablesync will delete its own slot after it reaches
+							 * SYNCDONE state. Next the apply worker move the tablesync from
+							 * SYNCDONE to READY state.
+							 *
+							 * Rarely, the DropSubscription may be issued when a tablesync still
+							 * is in SYNCDONE but not yet in READY state. If this happens then
+							 * the drop slot could fail because it is already dropped.
+							 * In this case suppress and drop slot error.
+							 *
+							 * FIXME - Is there a better way than this?
+							 */
+							if (rstate->state != SUBREL_STATE_SYNCDONE)
+								PG_RE_THROW();
+						}
+						PG_END_TRY();
+					}
+					pfree(syncslotname);
+				}
+
+				/* Remove the tablesync's origin tracking if exists. */
+				{
+					snprintf(originname, sizeof(originname), "pg_%u_%u", subid, relid);
+					originid = replorigin_by_name(originname, true);
+					if (originid != InvalidRepOriginId)
+					{
+						elog(LOG, "!!>> DropSubscription: dropping origin tracking for \"%s\"", originname);
+						replorigin_drop(originid, false);
+						elog(LOG, "!!>> DropSubscription: dropped origin tracking for \"%s\"", originname);
+					}
+				}
+
+			}
+			list_free(rstates);
+		}
+
+		/* Clean up dependencies */
+		deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
+
+		/* Remove any associated relation synchronization states. */
+		RemoveSubscriptionRel(subid, InvalidOid);
+
+		/* Remove the origin tracking if exists. */
+		snprintf(originname, sizeof(originname), "pg_%u", subid);
+		originid = replorigin_by_name(originname, true);
+		if (originid != InvalidRepOriginId)
+			replorigin_drop(originid, false);
+
+		/*
+		 * If there is a slot associated with the subscription, then drop the
+		 * replication slot at the publisher node using the replication
+		 * connection.
+		 */
+		if (slotname)
+			ReplicationSlotDropAtPubNode(wrconn, slotname);
+	}
+	PG_FINALLY();
 	{
+		if (wrconn)
+			walrcv_disconnect(wrconn);
+
 		table_close(rel, NoLock);
-		return;
 	}
+	PG_END_TRY();
+}
+
+
+/*
+ * Drop the replication slot at the publisher node
+ * using the replication connection.
+ */
+void
+ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname)
+{
+	StringInfoData	cmd;
+
+	Assert(wrconn);
 
-	/*
-	 * Otherwise drop the replication slot at the publisher node using the
-	 * replication connection.
-	 */
 	load_file("libpqwalreceiver", false);
 
 	initStringInfo(&cmd);
 	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
 
-	wrconn = walrcv_connect(conninfo, true, subname, &err);
-	if (wrconn == NULL)
-		ereport(ERROR,
-				(errmsg("could not connect to publisher when attempting to "
-						"drop the replication slot \"%s\"", slotname),
-				 errdetail("The error was: %s", err),
-		/* translator: %s is an SQL ALTER command */
-				 errhint("Use %s to disassociate the subscription from the slot.",
-						 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
-
 	PG_TRY();
 	{
-		WalRcvExecResult *res;
+		WalRcvExecResult   *res;
 
 		res = walrcv_exec(wrconn, cmd.data, 0, NULL);
 
@@ -1135,13 +1246,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 	PG_FINALLY();
 	{
-		walrcv_disconnect(wrconn);
+		pfree(cmd.data);
 	}
 	PG_END_TRY();
-
-	pfree(cmd.data);
-
-	table_close(rel, NoLock);
 }
 
 /*
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 15ab8e7..6b79dc6 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -843,7 +843,7 @@ replorigin_redo(XLogReaderState *record)
  * that originated at the LSN remote_commit on the remote node was replayed
  * successfully and that we don't need to do so again. In combination with
  * setting up replorigin_session_origin_lsn and replorigin_session_origin
- * that ensures we won't loose knowledge about that after a crash if the
+ * that ensures we won't lose knowledge about that after a crash if the
  * transaction had a persistent effect (think of asynchronous commits).
  *
  * local_commit needs to be a local LSN of the commit so that we can make sure
@@ -905,7 +905,7 @@ replorigin_advance(RepOriginId node,
 		LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
 
 		/* Make sure it's not used by somebody else */
-		if (replication_state->acquired_by != 0)
+		if (replication_state->acquired_by != 0 && replication_state->acquired_by != MyProcPid)
 		{
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_IN_USE),
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 1904f34..0d7e4ce 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -102,6 +102,8 @@
 #include "replication/logicalrelation.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
+#include "replication/slot.h"
+#include "replication/origin.h"
 #include "storage/ipc.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -139,6 +141,33 @@ finish_sync_worker(void)
 					get_rel_name(MyLogicalRepWorker->relid))));
 	CommitTransactionCommand();
 
+	/*
+	 * Cleanup the tablesync slot.
+	 */
+	{
+		/* Calculate the name of the tablesync slot */
+		char *syncslotname = ReplicationSlotNameForTablesync(
+						MySubscription->oid,
+						MyLogicalRepWorker->relid);
+
+		PG_TRY();
+		{
+			elog(LOG, "!!>> finish_sync_worker: dropping the tablesync slot \"%s\".", syncslotname);
+			ReplicationSlotDropAtPubNode(wrconn, syncslotname);
+			elog(LOG, "!!>> finish_sync_worker: dropped the tablesync slot \"%s\".", syncslotname);
+		}
+		PG_CATCH();
+		{
+			/*
+			 * NOP. Suppress any drop slot error because otherwise
+			 * it would cause the tablesync to fail and re-launch.
+			 */
+		}
+		PG_END_TRY();
+
+		pfree(syncslotname);
+	}
+
 	/* Find the main apply worker and signal it. */
 	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
 
@@ -270,8 +299,6 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 static void
 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 {
-	Assert(IsTransactionState());
-
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 
 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
@@ -284,6 +311,15 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+		/*
+		 * UpdateSubscriptionRelState must be called within a transaction.
+		 * That transaction will be ended within the finish_sync_worker().
+		 */
+		if (!IsTransactionState())
+		{
+			StartTransactionCommand();
+		}
+
 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 								   MyLogicalRepWorker->relid,
 								   MyLogicalRepWorker->relstate,
@@ -407,12 +443,41 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			{
 				rstate->state = SUBREL_STATE_READY;
 				rstate->lsn = current_lsn;
+
 				if (!started_tx)
 				{
 					StartTransactionCommand();
 					started_tx = true;
 				}
 
+				/*
+				 * Remove the tablesync origin tracking if exists.
+				 *
+				 * The cleanup is done here instead of in the finish_sync_worker function because
+				 * if the tablesync worker process attempted to call replorigin_drop then that will
+				 * hang because the replorigin_drop considers the owning tablesync PID as "busy".
+				 *
+				 * Do this before updating the state, so that DropSubscription can know that all
+				 * READY workers have already had their origin tracking removed.
+				 */
+				{
+					char        originname[NAMEDATALEN];
+					RepOriginId originid;
+
+					snprintf(originname, sizeof(originname), "pg_%u_%u", MyLogicalRepWorker->subid, rstate->relid);
+					originid = replorigin_by_name(originname, true);
+					elog(LOG, "!!>> apply worker: find tablesync origin tracking for \"%s\".", originname);
+					if (OidIsValid(originid))
+					{
+						elog(LOG, "!!>> apply worker: dropping tablesync origin tracking for \"%s\".", originname);
+						replorigin_drop(originid, false);
+						elog(LOG, "!!>> apply worker: dropped tablesync origin tracking for \"%s\".", originname);
+					}
+				}
+
+				/*
+				 * Update the state only after the origin cleanup.
+				 */
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 										   rstate->relid, rstate->state,
 										   rstate->lsn);
@@ -808,6 +873,32 @@ copy_table(Relation rel)
 	logicalrep_rel_close(relmapentry, NoLock);
 }
 
+
+/*
+ * Determine the tablesync slot name.
+ *
+ * The returned slot name is palloc'ed in current memory context.
+ */
+char *
+ReplicationSlotNameForTablesync(Oid suboid, Oid relid)
+{
+	char *syncslotname;
+
+	/*
+	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
+	 * 1 characters.
+	 *
+	 * The name is calculated as pg_%u_sync_%u (3 + 10 + 6 + 10 + '\0').
+	 * (It's actually the NAMEDATALEN on the remote that matters, but this
+	 * scheme will also work reasonably if that is different.)
+	 */
+	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");   /* for sanity */
+
+	syncslotname = psprintf("pg_%u_sync_%u", suboid, relid);
+
+	return syncslotname;
+}
+
 /*
  * Start syncing the table in the sync worker.
  *
@@ -825,6 +916,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	XLogRecPtr	relstate_lsn;
 	Relation	rel;
 	WalRcvExecResult *res;
+	bool		copied_ok;
 
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
@@ -850,17 +942,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 			finish_sync_worker();	/* doesn't return */
 	}
 
-	/*
-	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
-	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
-	 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
-	 * NAMEDATALEN on the remote that matters, but this scheme will also work
-	 * reasonably if that is different.)
-	 */
-	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");	/* for sanity */
-	slotname = psprintf("%.*s_%u_sync_%u",
-						NAMEDATALEN - 28,
-						MySubscription->slotname,
+	/* Calculate the name of the tablesync slot. */
+	slotname = ReplicationSlotNameForTablesync(
 						MySubscription->oid,
 						MyLogicalRepWorker->relid);
 
@@ -875,7 +958,18 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 				(errmsg("could not connect to the publisher: %s", err)));
 
 	Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
-		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_COPYDONE);
+
+	if (MyLogicalRepWorker->relstate == SUBREL_STATE_COPYDONE)
+	{
+		/*
+		 * The COPY phase was previously done, but tablesync then crashed/etc
+		 * before it was able to finish normally.
+		 */
+		elog(LOG, "!!>> LogicalRepSyncTableStart: tablesync relstate was SUBREL_STATE_COPYDONE.");
+		goto copy_table_done;
+	}
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 	MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -891,9 +985,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	/*
-	 * We want to do the table data sync in a single transaction.
-	 */
 	StartTransactionCommand();
 
 	/*
@@ -919,29 +1010,105 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	walrcv_clear_result(res);
 
 	/*
-	 * Create a new temporary logical decoding slot.  This slot will be used
+	 * Create a new permanent logical decoding slot.  This slot will be used
 	 * for the catchup phase after COPY is done, so tell it to use the
 	 * snapshot to make the final data consistent.
 	 */
-	walrcv_create_slot(wrconn, slotname, true,
+	elog(LOG, "!!>> LogicalRepSyncTableStart: walrcv_create_slot for \"%s\".", slotname);
+	walrcv_create_slot(wrconn, slotname, false,
 					   CRS_USE_SNAPSHOT, origin_startpos);
 
-	/* Now do the initial data copy */
-	PushActiveSnapshot(GetTransactionSnapshot());
-	copy_table(rel);
-	PopActiveSnapshot();
+	/*
+	 * Be sure to remove the newly created tablesync slot if the COPY fails.
+	 */
+	copied_ok = false;
+	PG_TRY();
+	{
+		/* Now do the initial data copy */
+		PushActiveSnapshot(GetTransactionSnapshot());
+		copy_table(rel);
+		PopActiveSnapshot();
+
+		res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+		if (res->status != WALRCV_OK_COMMAND)
+			ereport(ERROR,
+					(errmsg("table copy could not finish transaction on publisher"),
+					 errdetail("The error was: %s", res->err)));
+		walrcv_clear_result(res);
 
-	res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
-	if (res->status != WALRCV_OK_COMMAND)
-		ereport(ERROR,
-				(errmsg("table copy could not finish transaction on publisher"),
-				 errdetail("The error was: %s", res->err)));
-	walrcv_clear_result(res);
+		table_close(rel, NoLock);
+
+		/* Make the copy visible. */
+		CommandCounterIncrement();
 
-	table_close(rel, NoLock);
+		copied_ok = true;
+	}
+	PG_FINALLY();
+	{
+		/* If something failed during copy table then cleanup the created slot. */
+		if (!copied_ok)
+		{
+			elog(LOG, "!!>> LogicalRepSyncTableStart: tablesync copy failed. Dropping the tablesync slot \"%s\".", slotname);
+			ReplicationSlotDropAtPubNode(wrconn, slotname);
+			elog(LOG, "!!>> LogicalRepSyncTableStart: tablesync copy failed. Dropped the tablesync slot \"%s\".", slotname);
+
+			pfree(slotname);
+			slotname = NULL;
+		}
+	}
+	PG_END_TRY();
 
-	/* Make the copy visible. */
-	CommandCounterIncrement();
+	CommitTransactionCommand();
+
+	/* Update the persisted state to indicate the COPY phase is done; make it visible to others. */
+	StartTransactionCommand();
+	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+							   MyLogicalRepWorker->relid,
+							   SUBREL_STATE_COPYDONE,
+							   MyLogicalRepWorker->relstate_lsn);
+	CommitTransactionCommand();
+
+copy_table_done:
+
+	/* Setup replication origin tracking. */
+	{
+		char		originname[NAMEDATALEN];
+		RepOriginId originid;
+
+		StartTransactionCommand();
+		snprintf(originname, sizeof(originname), "pg_%u_%u", MySubscription->oid, MyLogicalRepWorker->relid);
+		originid = replorigin_by_name(originname, true);
+		if (!OidIsValid(originid))
+		{
+			/*
+			 * Origin tracking does not exist. Create it now, and advance to LSN got from walrcv_create_slot.
+			 */
+			elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_create \"%s\".", originname);
+			originid = replorigin_create(originname);
+			elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_session_setup \"%s\".", originname);
+			replorigin_session_setup(originid);
+			replorigin_session_origin = originid;
+			elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_advance \"%s\".", originname);
+			replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+							   true /* go backward */ , true /* WAL log */ );
+		}
+		else
+		{
+			/*
+			 * Origin tracktrack already exists.
+			 */
+			elog(LOG, "!!>> LogicalRepSyncTableStart: 2 replorigin_session_setup \"%s\".", originname);
+			replorigin_session_setup(originid);
+			replorigin_session_origin = originid;
+			elog(LOG, "!!>> LogicalRepSyncTableStart: 2 replorigin_session_get_progress \"%s\".", originname);
+			*origin_startpos = replorigin_session_get_progress(false);
+		}
+		elog(LOG, "!!>> LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
+				   originname,
+				   (uint32) (*origin_startpos >> 32),
+				   (uint32) *origin_startpos);
+		CommitTransactionCommand();
+	}
 
 	/*
 	 * We are done with the initial data synchronization, update the state.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 9271f87..a60e9fd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -771,8 +771,7 @@ apply_handle_prepare_txn(LogicalRepPrepareData *prepare_data)
 
 	Assert(prepare_data->prepare_lsn == remote_final_lsn);
 
-	/* The synchronization worker runs in single transaction. */
-	if (IsTransactionState() && !am_tablesync_worker())
+	if (IsTransactionState())
 	{
 		/*
 		 * BeginTransactionBlock is necessary to balance the
@@ -1079,12 +1078,8 @@ apply_handle_stream_stop(StringInfo s)
 	/* We must be in a valid transaction state */
 	Assert(IsTransactionState());
 
-	/* The synchronization worker runs in single transaction. */
-	if (!am_tablesync_worker())
-	{
-		/* Commit the per-stream transaction */
-		CommitTransactionCommand();
-	}
+	/* Commit the per-stream transaction */
+	CommitTransactionCommand();
 
 	in_streamed_transaction = false;
 
@@ -1161,9 +1156,7 @@ apply_handle_stream_abort(StringInfo s)
 			/* Cleanup the subxact info */
 			cleanup_subxact_info();
 
-			/* The synchronization worker runs in single transaction */
-			if (!am_tablesync_worker())
-				CommitTransactionCommand();
+			CommitTransactionCommand();
 			return;
 		}
 
@@ -1190,8 +1183,7 @@ apply_handle_stream_abort(StringInfo s)
 		/* write the updated subxact list */
 		subxact_info_write(MyLogicalRepWorker->subid, xid);
 
-		if (!am_tablesync_worker())
-			CommitTransactionCommand();
+		CommitTransactionCommand();
 	}
 }
 
@@ -1350,8 +1342,7 @@ apply_handle_stream_commit(StringInfo s)
 static void
 apply_handle_commit_internal(StringInfo s, LogicalRepCommitData* commit_data)
 {
-	/* The synchronization worker runs in single transaction. */
-	if (IsTransactionState() && !am_tablesync_worker())
+	if (IsTransactionState())
 	{
 		/*
 		 * Update origin state so we can restart streaming from correct
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index acc2926..e9f2b3f 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -61,6 +61,7 @@ DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg_subs
 #define SUBREL_STATE_INIT		'i' /* initializing (sublsn NULL) */
 #define SUBREL_STATE_DATASYNC	'd' /* data is being synchronized (sublsn
 									 * NULL) */
+#define SUBREL_STATE_COPYDONE	'C' /* tablesync copy phase is completed */
 #define SUBREL_STATE_SYNCDONE	's' /* synchronization finished in front of
 									 * apply (sublsn set) */
 #define SUBREL_STATE_READY		'r' /* ready (sublsn set) */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 63bab69..5f19089 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,6 +15,7 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
+#include "replication/walreceiver.h"
 
 /*
  * Behaviour of replication slots, upon release or crash.
@@ -211,6 +212,8 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern char *ReplicationSlotNameForTablesync(Oid suboid, Oid relid);
+extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
-- 
1.8.3.1

