From 6fa2654b1f3c3a1ba963db4e385afcc8c9fa47d6 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Thu, 14 Jan 2021 15:55:23 +1100
Subject: [PATCH v15] Tablesync extra logging.

This patch only adds some extra logging which may be helpful for testing, but is not for committing.
---
 src/backend/commands/subscriptioncmds.c     | 29 ++++++++++++++++++----
 src/backend/replication/logical/tablesync.c | 37 +++++++++++++++++++++++++----
 2 files changed, 56 insertions(+), 10 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f94243b..b5f9d56 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -665,11 +665,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 				XLogRecPtr	statelsn;
 
 				/* Immediately stop the worker. */
+				elog(LOG,
+					 "!!>> AlterSubscription_refresh: before logicalrep_worker_stop");
 				logicalrep_worker_stop_at_commit(subid, relid); /* prevent re-launching */
 				logicalrep_worker_stop(subid, relid); /* stop immediately */
+				elog(LOG,
+					 "!!>> AlterSubscription_refresh: after logicalrep_worker_stop");
 
 				/* Last known rel state. */
 				state = GetSubscriptionRelState(subid, relid, &statelsn);
+				elog(LOG,
+					 "!!>> AlterSubscription_refresh: relid %u had state %c",
+					 relid, state);
 
 				RemoveSubscriptionRel(sub->oid, relid);
 
@@ -692,10 +699,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
 					ReplicationSlotNameForTablesync(subid, relid, syncslotname);
 
-					elog(DEBUG1,
+					elog(LOG,
 						 "AlterSubscription_refresh: dropping the tablesync slot \"%s\".",
 						 syncslotname);
 					ReplicationSlotDropAtPubNode(wrconn, syncslotname, missing_ok);
+					elog(LOG,
+						 "!!>> AlterSubscription_refresh: dropped the tablesync slot \"%s\".",
+						 syncslotname);
 				}
 
 				/* Remove the tablesync's origin tracking if exists. */
