(was away for a while, got some time now for this again)

On 22/06/17 04:43, Peter Eisentraut wrote:
> The alternative is that we use the LockSharedObject() approach that was
> already alluded to, like in the attached patch.  Both approaches would
> work equally fine AFAICT.

I agree, but I think we need bigger overhaul of the locking/management
in general. So here is patch which does much more changes.

The patch does several important things (in no particular order):
- Split SetSubscriptionRelState into AddSubscriptionRelState and
UpdateSubscriptionRelState for the reasons said upstream, it's cleaner,
there is no half-broken upsert logic and it has proper error checking
for each action.

- Do LockSharedObject in ALTER SUBSCRIPTION, DROP SUBSCRIPTION (this one
is preexisting but mentioning it for context), SetSubscriptionRelState,
AddSubscriptionRelState, and in the logicalrep_worker_launch. This means
we use granular per object locks to deal with concurrency.

- Because of above, the AccessExclusiveLock on pg_subscription is no
longer needed, just normal RowExlusiveLock is used now.

- logicalrep_worker_stop is also simplified due to the proper locking

- There is new interface logicalrep_worker_stop_at_commit which is used
by ALTER SUBSCRIPTION ... REFRESH PUBLICATION and by transactional
variant of DROP SUBSCRIPTION to only kill workers at the end of transaction.

- Locking/reading of subscription info is unified between DROP and ALTER
SUBSCRIPTION commands.

- DROP SUBSCRIPTION will kill all workers associated with subscription,
not just apply.

- The sync worker checks during startup if the relation is still subscribed.

- The sync worker will exit when waiting for apply and apply has shut-down.

- The log messages around workers and removed or disabled subscription
are now more consistent between startup and normal runtime of the worker.

- Some code deduplication and stylistic changes/simplification in
related areas.

- Fixed catcache's IndexScanOK() handling of the subscription catalog.

It's bit bigger patch but solves issues from multiple threads around
handling of ALTER/DROP subscription.

A lot of the locking that I added is normally done transparently by
dependency handling, but subscriptions and subscription relation status
do not use that much as it was deemed to bloat pg_depend needlessly
during the original patch review (it's also probably why this has
slipped through).

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From d7038474012769c9c3b50231af76dd7796fe593f Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Sat, 24 Jun 2017 19:38:21 +0200
Subject: [PATCH] Rework subscription worker and relation status handling

---
 src/backend/catalog/pg_subscription.c       | 137 +++++++------
 src/backend/commands/subscriptioncmds.c     |  98 +++++-----
 src/backend/replication/logical/launcher.c  | 293 +++++++++++++++-------------
 src/backend/replication/logical/tablesync.c |  97 +++++----
 src/backend/replication/logical/worker.c    |  23 ++-
 src/backend/utils/cache/catcache.c          |   6 +-
 src/include/catalog/pg_subscription_rel.h   |   6 +-
 src/include/replication/worker_internal.h   |   6 +-
 8 files changed, 367 insertions(+), 299 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index c69c461..b643e54 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -28,6 +28,8 @@
 
 #include "nodes/makefuncs.h"
 
+#include "storage/lmgr.h"
+
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
@@ -225,84 +227,101 @@ textarray_to_stringlist(ArrayType *textarray)
 }
 
 /*
- * Set the state of a subscription table.
- *
- * If update_only is true and the record for given table doesn't exist, do
- * nothing.  This can be used to avoid inserting a new record that was deleted
- * by someone else.  Generally, subscription DDL commands should use false,
- * workers should use true.
- *
- * The insert-or-update logic in this function is not concurrency safe so it
- * might raise an error in rare circumstances.  But if we took a stronger lock
- * such as ShareRowExclusiveLock, we would risk more deadlocks.
+ * Add new state record for a subscription table.
  */
 Oid
-SetSubscriptionRelState(Oid subid, Oid relid, char state,
-						XLogRecPtr sublsn, bool update_only)
+AddSubscriptionRelState(Oid subid, Oid relid, char state,
+						XLogRecPtr sublsn)
 {
 	Relation	rel;
 	HeapTuple	tup;
-	Oid			subrelid = InvalidOid;
+	Oid			subrelid;
 	bool		nulls[Natts_pg_subscription_rel];
 	Datum		values[Natts_pg_subscription_rel];
 
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
 	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
 
 	/* Try finding existing mapping. */
 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
 							  ObjectIdGetDatum(relid),
 							  ObjectIdGetDatum(subid));
+	if (HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u already exists",
+			 relid, subid);
 
