Hi,

I have done some review of subscription handling (well self-review) and
here is the result of that (It's slightly improved version from another
thread [1]).

I split it into several patches:

0001 - Makes subscription worker exit nicely when there is subscription
missing (ie was removed while it was starting) and also makes disabled
message look same as the message when disabled state was detected while
worker is running. This is mostly just nicer behavior for user (ie no
unexpected errors in log when you just disabled the subscription).

0002 - This is bugfix - the sync worker should exit when waiting for
apply and apply dies otherwise there is possibility of not being
correctly synchronized.

0003 - Splits the schizophrenic SetSubscriptionRelState function into
two which don't try to do broken upsert and report proper errors instead.

0004 - Solves the issue which Masahiko Sawada reported [2] about ALTER
SUBSCRIPTION handling of workers not being transactional - we now do
killing of workers transactionally (and we do the same when possible in
DROP SUBSCRIPTION).

0005 - Bugfix to allow syscache access to subscription without being
connected to database - this should work since subscription is pinned
catalog, it wasn't caught because nothing in core is using it (yet).

0006 - Makes the locking of subscriptions more granular (this depends on
0005). This removes the ugly AccessExclusiveLock on the pg_subscription
catalog from DROP SUBSCRIPTION and also solves some potential race
conditions between launcher, ALTER SUBSCRIPTION and
process_syncing_tables_for_apply(). Also simplifies memory handling in
launcher as well as logicalrep_worker_stop() function. This basically
makes subscriptions behave like every other object in the database in
terms of locking.

Only the 0002, 0004 and 0005 are actual bug fixes, but I'd still like to
get it all into PG10 as especially the locking now behaves really
differently than everything else and that does not seem like a good idea.

[1]
https://www.postgresql.org/message-id/flat/3e06c16c-f441-c606-985c-6d6239097...@2ndquadrant.com
[2]
https://www.postgresql.org/message-id/flat/cad21aobd4t2rwtiwd8yfxd+q+m9swx50yjqt5ibj2mzebvn...@mail.gmail.com

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From 4c1ef64493ee930dfde3aa787c454a5d68ac3efd Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Thu, 6 Jul 2017 23:42:56 +0200
Subject: [PATCH 1/6] Improve messaging during logical replication worker
 startup

---
 src/backend/replication/logical/worker.c | 23 +++++++++++++++--------
 1 file changed, 15 insertions(+), 8 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 0d48dfa..2e4099c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1527,24 +1527,31 @@ ApplyWorkerMain(Datum main_arg)
 										 ALLOCSET_DEFAULT_SIZES);
 	StartTransactionCommand();
 	oldctx = MemoryContextSwitchTo(ApplyContext);
-	MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
+	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+	if (!MySubscription)
+	{
+		ereport(LOG,
+				(errmsg("logical replication apply worker for subscription %u will "
+						"stop because the subscription was removed",
+						MyLogicalRepWorker->subid)));
+		proc_exit(0);
+	}
 	MySubscriptionValid = true;
 	MemoryContextSwitchTo(oldctx);
 
-	/* Setup synchronous commit according to the user's wishes */
-	SetConfigOption("synchronous_commit", MySubscription->synccommit,
-					PGC_BACKEND, PGC_S_OVERRIDE);
-
 	if (!MySubscription->enabled)
 	{
 		ereport(LOG,
-				(errmsg("logical replication apply worker for subscription \"%s\" will not "
-						"start because the subscription was disabled during startup",
+				(errmsg("logical replication apply worker for subscription \"%s\" will "
+						"stop because the subscription was disabled",
 						MySubscription->name)));
-
 		proc_exit(0);
 	}
 
+	/* Setup synchronous commit according to the user's wishes */
+	SetConfigOption("synchronous_commit", MySubscription->synccommit,
+					PGC_BACKEND, PGC_S_OVERRIDE);
+
 	/* Keep us informed about subscription changes. */
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
 								  subscription_change_cb,
-- 
2.7.4

From b5d4d9a130658859bcf6e21ca3bed131dbdddb57 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Fri, 7 Jul 2017 00:04:43 +0200
Subject: [PATCH 2/6] Exit in sync worker if relation was removed during
 startup

---
 src/backend/replication/logical/tablesync.c | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 32abf5b..9fbdd8c 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -824,6 +824,20 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
 									   MyLogicalRepWorker->relid,
 									   &relstate_lsn, true);
+	/*
+	 * The relation is not locked during startup of sync worker so it's
+	 * possible that it has been removed in meantime.  Exit gracefully in that
+	 * case as it's perfectly normal scenario.
+	 */
+	if (relstate == SUBREL_STATE_UNKNOWN)
+	{
+		ereport(LOG,
+				(errmsg("logical replication table synchronization worker for subscription \"%s\", "
+						"table \"%s\" will stop because the table is no longer subscribed",
+						MySubscription->name,
+						get_rel_name(MyLogicalRepWorker->relid))));
+		proc_exit(0);
+	}
 	CommitTransactionCommand();
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-- 
2.7.4

From c676c0693cb4e30f0315eebe7424ae43e88c9cb2 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Thu, 6 Jul 2017 23:53:18 +0200
Subject: [PATCH 3/6] Split the SetSubscriptionRelState function into two

Removes ugly code spit, broken upsert logic and improves error reporting.
---
 src/backend/catalog/pg_subscription.c       | 133 ++++++++++++++++------------
 src/backend/commands/subscriptioncmds.c     |  10 +--
 src/backend/replication/logical/tablesync.c |  34 ++++---
 src/include/catalog/pg_subscription_rel.h   |   6 +-
 4 files changed, 99 insertions(+), 84 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index fb53d71..b643e54 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -227,24 +227,15 @@ textarray_to_stringlist(ArrayType *textarray)
 }
 
 /*
- * Set the state of a subscription table.
- *
- * If update_only is true and the record for given table doesn't exist, do
- * nothing.  This can be used to avoid inserting a new record that was deleted
- * by someone else.  Generally, subscription DDL commands should use false,
- * workers should use true.
- *
- * The insert-or-update logic in this function is not concurrency safe so it
- * might raise an error in rare circumstances.  But if we took a stronger lock
- * such as ShareRowExclusiveLock, we would risk more deadlocks.
+ * Add new state record for a subscription table.
  */
 Oid