@@ -703,13 +713,16 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 				originid = replorigin_by_name(originname, true);
 				if (OidIsValid(originid))
 				{
-					elog(DEBUG1,
+					elog(LOG,
 						 "AlterSubscription_refresh: dropping origin tracking for \"%s\"",
 						 originname);
 					replorigin_drop(originid, false);
+					elog(LOG,
+						 "!!>> AlterSubscription_refresh: dropped origin tracking for \"%s\"",
+						 originname);
 				}
 
-				ereport(DEBUG1,
+				ereport(LOG,
 						(errmsg("table \"%s.%s\" removed from subscription \"%s\"",
 								get_namespace_name(get_rel_namespace(relid)),
 								get_rel_name(relid),
@@ -1191,10 +1204,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 				}
 				else
 				{
-					elog(DEBUG1,
+					elog(LOG,
 						 "DropSubscription: dropping the tablesync slot \"%s\".",
 						 syncslotname);
 					ReplicationSlotDropAtPubNode(wrconn, syncslotname, missing_ok);
+					elog(LOG,
+						 "!!>> DropSubscription: dropped the tablesync slot \"%s\".",
+						 syncslotname);
 				}
 			}
 
@@ -1203,10 +1219,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 			originid = replorigin_by_name(originname, true);
 			if (originid != InvalidRepOriginId)
 			{
-				elog(DEBUG1,
+				elog(LOG,
 					 "DropSubscription: dropping origin tracking for \"%s\"",
 					 originname);
 				replorigin_drop(originid, false);
+				elog(LOG,
+					 "!!>> DropSubscription: dropped origin tracking for \"%s\"",
+					 originname);
 			}
 		}
 		list_free(rstates);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 33e11a1..80750ad 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -300,10 +300,13 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 										MyLogicalRepWorker->relid,
 										syncslotname);
 
-		elog(DEBUG1,
+		elog(LOG,
 			 "process_syncing_tables_for_sync: dropping the tablesync slot \"%s\".",
 			 syncslotname);
 		ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
+		elog(LOG,
+			 "!!>> process_syncing_tables_for_sync: dropped the tablesync slot \"%s\".",
+			 syncslotname);
 
 		/*
 		 * Change state to SYNCDONE.
@@ -469,10 +472,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					originid = replorigin_by_name(originname, true);
 					if (OidIsValid(originid))
 					{
-						elog(DEBUG1,
+						elog(LOG,
 							 "process_syncing_tables_for_apply: dropping tablesync origin tracking for \"%s\".",
 							 originname);
 						replorigin_drop(originid, false);
+						elog(LOG,
+							 "!!>> process_syncing_tables_for_apply: dropped tablesync origin tracking for \"%s\".",
+							 originname);
 					}
 				}
 
@@ -971,7 +977,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 * The COPY phase was previously done, but tablesync then crashed/etc
 		 * before it was able to finish normally.
 		 */
-		elog(DEBUG1,
+		elog(LOG,
 			 "LogicalRepSyncTableStart: tablesync relstate was SUBREL_STATE_FINISHEDCOPY.");
 		StartTransactionCommand();
 
@@ -979,8 +985,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 * The origin tracking name must already exist (missing_ok=false).
 		 */
 		originid = replorigin_by_name(originname, false);
+		elog(LOG,
+			 "!!>> LogicalRepSyncTableStart: 2 replorigin_session_setup \"%s\".",
+			 originname);
 		replorigin_session_setup(originid);
 		replorigin_session_origin = originid;
+		elog(LOG,
+			 "!!>> LogicalRepSyncTableStart: 2 replorigin_session_get_progress \"%s\".",
+			 originname);
 		*origin_startpos = replorigin_session_get_progress(false);
 
 		goto copy_table_done;
@@ -1029,6 +1041,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 * for the catchup phase after COPY is done, so tell it to use the
 	 * snapshot to make the final data consistent.
 	 */
+	elog(LOG,
+		 "!!>> LogicalRepSyncTableStart: walrcv_create_slot for \"%s\".",
+		 slotname);
 	walrcv_create_slot(wrconn, slotname, false,
 					   CRS_USE_SNAPSHOT, origin_startpos);
 
@@ -1060,10 +1075,13 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 * If something failed during copy table then cleanup the created
 		 * slot.
 		 */
-		elog(DEBUG1,
+		elog(LOG,
 			 "LogicalRepSyncTableStart: tablesync copy failed. Dropping the tablesync slot \"%s\".",
 			 slotname);
 		ReplicationSlotDropAtPubNode(wrconn, slotname, false);
+		elog(LOG,
+			 "!!>> LogicalRepSyncTableStart: tablesync copy failed. Dropped the tablesync slot \"%s\".",
+			 slotname);
 
 		pfree(slotname);
 		slotname = NULL;
@@ -1084,11 +1102,20 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 * replication origin from vanishing while advancing.
 		 */
 		LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+		elog(LOG,
+			 "!!>> LogicalRepSyncTableStart: 1 replorigin_create \"%s\".",
+			 originname);
 		originid = replorigin_create(originname);
+		elog(LOG,
+			 "!!>> LogicalRepSyncTableStart: 1 replorigin_advance \"%s\".",
+			 originname);
 		replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
 						   true /* go backward */ , true /* WAL log */ );
 		UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
 
+		elog(LOG,
+			 "!!>> LogicalRepSyncTableStart: 1 replorigin_session_setup \"%s\".",
+			 originname);
 		replorigin_session_setup(originid);
 		replorigin_session_origin = originid;
 	}
@@ -1111,7 +1138,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 copy_table_done:
 
-	elog(DEBUG1,
+	elog(LOG,
 		 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
 		 originname,
 		 (uint32) (*origin_startpos >> 32),
-- 
1.8.3.1