-	/*
-	 * If the record for given table does not exist yet create new record,
-	 * otherwise update the existing one.
-	 */
-	if (!HeapTupleIsValid(tup) && !update_only)
-	{
-		/* Form the tuple. */
-		memset(values, 0, sizeof(values));
-		memset(nulls, false, sizeof(nulls));
-		values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
-		values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
-		values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
-		if (sublsn != InvalidXLogRecPtr)
-			values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-		else
-			nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
-
-		tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
-
-		/* Insert tuple into catalog. */
-		subrelid = CatalogTupleInsert(rel, tup);
-
-		heap_freetuple(tup);
-	}
-	else if (HeapTupleIsValid(tup))
-	{
-		bool		replaces[Natts_pg_subscription_rel];
+	/* Form the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
+	values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+	if (sublsn != InvalidXLogRecPtr)
+		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+	else
+		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
 
-		/* Update the tuple. */
-		memset(values, 0, sizeof(values));
-		memset(nulls, false, sizeof(nulls));
-		memset(replaces, false, sizeof(replaces));
+	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
-		replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
-		values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+	/* Insert tuple into catalog. */
+	subrelid = CatalogTupleInsert(rel, tup);
 
-		replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
-		if (sublsn != InvalidXLogRecPtr)
-			values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-		else
-			nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	heap_freetuple(tup);
 
-		tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
-								replaces);
+	/* Cleanup. */
+	heap_close(rel, NoLock);
 
-		/* Update the catalog. */
-		CatalogTupleUpdate(rel, &tup->t_self, tup);
+	return subrelid;
+}
 
-		subrelid = HeapTupleGetOid(tup);
-	}
+/*
+ * Update the state of a subscription table.
+ */
+Oid
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+						   XLogRecPtr sublsn)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	Oid			subrelid;
+	bool		nulls[Natts_pg_subscription_rel];
+	Datum		values[Natts_pg_subscription_rel];
+	bool		replaces[Natts_pg_subscription_rel];
+
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+	/* Try finding existing mapping. */
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+							  ObjectIdGetDatum(relid),
+							  ObjectIdGetDatum(subid));
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u does not exist",
+			 relid, subid);
+
+	/* Update the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+	replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	if (sublsn != InvalidXLogRecPtr)
+		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+	else
+		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	subrelid = HeapTupleGetOid(tup);
 
 	/* Cleanup. */
 	heap_close(rel, NoLock);
@@ -377,6 +396,8 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 	HeapTuple	tup;
 	int			nkeys = 0;
 
+	Assert(OidIsValid(subid) || OidIsValid(relid));
+
 	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
 
 	if (OidIsValid(subid))
@@ -400,9 +421,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 	/* Do the search and delete what we found. */
 	scan = heap_beginscan_catalog(rel, nkeys, skey);
 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
-	{
 		CatalogTupleDelete(rel, &tup->t_self);
-	}
 	heap_endscan(scan);
 
 	heap_close(rel, RowExclusiveLock);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9cbd36f..3dc1f4c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 				CheckSubscriptionRelkind(get_rel_relkind(relid),
 										 rv->schemaname, rv->relname);
 
-				SetSubscriptionRelState(subid, relid, table_state,
-										InvalidXLogRecPtr, false);
+				AddSubscriptionRelState(subid, relid, table_state,
+										InvalidXLogRecPtr);
 			}
 
 			/*
@@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		if (!bsearch(&relid, subrel_local_oids,
 					 list_length(subrel_states), sizeof(Oid), oid_cmp))
 		{
-			SetSubscriptionRelState(sub->oid, relid,
-									copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-									InvalidXLogRecPtr, false);
+			AddSubscriptionRelState(sub->oid, relid,
+						  copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+									InvalidXLogRecPtr);
 			ereport(NOTICE,
 					(errmsg("added subscription for table %s.%s",
 							quote_identifier(rv->schemaname),
@@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
 			RemoveSubscriptionRel(sub->oid, relid);
 
-			logicalrep_worker_stop(sub->oid, relid);
+			logicalrep_worker_stop_at_commit(sub->oid, relid);
 
 			namespace = get_namespace_name(get_rel_namespace(relid));
 			ereport(NOTICE,
@@ -636,14 +636,18 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				 errmsg("subscription \"%s\" does not exist",
 						stmt->subname)));
 
+	subid = HeapTupleGetOid(tup);
+
 	/* must be owner */
-	if (!pg_subscription_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+	if (!pg_subscription_ownercheck(subid, GetUserId()))
 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION,
 					   stmt->subname);
 
-	subid = HeapTupleGetOid(tup);
 	sub = GetSubscription(subid, false);
 
+	/* Lock the subscription so nobody else can do anything with it. */
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+
 	/* Form a new tuple. */
 	memset(values, 0, sizeof(values));
 	memset(nulls, false, sizeof(nulls));
@@ -811,14 +815,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	ObjectAddress myself;
 	HeapTuple	tup;
 	Oid			subid;
