On 13/06/17 21:49, Peter Eisentraut wrote:
> On 6/13/17 02:33, Noah Misch wrote:
>>> Steps to reproduce -
>>> X cluster -> create 100 tables , publish all tables  (create publication pub
>>> for all tables);
>>> Y Cluster -> create 100 tables ,create subscription(create subscription sub
>>> connection 'user=centos host=localhost' publication pub;
>>> Y cluster ->drop subscription - drop subscription sub;
>>>
>>> check the log file on Y cluster.
>>>
>>> Sometime , i have seen this error on psql prompt and drop subscription
>>> operation got failed at first attempt.
>>>
>>> postgres=# drop subscription sub;
>>> ERROR:  tuple concurrently updated
>>> postgres=# drop subscription sub;
>>> NOTICE:  dropped replication slot "sub" on publisher
>>> DROP SUBSCRIPTION
>>
>> [Action required within three days.  This is a generic notification.]
> 
> It's being worked on.  Let's see by Thursday.
> 

Attached fixes it (it was mostly about order of calls). I also split the
SetSubscriptionRelState into 2 separate interface while I was changing
it, because now that the update_only bool was added it has become quite
strange to have single interface for what is basically two separate
functions.

There are still couple of remaining issues from this thread though.
Namely the AccessExclusiveLock of the pg_subscription catalog which is
not very pretty, but we need a way to block launcher from accessing the
subscription which is being dropped and make sure it will not start new
workers for it afterwards. Question is how however as by the time
launcher can lock individual subscription it is already processing it.
So it looks to me like we'd need to reread the catalog with new snapshot
after the lock was acquired which seems bit wasteful (I wonder if we
could just AcceptInvalidationMessages and refetch from syscache). Any
better ideas?

Other related problem is locking of subscriptions during operations on
them, especially AlterSubscription seems like it should lock the
subscription itself. I did that in 0002.

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From 005eeb820f4e0528513744136582c4489e2429e3 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Wed, 14 Jun 2017 08:14:20 +0200
Subject: [PATCH 2/2] Lock subscription when altering it

---
 src/backend/commands/subscriptioncmds.c | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 14c8f3f..e0ec8ea 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -642,6 +642,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 					   stmt->subname);
 
 	subid = HeapTupleGetOid(tup);
+
+	/*
+	 * Lock the subscription so nobody else can do anything with it (including
+	 * the replication workers).
+	 */
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+
 	sub = GetSubscription(subid, false);
 
 	/* Form a new tuple. */
-- 
2.7.4

From 9011698ae800e0f45f960e91f6b16eab15634fac Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Tue, 13 Jun 2017 19:26:51 +0200
Subject: [PATCH 1/2] Improve the pg_subscription_rel handling

Split the SetSubscriptionRelState into separate Add and Update
functions, removing the unsafe upsert logic as callers are supposed to
know whether they are updating or adding new row.

Reorder the code in the above mentioned functions to avoid "tuple
updated concurrently" warnings.
---
 src/backend/catalog/pg_subscription.c       | 131 +++++++++++++++-------------
 src/backend/commands/subscriptioncmds.c     |  14 +--
 src/backend/replication/logical/tablesync.c |  33 ++++---
 src/include/catalog/pg_subscription_rel.h   |   6 +-
 4 files changed, 98 insertions(+), 86 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index c5b2541..fd19675 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -225,24 +225,15 @@ textarray_to_stringlist(ArrayType *textarray)
 }
 
 /*
- * Set the state of a subscription table.
- *
- * If update_only is true and the record for given table doesn't exist, do
- * nothing.  This can be used to avoid inserting a new record that was deleted
- * by someone else.  Generally, subscription DDL commands should use false,
- * workers should use true.
- *
- * The insert-or-update logic in this function is not concurrency safe so it
- * might raise an error in rare circumstances.  But if we took a stronger lock
- * such as ShareRowExclusiveLock, we would risk more deadlocks.
+ * Add new state record for a subscription table.
  */
 Oid
