From 8867bd8756d6b31b015ac39ae36d5bdf4146db0e Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Sat, 19 Dec 2020 00:01:41 +1100
Subject: [PATCH v4] WIP patch for Solution1.

This patch applies onto the v30 patch set [1] from other 2PC thread:
[1] https://www.postgresql.org/message-id/CAFPTHDYA8yE6tEmQ2USYS68kNt%2BkM%3DSwKgj%3Djy4AvFD5e9-UTQ%40mail.gmail.com

Although incomplete it does still pass all the make check, and src/test/subscription TAP tests.

====

Coded / WIP:

* tablesync slot is now permanent instead of temporary

* the tablesync slot cleanup (drop) code is added for DropSubscription and for finish_sync_worker functions

* tablesync worked now allowing multiple tx instead of single tx

* a new state (SUBREL_STATE_COPYDONE) is persisted after a successful copy_table in LogicalRepSyncTableStart.

* if a relaunched tablesync finds the state is SUBREL_STATE_COPYDONE then it will bypass the initial copy_table phase.

* tablesync now sets up replication origin tracking in LogicalRepSyncTableStart (similar as done for apply worker)

* tablesync replication origin tracking is cleaned up during DropSubscription and/or process_syncing_tables_for_apply

TODO / Known Issues:

* the current implementation of tablesync drop slot (e.g. from DropSubscription or finish_sync_worker) regenerates the tablesync slot name so it knows what slot to drop. The current code might be ok for normal use cases, but if there is a ALTER SUBSCRIPTION ... SET (slot_name = newname) it would fail to be able to find the tablesync slot.

* I think if there are crashed tablesync workers then they are not known to DropSubscription. So this might be a problem to cleanup slots and/or origin tracking belonging to those unknown workers.

* help / comments / cleanup

* There is temporary "!!>>" excessive logging of mine scattered around which I added to help my testing during development
---
 src/backend/commands/subscriptioncmds.c     | 124 +++++++++++++++++
 src/backend/replication/logical/tablesync.c | 207 +++++++++++++++++++++++-----
 src/backend/replication/logical/worker.c    |  21 +--
 src/include/catalog/pg_subscription_rel.h   |   1 +
 src/include/replication/slot.h              |   1 +
 5 files changed, 307 insertions(+), 47 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b0745d5..c4b02a6 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -37,6 +37,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "replication/worker_internal.h"
+#include "replication/slot.h"
 #include "storage/lmgr.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
@@ -47,6 +48,8 @@
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
 
+void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname);
+
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
  *
@@ -1070,6 +1073,55 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	{
 		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
+		/*
+		 * Is this a tablesync worker? If yes, drop the tablesync's slot.
+		 */
+		if (OidIsValid(w->relid))
+		{
+			/*
+			 * FIXME 1 - This slotname check below is a workaround needed because the tablesync slot
+			 * name is derived from the subscription slot name, so if that is set slot_name = NONE
+			 * then we cannot do that calculation anymore to get the tablesyn slot name.
+			 *
+			 * FIXME 2 - Similarly, If subscription slot name changes from 'aaa' to 'bbb' then that
+			 * will also make it not possible to re-calculate the tablesync slots. Some redesign is
+			 * needed (eg store the tablesync slotname somewhere) to avoid this trouble...
+			 *
+			 * FIXME 3 - Crashed tablesync workers may also have remaining slots because I don't think
+			 * such workers are even iterated by this loop, and nobody else is removing them.
+			 */
+			if (slotname)
+			{
+				/* Calculate the name of the tablesync slot. */
+				char *syncslotname = ReplicationSlotNameForTablesync(slotname, w->subid, w->relid);
+
+				elog(LOG, "!!>> DropSubscription: now dropping the tablesync slot \"%s\".", syncslotname);
+				ReplicationSlotDropAtPubNode(
+								NULL,
+								conninfo, /* use conninfo to make a new connection. */
+								subname,
+								syncslotname);
+
+				pfree(syncslotname);
+			}
+			else
+			{
+				elog(LOG, "!!>> DropSubscription: no slotname for relid %u.", w->relid);
+			}
+
+			/* Remove the (tablesync's) origin tracking if exists. */
+			snprintf(originname, sizeof(originname), "pg_%u_%u", subid, w->relid);
+			originid = replorigin_by_name(originname, true);
+			if (originid != InvalidRepOriginId)
+			{
+				elog(LOG, "!!>> DropSubscription: dropping origin tracking for \"%s\"", originname);
+				replorigin_drop(originid, false);
+				elog(LOG, "!!>> DropSubscription: dropped origin tracking for \"%s\"", originname);
+			}
+
+		}
+
+		/* Stop the worker. */
 		logicalrep_worker_stop(w->subid, w->relid);
 	}
 	list_free(subworkers);
