From 7dd4e8786314c6f98f9363411e2ff693c6aaad02 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 8 Jan 2021 20:04:23 +1100
Subject: [PATCH v13] Tablesync extra logging.

This patch only adds some extra logging which may be helpful for testing, but is not for committing.
---
 src/backend/commands/subscriptioncmds.c     | 17 ++++++++++++-----
 src/backend/replication/logical/tablesync.c | 19 ++++++++++++++-----
 2 files changed, 26 insertions(+), 10 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 51f5e40..dec1ae5 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -652,7 +652,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		{
 			Oid			subid = sub->oid;
 
+			elog(LOG, "!!>> AlterSubscription_refresh: before logicalrep_worker_stop_at_commit");
 			logicalrep_worker_stop_at_commit(subid, relid);
+			elog(LOG, "!!>> AlterSubscription_refresh: after logicalrep_worker_stop_at_commit");
 
 			/*
 			 * Cleanup any remaining tablesync resources.
@@ -665,6 +667,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
 				/* Last known rel state. */
 				state = GetSubscriptionRelState(subid, relid, &statelsn);
+				elog(LOG, "!!>> AlterSubscription_refresh: relid %u had state %c", relid, state);
 
 				RemoveSubscriptionRel(sub->oid, relid);
 
@@ -703,8 +706,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 							ereport(ERROR,
 									(errmsg("could not connect to the publisher: %s", err)));
 
-						elog(DEBUG1, "AlterSubscription_refresh: dropping the tablesync slot \"%s\".", syncslotname);
+						elog(LOG, "AlterSubscription_refresh: dropping the tablesync slot \"%s\".", syncslotname);
 						ReplicationSlotDropAtPubNode(wrconn, syncslotname);
+						elog(LOG, "!!>> AlterSubscription_refresh: dropped the tablesync slot \"%s\".", syncslotname);
 					}
 					PG_FINALLY();
 					{
@@ -721,12 +725,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 				originid = replorigin_by_name(originname, true);
 				if (OidIsValid(originid))
 				{
-					elog(DEBUG1, "AlterSubscription_refresh: dropping origin tracking for \"%s\"", originname);
+					elog(LOG, "AlterSubscription_refresh: dropping origin tracking for \"%s\"", originname);
 					replorigin_drop(originid, false);
+					elog(LOG, "!!>> AlterSubscription_refresh: dropped origin tracking for \"%s\"", originname);
 				}
 			}
 
-			ereport(DEBUG1,
+			ereport(LOG,
 					(errmsg("table \"%s.%s\" removed from subscription \"%s\"",
 							get_namespace_name(get_rel_namespace(relid)),
 							get_rel_name(relid),
@@ -1196,8 +1201,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 					}
 					else
 					{
-						elog(DEBUG1, "DropSubscription: dropping the tablesync slot \"%s\".", syncslotname);
+						elog(LOG, "DropSubscription: dropping the tablesync slot \"%s\".", syncslotname);
 						ReplicationSlotDropAtPubNode(wrconn, syncslotname);
+						elog(LOG, "!!>> DropSubscription: dropped the tablesync slot \"%s\".", syncslotname);
 					}
 				}
 				PG_FINALLY();
@@ -1212,8 +1218,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 			originid = replorigin_by_name(originname, true);
 			if (originid != InvalidRepOriginId)
 			{
-				elog(DEBUG1, "DropSubscription: dropping origin tracking for \"%s\"", originname);
+				elog(LOG, "DropSubscription: dropping origin tracking for \"%s\"", originname);
 				replorigin_drop(originid, false);
+				elog(LOG, "!!>> DropSubscription: dropped origin tracking for \"%s\"", originname);
 			}
 		}
 		list_free(rstates);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d5d0840..4d9d3fa 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -299,8 +299,9 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 													   MyLogicalRepWorker->relid);
 		PG_TRY();
 		{
-			elog(DEBUG1, "process_syncing_tables_for_sync: dropping the tablesync slot \"%s\".", syncslotname);
+			elog(LOG, "process_syncing_tables_for_sync: dropping the tablesync slot \"%s\".", syncslotname);
 			ReplicationSlotDropAtPubNode(wrconn, syncslotname);
+			elog(LOG, "!!>> process_syncing_tables_for_sync: dropped the tablesync slot \"%s\".", syncslotname);
 		}
 		PG_FINALLY();
 		{
@@ -471,8 +472,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					originid = replorigin_by_name(originname, true);
 					if (OidIsValid(originid))
 					{
-						elog(DEBUG1, "process_syncing_tables_for_apply: dropping tablesync origin tracking for \"%s\".", originname);
+						elog(LOG, "process_syncing_tables_for_apply: dropping tablesync origin tracking for \"%s\".", originname);
 						replorigin_drop(originid, false);
+						elog(LOG, "!!>> process_syncing_tables_for_apply: dropped tablesync origin tracking for \"%s\".", originname);
 					}
 				}
 
