From 3e69d48bf311e763fc1e9fd04afd09b9dea358a7 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Thu, 14 Jan 2021 15:07:23 +1100
Subject: [PATCH v15] Tablesync Solution1.

====

Features:

* The tablesync slot is now permanent instead of temporary.

* The tablesync slot name is no longer tied to the Subscription slot name.

* The tablesync slot cleanup (drop) code is added for DropSubscription, AlterSubscription_refresh and for process_syncing_tables_for_sync functions. Drop/AlterSubscription will issue WARNING instead of ERROR in case the slot drop fails.

* The tablesync worker is now allowing multiple tx instead of single tx

* A new state (SUBREL_STATE_FINISHEDCOPY) is persisted after a successful copy_table in tablesync's LogicalRepSyncTableStart.

* If a re-launched tablesync finds state SUBREL_STATE_FINISHEDCOPY then it will bypass the initial copy_table phase.

* Now tablesync sets up replication origin tracking in LogicalRepSyncTableStart (similar as done for the apply worker). The origin is advanced when first created.

* The tablesync replication origin tracking is cleaned up during DropSubscription and/or process_syncing_tables_for_apply.

* The DropSubscription cleanup code was enhanced (v7+) to take care of any crashed tablesync workers.

* The AlterSubscription_refresh (v14+) is now more similar to DropSubscription w.r.t to stopping tablesync workers for any "removed" tables.

* Updates to PG docs.

TODO / Known Issues:

* The AlterSubscription tablesync cleanup code still has problems [1]
[1] = https://www.postgresql.org/message-id/CAA4eK1JuwZF7FHM%2BEPjWdVh%3DXaz-7Eo-G0TByMjWeUU32Xue3w%40mail.gmail.com
---
 doc/src/sgml/catalogs.sgml                  |   1 +
 doc/src/sgml/logical-replication.sgml       |  17 +-
 doc/src/sgml/ref/drop_subscription.sgml     |   6 +-
 src/backend/commands/subscriptioncmds.c     | 454 ++++++++++++++++++++--------
 src/backend/replication/logical/tablesync.c | 259 +++++++++++++---
 src/backend/replication/logical/worker.c    |  18 +-
 src/include/catalog/pg_subscription_rel.h   |   2 +
 src/include/replication/slot.h              |   3 +
 8 files changed, 567 insertions(+), 193 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 3a22665..2e46a49 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7651,6 +7651,7 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        State code:
        <literal>i</literal> = initialize,
        <literal>d</literal> = data is being copied,
+       <literal>f</literal> = finished table copy,
        <literal>s</literal> = synchronized,
        <literal>r</literal> = ready (normal replication)
       </para></entry>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index a560ad6..20cdd57 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -248,7 +248,17 @@
 
    <para>
     As mentioned earlier, each (active) subscription receives changes from a
-    replication slot on the remote (publishing) side.  Normally, the remote
+    replication slot on the remote (publishing) side.
+   </para>
+   <para>
+    Additional table synchronization slots are normally transient, created
+    internally and dropped automatically when they are no longer needed.
+    These table synchronization slots have generated names:
+    <quote><literal>pg_%u_sync_%u</literal></quote> (parameters: Subscription
+    <parameter>oid</parameter>, Table <parameter>relid</parameter>)
+   </para>
+   <para>
+    Normally, the remote
     replication slot is created automatically when the subscription is created
     using <command>CREATE SUBSCRIPTION</command> and it is dropped
     automatically when the subscription is dropped using <command>DROP
@@ -294,8 +304,9 @@
        using <command>ALTER SUBSCRIPTION</command> before attempting to drop
        the subscription.  If the remote database instance no longer exists, no
        further action is then necessary.  If, however, the remote database
-       instance is just unreachable, the replication slot should then be
-       dropped manually; otherwise it would continue to reserve WAL and might
+       instance is just unreachable, the replication slot (and any still 
+       remaining table synchronization slots) should then be
+       dropped manually; otherwise it/they would continue to reserve WAL and might
        eventually cause the disk to fill up.  Such cases should be carefully
        investigated.
       </para>
diff --git a/doc/src/sgml/ref/drop_subscription.sgml b/doc/src/sgml/ref/drop_subscription.sgml
index adbdeaf..aee9615 100644
--- a/doc/src/sgml/ref/drop_subscription.sgml
+++ b/doc/src/sgml/ref/drop_subscription.sgml
@@ -79,7 +79,8 @@ DROP SUBSCRIPTION [ IF EXISTS ] <replaceable class="parameter">name</replaceable
   <para>
    When dropping a subscription that is associated with a replication slot on
    the remote host (the normal state), <command>DROP SUBSCRIPTION</command>