@@ -1144,6 +1196,78 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	table_close(rel, NoLock);
 }
 
+
+/*
+ * Drop the replication slot at the publisher node
+ * using the replication connection.
+ *
+ * If the connection is passed then just use that,
+ * otherwise connect/disconnect within this function.
+ */
+void
+ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname)
+{
+	StringInfoData cmd;
+
+	load_file("libpqwalreceiver", false);
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
+
+	/*
+ 	 * If the connection was passed then use it.
+ 	 * If the connection was not passed then make a new connection using the passed conninfo.
+ 	 */
+	if (wrconn_given != NULL)
+	{
+		Assert (conninfo == NULL);
+		wrconn = wrconn_given;
+	}
+	else
+	{
+		char	   *err = NULL;
+
+		Assert(conninfo != NULL);
+		wrconn = walrcv_connect(conninfo, true, subname, &err);
+
+		if (wrconn == NULL)
+			ereport(ERROR,
+					(errmsg("could not connect to publisher when attempting to "
+							"drop the replication slot \"%s\"", slotname),
+					 errdetail("The error was: %s", err)));
+	}
+
+	PG_TRY();
+	{
+		WalRcvExecResult *res;
+
+		res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+
+		if (res->status != WALRCV_OK_COMMAND)
+			ereport(ERROR,
+					(errmsg("could not drop the replication slot \"%s\" on publisher",
+							slotname),
+					 errdetail("The error was: %s", res->err)));
+		else
+			ereport(LOG,
+					(errmsg("dropped replication slot \"%s\" on publisher",
+							slotname)));
+
+		walrcv_clear_result(res);
+	}
+	PG_CATCH();
+	{
+		/* NOP. Just gobble any ERROR. */
+	}
+	PG_END_TRY();
+
+	/* Disconnect the connection (unless using one passed) */
+	if (wrconn_given == NULL)
+		walrcv_disconnect(wrconn);
+
+	pfree(cmd.data);
+}
+
 /*
  * Internal workhorse for changing a subscription owner
  */
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 1904f34..780cf8d 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -102,6 +102,8 @@
 #include "replication/logicalrelation.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
+#include "replication/slot.h"
+#include "replication/origin.h"
 #include "storage/ipc.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -139,6 +141,28 @@ finish_sync_worker(void)
 					get_rel_name(MyLogicalRepWorker->relid))));
 	CommitTransactionCommand();
 
+	/*
+ 	 * Cleanup the tablesync slot.
+	 */
+	{
+		extern void ReplicationSlotDropAtPubNode(
+			WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname);
+
+		/* Calculate the name of the tablesync slot */
+		char *syncslotname = ReplicationSlotNameForTablesync(
+						MySubscription->slotname,
+						MySubscription->oid,
+						MyLogicalRepWorker->relid);
+
+		elog(LOG, "!!>> finish_sync_worker: dropping the tablesync slot \"%s\".", syncslotname);
+		ReplicationSlotDropAtPubNode(
+						wrconn,
+						NULL, /* use the current connection. */
+						MySubscription->name, syncslotname);
+
+		pfree(syncslotname);
+	}
+
 	/* Find the main apply worker and signal it. */
 	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
 
@@ -270,8 +294,6 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
 static void
 process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 {
-	Assert(IsTransactionState());
-
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 
 	if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
@@ -284,6 +306,15 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+		/*
+ 		 * UpdateSubscriptionRelState must be called within a transaction.
+ 		 * That transaction will be ended within the finish_sync_worker().
+ 		 */
+ 		if (!IsTransactionState())
+		{
+			StartTransactionCommand();
+		}
+
 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 								   MyLogicalRepWorker->relid,
 								   MyLogicalRepWorker->relstate,
@@ -416,6 +447,27 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 										   rstate->relid, rstate->state,
 										   rstate->lsn);
+				/*
+				 * Remove the tablesync origin tracking if exists.
+				 *
+				 * The cleanup is done here instead of in the finish_sync_worker function because
+				 * if the tablesync worker process attempted to call replorigin_drop then that will
+				 * hang because the replorigin_drop considers the owning tablesync PID as "busy".
+				 */
+				{
+					char        originname[NAMEDATALEN];
+					RepOriginId originid;
+
+					snprintf(originname, sizeof(originname), "pg_%u_%u", MyLogicalRepWorker->subid, rstate->relid);
+					originid = replorigin_by_name(originname, true);
+					elog(LOG, "!!>> apply worker: find tablesync origin tracking for \"%s\".", originname);
+					if (OidIsValid(originid))
+					{
+						elog(LOG, "!!>> apply worker: dropping tablesync origin tracking for \"%s\".", originname);
+						replorigin_drop(originid, false);
+						elog(LOG, "!!>> apply worker: dropped tablesync origin tracking for \"%s\".", originname);
+					}
+				}
 			}
 		}
 		else