-	Datum		datum;
-	bool		isnull;
-	char	   *subname;
-	char	   *conninfo;
-	char	   *slotname;
+	List	   *subworkers;
+	ListCell   *lc;
 	char		originname[NAMEDATALEN];
-	char	   *err = NULL;
 	RepOriginId originid;
+	char	   *err = NULL;
+	Subscription *sub;
 	WalReceiverConn *wrconn = NULL;
 	StringInfoData cmd;
 
@@ -826,7 +828,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * Lock pg_subscription with AccessExclusiveLock to ensure that the
 	 * launcher doesn't restart new worker during dropping the subscription
 	 */
-	rel = heap_open(SubscriptionRelationId, AccessExclusiveLock);
+	rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
 
 	tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
 						  CStringGetDatum(stmt->subname));
@@ -858,31 +860,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	/* DROP hook for the subscription being removed */
 	InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
 
-	/*
-	 * Lock the subscription so nobody else can do anything with it (including
-	 * the replication workers).
-	 */
-	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+	sub = GetSubscription(subid, false);
 
-	/* Get subname */
-	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
-							Anum_pg_subscription_subname, &isnull);
-	Assert(!isnull);
-	subname = pstrdup(NameStr(*DatumGetName(datum)));
-
-	/* Get conninfo */
-	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
-							Anum_pg_subscription_subconninfo, &isnull);
-	Assert(!isnull);
-	conninfo = TextDatumGetCString(datum);
-
-	/* Get slotname */
-	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
-							Anum_pg_subscription_subslotname, &isnull);
-	if (!isnull)
-		slotname = pstrdup(NameStr(*DatumGetName(datum)));
-	else
-		slotname = NULL;
+	/* Lock the subscription so nobody else can do anything with it. */
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
 
 	/*
 	 * Since dropping a replication slot is not transactional, the replication
@@ -894,7 +875,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * of a subscription that is associated with a replication slot", but we
 	 * don't have the proper facilities for that.
 	 */
-	if (slotname)
+	if (sub->slotname)
 		PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
 
 
@@ -906,15 +887,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 
 	ReleaseSysCache(tup);
 
+	/*
+	 * If we are dropping slot, stop all the subscription workers immediately
+	 * so that the slot is accessible, otherwise just shedule the stop at the
+	 * end of the transaction.
+	 *
+	 * New workers won't be started because we hold exclusive lock on the
+	 * subscription till the end of transaction.
+	 */
+	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+	subworkers = logicalrep_sub_workers_find(subid, false);
+	LWLockRelease(LogicalRepWorkerLock);
+	foreach (lc, subworkers)
+	{
+		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+		if (sub->slotname)
+			logicalrep_worker_stop(w->subid, w->relid);
+		else
+			logicalrep_worker_stop_at_commit(w->subid, w->relid);
+	}
+	list_free(subworkers);
+
 	/* Clean up dependencies */
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
 	/* Remove any associated relation synchronization states. */
 	RemoveSubscriptionRel(subid, InvalidOid);
 
-	/* Kill the apply worker so that the slot becomes accessible. */
-	logicalrep_worker_stop(subid, InvalidOid);
-
 	/* Remove the origin tracking if exists. */
 	snprintf(originname, sizeof(originname), "pg_%u", subid);
 	originid = replorigin_by_name(originname, true);
@@ -925,7 +924,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * If there is no slot associated with the subscription, we can finish
 	 * here.
 	 */
-	if (!slotname)
+	if (!sub->slotname)
 	{
 		heap_close(rel, NoLock);
 		return;
@@ -938,13 +937,14 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	load_file("libpqwalreceiver", false);
 
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname));
+	appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s",
+					 quote_identifier(sub->slotname));
 
-	wrconn = walrcv_connect(conninfo, true, subname, &err);
+	wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
 	if (wrconn == NULL)
 		ereport(ERROR,
 				(errmsg("could not connect to publisher when attempting to "
-						"drop the replication slot \"%s\"", slotname),
+						"drop the replication slot \"%s\"", sub->slotname),
 				 errdetail("The error was: %s", err),
 				 errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
 						 "to disassociate the subscription from the slot.")));
@@ -958,12 +958,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 		if (res->status != WALRCV_OK_COMMAND)
 			ereport(ERROR,
 					(errmsg("could not drop the replication slot \"%s\" on publisher",
-							slotname),
+							sub->slotname),
 					 errdetail("The error was: %s", res->err)));
 		else
 			ereport(NOTICE,
 					(errmsg("dropped replication slot \"%s\" on publisher",
-							slotname)));
+							sub->slotname)));
 
 		walrcv_clear_result(res);
 	}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 86a2b14..410ad18 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -42,12 +42,14 @@
 #include "replication/worker_internal.h"
 
 #include "storage/ipc.h"