-SetSubscriptionRelState(Oid subid, Oid relid, char state,
-						XLogRecPtr sublsn, bool update_only)
+AddSubscriptionRelState(Oid subid, Oid relid, char state,
+						XLogRecPtr sublsn)
 {
 	Relation	rel;
 	HeapTuple	tup;
-	Oid			subrelid = InvalidOid;
+	Oid			subrelid;
 	bool		nulls[Natts_pg_subscription_rel];
 	Datum		values[Natts_pg_subscription_rel];
 
@@ -256,57 +247,81 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
 							  ObjectIdGetDatum(relid),
 							  ObjectIdGetDatum(subid));
+	if (HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u already exists",
+			 relid, subid);
 
-	/*
-	 * If the record for given table does not exist yet create new record,
-	 * otherwise update the existing one.
-	 */
-	if (!HeapTupleIsValid(tup) && !update_only)
-	{
-		/* Form the tuple. */
-		memset(values, 0, sizeof(values));
-		memset(nulls, false, sizeof(nulls));
-		values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
-		values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
-		values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
-		if (sublsn != InvalidXLogRecPtr)
-			values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-		else
-			nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
-
-		tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
-
-		/* Insert tuple into catalog. */
-		subrelid = CatalogTupleInsert(rel, tup);
-
-		heap_freetuple(tup);
-	}
-	else if (HeapTupleIsValid(tup))
-	{
-		bool		replaces[Natts_pg_subscription_rel];
+	/* Form the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
+	values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+	if (sublsn != InvalidXLogRecPtr)
+		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+	else
+		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
 
-		/* Update the tuple. */
-		memset(values, 0, sizeof(values));
-		memset(nulls, false, sizeof(nulls));
-		memset(replaces, false, sizeof(replaces));
+	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
-		replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
-		values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+	/* Insert tuple into catalog. */
+	subrelid = CatalogTupleInsert(rel, tup);
 
-		replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
-		if (sublsn != InvalidXLogRecPtr)
-			values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-		else
-			nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	heap_freetuple(tup);
 
-		tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
-								replaces);
+	/* Cleanup. */
+	heap_close(rel, NoLock);
 
-		/* Update the catalog. */
-		CatalogTupleUpdate(rel, &tup->t_self, tup);
+	return subrelid;
+}
 
-		subrelid = HeapTupleGetOid(tup);
-	}
+/*
+ * Update the state of a subscription table.
+ */
+Oid
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+						   XLogRecPtr sublsn)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	Oid			subrelid;
+	bool		nulls[Natts_pg_subscription_rel];
+	Datum		values[Natts_pg_subscription_rel];
+	bool		replaces[Natts_pg_subscription_rel];
+
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+	/* Try finding existing mapping. */
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+							  ObjectIdGetDatum(relid),
+							  ObjectIdGetDatum(subid));
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u does not exist",
+			 relid, subid);
+
+	/* Update the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+	replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	if (sublsn != InvalidXLogRecPtr)
+		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+	else
+		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	subrelid = HeapTupleGetOid(tup);
 
 	/* Cleanup. */
 	heap_close(rel, NoLock);
@@ -381,6 +396,8 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 	HeapTuple	tup;
 	int			nkeys = 0;
 
+	Assert(OidIsValid(subid) || OidIsValid(relid));
+
 	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
 
 	if (OidIsValid(subid))
@@ -404,9 +421,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 	/* Do the search and delete what we found. */
 	scan = heap_beginscan_catalog(rel, nkeys, skey);
 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
-	{
 		CatalogTupleDelete(rel, &tup->t_self);
-	}
 	heap_endscan(scan);
 
 	heap_close(rel, RowExclusiveLock);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 6dc3f6e..8d144ab 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 				CheckSubscriptionRelkind(get_rel_relkind(relid),
 										 rv->schemaname, rv->relname);
 