-   will connect to the remote host and try to drop the replication slot as
+   will connect to the remote host and try to drop the replication slot (and
+   any remaining table synchronization slots) as
    part of its operation.  This is necessary so that the resources allocated
    for the subscription on the remote host are released.  If this fails,
    either because the remote host is not reachable or because the remote
@@ -89,7 +90,8 @@ DROP SUBSCRIPTION [ IF EXISTS ] <replaceable class="parameter">name</replaceable
    executing <literal>ALTER SUBSCRIPTION ... SET (slot_name = NONE)</literal>.
    After that, <command>DROP SUBSCRIPTION</command> will no longer attempt any
    actions on a remote host.  Note that if the remote replication slot still
-   exists, it should then be dropped manually; otherwise it will continue to
+   exists, it (and any related table synchronization slots) should then be
+   dropped manually; otherwise it/they will continue to
    reserve WAL and might eventually cause the disk to fill up.  See
    also <xref linkend="logical-replication-subscription-slot"/>.
   </para>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 490e935..f94243b 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -37,6 +37,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "replication/worker_internal.h"
+#include "replication/slot.h"
 #include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -566,100 +567,165 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 	Oid		   *pubrel_local_oids;
 	ListCell   *lc;
 	int			off;
+	Relation	rel;
 
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
 