@@ -971,15 +973,17 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 * The COPY phase was previously done, but tablesync then crashed/etc
 		 * before it was able to finish normally.
 		 */
-		elog(DEBUG1, "LogicalRepSyncTableStart: tablesync relstate was SUBREL_STATE_TCOPYDONE.");
+		elog(LOG, "LogicalRepSyncTableStart: tablesync relstate was SUBREL_STATE_TCOPYDONE.");
 		StartTransactionCommand();
 
 		/*
 		 * The origin tracking name must already exist (missing_ok=false).
 		 */
 		originid = replorigin_by_name(originname, false);
+		elog(LOG, "!!>> LogicalRepSyncTableStart: 2 replorigin_session_setup \"%s\".", originname);
 		replorigin_session_setup(originid);
 		replorigin_session_origin = originid;
+		elog(LOG, "!!>> LogicalRepSyncTableStart: 2 replorigin_session_get_progress \"%s\".", originname);
 		*origin_startpos = replorigin_session_get_progress(false);
 
 		goto copy_table_done;
@@ -1028,6 +1032,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 * for the catchup phase after COPY is done, so tell it to use the
 	 * snapshot to make the final data consistent.
 	 */
+	elog(LOG, "!!>> LogicalRepSyncTableStart: walrcv_create_slot for \"%s\".", slotname);
 	walrcv_create_slot(wrconn, slotname, false,
 					   CRS_USE_SNAPSHOT, origin_startpos);
 
@@ -1059,8 +1064,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 * If something failed during copy table then cleanup the created
 		 * slot.
 		 */
-		elog(DEBUG1, "LogicalRepSyncTableStart: tablesync copy failed. Dropping the tablesync slot \"%s\".", slotname);
+		elog(LOG, "LogicalRepSyncTableStart: tablesync copy failed. Dropping the tablesync slot \"%s\".", slotname);
 		ReplicationSlotDropAtPubNode(wrconn, slotname);
+		elog(LOG, "!!>> LogicalRepSyncTableStart: tablesync copy failed. Dropped the tablesync slot \"%s\".", slotname);
 
 		pfree(slotname);
 		slotname = NULL;
@@ -1077,9 +1083,12 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		 * Origin tracking does not exist. Create it now, and advance to LSN
 		 * got from walrcv_create_slot.
 		 */
+		elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_create \"%s\".", originname);
 		originid = replorigin_create(originname);
+		elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_session_setup \"%s\".", originname);
 		replorigin_session_setup(originid);
 		replorigin_session_origin = originid;
+		elog(LOG, "!!>> LogicalRepSyncTableStart: 1 replorigin_advance \"%s\".", originname);
 		replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
 						   true /* go backward */ , true /* WAL log */ );
 	}
@@ -1102,7 +1111,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 copy_table_done:
 
-	elog(DEBUG1, "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
+	elog(LOG, "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
 		 originname,
 		 (uint32) (*origin_startpos >> 32),
 		 (uint32) *origin_startpos);
-- 
1.8.3.1