-				SetSubscriptionRelState(subid, relid, table_state,
-										InvalidXLogRecPtr, false);
+				AddSubscriptionRelState(subid, relid, table_state,
+										InvalidXLogRecPtr);
 			}
 
 			/*
@@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		if (!bsearch(&relid, subrel_local_oids,
 					 list_length(subrel_states), sizeof(Oid), oid_cmp))
 		{
-			SetSubscriptionRelState(sub->oid, relid,
-									copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-									InvalidXLogRecPtr, false);
+			AddSubscriptionRelState(sub->oid, relid,
+						  copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+									InvalidXLogRecPtr);
 			ereport(NOTICE,
 					(errmsg("added subscription for table %s.%s",
 							quote_identifier(rv->schemaname),
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 9fbdd8c..2f6c7b4 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -298,11 +298,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-		SetSubscriptionRelState(MyLogicalRepWorker->subid,
-								MyLogicalRepWorker->relid,
-								MyLogicalRepWorker->relstate,
-								MyLogicalRepWorker->relstate_lsn,
-								true);
+		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+								   MyLogicalRepWorker->relid,
+								   MyLogicalRepWorker->relstate,
+								   MyLogicalRepWorker->relstate_lsn);
 
 		walrcv_endstreaming(wrconn, &tli);
 		finish_sync_worker();
@@ -427,9 +426,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					StartTransactionCommand();
 					started_tx = true;
 				}
-				SetSubscriptionRelState(MyLogicalRepWorker->subid,
-										rstate->relid, rstate->state,
-										rstate->lsn, true);
+
+				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+										   rstate->relid, rstate->state,
+										   rstate->lsn);
 			}
 		}
 		else
@@ -884,11 +884,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 				/* Update the state and make it visible to others. */
 				StartTransactionCommand();
-				SetSubscriptionRelState(MyLogicalRepWorker->subid,
-										MyLogicalRepWorker->relid,
-										MyLogicalRepWorker->relstate,
-										MyLogicalRepWorker->relstate_lsn,
-										true);
+				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+										   MyLogicalRepWorker->relid,
+										   MyLogicalRepWorker->relstate,
+										   MyLogicalRepWorker->relstate_lsn);
 				CommitTransactionCommand();
 				pgstat_report_stat(false);
 
@@ -973,11 +972,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 					 * Update the new state in catalog.  No need to bother
 					 * with the shmem state as we are exiting for good.
 					 */
-					SetSubscriptionRelState(MyLogicalRepWorker->subid,
-											MyLogicalRepWorker->relid,
-											SUBREL_STATE_SYNCDONE,
-											*origin_startpos,
-											true);
+					UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+											   MyLogicalRepWorker->relid,
+											   SUBREL_STATE_SYNCDONE,
+											   *origin_startpos);
 					finish_sync_worker();
 				}
 				break;
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 991ca9d..c5b0b9c 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -70,8 +70,10 @@ typedef struct SubscriptionRelState
 	char		state;
 } SubscriptionRelState;
 
-extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
-						XLogRecPtr sublsn, bool update_only);
+extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
+						XLogRecPtr sublsn);
+extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+						   XLogRecPtr sublsn);
 extern char GetSubscriptionRelState(Oid subid, Oid relid,
 						XLogRecPtr *sublsn, bool missing_ok);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
-- 
2.7.4

From 59debb27c5aefedb8add2925e983815a5e0b9bd6 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Thu, 6 Jul 2017 23:57:05 +0200
Subject: [PATCH 4/6] Only kill sync workers at commit time in SUBSCRIPTION DDL

---
 src/backend/access/transam/xact.c          |  9 +++++
 src/backend/commands/subscriptioncmds.c    | 26 +++++++++++--
 src/backend/replication/logical/launcher.c | 60 +++++++++++++++++++++++++++++-
 src/include/replication/logicallauncher.h  |  1 +
 src/include/replication/worker_internal.h  |  1 +
 5 files changed, 91 insertions(+), 6 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b0aa69f..322502d 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2277,6 +2277,15 @@ PrepareTransaction(void)
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("cannot PREPARE a transaction that has exported snapshots")));
 
+	/*
+	 * Similar to above, don't allow PREPARE but for transaction that kill
+	 * logical replication, workers.
+	 */
+	if (XactManipulatesLogicalReplicationWorkers())
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
+
 	/* Prevent cancel/die interrupt while cleaning up */
 	HOLD_INTERRUPTS();
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 8d144ab..f25a79f 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
 			RemoveSubscriptionRel(sub->oid, relid);
 