+#include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/procsignal.h"
 
 #include "tcop/tcopprot.h"
 
+#include "utils/inval.h"
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
 #include "utils/ps_status.h"
@@ -73,6 +75,14 @@ typedef struct LogicalRepCtxStruct
 
 LogicalRepCtxStruct *LogicalRepCtx;
 
+typedef struct LogicalRepWorkerId
+{
+	Oid	subid;
+	Oid relid;
+} LogicalRepWorkerId;
+
+static List *on_commit_stop_workers = NIL;
+
 static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
@@ -86,12 +96,11 @@ static bool on_commit_launcher_wakeup = false;
 
 Datum		pg_stat_get_subscription(PG_FUNCTION_ARGS);
 
-
 /*
  * Load the list of subscriptions.
  *
- * Only the fields interesting for worker start/stop functions are filled for
- * each subscription.
+ * Only the fields interesting for worker start are filled for each
+ * subscription.
  */
 static List *
 get_subscription_list(void)
@@ -100,19 +109,13 @@ get_subscription_list(void)
 	Relation	rel;
 	HeapScanDesc scan;
 	HeapTuple	tup;
-	MemoryContext resultcxt;
-
-	/* This is the context that we will allocate our output data in */
-	resultcxt = CurrentMemoryContext;
 
 	/*
-	 * Start a transaction so we can access pg_database, and get a snapshot.
 	 * We don't have a use for the snapshot itself, but we're interested in
 	 * the secondary effect that it sets RecentGlobalXmin.  (This is critical
 	 * for anything that reads heap pages, because HOT may decide to prune
 	 * them even if the process doesn't attempt to modify any tuples.)
 	 */
-	StartTransactionCommand();
 	(void) GetTransactionSnapshot();
 
 	rel = heap_open(SubscriptionRelationId, AccessShareLock);
@@ -121,34 +124,17 @@ get_subscription_list(void)
 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
 	{
 		Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
-		Subscription *sub;
-		MemoryContext oldcxt;
 
-		/*
-		 * Allocate our results in the caller's context, not the
-		 * transaction's. We do this inside the loop, and restore the original
-		 * context at the end, so that leaky things like heap_getnext() are
-		 * not called in a potentially long-lived context.
-		 */
-		oldcxt = MemoryContextSwitchTo(resultcxt);
-
-		sub = (Subscription *) palloc0(sizeof(Subscription));
-		sub->oid = HeapTupleGetOid(tup);
-		sub->dbid = subform->subdbid;
-		sub->owner = subform->subowner;
-		sub->enabled = subform->subenabled;
-		sub->name = pstrdup(NameStr(subform->subname));
-		/* We don't fill fields we are not interested in. */
-
-		res = lappend(res, sub);
-		MemoryContextSwitchTo(oldcxt);
+		/* We only care about enabled subscriptions. */
+		if (!subform->subenabled)
+			continue;
+
+		res = lappend_oid(res, HeapTupleGetOid(tup));
 	}
 
 	heap_endscan(scan);
 	heap_close(rel, AccessShareLock);
 
-	CommitTransactionCommand();
-
 	return res;
 }
 
@@ -250,23 +236,68 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
 }
 
 /*
- * Start new apply background worker.
+ * Similar as logicalrep_worker_find(), but returns list of all workers
+ * for the subscription instead just one.
+ */
+List *
+logicalrep_sub_workers_find(Oid subid, bool only_running)
+{
+	int			i;
+	List	   *res = NIL;
+
+	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+	/* Search for attached worker for a given subscription id. */
+	for (i = 0; i < max_logical_replication_workers; i++)
+	{
+		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+		if (w->in_use && w->subid == subid && (!only_running || w->proc))
+			res = lappend(res, w);
+	}
+
+	return res;
+}
+
+/*
+ * Start new logical replication background worker.
  */
 void
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
-						 Oid relid)
+logicalrep_worker_launch(Oid subid, Oid relid)
 {
 	BackgroundWorker bgw;
 	BackgroundWorkerHandle *bgw_handle;
 	int			i;
 	int			slot = 0;
-	LogicalRepWorker *worker = NULL;
-	int			nsyncworkers;
+	List	   *subworkers;
+	ListCell   *lc;
 	TimestampTz now;
+	int			nsyncworkers = 0;
+	Subscription *sub;
+	LogicalRepWorker *worker = NULL;
 
 	ereport(DEBUG1,
-			(errmsg("starting logical replication worker for subscription \"%s\"",
-					subname)));
+			(errmsg("starting logical replication worker for subscription %u",
+					subid)));
+
+	/* Block any concurrent DDL on the subscription. */
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
+	/*
+	 * Subscription might have been dropped in meantime, make sure our cache
+	 * is up to date.
+	 */
+	AcceptInvalidationMessages();
+
+	/* Get info about subscription. */
+	sub = GetSubscription(subid, true);
+	if (!sub)
+	{
+		ereport(DEBUG1,
+				(errmsg("subscription %u not found, not starting worker for it",
+						subid)));
+		return;
+	}
 
 	/* Report this after the initial starting message for consistency. */
 	if (max_replication_slots == 0)