-	/* Try to connect to the publisher. */
-	wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
-	if (!wrconn)
-		ereport(ERROR,
-				(errmsg("could not connect to the publisher: %s", err)));
-
-	/* Get the table list from publisher. */
-	pubrel_names = fetch_table_list(wrconn, sub->publications);
-
-	/* We are done with the remote side, close connection. */
-	walrcv_disconnect(wrconn);
+	PG_TRY();
+	{
+		/* Try to connect to the publisher. */
+		wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
+		if (!wrconn)
+			ereport(ERROR,
+					(errmsg("could not connect to the publisher: %s", err)));
 
-	/* Get local table list. */
-	subrel_states = GetSubscriptionRelations(sub->oid);
+		/* Get the table list from publisher. */
+		pubrel_names = fetch_table_list(wrconn, sub->publications);
 
-	/*
-	 * Build qsorted array of local table oids for faster lookup. This can
-	 * potentially contain all tables in the database so speed of lookup is
-	 * important.
-	 */
-	subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
-	off = 0;
-	foreach(lc, subrel_states)
-	{
-		SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+		/* Get local table list. */
+		subrel_states = GetSubscriptionRelations(sub->oid);
 
-		subrel_local_oids[off++] = relstate->relid;
-	}
-	qsort(subrel_local_oids, list_length(subrel_states),
-		  sizeof(Oid), oid_cmp);
+		/*
+		 * Build qsorted array of local table oids for faster lookup. This can
+		 * potentially contain all tables in the database so speed of lookup
+		 * is important.
+		 */
+		subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+		off = 0;
+		foreach(lc, subrel_states)
+		{
+			SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
 
-	/*
-	 * Walk over the remote tables and try to match them to locally known
-	 * tables. If the table is not known locally create a new state for it.
-	 *
-	 * Also builds array of local oids of remote tables for the next step.
-	 */
-	off = 0;
-	pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+			subrel_local_oids[off++] = relstate->relid;
+		}
+		qsort(subrel_local_oids, list_length(subrel_states),
+			  sizeof(Oid), oid_cmp);
+
+		/*
+		 * Walk over the remote tables and try to match them to locally known
+		 * tables. If the table is not known locally create a new state for
+		 * it.
+		 *
+		 * Also builds array of local oids of remote tables for the next step.
+		 */
+		off = 0;
+		pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
+
+		foreach(lc, pubrel_names)
+		{
+			RangeVar   *rv = (RangeVar *) lfirst(lc);
+			Oid			relid;
 
-	foreach(lc, pubrel_names)
-	{
-		RangeVar   *rv = (RangeVar *) lfirst(lc);
-		Oid			relid;
+			relid = RangeVarGetRelid(rv, AccessShareLock, false);
 
-		relid = RangeVarGetRelid(rv, AccessShareLock, false);
+			/* Check for supported relkind. */
+			CheckSubscriptionRelkind(get_rel_relkind(relid),
+									 rv->schemaname, rv->relname);
 
-		/* Check for supported relkind. */
-		CheckSubscriptionRelkind(get_rel_relkind(relid),
-								 rv->schemaname, rv->relname);
+			pubrel_local_oids[off++] = relid;
 
-		pubrel_local_oids[off++] = relid;
+			if (!bsearch(&relid, subrel_local_oids,
+						 list_length(subrel_states), sizeof(Oid), oid_cmp))
+			{
+				AddSubscriptionRelState(sub->oid, relid,
+										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+										InvalidXLogRecPtr);
+				ereport(DEBUG1,
+						(errmsg("table \"%s.%s\" added to subscription \"%s\"",
+								rv->schemaname, rv->relname, sub->name)));
+			}
+		}
 
-		if (!bsearch(&relid, subrel_local_oids,
-					 list_length(subrel_states), sizeof(Oid), oid_cmp))
+		/*
+		 * Next remove state for tables we should not care about anymore using
+		 * the data we collected above
+		 */
+		qsort(pubrel_local_oids, list_length(pubrel_names),
+			  sizeof(Oid), oid_cmp);
+
+		/*
+		 * Lock pg_subscription with AccessExclusiveLock to ensure that the
+		 * launcher doesn't restart new worker for the ones we are about to
+		 * stop.
+		 */
+		rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
+
+		for (off = 0; off < list_length(subrel_states); off++)
 		{
-			AddSubscriptionRelState(sub->oid, relid,
-									copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-									InvalidXLogRecPtr);
-			ereport(DEBUG1,
-					(errmsg("table \"%s.%s\" added to subscription \"%s\"",
-							rv->schemaname, rv->relname, sub->name)));
-		}
-	}
+			Oid			relid = subrel_local_oids[off];
 
-	/*
-	 * Next remove state for tables we should not care about anymore using the
-	 * data we collected above
-	 */
-	qsort(pubrel_local_oids, list_length(pubrel_names),
-		  sizeof(Oid), oid_cmp);
+			if (!bsearch(&relid, pubrel_local_oids,
+						 list_length(pubrel_names), sizeof(Oid), oid_cmp))
+			{
+				Oid			subid = sub->oid;
+				char		originname[NAMEDATALEN];
+				RepOriginId originid;
+				char		state;
+				XLogRecPtr	statelsn;
+
+				/* Immediately stop the worker. */
+				logicalrep_worker_stop_at_commit(subid, relid); /* prevent re-launching */
+				logicalrep_worker_stop(subid, relid); /* stop immediately */
+
+				/* Last known rel state. */
+				state = GetSubscriptionRelState(subid, relid, &statelsn);
+
+				RemoveSubscriptionRel(sub->oid, relid);
+
+				/*
+				 * Drop the tablesync slot.
+				 *
+				 * For SYNCDONE/READY states the tablesync slot is known to
+				 * have already been dropped by the tablesync worker.
+				 *
+				 * For other states, there is no certainty. Maybe the slot
+				 * does not exist yet; Maybe the slot is already deleted but
+				 * SYNCDONE is not yet set. For this reason we allow
+				 * missing_ok = true for the drop.
+				 */
+				if (state != SUBREL_STATE_SYNCDONE && state != SUBREL_STATE_READY)
+				{
+					char		syncslotname[NAMEDATALEN] = {0};
+					bool		missing_ok = true;	/* no ERROR if slot is
+													 * missing. */
 
-	for (off = 0; off < list_length(subrel_states); off++)
-	{
-		Oid			relid = subrel_local_oids[off];
+					ReplicationSlotNameForTablesync(subid, relid, syncslotname);
 
-		if (!bsearch(&relid, pubrel_local_oids,
-					 list_length(pubrel_names), sizeof(Oid), oid_cmp))
-		{
-			RemoveSubscriptionRel(sub->oid, relid);
+					elog(DEBUG1,
+						 "AlterSubscription_refresh: dropping the tablesync slot \"%s\".",
+						 syncslotname);
+					ReplicationSlotDropAtPubNode(wrconn, syncslotname, missing_ok);
+				}
 
-			logicalrep_worker_stop_at_commit(sub->oid, relid);
+				/* Remove the tablesync's origin tracking if exists. */
+				snprintf(originname, sizeof(originname), "pg_%u_%u", subid, relid);
+				originid = replorigin_by_name(originname, true);
+				if (OidIsValid(originid))
+				{
+					elog(DEBUG1,
+						 "AlterSubscription_refresh: dropping origin tracking for \"%s\"",
+						 originname);
+					replorigin_drop(originid, false);
+				}
 
-			ereport(DEBUG1,
-					(errmsg("table \"%s.%s\" removed from subscription \"%s\"",
-							get_namespace_name(get_rel_namespace(relid)),
-							get_rel_name(relid),
-							sub->name)));
+				ereport(DEBUG1,
+						(errmsg("table \"%s.%s\" removed from subscription \"%s\"",
+								get_namespace_name(get_rel_namespace(relid)),
+								get_rel_name(relid),
+								sub->name)));
+			}
 		}
+
+		table_close(rel, NoLock);
+
 	}
+	PG_FINALLY();
+	{
+		if (wrconn)
+			walrcv_disconnect(wrconn);
+	}
+	PG_END_TRY();
 }
 
 /*
@@ -928,8 +994,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	char	   *err = NULL;
 	RepOriginId originid;
 	WalReceiverConn *wrconn = NULL;
-	StringInfoData cmd;
 	Form_pg_subscription form;
+	List	   *rstates;
 
 	/*
 	 * Lock pg_subscription with AccessExclusiveLock to ensure that the
@@ -1016,100 +1082,220 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	ReleaseSysCache(tup);
 
 	/*
-	 * Stop all the subscription workers immediately.
-	 *
-	 * This is necessary if we are dropping the replication slot, so that the
-	 * slot becomes accessible.
-	 *
-	 * It is also necessary if the subscription is disabled and was disabled
-	 * in the same transaction.  Then the workers haven't seen the disabling
-	 * yet and will still be running, leading to hangs later when we want to
-	 * drop the replication origin.  If the subscription was disabled before
-	 * this transaction, then there shouldn't be any workers left, so this
-	 * won't make a difference.
+	 * Try to acquire the connection necessary for dropping slots. We do this
+	 * here so that the same connection may be shared for dropping the
+	 * Subscription slot, as well as dropping any tablesync slots.
 	 *
-	 * New workers won't be started because we hold an exclusive lock on the
-	 * subscription till the end of the transaction.
+	 * Note: If the slotname is NONE/NULL then connection errors are
+	 * suppressed. This is necessary so that the DROP SUBSCRIPTION can still
+	 * complete even when the connection to publisher is broken.
 	 */