-			logicalrep_worker_stop(sub->oid, relid);
+			logicalrep_worker_stop_at_commit(sub->oid, relid);
 
 			namespace = get_namespace_name(get_rel_namespace(relid));
 			ereport(NOTICE,
@@ -909,15 +909,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 
 	ReleaseSysCache(tup);
 
+	/*
+	 * If we are dropping slot, stop all the subscription workers immediately
+	 * so that the slot is accessible, otherwise just shedule the stop at the
+	 * end of the transaction.
+	 *
+	 * New workers won't be started because we hold exclusive lock on the
+	 * subscription till the end of transaction.
+	 */
+	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+	subworkers = logicalrep_sub_workers_find(subid, false);
+	LWLockRelease(LogicalRepWorkerLock);
+	foreach (lc, subworkers)
+	{
+		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+		if (sub->slotname)
+			logicalrep_worker_stop(w->subid, w->relid);
+		else
+			logicalrep_worker_stop_at_commit(w->subid, w->relid);
+	}
+	list_free(subworkers);
+
 	/* Clean up dependencies */
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
 	/* Remove any associated relation synchronization states. */
 	RemoveSubscriptionRel(subid, InvalidOid);
 
-	/* Kill the apply worker so that the slot becomes accessible. */
-	logicalrep_worker_stop(subid, InvalidOid);
-
 	/* Remove the origin tracking if exists. */
 	snprintf(originname, sizeof(originname), "pg_%u", subid);
 	originid = replorigin_by_name(originname, true);
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index d165d51..caf4844 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -73,6 +73,14 @@ typedef struct LogicalRepCtxStruct
 
 LogicalRepCtxStruct *LogicalRepCtx;
 
+typedef struct LogicalRepWorkerId
+{
+	Oid	subid;
+	Oid relid;
+} LogicalRepWorkerId;
+
+static List *on_commit_stop_workers = NIL;
+
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
@@ -514,6 +522,27 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 }
 
 /*
+ * Request worker for specified sub/rel to be stopped on commit.
+ */
+void
+logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+{
+	LogicalRepWorkerId *wid;
+	MemoryContext		oldctx;
+
+	/* Make sure we store the info in context which survives until commit. */
+	oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+	wid = palloc(sizeof(LogicalRepWorkerId));
+	wid->subid = subid;
+	wid->relid = relid;
+
+	on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+
+	MemoryContextSwitchTo(oldctx);
+}
+
+/*
  * Wake up (using latch) any logical replication worker for specified sub/rel.
  */
 void
@@ -754,14 +783,41 @@ ApplyLauncherShmemInit(void)
 }
 
 /*
+ * XactManipulatesLogicalReplicationWorkers
+ *		Check whether current transaction has manipulated logical replication
+ *		workers.
+ */
+bool
+XactManipulatesLogicalReplicationWorkers(void)
+{
+	return (on_commit_stop_workers != NIL);
+}
+
+/*
  * Wakeup the launcher on commit if requested.
  */
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
-	if (isCommit && on_commit_launcher_wakeup)
-		ApplyLauncherWakeup();
+	ListCell *lc;
 
+	if (isCommit)
+	{
+		foreach (lc, on_commit_stop_workers)
+		{
+			LogicalRepWorkerId *wid = lfirst(lc);
+			logicalrep_worker_stop(wid->subid, wid->relid);
+		}
+
+		if (on_commit_launcher_wakeup)
+			ApplyLauncherWakeup();
+	}
+
+	/*
+	 * No need to pfree on_commit_stop_workers, it's been allocated in
+	 * transaction memory context which is going to be cleaned soon.
+	 */
+	on_commit_stop_workers = NIL;
 	on_commit_launcher_wakeup = false;
 }
 
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index aac7d32..78016c4 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -22,6 +22,7 @@ extern Size ApplyLauncherShmemSize(void);
 extern void ApplyLauncherShmemInit(void);
 
 extern void ApplyLauncherWakeupAtCommit(void);
+extern bool XactManipulatesLogicalReplicationWorkers(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 494a3a3..402b166 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -74,6 +74,7 @@ extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 						 Oid userid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
-- 
2.7.4

From 6d32797e05ccacf627241e60b78661878c814806 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Fri, 7 Jul 2017 00:02:21 +0200
Subject: [PATCH 5/6] Allow syscache access to subscriptions in database-less
 processes

---
 src/backend/utils/cache/catcache.c | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c
index e7e8e3b..639b4eb 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -1052,10 +1052,12 @@ IndexScanOK(CatCache *cache, ScanKey cur_skey)
 		case AUTHNAME:
 		case AUTHOID:
 		case AUTHMEMMEMROLE:
+		case SUBSCRIPTIONOID:
+		case SUBSCRIPTIONNAME:
 
 			/*
-			 * Protect authentication lookups occurring before relcache has
-			 * collected entries for shared indexes.
+			 * Protect authentication and subscription lookups occurring
+			 * before relcache has collected entries for shared indexes.
 			 */
 			if (!criticalSharedRelcachesBuilt)
 				return false;
-- 
2.7.4

From c05006a7cb1be1c1949bf0413e5ca363177ece02 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Fri, 7 Jul 2017 16:27:17 +0200
Subject: [PATCH 6/6] Improve locking for subscriptions and subscribed
 relations

Remove the exclusive lock on the catalog the DROP SUBSCRIPTION was
using and use more granular locking of individual subscriptions.

This should make overall behavior of the subscriptions and their workers
more sane in terms of concurrency and race conditions.
---
 src/backend/commands/subscriptioncmds.c     |  59 +++----
 src/backend/replication/logical/launcher.c  | 253 +++++++++++++---------------
 src/backend/replication/logical/tablesync.c |  63 ++++---
 src/include/replication/worker_internal.h   |   5 +-
 4 files changed, 172 insertions(+), 208 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f25a79f..3dc1f4c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -636,12 +636,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				 errmsg("subscription \"%s\" does not exist",
 						stmt->subname)));
 
+	subid = HeapTupleGetOid(tup);
+
 	/* must be owner */