@@ -808,6 +860,35 @@ copy_table(Relation rel)
 	logicalrep_rel_close(relmapentry, NoLock);
 }
 
+
+/*
+ * Determine the tablesync slot name.
+ *
+ * The returned slot name is palloc'ed in current memory context.
+ */
+char *
+ReplicationSlotNameForTablesync(char *subslotname, Oid suboid, Oid relid)
+{
+	char *syncslotname;
+
+	/*
+	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
+	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
+	 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
+	 * NAMEDATALEN on the remote that matters, but this scheme will also work
+	 * reasonably if that is different.)
+	 */
+	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");   /* for sanity */
+
+	syncslotname = psprintf("%.*s_%u_sync_%u",
+						NAMEDATALEN - 28,
+						subslotname,
+						suboid,
+						relid);
+
+	return syncslotname;
+}
+
 /*
  * Start syncing the table in the sync worker.
  *
@@ -825,6 +906,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	XLogRecPtr	relstate_lsn;
 	Relation	rel;
 	WalRcvExecResult *res;
+	bool		copied_ok;
 
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
@@ -850,16 +932,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 			finish_sync_worker();	/* doesn't return */
 	}
 
-	/*
-	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
-	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
-	 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0').  (It's actually the
-	 * NAMEDATALEN on the remote that matters, but this scheme will also work
-	 * reasonably if that is different.)
-	 */
-	StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small");	/* for sanity */
-	slotname = psprintf("%.*s_%u_sync_%u",
-						NAMEDATALEN - 28,
+	/* Calculate the name of the tablesync slot. */
+	slotname = ReplicationSlotNameForTablesync(
 						MySubscription->slotname,
 						MySubscription->oid,
 						MyLogicalRepWorker->relid);
@@ -875,7 +949,18 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 				(errmsg("could not connect to the publisher: %s", err)));
 
 	Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
-		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_COPYDONE);
+
+	if (MyLogicalRepWorker->relstate == SUBREL_STATE_COPYDONE)
+	{
+		/*
+		 * The COPY phase was previously done, but tablesync then crashed/etc
+		 * before it was able to finish normally.
+		 */
+		elog(LOG, "!!>> LogicalRepSyncTableStart: tablesync relstate was SUBREL_STATE_COPYDONE.");
+		goto copy_table_done;
+	}
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 	MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
@@ -891,9 +976,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	CommitTransactionCommand();
 	pgstat_report_stat(false);
 
-	/*
-	 * We want to do the table data sync in a single transaction.
-	 */
 	StartTransactionCommand();
 
 	/*
@@ -919,29 +1001,90 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	walrcv_clear_result(res);
 
 	/*
-	 * Create a new temporary logical decoding slot.  This slot will be used
+	 * Create a new permanent logical decoding slot.  This slot will be used
 	 * for the catchup phase after COPY is done, so tell it to use the
 	 * snapshot to make the final data consistent.
 	 */
-	walrcv_create_slot(wrconn, slotname, true,
-					   CRS_USE_SNAPSHOT, origin_startpos);
+	elog(LOG, "!!>> LogicalRepSyncTableStart: walrcv_create_slot for \"%s\".", slotname);
+	walrcv_create_slot(wrconn, slotname, false,
+					   CRS_USE_SNAPSHOT, NULL);
 