-	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-	subworkers = logicalrep_workers_find(subid, false);
-	LWLockRelease(LogicalRepWorkerLock);
-	foreach(lc, subworkers)
+	load_file("libpqwalreceiver", false);
+
+	wrconn = walrcv_connect(conninfo, true, subname, &err);
+	if (wrconn == NULL && slotname != NULL)
+		ereport(ERROR,
+				(errmsg("could not connect to publisher when attempting to "
+						"drop the replication slot \"%s\"", slotname),
+				 errdetail("The error was: %s", err),
+		/* translator: %s is an SQL ALTER command */
+				 errhint("Use %s to disassociate the subscription from the slot.",
+						 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
+
+	PG_TRY();
 	{
-		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+		/*
+		 * Stop all the subscription workers immediately.
+		 *
+		 * This is necessary if we are dropping the replication slot, so that
+		 * the slot becomes accessible.
+		 *
+		 * It is also necessary if the subscription is disabled and was
+		 * disabled in the same transaction.  Then the workers haven't seen
+		 * the disabling yet and will still be running, leading to hangs later
+		 * when we want to drop the replication origin.  If the subscription
+		 * was disabled before this transaction, then there shouldn't be any
+		 * workers left, so this won't make a difference.
+		 *
+		 * New workers won't be started because we hold an exclusive lock on
+		 * the subscription till the end of the transaction.
+		 */
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		subworkers = logicalrep_workers_find(subid, false);
+		LWLockRelease(LogicalRepWorkerLock);
+		foreach(lc, subworkers)
+		{
+			LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-		logicalrep_worker_stop(w->subid, w->relid);
-	}
-	list_free(subworkers);
+			logicalrep_worker_stop(w->subid, w->relid);
+		}
+		list_free(subworkers);
+
+		/*
+		 * Tablesync resource cleanup (slots and origins).
+		 *
+		 * Any READY-state relations would already have dealt with clean-ups.
+		 */
+		rstates = GetSubscriptionNotReadyRelations(subid);
+		foreach(lc, rstates)
+		{
+			SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
+			Oid			relid = rstate->relid;
 
-	/* Clean up dependencies */
-	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
+			/* Only cleanup resources of tablesync workers */
+			if (!OidIsValid(relid))
+				continue;
 
-	/* Remove any associated relation synchronization states. */
-	RemoveSubscriptionRel(subid, InvalidOid);
+			/*
+			 * Drop the tablesync slot.
+			 *
+			 * For SYNCDONE/READY states the tablesync slot is known to have
+			 * already been dropped by the tablesync worker.
+			 *
+			 * For other states, there is no certainty. Maybe the slot does
+			 * not exist yet; Maybe the slot is already deleted but SYNCDONE
+			 * is not yet set. For this reason we allow missing_ok = true for
+			 * the drop.
+			 */
+			if (rstate->state != SUBREL_STATE_SYNCDONE)
+			{
+				char		syncslotname[NAMEDATALEN] = {0};
+				bool		missing_ok = true;	/* no ERROR if slot is
+												 * missing. */
 
-	/* Remove the origin tracking if exists. */
-	snprintf(originname, sizeof(originname), "pg_%u", subid);
-	originid = replorigin_by_name(originname, true);
-	if (originid != InvalidRepOriginId)
-		replorigin_drop(originid, false);
+				ReplicationSlotNameForTablesync(subid, relid, syncslotname);
 
-	/*
-	 * If there is no slot associated with the subscription, we can finish
-	 * here.
-	 */
-	if (!slotname)
+				if (!wrconn)
+				{
+					/*
+					 * It is only possible to reach here without ERROR for a
+					 * broken publisher connection if the subscription
+					 * slotname is already NONE/NULL.
+					 *
+					 * This means the user has disassociated the subscription
+					 * from the replication slot deliberately so that the DROP
+					 * SUBSCRIPTION can proceed to completion. See
+					 * https://www.postgresql.org/docs/current/sql-dropsubscription.html
+					 *
+					 * For this reason we only give a WARNING a message that
+					 * the tablesync slots cannot be dropped, rather than
+					 * throw ERROR (which would prevent the DROP SUBSCRIPTION
+					 * from proceeding).
+					 *
+					 * In such a case the user must take steps to manually
+					 * cleanup these remaining tablesync slots.
+					 */
+					elog(WARNING,
+						 "no connection; cannot drop tablesync slot \"%s\".",
+						 syncslotname);
+				}
+				else
+				{
+					elog(DEBUG1,
+						 "DropSubscription: dropping the tablesync slot \"%s\".",
+						 syncslotname);
+					ReplicationSlotDropAtPubNode(wrconn, syncslotname, missing_ok);
+				}
+			}
+
+			/* Remove the tablesync's origin tracking if exists. */
+			snprintf(originname, sizeof(originname), "pg_%u_%u", subid, relid);
+			originid = replorigin_by_name(originname, true);
+			if (originid != InvalidRepOriginId)
+			{
+				elog(DEBUG1,
+					 "DropSubscription: dropping origin tracking for \"%s\"",
+					 originname);
+				replorigin_drop(originid, false);
+			}
+		}
+		list_free(rstates);
+
+		/* Clean up dependencies. */
+		deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
+
+		/* Remove any associated relation synchronization states. */
+		RemoveSubscriptionRel(subid, InvalidOid);
+
+		/* Remove the origin tracking if exists. */
+		snprintf(originname, sizeof(originname), "pg_%u", subid);
+		originid = replorigin_by_name(originname, true);
+		if (originid != InvalidRepOriginId)
+			replorigin_drop(originid, false);
+
+		/*
+		 * If there is a slot associated with the subscription, then drop the
+		 * replication slot at the publisher node using the replication
+		 * connection.
+		 */
+		if (slotname)
+			ReplicationSlotDropAtPubNode(wrconn, slotname, false);
+	}
+	PG_FINALLY();
 	{
+		if (wrconn)
+			walrcv_disconnect(wrconn);
+
 		table_close(rel, NoLock);
-		return;
 	}
+	PG_END_TRY();
+}
+
+
+/*
+ * Drop the replication slot at the publisher node using the replication connection.
+ *
+ * missing_ok - if true then only issue WARNING message if the slot cannot be deleted.
+ */
+void
+ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
+{
+	StringInfoData cmd;
+
+	Assert(wrconn);
 
-	/*
-	 * Otherwise drop the replication slot at the publisher node using the
-	 * replication connection.
-	 */
 	load_file("libpqwalreceiver", false);
 
 	initStringInfo(&cmd);
 	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
 
-	wrconn = walrcv_connect(conninfo, true, subname, &err);
-	if (wrconn == NULL)
-		ereport(ERROR,
-				(errmsg("could not connect to publisher when attempting to "
-						"drop the replication slot \"%s\"", slotname),
-				 errdetail("The error was: %s", err),
-		/* translator: %s is an SQL ALTER command */
-				 errhint("Use %s to disassociate the subscription from the slot.",
-						 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
-
 	PG_TRY();
 	{
 		WalRcvExecResult *res;
 
 		res = walrcv_exec(wrconn, cmd.data, 0, NULL);
 
-		if (res->status != WALRCV_OK_COMMAND)
-			ereport(ERROR,
+		if (res->status == WALRCV_OK_COMMAND)
+		{
+			/* NOTICE. Success. */
+			ereport(NOTICE,
+					(errmsg("dropped replication slot \"%s\" on publisher",
+							slotname)));
+		}
+		else if (res->status == WALRCV_ERROR && missing_ok)
+		{
+			/* WARNING. Error, but missing_ok = true. */
+			ereport(WARNING,
 					(errmsg("could not drop the replication slot \"%s\" on publisher",
 							slotname),
 					 errdetail("The error was: %s", res->err)));
+		}
 		else
-			ereport(NOTICE,
-					(errmsg("dropped replication slot \"%s\" on publisher",
-							slotname)));
+		{
+			/* ERROR. */
+			ereport(ERROR,
+					(errmsg("could not drop the replication slot \"%s\" on publisher",
+							slotname),
+					 errdetail("The error was: %s", res->err)));
+		}
 
 		walrcv_clear_result(res);
 	}
 	PG_FINALLY();
 	{
-		walrcv_disconnect(wrconn);
+		pfree(cmd.data);
 	}
 	PG_END_TRY();
-
-	pfree(cmd.data);
-
-	table_close(rel, NoLock);
 }
 
 /*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 863d196..33e11a1 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -31,8 +31,10 @@
  *		 table state to INIT.
  *	   - Tablesync worker starts; changes table state from INIT to DATASYNC while
  *		 copying.
- *	   - Tablesync worker finishes the copy and sets table state to SYNCWAIT;
- *		 waits for state change.
+ *	   - Tablesync worker does initial table copy; there is a FINISHEDCOPY state to
+ *		 indicate when the copy phase has completed, so if the worker crashes
+ *		 before reaching SYNCDONE the copy will not be re-attempted.
+ *	   - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
  *	   - Apply worker periodically checks for tables in SYNCWAIT state.  When
  *		 any appear, it sets the table state to CATCHUP and starts loop-waiting
  *		 until either the table state is set to SYNCDONE or the sync worker
@@ -48,8 +50,8 @@
  *		 point it sets state to READY and stops tracking.  Again, there might
  *		 be zero changes in between.
  *
- *	  So the state progression is always: INIT -> DATASYNC -> SYNCWAIT ->
- *	  CATCHUP -> SYNCDONE -> READY.
+ *	  So the state progression is always: INIT -> DATASYNC ->
+ *	  (sync worker FINISHEDCOPY) -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
  *
  *	  The catalog pg_subscription_rel is used to keep information about
  *	  subscribed tables and their state.  Some transient state during data
@@ -59,6 +61,7 @@
  *	  Example flows look like this:
  *	   - Apply is in front:
  *		  sync:8
+ *			-> set in catalog FINISHEDCOPY
  *			-> set in memory SYNCWAIT
  *		  apply:10
  *			-> set in memory CATCHUP
@@ -74,6 +77,7 @@
  *
  *	   - Sync is in front:
  *		  sync:10
+ *			-> set in catalog FINISHEDCOPY
  *			-> set in memory SYNCWAIT
  *		  apply:8
  *			-> set in memory CATCHUP
@@ -102,7 +106,10 @@
 #include "replication/logicalrelation.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
+#include "replication/slot.h"
+#include "replication/origin.h"
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -270,30 +277,59 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 static void
 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 {
-	Assert(IsTransactionState());
+	bool		sync_done = false;
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+	sync_done = MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
+		current_lsn >= MyLogicalRepWorker->relstate_lsn;
+	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
-		current_lsn >= MyLogicalRepWorker->relstate_lsn)
+	if (sync_done)
 	{
 		TimeLineID	tli;
+		char		syncslotname[NAMEDATALEN] = {0};
+
+		/* End wal streaming so wrconn can be re-used to drop the slot. */
+		walrcv_endstreaming(wrconn, &tli);
+
+		/*
+		 * Cleanup the tablesync slot.
+		 */
+		ReplicationSlotNameForTablesync(
+										MySubscription->oid,
+										MyLogicalRepWorker->relid,
+										syncslotname);
 
+		elog(DEBUG1,
+			 "process_syncing_tables_for_sync: dropping the tablesync slot \"%s\".",
+			 syncslotname);
+		ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
+
+		/*
+		 * Change state to SYNCDONE.
+		 */
+		SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 		MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
 		MyLogicalRepWorker->relstate_lsn = current_lsn;
 
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+		/*
+		 * UpdateSubscriptionRelState must be called within a transaction.
+		 * That transaction will be ended within the finish_sync_worker().
+		 */
+		if (!IsTransactionState())
+		{
+			StartTransactionCommand();
+		}
+
 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 								   MyLogicalRepWorker->relid,
 								   MyLogicalRepWorker->relstate,
 								   MyLogicalRepWorker->relstate_lsn);
 
-		walrcv_endstreaming(wrconn, &tli);
 		finish_sync_worker();
 	}
-	else
-		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 }
 
 /*
@@ -412,6 +448,37 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					started_tx = true;
 				}
 
+				/*
+				 * Remove the tablesync origin tracking if exists.
+				 *
+				 * The cleanup is done here instead of in the
+				 * finish_sync_worker function because if the tablesync worker
+				 * process attempted to call replorigin_drop then that will
+				 * hang because replorigin_drop logic considers the owning
+				 * tablesync PID as "busy".
+				 *
+				 * Do this before updating the state, so that DropSubscription
+				 * can know that all READY workers have already had their
+				 * origin tracking removed.
+				 */
+				{
+					char		originname[NAMEDATALEN];
+					RepOriginId originid;
+
+					snprintf(originname, sizeof(originname), "pg_%u_%u", MyLogicalRepWorker->subid, rstate->relid);
+					originid = replorigin_by_name(originname, true);
+					if (OidIsValid(originid))
+					{
+						elog(DEBUG1,
+							 "process_syncing_tables_for_apply: dropping tablesync origin tracking for \"%s\".",
+							 originname);
+						replorigin_drop(originid, false);
+					}
+				}
+
+				/*
+				 * Update the state only after the origin cleanup.
+				 */
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 										   rstate->relid, rstate->state,
 										   rstate->lsn);
@@ -808,6 +875,30 @@ copy_table(Relation rel)
 }
 
 /*
+ * Determine the tablesync slot name.
+ *
+ * The name must not exceed NAMEDATALEN -1 because of remote node constraints on
+ * slot name length.
+ *
+ * The returned slot name is either returned in the supplied buffer or
+ * palloc'ed in current memory context (if NULL buffer).
+ */
+char *
+ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname)
+{
+	if (syncslotname)
+	{
+		sprintf(syncslotname, "pg_%u_sync_%u", suboid, relid);
+	}
+	else
+	{
+		syncslotname = psprintf("pg_%u_sync_%u", suboid, relid);
+	}
+
+	return syncslotname;
+}
+
+/*
  * Start syncing the table in the sync worker.
  *
  * If nothing needs to be done to sync the table, we exit the worker without
@@ -824,6 +915,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	XLogRecPtr	relstate_lsn;
 	Relation	rel;
 	WalRcvExecResult *res;
+	char		originname[NAMEDATALEN];
+	RepOriginId originid;
 
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
@@ -849,19 +942,11 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 			finish_sync_worker();	/* doesn't return */
 	}
 