-	if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+	if (!pg_subscription_ownercheck(subid, GetUserId()))
 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
 					   stmt->subname);
 
-	subid = HeapTupleGetOid(tup);
 	sub = GetSubscription(subid, false);
 
 	/* Lock the subscription so nobody else can do anything with it. */
@@ -814,14 +815,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	ObjectAddress myself;
 	HeapTuple	tup;
 	Oid			subid;
-	Datum		datum;
-	bool		isnull;
-	char	   *subname;
-	char	   *conninfo;
-	char	   *slotname;
+	List	   *subworkers;
+	ListCell   *lc;
 	char		originname[NAMEDATALEN];
-	char	   *err = NULL;
 	RepOriginId originid;
+	char	   *err = NULL;
+	Subscription *sub;
 	WalReceiverConn *wrconn = NULL;
 	StringInfoData cmd;
 
@@ -829,7 +828,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * Lock pg_subscription with AccessExclusiveLock to ensure that the
 	 * launcher doesn't restart new worker during dropping the subscription
 	 */
-	rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
+	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
 
 	tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
 						  CStringGetDatum(stmt->subname));
@@ -861,31 +860,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	/* DROP hook for the subscription being removed */
 	InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
 
-	/*
-	 * Lock the subscription so nobody else can do anything with it (including
-	 * the replication workers).
-	 */
-	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+	sub = GetSubscription(subid, false);
 
-	/* Get subname */
-	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
-							Anum_pg_subscription_subname, &isnull);
-	Assert(!isnull);
-	subname = pstrdup(NameStr(*DatumGetName(datum)));
-
-	/* Get conninfo */
-	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
-							Anum_pg_subscription_subconninfo, &isnull);
-	Assert(!isnull);
-	conninfo = TextDatumGetCString(datum);
-
-	/* Get slotname */
-	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
-							Anum_pg_subscription_subslotname, &isnull);
-	if (!isnull)
-		slotname = pstrdup(NameStr(*DatumGetName(datum)));
-	else
-		slotname = NULL;
+	/* Lock the subscription so nobody else can do anything with it. */
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
 
 	/*
 	 * Since dropping a replication slot is not transactional, the replication
@@ -897,7 +875,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * of a subscription that is associated with a replication slot", but we
 	 * don't have the proper facilities for that.
 	 */
-	if (slotname)
+	if (sub->slotname)
 		PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
 
 
@@ -946,7 +924,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * If there is no slot associated with the subscription, we can finish
 	 * here.
 	 */
-	if (!slotname)
+	if (!sub->slotname)
 	{
 		heap_close(rel, NoLock);
 		return;
@@ -959,13 +937,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	load_file("libpqwalreceiver", false);
 
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname));
+	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s",
+					 quote_identifier(sub->slotname));
 
-	wrconn = walrcv_connect(conninfo, true, subname, &err);
+	wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
 	if (wrconn == NULL)
 		ereport(ERROR,
 				(errmsg("could not connect to publisher when attempting to "
-						"drop the replication slot \"%s\"", slotname),
+						"drop the replication slot \"%s\"", sub->slotname),
 				 errdetail("The error was: %s", err),
 				 errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
 						 "to disassociate the subscription from the slot.")));
@@ -979,12 +958,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 		if (res->status != WALRCV_OK_COMMAND)
 			ereport(ERROR,
 					(errmsg("could not drop the replication slot \"%s\" on publisher",
-							slotname),
+							sub->slotname),
 					 errdetail("The error was: %s", res->err)));
 		else
 			ereport(NOTICE,
 					(errmsg("dropped replication slot \"%s\" on publisher",
-							slotname)));
+							sub->slotname)));
 
 		walrcv_clear_result(res);
 	}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index caf4844..eea125b 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -42,6 +42,7 @@
 #include "replication/worker_internal.h"
 
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/procsignal.h"
@@ -94,33 +95,23 @@ static bool on_commit_launcher_wakeup = false;
 
 Datum		pg_stat_get_subscription(PG_FUNCTION_ARGS);
 
-
 /*
- * Load the list of subscriptions.
- *
- * Only the fields interesting for worker start/stop functions are filled for
- * each subscription.
+ * Load the list of emabled subscription oids.
  */
 static List *