@@ -294,7 +325,14 @@ retry:
 		}
 	}
 
-	nsyncworkers = logicalrep_sync_worker_count(subid);
+	subworkers = logicalrep_sub_workers_find(subid, false);
+	foreach (lc, subworkers)
+	{
+		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+		if (w->relid != InvalidOid)
+			nsyncworkers ++;
+	}
+	list_free(subworkers);
 
 	now = GetCurrentTimestamp();
 
@@ -340,6 +378,7 @@ retry:
 	if (nsyncworkers >= max_sync_workers_per_subscription)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 		return;
 	}
 
@@ -350,6 +389,7 @@ retry:
 	if (worker == NULL)
 	{
 		LWLockRelease(LogicalRepWorkerLock);
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 		ereport(WARNING,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("out of logical replication worker slots"),
@@ -362,8 +402,8 @@ retry:
 	worker->in_use = true;
 	worker->generation++;
 	worker->proc = NULL;
-	worker->dbid = dbid;
-	worker->userid = userid;
+	worker->dbid = sub->dbid;
+	worker->userid = sub->owner;
 	worker->subid = subid;
 	worker->relid = relid;
 	worker->relstate = SUBREL_STATE_UNKNOWN;
@@ -374,8 +414,6 @@ retry:
 	worker->reply_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->reply_time);
 
-	LWLockRelease(LogicalRepWorkerLock);
-
 	/* Register the new dynamic worker. */
 	memset(&bgw, 0, sizeof(bgw));
 	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
@@ -394,8 +432,13 @@ retry:
 	bgw.bgw_notify_pid = MyProcPid;
 	bgw.bgw_main_arg = Int32GetDatum(slot);
 
+	/* Try to register the worker and cleanup in case of failure. */
 	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
 	{
+		logicalrep_worker_cleanup(worker);
+		LWLockRelease(LogicalRepWorkerLock);
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+
 		ereport(WARNING,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("out of background worker slots"),
@@ -403,13 +446,24 @@ retry:
 		return;
 	}
 
+	/* Done with the worker array. */
+	LWLockRelease(LogicalRepWorkerLock);
+
 	/* Now wait until it attaches. */
 	WaitForReplicationWorkerAttach(worker, bgw_handle);
+
+	/*
+	 * Worker either started or died, in any case we are done with the
+	 * subscription.
+	 */
+	UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 }
 
 /*
  * Stop the logical replication worker and wait until it detaches from the
  * slot.
+ *
+ * Callers of this function better have exclusive lock on the subscription.
  */
 void
 logicalrep_worker_stop(Oid subid, Oid relid)
@@ -417,7 +471,8 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 	LogicalRepWorker *worker;
 	uint16		generation;
 
-	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+	/* Exclusive is needed for logicalrep_worker_cleanup(). */
+	LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
 	worker = logicalrep_worker_find(subid, relid, false);
 
@@ -428,56 +483,16 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 		return;
 	}
 
+	/* If there is worker but it's not running, clean it up. */
+	if (!worker->proc)
+		logicalrep_worker_cleanup(worker);
+
 	/*
 	 * Remember which generation was our worker so we can check if what we see
 	 * is still the same one.
 	 */
 	generation = worker->generation;
 
-	/*
-	 * If we found worker but it does not have proc set it is starting up,
-	 * wait for it to finish and then kill it.
-	 */
-	while (worker->in_use && !worker->proc)
-	{
-		int			rc;
-
-		LWLockRelease(LogicalRepWorkerLock);
-
-		/* Wait for signal. */
-		rc = WaitLatch(MyLatch,
-					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
-					   1000L, WAIT_EVENT_BGWORKER_STARTUP);
-
-		/* emergency bailout if postmaster has died */
-		if (rc & WL_POSTMASTER_DEATH)
-			proc_exit(1);
-
-		if (rc & WL_LATCH_SET)
-		{
-			ResetLatch(MyLatch);
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/* Check worker status. */
-		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
-		/*
-		 * Check whether the worker slot is no longer used, which would mean
-		 * that the worker has exited, or whether the worker generation is
-		 * different, meaning that a different worker has taken the slot.
-		 */
-		if (!worker->in_use || worker->generation != generation)
-		{
-			LWLockRelease(LogicalRepWorkerLock);
-			return;
-		}
-
-		/* Worker has assigned proc, so it has started. */
-		if (worker->proc)
-			break;
-	}
-
 	/* Now terminate the worker ... */
 	kill(worker->proc->pid, SIGTERM);
 	LWLockRelease(LogicalRepWorkerLock);
@@ -497,7 +512,10 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 
 		CHECK_FOR_INTERRUPTS();
 
-		/* Wait for more work. */
+		/*
+		 * We need timeout because we generally don't get notified via latch
+		 * about the worker attach.
+		 */
 		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
 					   1000L, WAIT_EVENT_BGWORKER_SHUTDOWN);