-	/*
-	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
-	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
-	 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
-	 * NAMEDATALEN on the remote that matters, but this scheme will also work
-	 * reasonably if that is different.)
-	 */
-	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");	/* for sanity */
-	slotname = psprintf("%.*s_%u_sync_%u",
-						NAMEDATALEN - 28,
-						MySubscription->slotname,
-						MySubscription->oid,
-						MyLogicalRepWorker->relid);
+	/* Calculate the name of the tablesync slot. */
+	slotname = ReplicationSlotNameForTablesync(
+											   MySubscription->oid,
+											   MyLogicalRepWorker->relid,
+											   NULL);	/* use palloc */
 
 	/*
 	 * Here we use the slot name instead of the subscription name as the
@@ -874,7 +959,32 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 				(errmsg("could not connect to the publisher: %s", err)));
 
 	Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
-		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
+
+	/* Assign the origin tracking record name. */
+	snprintf(originname, sizeof(originname), "pg_%u_%u", MySubscription->oid, MyLogicalRepWorker->relid);
+
+	if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
+	{
+		/*
+		 * The COPY phase was previously done, but tablesync then crashed/etc
+		 * before it was able to finish normally.
+		 */
+		elog(DEBUG1,
+			 "LogicalRepSyncTableStart: tablesync relstate was SUBREL_STATE_FINISHEDCOPY.");
+		StartTransactionCommand();
+
+		/*
+		 * The origin tracking name must already exist (missing_ok=false).
+		 */
+		originid = replorigin_by_name(originname, false);
+		replorigin_session_setup(originid);
+		replorigin_session_origin = originid;
+		*origin_startpos = replorigin_session_get_progress(false);
+
+		goto copy_table_done;
+	}
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 	MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -890,9 +1000,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	/*
-	 * We want to do the table data sync in a single transaction.
-	 */
 	StartTransactionCommand();
 
 	/*
@@ -918,29 +1025,99 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	walrcv_clear_result(res);
 
 	/*
-	 * Create a new temporary logical decoding slot.  This slot will be used
+	 * Create a new permanent logical decoding slot. This slot will be used
 	 * for the catchup phase after COPY is done, so tell it to use the
 	 * snapshot to make the final data consistent.
 	 */