-get_subscription_list(void)
+get_subscription_oids(void)
 {
 	List	   *res = NIL;
 	Relation	rel;
 	HeapScanDesc scan;
 	HeapTuple	tup;
-	MemoryContext resultcxt;
-
-	/* This is the context that we will allocate our output data in */
-	resultcxt = CurrentMemoryContext;
 
 	/*
-	 * Start a transaction so we can access pg_database, and get a snapshot.
 	 * We don't have a use for the snapshot itself, but we're interested in
 	 * the secondary effect that it sets RecentGlobalXmin.  (This is critical
 	 * for anything that reads heap pages, because HOT may decide to prune
 	 * them even if the process doesn't attempt to modify any tuples.)
 	 */
-	StartTransactionCommand();
 	(void) GetTransactionSnapshot();
 
 	rel = heap_open(SubscriptionRelationId, AccessShareLock);
@@ -129,34 +120,17 @@ get_subscription_list(void)
 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
 	{
 		Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
-		Subscription *sub;
-		MemoryContext oldcxt;
 
-		/*
-		 * Allocate our results in the caller's context, not the
-		 * transaction's. We do this inside the loop, and restore the original
-		 * context at the end, so that leaky things like heap_getnext() are
-		 * not called in a potentially long-lived context.
-		 */
-		oldcxt = MemoryContextSwitchTo(resultcxt);
-
-		sub = (Subscription *) palloc0(sizeof(Subscription));
-		sub->oid = HeapTupleGetOid(tup);
-		sub->dbid = subform->subdbid;
-		sub->owner = subform->subowner;
-		sub->enabled = subform->subenabled;
-		sub->name = pstrdup(NameStr(subform->subname));
-		/* We don't fill fields we are not interested in. */
-
-		res = lappend(res, sub);
-		MemoryContextSwitchTo(oldcxt);
+		/* We only care about enabled subscriptions. */
+		if (!subform->subenabled)
+			continue;
+
+		res = lappend_oid(res, HeapTupleGetOid(tup));
 	}
 
 	heap_endscan(scan);
 	heap_close(rel, AccessShareLock);
 
-	CommitTransactionCommand();
-
 	return res;
 }
 
@@ -258,23 +232,62 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
 }
 
 /*
- * Start new apply background worker.
+ * Similar as logicalrep_worker_find(), but returns list of all workers
+ * for the subscription instead just one.
+ */
+List *
+logicalrep_sub_workers_find(Oid subid, bool only_running)
+{
+	int			i;
+	List	   *res = NIL;
+
+	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+	/* Search for attached worker for a given subscription id. */
+	for (i = 0; i < max_logical_replication_workers; i++)
+	{
+		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+		if (w->in_use && w->subid == subid && (!only_running || w->proc))
+			res = lappend(res, w);
+	}
+
+	return res;
+}
+
+/*
+ * Start new logical replication background worker.
  */
 void
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
-						 Oid relid)
+logicalrep_worker_launch(Oid subid, Oid relid)
 {
 	BackgroundWorker bgw;
 	BackgroundWorkerHandle *bgw_handle;
 	int			i;
 	int			slot = 0;
-	LogicalRepWorker *worker = NULL;
-	int			nsyncworkers;
+	List	   *subworkers;
+	ListCell   *lc;
 	TimestampTz now;
+	int			nsyncworkers = 0;
+	Subscription *sub;
+	LogicalRepWorker *worker = NULL;
 
 	ereport(DEBUG1,
-			(errmsg("starting logical replication worker for subscription \"%s\"",
-					subname)));
+			(errmsg("starting logical replication worker for subscription %u",
+					subid)));
+
+	/* Block any concurrent DDL on the subscription. */
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	/* Get info about subscription. */
+	sub = GetSubscription(subid, true);
+	if (!sub)
+	{
+		ereport(DEBUG1,
+				(errmsg("subscription %u not found, not starting worker for it",
+						subid)));
+		return;
+	}
 
 	/* Report this after the initial starting message for consistency. */
 	if (max_replication_slots == 0)
@@ -302,7 +315,14 @@ retry:
 		}
 	}
 
-	nsyncworkers = logicalrep_sync_worker_count(subid);
+	subworkers = logicalrep_sub_workers_find(subid, false);
+	foreach (lc, subworkers)
+	{
+		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+		if (w->relid != InvalidOid)
+			nsyncworkers ++;
+	}
+	list_free(subworkers);
 
 	now = GetCurrentTimestamp();
 
@@ -348,6 +368,7 @@ retry:
 	if (nsyncworkers >= max_sync_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 		return;
 	}
 
@@ -358,6 +379,7 @@ retry:
 	if (worker == NULL)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 		ereport(WARNING,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("out of logical replication worker slots"),
@@ -370,8 +392,8 @@ retry:
 	worker->in_use = true;
 	worker->generation++;
 	worker->proc = NULL;
-	worker->dbid = dbid;
-	worker->userid = userid;
+	worker->dbid = sub->dbid;
+	worker->userid = sub->owner;
 	worker->subid = subid;
 	worker->relid = relid;
 	worker->relstate = SUBREL_STATE_UNKNOWN;
@@ -382,8 +404,6 @@ retry:
 	worker->reply_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->reply_time);
 
-	LWLockRelease(LogicalRepWorkerLock);
-
 	/* Register the new dynamic worker. */
 	memset(&bgw, 0, sizeof(bgw));
 	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
@@ -402,8 +422,13 @@ retry:
 	bgw.bgw_notify_pid = MyProcPid;
 	bgw.bgw_main_arg = Int32GetDatum(slot);
 
+	/* Try to register the worker and cleanup in case of failure. */
 	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
 	{
+		logicalrep_worker_cleanup(worker);
+		LWLockRelease(LogicalRepWorkerLock);
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
 		ereport(WARNING,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("out of background worker slots"),
@@ -411,13 +436,24 @@ retry:
 		return;
 	}
 
+	/* Done with the worker array. */
+	LWLockRelease(LogicalRepWorkerLock);
+
 	/* Now wait until it attaches. */
 	WaitForReplicationWorkerAttach(worker, bgw_handle);
+
+	/*
+	 * Worker either started or died, in any case we are done with the
+	 * subscription.
+	 */
+	UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 }
 
 /*
  * Stop the logical replication worker for subid/relid, if any, and wait until
  * it detaches from the slot.
+ *
+ * Callers of this function better have exclusive lock on the subscription.
  */
 void
 logicalrep_worker_stop(Oid subid, Oid relid)