@@ -515,6 +533,22 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 }
 
 /*
+ * Request worker to be stopped on commit.
+ */
+void
+logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
+{
+	LogicalRepWorkerId *wid;
+	wid = MemoryContextAlloc(TopTransactionContext,
+							 sizeof(LogicalRepWorkerId));
+	wid->subid = subid;
+	wid->relid = relid;
+
+	on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
+}
+
+
+/*
  * Wake up (using latch) the logical replication worker.
  */
 void
@@ -648,30 +682,6 @@ logicalrep_launcher_sighup(SIGNAL_ARGS)
 }
 
 /*
- * Count the number of registered (not necessarily running) sync workers
- * for a subscription.
- */
-int
-logicalrep_sync_worker_count(Oid subid)
-{
-	int			i;
-	int			res = 0;
-
-	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
-
-	/* Search for attached worker for a given subscription id. */
-	for (i = 0; i < max_logical_replication_workers; i++)
-	{
-		LogicalRepWorker *w = &LogicalRepCtx->workers[i];
-
-		if (w->subid == subid && OidIsValid(w->relid))
-			res++;
-	}
-
-	return res;
-}
-
-/*
  * ApplyLauncherShmemSize
  *		Compute space needed for replication launcher shared memory
  */
@@ -754,9 +764,25 @@ ApplyLauncherShmemInit(void)
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
-	if (isCommit && on_commit_launcher_wakeup)
-		ApplyLauncherWakeup();
+	ListCell *lc;
+
+	if (isCommit)
+	{
+		foreach (lc, on_commit_stop_workers)
+		{
+			LogicalRepWorkerId *wid = lfirst(lc);
+			logicalrep_worker_stop(wid->subid, wid->relid);
+		}
 
+		if (on_commit_launcher_wakeup)
+			ApplyLauncherWakeup();
+	}
+
+	/*
+	 * No need to pfree on_commit_stop_workers, it's been allocated in
+	 * transaction memory context which is going to be cleaned soon.
+	 */
+	on_commit_stop_workers = NIL;
 	on_commit_launcher_wakeup = false;
 }
 
@@ -814,8 +840,6 @@ ApplyLauncherMain(Datum main_arg)
 		int			rc;
 		List	   *sublist;
 		ListCell   *lc;
-		MemoryContext subctx;
-		MemoryContext oldctx;
 		TimestampTz now;
 		long		wait_time = DEFAULT_NAPTIME_PER_CYCLE;
 
@@ -827,41 +851,38 @@ ApplyLauncherMain(Datum main_arg)
 		if (TimestampDifferenceExceeds(last_start_time, now,
 									   wal_retrieve_retry_interval))
 		{
-			/* Use temporary context for the database list and worker info. */
-			subctx = AllocSetContextCreate(TopMemoryContext,
-										   "Logical Replication Launcher sublist",
-										   ALLOCSET_DEFAULT_MINSIZE,
-										   ALLOCSET_DEFAULT_INITSIZE,
-										   ALLOCSET_DEFAULT_MAXSIZE);
-			oldctx = MemoryContextSwitchTo(subctx);
-
-			/* search for subscriptions to start or stop. */
+			/*
+			 * Start new transaction so that we can take locks and snapshots.
+			 *
+			 * Any allocations will also be made inside the transaction memory
+			 * context.
+			 */
+			StartTransactionCommand();
+
+			/* Search for subscriptions to start. */
 			sublist = get_subscription_list();
 
-			/* Start the missing workers for enabled subscriptions. */
+			/* Start the missing workers. */
 			foreach(lc, sublist)
 			{
-				Subscription *sub = (Subscription *) lfirst(lc);
+				Oid	subid = lfirst_oid(lc);
 				LogicalRepWorker *w;
 
 				LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-				w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+				w = logicalrep_worker_find(subid, InvalidOid, false);
 				LWLockRelease(LogicalRepWorkerLock);
 
-				if (sub->enabled && w == NULL)
+				if (w == NULL)
 				{
 					last_start_time = now;
 					wait_time = wal_retrieve_retry_interval;
 
-					logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
-											 sub->owner, InvalidOid);
+					/* Start the worker. */
+					logicalrep_worker_launch(subid, InvalidOid);
 				}
 			}
 