-	walrcv_create_slot(wrconn, slotname, true,
+	walrcv_create_slot(wrconn, slotname, false,
 					   CRS_USE_SNAPSHOT, origin_startpos);
 
-	/* Now do the initial data copy */
-	PushActiveSnapshot(GetTransactionSnapshot());
-	copy_table(rel);
-	PopActiveSnapshot();
+	/*
+	 * Be sure to remove the newly created tablesync slot if the COPY fails.
+	 */
+	PG_TRY();
+	{
+		/* Now do the initial data copy */
+		PushActiveSnapshot(GetTransactionSnapshot());
+		copy_table(rel);
+		PopActiveSnapshot();
+
+		res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+		if (res->status != WALRCV_OK_COMMAND)
+			ereport(ERROR,
+					(errmsg("table copy could not finish transaction on publisher"),
+					 errdetail("The error was: %s", res->err)));
+		walrcv_clear_result(res);
+
+		table_close(rel, NoLock);
+
+		/* Make the copy visible. */
+		CommandCounterIncrement();
+	}
+	PG_CATCH();
+	{
+		/*
+		 * If something failed during copy table then cleanup the created
+		 * slot.
+		 */
+		elog(DEBUG1,
+			 "LogicalRepSyncTableStart: tablesync copy failed. Dropping the tablesync slot \"%s\".",
+			 slotname);
+		ReplicationSlotDropAtPubNode(wrconn, slotname, false);
+
+		pfree(slotname);
+		slotname = NULL;
 
-	res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
-	if (res->status != WALRCV_OK_COMMAND)
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	/* Setup replication origin tracking. */
+	originid = replorigin_by_name(originname, true);
+	if (!OidIsValid(originid))
+	{
+		/*
+		 * Origin tracking does not exist, so create it now.
+		 *
+		 * Then advance to the LSN got from walrcv_create_slot. This is WAL
+		 * logged for for the purpose of recovery. Locks are to prevent the
+		 * replication origin from vanishing while advancing.
+		 */
+		LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+		originid = replorigin_create(originname);
+		replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+						   true /* go backward */ , true /* WAL log */ );
+		UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+
+		replorigin_session_setup(originid);
+		replorigin_session_origin = originid;
+	}
+	else
+	{
 		ereport(ERROR,
-				(errmsg("table copy could not finish transaction on publisher"),
-				 errdetail("The error was: %s", res->err)));
-	walrcv_clear_result(res);
+				(errcode(ERRCODE_DUPLICATE_OBJECT),
+				 errmsg("replication origin \"%s\" already exists",
+						originname)));
+	}
+
+	/*
+	 * Update the persisted state to indicate the COPY phase is done; make it
+	 * visible to others.
+	 */
+	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+							   MyLogicalRepWorker->relid,
+							   SUBREL_STATE_FINISHEDCOPY,
+							   MyLogicalRepWorker->relstate_lsn);
 