@@ -425,7 +461,8 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 	LogicalRepWorker *worker;
 	uint16		generation;
 
-	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+	/* Exclusive is needed for logicalrep_worker_cleanup(). */
+	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
 	worker = logicalrep_worker_find(subid, relid, false);
 
@@ -436,56 +473,20 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 		return;
 	}
 
+	/* If there is worker but it's not running, clean it up. */
+	if (!worker->proc)
+	{
+		logicalrep_worker_cleanup(worker);
+		LWLockRelease(LogicalRepWorkerLock);
+		return;
+	}
+
 	/*
 	 * Remember which generation was our worker so we can check if what we see
 	 * is still the same one.
 	 */
 	generation = worker->generation;
 
-	/*
-	 * If we found a worker but it does not have proc set then it is still
-	 * starting up; wait for it to finish starting and then kill it.
-	 */
-	while (worker->in_use && !worker->proc)
-	{
-		int			rc;
-
-		LWLockRelease(LogicalRepWorkerLock);
-
-		/* Wait a bit --- we don't expect to have to wait long. */
-		rc = WaitLatch(MyLatch,
-					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
-					   10L, WAIT_EVENT_BGWORKER_STARTUP);
-
-		/* emergency bailout if postmaster has died */
-		if (rc & WL_POSTMASTER_DEATH)
-			proc_exit(1);
-
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/* Recheck worker status. */
-		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
-		/*
-		 * Check whether the worker slot is no longer used, which would mean
-		 * that the worker has exited, or whether the worker generation is
-		 * different, meaning that a different worker has taken the slot.
-		 */
-		if (!worker->in_use || worker->generation != generation)
-		{
-			LWLockRelease(LogicalRepWorkerLock);
-			return;
-		}
-
-		/* Worker has assigned proc, so it has started. */
-		if (worker->proc)
-			break;
-	}
-
 	/* Now terminate the worker ... */
 	kill(worker->proc->pid, SIGTERM);
 
@@ -515,6 +516,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 			CHECK_FOR_INTERRUPTS();
 		}
 
+		/*
+		 * Shared lock is enough for the loop as we don't need to do the slot
+		 * cleanup because at this point we know that the worker has attached
+		 * to the shmem and will clean the slot on detach automatically.
+		 */
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 	}
 
@@ -682,30 +688,6 @@ logicalrep_launcher_sighup(SIGNAL_ARGS)
 }
 
 /*
- * Count the number of registered (not necessarily running) sync workers
- * for a subscription.
- */
-int
-logicalrep_sync_worker_count(Oid subid)
-{
-	int			i;
-	int			res = 0;
-
-	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
-	/* Search for attached worker for a given subscription id. */
-	for (i = 0; i < max_logical_replication_workers; i++)
-	{
-		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
-		if (w->subid == subid && OidIsValid(w->relid))
-			res++;
-	}
-
-	return res;
-}
-
-/*
  * ApplyLauncherShmemSize
  *		Compute space needed for replication launcher shared memory
  */
@@ -875,8 +857,6 @@ ApplyLauncherMain(Datum main_arg)
 		int			rc;
 		List	   *sublist;
 		ListCell   *lc;
-		MemoryContext subctx;
-		MemoryContext oldctx;
 		TimestampTz now;
 		long		wait_time = DEFAULT_NAPTIME_PER_CYCLE;
 
@@ -888,41 +868,38 @@ ApplyLauncherMain(Datum main_arg)
 		if (TimestampDifferenceExceeds(last_start_time, now,
 									   wal_retrieve_retry_interval))
 		{
-			/* Use temporary context for the database list and worker info. */
-			subctx = AllocSetContextCreate(TopMemoryContext,
-										   "Logical Replication Launcher sublist",
-										   ALLOCSET_DEFAULT_MINSIZE,
-										   ALLOCSET_DEFAULT_INITSIZE,
-										   ALLOCSET_DEFAULT_MAXSIZE);
-			oldctx = MemoryContextSwitchTo(subctx);
-
-			/* search for subscriptions to start or stop. */
-			sublist = get_subscription_list();
-
-			/* Start the missing workers for enabled subscriptions. */
+			/*
+			 * Start new transaction so that we can take locks and snapshots.
+			 *
+			 * Any allocations will also be made inside the transaction memory
+			 * context.
+			 */
+			StartTransactionCommand();
+
+			/* Search for subscriptions to start. */
+			sublist = get_subscription_oids();
+
+			/* Start the missing workers. */
 			foreach(lc, sublist)
 			{
-				Subscription *sub = (Subscription *) lfirst(lc);
+				Oid	subid = lfirst_oid(lc);
 				LogicalRepWorker *w;
 
 				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-				w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+				w = logicalrep_worker_find(subid, InvalidOid, false);
 				LWLockRelease(LogicalRepWorkerLock);
 
-				if (sub->enabled && w == NULL)
+				if (w == NULL)
 				{
 					last_start_time = now;
 					wait_time = wal_retrieve_retry_interval;
 
-					logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
-											 sub->owner, InvalidOid);
+					/* Start the worker. */
+					logicalrep_worker_launch(subid, InvalidOid);
 				}
 			}
 