-	/* Now do the initial data copy */
-	PushActiveSnapshot(GetTransactionSnapshot());
-	copy_table(rel);
-	PopActiveSnapshot();
+	/*
+	 * Be sure to remove the newly created tablesync slot if the COPY fails.
+	 */
+	copied_ok = false;
+	PG_TRY();
+	{
+		/* Now do the initial data copy */
+		PushActiveSnapshot(GetTransactionSnapshot());
+		copy_table(rel);
+		PopActiveSnapshot();
 
-	res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
-	if (res->status != WALRCV_OK_COMMAND)
-		ereport(ERROR,
-				(errmsg("table copy could not finish transaction on publisher"),
-				 errdetail("The error was: %s", res->err)));
-	walrcv_clear_result(res);
+		res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+		if (res->status != WALRCV_OK_COMMAND)
+			ereport(ERROR,
+					(errmsg("table copy could not finish transaction on publisher"),
+					 errdetail("The error was: %s", res->err)));
+		walrcv_clear_result(res);
+
+		table_close(rel, NoLock);
 
-	table_close(rel, NoLock);
+		/* Make the copy visible. */
+		CommandCounterIncrement();
 
-	/* Make the copy visible. */
-	CommandCounterIncrement();
+		copied_ok = true;
+	}
+	PG_FINALLY();
+	{
+		/* If something failed during copy table then cleanup the created slot. */
+		if (!copied_ok)
+		{
+			extern void ReplicationSlotDropAtPubNode(
+				WalReceiverConn *wrconn_given, char *conninfo, char *subname, char *slotname);
+
+			elog(LOG, "!!>> LogicalRepSyncTableStart: tablesync copy failed. Drop the tablesync slot \"%s\".", slotname);
+			ReplicationSlotDropAtPubNode(
+							wrconn,
+							NULL, /* use the current connection. */
+							MySubscription->name,
+							slotname);
+
+			pfree(slotname);
+		}
+	}
+	PG_END_TRY();
+
+	CommitTransactionCommand();
+
+	/* Update the persisted state to indicate the COPY phase is done; make it visible to others. */
+	StartTransactionCommand();
+	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+							   MyLogicalRepWorker->relid,
+							   SUBREL_STATE_COPYDONE,
+							   MyLogicalRepWorker->relstate_lsn);
+	CommitTransactionCommand();
+
+copy_table_done:
+
+	/* Setup replication origin tracking. */
+	{
+		char		originname[NAMEDATALEN];
+		RepOriginId originid;
+
+		StartTransactionCommand();
+		snprintf(originname, sizeof(originname), "pg_%u_%u", MySubscription->oid, MyLogicalRepWorker->relid);
+		originid = replorigin_by_name(originname, true);
+		if (!OidIsValid(originid)) 
+		{
+			elog(LOG, "!!>> LogicalRepSyncTableStart: create replication origin tracking \"%s\".", originname);
+			originid = replorigin_create(originname);
+		}
+		elog(LOG, "!!>> LogicalRepSyncTableStart: setup replication origin tracking \"%s\".", originname);
+		replorigin_session_setup(originid);
+		replorigin_session_origin = originid;
+		*origin_startpos = replorigin_session_get_progress(false);
+		CommitTransactionCommand();
+	}
 
 	/*
 	 * We are done with the initial data synchronization, update the state.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 9271f87..a60e9fd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -771,8 +771,7 @@ apply_handle_prepare_txn(LogicalRepPrepareData *prepare_data)
 
 	Assert(prepare_data->prepare_lsn == remote_final_lsn);
 
-	/* The synchronization worker runs in single transaction. */
-	if (IsTransactionState() && !am_tablesync_worker())
+	if (IsTransactionState())
 	{
 		/*
 		 * BeginTransactionBlock is necessary to balance the
@@ -1079,12 +1078,8 @@ apply_handle_stream_stop(StringInfo s)
 	/* We must be in a valid transaction state */
 	Assert(IsTransactionState());
 
-	/* The synchronization worker runs in single transaction. */
-	if (!am_tablesync_worker())
-	{
-		/* Commit the per-stream transaction */
-		CommitTransactionCommand();
-	}
+	/* Commit the per-stream transaction */
+	CommitTransactionCommand();
 
 	in_streamed_transaction = false;
 
@@ -1161,9 +1156,7 @@ apply_handle_stream_abort(StringInfo s)
 			/* Cleanup the subxact info */
 			cleanup_subxact_info();
 
-			/* The synchronization worker runs in single transaction */
-			if (!am_tablesync_worker())
-				CommitTransactionCommand();
+			CommitTransactionCommand();
 			return;
 		}
 
@@ -1190,8 +1183,7 @@ apply_handle_stream_abort(StringInfo s)
 		/* write the updated subxact list */
 		subxact_info_write(MyLogicalRepWorker->subid, xid);
 
-		if (!am_tablesync_worker())
-			CommitTransactionCommand();
+		CommitTransactionCommand();
 	}
 }
 
@@ -1350,8 +1342,7 @@ apply_handle_stream_commit(StringInfo s)
 static void
 apply_handle_commit_internal(StringInfo s, LogicalRepCommitData* commit_data)
 {
-	/* The synchronization worker runs in single transaction. */
-	if (IsTransactionState() && !am_tablesync_worker())
+	if (IsTransactionState())
 	{
 		/*
 		 * Update origin state so we can restart streaming from correct
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index acc2926..e9f2b3f 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -61,6 +61,7 @@ DECLARE_UNIQUE_INDEX(pg_subscription_rel_srrelid_srsubid_index, 6117, on pg_subs
 #define SUBREL_STATE_INIT		'i' /* initializing (sublsn NULL) */
 #define SUBREL_STATE_DATASYNC	'd' /* data is being synchronized (sublsn
 									 * NULL) */
+#define SUBREL_STATE_COPYDONE	'C' /* tablesync copy phase is completed */
 #define SUBREL_STATE_SYNCDONE	's' /* synchronization finished in front of
 									 * apply (sublsn set) */
 #define SUBREL_STATE_READY		'r' /* ready (sublsn set) */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 63bab69..366a737 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -211,6 +211,7 @@ extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+extern char *ReplicationSlotNameForTablesync(char *subslotname, Oid suboid, Oid relid);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
-- 
1.8.3.1