-SetSubscriptionRelState(Oid subid, Oid relid, char state,
-						XLogRecPtr sublsn, bool update_only)
+AddSubscriptionRelState(Oid subid, Oid relid, char state,
+						XLogRecPtr sublsn)
 {
 	Relation	rel;
 	HeapTuple	tup;
-	Oid			subrelid = InvalidOid;
+	Oid			subrelid;
 	bool		nulls[Natts_pg_subscription_rel];
 	Datum		values[Natts_pg_subscription_rel];
 
@@ -252,57 +243,79 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state,
 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
 							  ObjectIdGetDatum(relid),
 							  ObjectIdGetDatum(subid));
+	if (HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u already exists",
+			 relid, subid);
 
-	/*
-	 * If the record for given table does not exist yet create new record,
-	 * otherwise update the existing one.
-	 */
-	if (!HeapTupleIsValid(tup) && !update_only)
-	{
-		/* Form the tuple. */
-		memset(values, 0, sizeof(values));
-		memset(nulls, false, sizeof(nulls));
-		values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
-		values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
-		values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
-		if (sublsn != InvalidXLogRecPtr)
-			values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-		else
-			nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
-
-		tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
-
-		/* Insert tuple into catalog. */
-		subrelid = CatalogTupleInsert(rel, tup);
-
-		heap_freetuple(tup);
-	}
-	else if (HeapTupleIsValid(tup))
-	{
-		bool		replaces[Natts_pg_subscription_rel];
+	/* Form the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
+	values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
+	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+	if (sublsn != InvalidXLogRecPtr)
+		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+	else
+		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
 
-		/* Update the tuple. */
-		memset(values, 0, sizeof(values));
-		memset(nulls, false, sizeof(nulls));
-		memset(replaces, false, sizeof(replaces));
+	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
-		replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
-		values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+	/* Insert tuple into catalog. */
+	subrelid = CatalogTupleInsert(rel, tup);
 
-		replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
-		if (sublsn != InvalidXLogRecPtr)
-			values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
-		else
-			nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	heap_freetuple(tup);
 
-		tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
-								replaces);
+	/* Cleanup. */
+	heap_close(rel, NoLock);
 
-		/* Update the catalog. */
-		CatalogTupleUpdate(rel, &tup->t_self, tup);
+	return subrelid;
+}
 
-		subrelid = HeapTupleGetOid(tup);
-	}
+/*
+ * Update the state of a subscription table.
+ */
+Oid
+UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+						   XLogRecPtr sublsn)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	Oid			subrelid;
+	bool		nulls[Natts_pg_subscription_rel];
+	Datum		values[Natts_pg_subscription_rel];
+	bool		replaces[Natts_pg_subscription_rel];
+
+	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
+
+	/* Try finding existing mapping. */
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+							  ObjectIdGetDatum(relid),
+							  ObjectIdGetDatum(subid));
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "subscription table %u in subscription %u does not exist",
+			 relid, subid);
+
+	/* Update the tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
+	values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+	replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
+	if (sublsn != InvalidXLogRecPtr)
+		values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
+	else
+		nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	subrelid = HeapTupleGetOid(tup);
 
 	/* Cleanup. */
 	heap_close(rel, NoLock);
@@ -377,6 +390,8 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 	HeapTuple	tup;
 	int			nkeys = 0;
 
+	Assert(OidIsValid(subid) || OidIsValid(relid));
+
 	rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
 
 	if (OidIsValid(subid))
@@ -400,9 +415,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid)
 	/* Do the search and delete what we found. */
 	scan = heap_beginscan_catalog(rel, nkeys, skey);
 	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
-	{
 		simple_heap_delete(rel, &tup->t_self);
-	}
 	heap_endscan(scan);
 
 	heap_close(rel, RowExclusiveLock);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 5aae7b6..14c8f3f 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 				CheckSubscriptionRelkind(get_rel_relkind(relid),
 										 rv->schemaname, rv->relname);
 