-			/* Switch back to original memory context. */
-			MemoryContextSwitchTo(oldctx);
-			/* Clean the temporary memory. */
-			MemoryContextDelete(subctx);
+			CommitTransactionCommand();
 		}
 		else
 		{
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 3ef12df..11f4977 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -215,7 +215,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
  * Returns false if the apply worker has disappeared or the table state has been
  * reset.
  */
-static bool
+static void
 wait_for_worker_state_change(char expected_state)
 {
 	int			rc;
@@ -232,10 +232,13 @@ wait_for_worker_state_change(char expected_state)
 										InvalidOid, false);
 		LWLockRelease(LogicalRepWorkerLock);
 		if (!worker)
-			return false;
+			ereport(FATAL,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("terminating logical replication synchronization "
+							"worker due to subscription apply worker exit")));
 
 		if (MyLogicalRepWorker->relstate == expected_state)
-			return true;
+			return;
 
 		rc = WaitLatch(MyLatch,
 					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
@@ -247,8 +250,6 @@ wait_for_worker_state_change(char expected_state)
 
 		ResetLatch(MyLatch);
 	}
-
-	return false;
 }
 
 /*
@@ -285,11 +286,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-		SetSubscriptionRelState(MyLogicalRepWorker->subid,
-								MyLogicalRepWorker->relid,
-								MyLogicalRepWorker->relstate,
-								MyLogicalRepWorker->relstate_lsn,
-								true);
+		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+								   MyLogicalRepWorker->relid,
+								   MyLogicalRepWorker->relstate,
+								   MyLogicalRepWorker->relstate_lsn);
 
 		walrcv_endstreaming(wrconn, &tli);
 		finish_sync_worker();
@@ -332,6 +332,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	ListCell   *lc;
 	bool		started_tx = false;
 
+#define ensure_transaction() \
+	if (!started_tx) \
+	{\
+		StartTransactionCommand(); \
+		started_tx = true; \
+	}
+
 	Assert(!IsTransactionState());
 
 	/* We need up-to-date sync state info for subscription tables here. */
@@ -346,8 +353,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		list_free_deep(table_states);
 		table_states = NIL;
 
-		StartTransactionCommand();
-		started_tx = true;
+		ensure_transaction();
 
 		/* Fetch all non-ready tables. */
 		rstates = GetSubscriptionNotReadyRelations(MySubscription->oid);
@@ -409,14 +415,11 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			{
 				rstate->state = SUBREL_STATE_READY;
 				rstate->lsn = current_lsn;
-				if (!started_tx)
-				{
-					StartTransactionCommand();
-					started_tx = true;
-				}
-				SetSubscriptionRelState(MyLogicalRepWorker->subid,
-										rstate->relid, rstate->state,
-										rstate->lsn, true);
+
+				ensure_transaction();
+				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+										   rstate->relid, rstate->state,
+										   rstate->lsn);
 			}
 		}
 		else
@@ -435,13 +438,26 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				SpinLockRelease(&syncworker->relmutex);
 			}
 			else
+			{
+				List	   *subworkers;
+				ListCell   *lc;
 
 				/*
 				 * If there is no sync worker for this table yet, count
 				 * running sync workers for this subscription, while we have
 				 * the lock, for later.
 				 */
-				nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+				subworkers =
+					logicalrep_sub_workers_find(MyLogicalRepWorker->subid,
+												false);
+				foreach (lc, subworkers)
+				{
+					LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+					if (w->relid != InvalidOid)
+						nsyncworkers ++;
+				}
+				list_free(subworkers);
+			}
 			LWLockRelease(LogicalRepWorkerLock);
 
 			/*
@@ -467,11 +483,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * Enter busy loop and wait for synchronization worker to
 				 * reach expected state (or die trying).
 				 */
-				if (!started_tx)
-				{
-					StartTransactionCommand();
-					started_tx = true;
-				}
+				ensure_transaction();
 				wait_for_relation_state_change(rstate->relid,
 											   SUBREL_STATE_SYNCDONE);
 			}
@@ -493,10 +505,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					TimestampDifferenceExceeds(hentry->last_start_time, now,
 											   wal_retrieve_retry_interval))
 				{
-					logicalrep_worker_launch(MyLogicalRepWorker->dbid,
-											 MySubscription->oid,
-											 MySubscription->name,
-											 MyLogicalRepWorker->userid,
+					ensure_transaction();
+					logicalrep_worker_launch(MySubscription->oid,
 											 rstate->relid);
 					hentry->last_start_time = now;
 				}
@@ -798,6 +808,15 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
 									   MyLogicalRepWorker->relid,
 									   &relstate_lsn, true);