-	table_close(rel, NoLock);
+copy_table_done:
 
-	/* Make the copy visible. */
-	CommandCounterIncrement();
+	elog(DEBUG1,
+		 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
+		 originname,
+		 (uint32) (*origin_startpos >> 32),
+		 (uint32) *origin_startpos);
+
+	CommitTransactionCommand();
 
 	/*
 	 * We are done with the initial data synchronization, update the state.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 1b1d70e..4bd4030 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -807,12 +807,8 @@ apply_handle_stream_stop(StringInfo s)
 	/* We must be in a valid transaction state */
 	Assert(IsTransactionState());
 
-	/* The synchronization worker runs in single transaction. */
-	if (!am_tablesync_worker())
-	{
-		/* Commit the per-stream transaction */
-		CommitTransactionCommand();
-	}
+	/* Commit the per-stream transaction */
+	CommitTransactionCommand();
 
 	in_streamed_transaction = false;
 
@@ -889,9 +885,7 @@ apply_handle_stream_abort(StringInfo s)
 			/* Cleanup the subxact info */
 			cleanup_subxact_info();
 
-			/* The synchronization worker runs in single transaction */
-			if (!am_tablesync_worker())
-				CommitTransactionCommand();
+			CommitTransactionCommand();
 			return;
 		}
 