-				SetSubscriptionRelState(subid, relid, table_state,
-										InvalidXLogRecPtr, false);
+				AddSubscriptionRelState(subid, relid, table_state,
+										InvalidXLogRecPtr);
 			}
 
 			/*
@@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		if (!bsearch(&relid, subrel_local_oids,
 					 list_length(subrel_states), sizeof(Oid), oid_cmp))
 		{
-			SetSubscriptionRelState(sub->oid, relid,
+			AddSubscriptionRelState(sub->oid, relid,
 						  copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
-									InvalidXLogRecPtr, false);
+									InvalidXLogRecPtr);
 			ereport(NOTICE,
 					(errmsg("added subscription for table %s.%s",
 							quote_identifier(rv->schemaname),
@@ -906,15 +906,15 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 
 	ReleaseSysCache(tup);
 
+	/* Kill the apply worker associated with the subscription. */
+	logicalrep_worker_stop(subid, InvalidOid);
+
 	/* Clean up dependencies */
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
 	/* Remove any associated relation synchronization states. */
 	RemoveSubscriptionRel(subid, InvalidOid);
 
-	/* Kill the apply worker so that the slot becomes accessible. */
-	logicalrep_worker_stop(subid, InvalidOid);
-
 	/* Remove the origin tracking if exists. */
 	snprintf(originname, sizeof(originname), "pg_%u", subid);
 	originid = replorigin_by_name(originname, true);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 3ff08bf..28accda 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -285,11 +285,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 
 		SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-		SetSubscriptionRelState(MyLogicalRepWorker->subid,
-								MyLogicalRepWorker->relid,
-								MyLogicalRepWorker->relstate,
-								MyLogicalRepWorker->relstate_lsn,
-								true);
+		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+								   MyLogicalRepWorker->relid,
+								   MyLogicalRepWorker->relstate,
+								   MyLogicalRepWorker->relstate_lsn);
 
 		walrcv_endstreaming(wrconn, &tli);
 		finish_sync_worker();
@@ -414,9 +413,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 					StartTransactionCommand();
 					started_tx = true;
 				}
-				SetSubscriptionRelState(MyLogicalRepWorker->subid,
-										rstate->relid, rstate->state,
-										rstate->lsn, true);
+				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+										   rstate->relid, rstate->state,
+										   rstate->lsn);
 			}
 		}
 		else
@@ -844,11 +843,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 				/* Update the state and make it visible to others. */
 				StartTransactionCommand();
-				SetSubscriptionRelState(MyLogicalRepWorker->subid,
-										MyLogicalRepWorker->relid,
-										MyLogicalRepWorker->relstate,
-										MyLogicalRepWorker->relstate_lsn,
-										true);
+				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+										   MyLogicalRepWorker->relid,
+										   MyLogicalRepWorker->relstate,
+										   MyLogicalRepWorker->relstate_lsn);
 				CommitTransactionCommand();
 				pgstat_report_stat(false);
 
@@ -933,11 +931,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 					 * Update the new state in catalog.  No need to bother
 					 * with the shmem state as we are exiting for good.
 					 */
-					SetSubscriptionRelState(MyLogicalRepWorker->subid,
-											MyLogicalRepWorker->relid,
-											SUBREL_STATE_SYNCDONE,
-											*origin_startpos,
-											true);
+					UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+											   MyLogicalRepWorker->relid,
+											   SUBREL_STATE_SYNCDONE,
+											   *origin_startpos);
 					finish_sync_worker();
 				}
 				break;
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index f5f6191..e6a2dd5 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -70,8 +70,10 @@ typedef struct SubscriptionRelState
 	char		state;
 } SubscriptionRelState;
 
-extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
-						XLogRecPtr sublsn, bool update_only);
+extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state,
+						XLogRecPtr sublsn);
+extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
+						   XLogRecPtr sublsn);
 extern char GetSubscriptionRelState(Oid subid, Oid relid,
 						XLogRecPtr *sublsn, bool missing_ok);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
-- 
2.7.4

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to