+	if (relstate == SUBREL_STATE_UNKNOWN)
+	{
+		ereport(LOG,
+				(errmsg("logical replication table synchronization worker for subscription \"%s\", "
+						"table \"%s\" will stop because the table is no longer subscribed",
+						MySubscription->name,
+						get_rel_name(MyLogicalRepWorker->relid))));
+		proc_exit(0);
+	}
 	CommitTransactionCommand();
 
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
@@ -844,11 +863,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 				/* Update the state and make it visible to others. */
 				StartTransactionCommand();
-				SetSubscriptionRelState(MyLogicalRepWorker->subid,
-										MyLogicalRepWorker->relid,
-										MyLogicalRepWorker->relstate,
-										MyLogicalRepWorker->relstate_lsn,
-										true);
+				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+										   MyLogicalRepWorker->relid,
+										   MyLogicalRepWorker->relstate,
+										   MyLogicalRepWorker->relstate_lsn);
 				CommitTransactionCommand();
 				pgstat_report_stat(false);
 
@@ -933,11 +951,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 					 * Update the new state in catalog.  No need to bother
 					 * with the shmem state as we are exiting for good.
 					 */
-					SetSubscriptionRelState(MyLogicalRepWorker->subid,
-											MyLogicalRepWorker->relid,
-											SUBREL_STATE_SYNCDONE,
-											*origin_startpos,
-											true);
+					UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+											   MyLogicalRepWorker->relid,
+											   SUBREL_STATE_SYNCDONE,
+											   *origin_startpos);
 					finish_sync_worker();
 				}
 				break;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 898c497..085dd8c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1516,24 +1516,31 @@ ApplyWorkerMain(Datum main_arg)
 										 ALLOCSET_DEFAULT_SIZES);
 	StartTransactionCommand();
 	oldctx = MemoryContextSwitchTo(ApplyContext);
-	MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
+	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+	if (!MySubscription)
+	{
+		ereport(LOG,
+				(errmsg("logical replication apply worker for subscription %u will "
+						"stop because the subscription was removed",
+						MyLogicalRepWorker->subid)));
+		proc_exit(0);
+	}
 	MySubscriptionValid = true;
 	MemoryContextSwitchTo(oldctx);
 
-	/* Setup synchronous commit according to the user's wishes */
-	SetConfigOption("synchronous_commit", MySubscription->synccommit,
-					PGC_BACKEND, PGC_S_OVERRIDE);
-
 	if (!MySubscription->enabled)
 	{
 		ereport(LOG,
-				(errmsg("logical replication apply worker for subscription \"%s\" will not "
-						"start because the subscription was disabled during startup",
+				(errmsg("logical replication apply worker for subscription \"%s\" will "
+						"stop because the subscription was disabled",
 						MySubscription->name)));
-
 		proc_exit(0);
 	}
 
+	/* Setup synchronous commit according to the user's wishes */
+	SetConfigOption("synchronous_commit", MySubscription->synccommit,
+					PGC_BACKEND, PGC_S_OVERRIDE);
+
 	/* Keep us informed about subscription changes. */
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
 								  subscription_change_cb,
diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c
index e7e8e3b..639b4eb 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -1052,10 +1052,12 @@ IndexScanOK(CatCache *cache, ScanKey cur_skey)
 		case AUTHNAME:
 		case AUTHOID:
 		case AUTHMEMMEMROLE:
+		case SUBSCRIPTIONOID:
+		case SUBSCRIPTIONNAME:
 
 			/*
-			 * Protect authentication lookups occurring before relcache has
-			 * collected entries for shared indexes.
+			 * Protect authentication and subscription lookups occurring
+			 * before relcache has collected entries for shared indexes.
 			 */
 			if (!criticalSharedRelcachesBuilt)
 				return false;
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 991ca9d..c5b0b9c 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -70,8 +70,10 @@ typedef struct SubscriptionRelState
 	char		state;
 } SubscriptionRelState;
 
-extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
-						XLogRecPtr sublsn, bool update_only);
+extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
+						XLogRecPtr sublsn);
+extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+						   XLogRecPtr sublsn);
 extern char GetSubscriptionRelState(Oid subid, Oid relid,
 						XLogRecPtr *sublsn, bool missing_ok);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 494a3a3..add7841 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -71,13 +71,13 @@ extern bool in_remote_transaction;
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 					   bool only_running);
-extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
-						 Oid userid, Oid relid);
+extern void logicalrep_worker_launch(Oid subid, Oid relid);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
-extern int	logicalrep_sync_worker_count(Oid subid);
+extern List *logicalrep_sub_workers_find(Oid subid, bool only_running);
 
 extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
 void		process_syncing_tables(XLogRecPtr current_lsn);
-- 
2.7.4

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to