@@ -918,8 +912,7 @@ apply_handle_stream_abort(StringInfo s)
 		/* write the updated subxact list */
 		subxact_info_write(MyLogicalRepWorker->subid, xid);
 
-		if (!am_tablesync_worker())
-			CommitTransactionCommand();
+		CommitTransactionCommand();
 	}
 }
 
@@ -1062,8 +1055,7 @@ apply_handle_stream_commit(StringInfo s)
 static void
 apply_handle_commit_internal(StringInfo s, LogicalRepCommitData* commit_data)
 {
-	/* The synchronization worker runs in single transaction. */
-	if (IsTransactionState() && !am_tablesync_worker())
+	if (IsTransactionState())
 	{
 		/*
 		 * Update origin state so we can restart streaming from correct
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 06663b9..9027c42 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -61,6 +61,8 @@ DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg_subs
 #define SUBREL_STATE_INIT		'i' /* initializing (sublsn NULL) */
 #define SUBREL_STATE_DATASYNC	'd' /* data is being synchronized (sublsn
 									 * NULL) */
+#define SUBREL_STATE_FINISHEDCOPY 'f'	/* tablesync copy phase is completed
+										 * (sublsn NULL) */
 #define SUBREL_STATE_SYNCDONE	's' /* synchronization finished in front of
 									 * apply (sublsn set) */
 #define SUBREL_STATE_READY		'r' /* ready (sublsn set) */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 53f636c..5f52335 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -15,6 +15,7 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
+#include "replication/walreceiver.h"
 
 /*
  * Behaviour of replication slots, upon release or crash.
@@ -211,6 +212,8 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern char *ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname);
+extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
-- 
1.8.3.1