-			/* Switch back to original memory context. */
-			MemoryContextSwitchTo(oldctx);
-			/* Clean the temporary memory. */
-			MemoryContextDelete(subctx);
+			CommitTransactionCommand();
 		}
 		else
 		{
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 2f6c7b4..d49d186 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -102,12 +102,13 @@
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 
-#include "utils/snapmgr.h"
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
 
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/snapmgr.h"
 
 static bool table_states_valid = false;
 
@@ -211,10 +212,8 @@ wait_for_relation_state_change(Oid relid, char expected_state)
  * worker to the expected one.
  *
  * Used when transitioning from SYNCWAIT state to CATCHUP.
- *
- * Returns false if the apply worker has disappeared.
  */
-static bool
+static void
 wait_for_worker_state_change(char expected_state)
 {
 	int			rc;
@@ -230,7 +229,7 @@ wait_for_worker_state_change(char expected_state)
 		 * enough to not give a misleading answer if we do it with no lock.)
 		 */
 		if (MyLogicalRepWorker->relstate == expected_state)
-			return true;
+			return;
 
 		/*
 		 * Bail out if the apply worker has died, else signal it we're
@@ -243,7 +242,10 @@ wait_for_worker_state_change(char expected_state)
 			logicalrep_worker_wakeup_ptr(worker);
 		LWLockRelease(LogicalRepWorkerLock);
 		if (!worker)
-			break;
+			ereport(FATAL,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("terminating logical replication synchronization "
+							"worker due to subscription apply worker exit")));
 
 		/*
 		 * Wait.  We expect to get a latch signal back from the apply worker,
@@ -260,8 +262,6 @@ wait_for_worker_state_change(char expected_state)
 		if (rc & WL_LATCH_SET)
 			ResetLatch(MyLatch);
 	}
-
-	return false;
 }
 
 /*
@@ -346,6 +346,15 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 
 	Assert(!IsTransactionState());
 
+#define ensure_transaction_and_lock() \
+	if (!started_tx) \
+	{\
+		StartTransactionCommand(); \
+		LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, \
+						 AccessShareLock); \
+		started_tx = true; \
+	}
+
 	/* We need up-to-date sync state info for subscription tables here. */
 	if (!table_states_valid)
 	{
@@ -358,8 +367,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		list_free_deep(table_states);
 		table_states = NIL;
 
-		StartTransactionCommand();
-		started_tx = true;
+		ensure_transaction_and_lock();
 
 		/* Fetch all non-ready tables. */
 		rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
@@ -421,11 +429,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			{
 				rstate->state = SUBREL_STATE_READY;
 				rstate->lsn = current_lsn;
-				if (!started_tx)
-				{
-					StartTransactionCommand();
-					started_tx = true;
-				}
+
+				ensure_transaction_and_lock();
 
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 										   rstate->relid, rstate->state,
@@ -476,12 +481,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					 * Enter busy loop and wait for synchronization worker to
 					 * reach expected state (or die trying).
 					 */
-					if (!started_tx)
-					{
-						StartTransactionCommand();
-						started_tx = true;
-					}
-
+					ensure_transaction_and_lock();
 					wait_for_relation_state_change(rstate->relid,
 												   SUBREL_STATE_SYNCDONE);
 				}
@@ -495,8 +495,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * running sync workers for this subscription, while we have
 				 * the lock.
 				 */
-				int			nsyncworkers =
-				logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+				List	   *subworkers;
+				ListCell   *lc;
+				int			nsyncworkers = 0;
+
+				subworkers = logicalrep_sub_workers_find(MySubscription->oid,
+														 false);
+				foreach (lc, subworkers)
+				{
+					LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+					if (w->relid != InvalidOid)
+						nsyncworkers ++;
+				}
+				list_free(subworkers);
 
 				/* Now safe to release the LWLock */
 				LWLockRelease(LogicalRepWorkerLock);
@@ -518,10 +529,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						TimestampDifferenceExceeds(hentry->last_start_time, now,
 												   wal_retrieve_retry_interval))
 					{
-						logicalrep_worker_launch(MyLogicalRepWorker->dbid,
-												 MySubscription->oid,
-												 MySubscription->name,
-												 MyLogicalRepWorker->userid,
+						ensure_transaction_and_lock();
+						logicalrep_worker_launch(MySubscription->oid,
 												 rstate->relid);
 						hentry->last_start_time = now;
 					}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 402b166..add7841 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -71,14 +71,13 @@ extern bool in_remote_transaction;
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 					   bool only_running);
-extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
-						 Oid userid, Oid relid);
+extern void logicalrep_worker_launch(Oid subid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
 extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
-extern int	logicalrep_sync_worker_count(Oid subid);
+extern List *logicalrep_sub_workers_find(Oid subid, bool only_running);
 
 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
 void		process_syncing_tables(XLogRecPtr current_lsn);
-- 
2.7.4

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to