From f20b23d4e19769d385d7b45d7dd9ed994e83a3f7 Mon Sep 17 00:00:00 2001
From: Jelte Fennema <github-tech@jeltef.nl>
Date: Thu, 25 Aug 2022 15:02:58 +0200
Subject: [PATCH] Fix errors when concurrently altering subscriptions

Without this patch concurrent ALTER SUBSCRIPTION statements for the same
subscription could result in one of these statements returning the
following error:

ERROR:  XX000: tuple concurrently updated

This fixes that by re-fetching the tuple after acquiring the lock on the
subscription.
---
 src/backend/commands/subscriptioncmds.c       | 150 +++++++++++-----
 .../isolation/expected/subscription-ddl.out   | 169 ++++++++++++++++++
 src/test/isolation/isolation_schedule         |   1 +
 .../isolation/specs/subscription-ddl.spec     |  59 ++++++
 4 files changed, 335 insertions(+), 44 deletions(-)
 create mode 100644 src/test/isolation/expected/subscription-ddl.out
 create mode 100644 src/test/isolation/specs/subscription-ddl.spec

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 670b219c8d..9a1eaeb200 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -95,6 +95,7 @@ static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
+static Oid	LockSubscription(Relation rel, const char *subname, bool missing_ok);
 
 
 /*
@@ -987,6 +988,86 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		table_close(rel, NoLock);
 }
 
+static Oid
+LockSubscription(Relation rel, const char *subname, bool missing_ok)
+{
+	HeapTuple	tup;
+	Form_pg_subscription form;
+	Oid			subid;
+
+	/*
+	 * Loop covers the rare case where the subscription is renamed before we
+	 * can lock it. We try again just in case we can find a new one of the
+	 * same name.
+	 */
+	while (true)
+	{
+		tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
+							  CStringGetDatum(subname));
+
+		if (!HeapTupleIsValid(tup))
+		{
+			if (!missing_ok)
+				ereport(ERROR,
+						(errcode(ERRCODE_UNDEFINED_OBJECT),
+						 errmsg("subscription \"%s\" does not exist",
+								subname)));
+			else
+				ereport(NOTICE,
+						(errmsg("subscription \"%s\" does not exist, skipping",
+								subname)));
+
+			return InvalidOid;
+		}
+
+		form = (Form_pg_subscription) GETSTRUCT(tup);
+		subid = form->oid;
+
+		/* must be owner */
+		if (!pg_subscription_ownercheck(subid, GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
+						   subname);
+
+		/*
+		 * Lock the subscription so nobody else can do anything with it
+		 * (including the replication workers).
+		 */
+		LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+
+		/*
+		 * And now, re-fetch the tuple by OID.  If it's still there and still
+		 * the same name, we win; else, drop the lock and loop back to try
+		 * again.
+		 */
+		ReleaseSysCache(tup);
+		tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+		if (HeapTupleIsValid(tup))
+		{
+
+			form = (Form_pg_subscription) GETSTRUCT(tup);
+
+			/*
+			 * if the name changes we try te fetch a subscription with the
+			 * same name again, in case a new one was created.
+			 */
+			if (strcmp(subname, NameStr(form->subname)) == 0)
+			{
+				/* must still be owner */
+				if (!pg_subscription_ownercheck(subid, GetUserId()))
+					aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
+								   subname);
+				break;
+			}
+			/* can only get here if subscription was just renamed */
+			ReleaseSysCache(tup);
+		}
+		UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+	}
+	ReleaseSysCache(tup);
+	return subid;
+}
+
+
 /*
  * Alter the existing subscription.
  */
@@ -1003,35 +1084,27 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 	Oid			subid;
 	bool		update_tuple = false;
 	Subscription *sub;
-	Form_pg_subscription form;
 	bits32		supported_opts;
 	SubOpts		opts = {0};
 
 	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+	subid = LockSubscription(rel, stmt->subname, false);
 
 	/* Fetch the existing tuple. */
-	tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
-							  CStringGetDatum(stmt->subname));
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
 
+	/*
+	 * This should never happen since we already locked the subscription, so
+	 * we know it exists.
+	 */
 	if (!HeapTupleIsValid(tup))
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
 				 errmsg("subscription \"%s\" does not exist",
 						stmt->subname)));
 
-	form = (Form_pg_subscription) GETSTRUCT(tup);
-	subid = form->oid;
-
-	/* must be owner */
-	if (!pg_subscription_ownercheck(subid, GetUserId()))
-		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
-					   stmt->subname);
-
 	sub = GetSubscription(subid, false);
 
-	/* Lock the subscription so nobody else can do anything with it. */
-	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
-
 	/* Form a new tuple. */
 	memset(values, 0, sizeof(values));
 	memset(nulls, false, sizeof(nulls));
@@ -1370,7 +1443,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	char		originname[NAMEDATALEN];
 	char	   *err = NULL;
 	WalReceiverConn *wrconn;
-	Form_pg_subscription form;
 	List	   *rstates;
 
 	/*
@@ -1379,43 +1451,28 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 */
 	rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
 
-	tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId,
-						  CStringGetDatum(stmt->subname));
-
-	if (!HeapTupleIsValid(tup))
+	subid = LockSubscription(rel, stmt->subname, stmt->missing_ok);
+	if (subid == InvalidOid)
 	{
 		table_close(rel, NoLock);
-
-		if (!stmt->missing_ok)
-			ereport(ERROR,
-					(errcode(ERRCODE_UNDEFINED_OBJECT),
-					 errmsg("subscription \"%s\" does not exist",
-							stmt->subname)));
-		else
-			ereport(NOTICE,
-					(errmsg("subscription \"%s\" does not exist, skipping",
-							stmt->subname)));
-
 		return;
 	}
 
-	form = (Form_pg_subscription) GETSTRUCT(tup);
-	subid = form->oid;
+	tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
 
-	/* must be owner */
-	if (!pg_subscription_ownercheck(subid, GetUserId()))
-		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
-					   stmt->subname);
+	/*
+	 * This should never happen since we already locked the subscription, so
+	 * we know it exists.
+	 */
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("subscription \"%s\" does not exist",
+						stmt->subname)));
 
 	/* DROP hook for the subscription being removed */
 	InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
 
-	/*
-	 * Lock the subscription so nobody else can do anything with it (including
-	 * the replication workers).
-	 */
-	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
-
 	/* Get subname */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
 							Anum_pg_subscription_subname, &isnull);
@@ -1734,9 +1791,14 @@ AlterSubscriptionOwner(const char *name, Oid newOwnerId)
 
 	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
 
-	tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
-							  CStringGetDatum(name));
+	subid = LockSubscription(rel, name, false);
 
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+	/*
+	 * This should never happen since we already locked the subscription, so
+	 * we know it exists.
+	 */
 	if (!HeapTupleIsValid(tup))
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
diff --git a/src/test/isolation/expected/subscription-ddl.out b/src/test/isolation/expected/subscription-ddl.out
new file mode 100644
index 0000000000..0f280cbad9
--- /dev/null
+++ b/src/test/isolation/expected/subscription-ddl.out
@@ -0,0 +1,169 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1alter s2alter s1commit
+step s1alter: ALTER SUBSCRIPTION sub1 DISABLE;
+step s2alter: ALTER SUBSCRIPTION sub1 DISABLE; <waiting ...>
+step s1commit: COMMIT;
+step s2alter: <... completed>
+
+starting permutation: s1owner s2alter s1commit
+step s1owner: ALTER SUBSCRIPTION sub1 OWNER TO testsuperuser;
+step s2alter: ALTER SUBSCRIPTION sub1 DISABLE; <waiting ...>
+step s1commit: COMMIT;
+step s2alter: <... completed>
+
+starting permutation: s1drop s2alter s1commit
+step s1drop: DROP SUBSCRIPTION sub1;
+step s2alter: ALTER SUBSCRIPTION sub1 DISABLE; <waiting ...>
+step s1commit: COMMIT;
+step s2alter: <... completed>
+ERROR:  subscription "sub1" does not exist
+
+starting permutation: s1rename s2alter s1commit
+step s1rename: ALTER SUBSCRIPTION sub1 RENAME TO sub2;
+step s2alter: ALTER SUBSCRIPTION sub1 DISABLE; <waiting ...>
+step s1commit: COMMIT;
+step s2alter: <... completed>
+ERROR:  subscription "sub1" does not exist
+
+starting permutation: s1drop s1recreate s2alter s1commit
+step s1drop: DROP SUBSCRIPTION sub1;
+s1: WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+step s1recreate: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres' PUBLICATION pub1 WITH(connect=false, create_slot=false, slot_name = NONE);
+step s2alter: ALTER SUBSCRIPTION sub1 DISABLE; <waiting ...>
+step s1commit: COMMIT;
+step s2alter: <... completed>
+
+starting permutation: s1rename s1recreate s2alter s1commit
+step s1rename: ALTER SUBSCRIPTION sub1 RENAME TO sub2;
+s1: WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+step s1recreate: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres' PUBLICATION pub1 WITH(connect=false, create_slot=false, slot_name = NONE);
+step s2alter: ALTER SUBSCRIPTION sub1 DISABLE; <waiting ...>
+step s1commit: COMMIT;
+step s2alter: <... completed>
+
+starting permutation: s1alter s2drop s1commit
+step s1alter: ALTER SUBSCRIPTION sub1 DISABLE;
+step s2drop: DROP SUBSCRIPTION sub1; <waiting ...>
+step s1commit: COMMIT;
+step s2drop: <... completed>
+
+starting permutation: s1owner s2drop s1commit
+step s1owner: ALTER SUBSCRIPTION sub1 OWNER TO testsuperuser;
+step s2drop: DROP SUBSCRIPTION sub1; <waiting ...>
+step s1commit: COMMIT;
+step s2drop: <... completed>
+
+starting permutation: s1drop s2drop s1commit
+step s1drop: DROP SUBSCRIPTION sub1;
+step s2drop: DROP SUBSCRIPTION sub1; <waiting ...>
+step s1commit: COMMIT;
+step s2drop: <... completed>
+ERROR:  subscription "sub1" does not exist
+
+starting permutation: s1rename s2drop s1commit
+step s1rename: ALTER SUBSCRIPTION sub1 RENAME TO sub2;
+step s2drop: DROP SUBSCRIPTION sub1; <waiting ...>
+step s1commit: COMMIT;
+step s2drop: <... completed>
+ERROR:  subscription "sub1" does not exist
+
+starting permutation: s1drop s1recreate s2drop s1commit
+step s1drop: DROP SUBSCRIPTION sub1;
+s1: WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+step s1recreate: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres' PUBLICATION pub1 WITH(connect=false, create_slot=false, slot_name = NONE);
+step s2drop: DROP SUBSCRIPTION sub1; <waiting ...>
+step s1commit: COMMIT;
+step s2drop: <... completed>
+
+starting permutation: s1rename s1recreate s2drop s1commit
+step s1rename: ALTER SUBSCRIPTION sub1 RENAME TO sub2;
+s1: WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+step s1recreate: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres' PUBLICATION pub1 WITH(connect=false, create_slot=false, slot_name = NONE);
+step s2drop: DROP SUBSCRIPTION sub1; <waiting ...>
+step s1commit: COMMIT;
+step s2drop: <... completed>
+
+starting permutation: s1alter s2rename s1commit
+step s1alter: ALTER SUBSCRIPTION sub1 DISABLE;
+step s2rename: ALTER SUBSCRIPTION sub1 RENAME TO sub3; <waiting ...>
+step s1commit: COMMIT;
+step s2rename: <... completed>
+
+starting permutation: s1owner s2rename s1commit
+step s1owner: ALTER SUBSCRIPTION sub1 OWNER TO testsuperuser;
+step s2rename: ALTER SUBSCRIPTION sub1 RENAME TO sub3; <waiting ...>
+step s1commit: COMMIT;
+step s2rename: <... completed>
+
+starting permutation: s1drop s2rename s1commit
+step s1drop: DROP SUBSCRIPTION sub1;
+step s2rename: ALTER SUBSCRIPTION sub1 RENAME TO sub3; <waiting ...>
+step s1commit: COMMIT;
+step s2rename: <... completed>
+ERROR:  subscription "sub1" does not exist
+
+starting permutation: s1rename s2rename s1commit
+step s1rename: ALTER SUBSCRIPTION sub1 RENAME TO sub2;
+step s2rename: ALTER SUBSCRIPTION sub1 RENAME TO sub3; <waiting ...>
+step s1commit: COMMIT;
+step s2rename: <... completed>
+ERROR:  subscription "sub1" does not exist
+
+starting permutation: s1drop s1recreate s2rename s1commit
+step s1drop: DROP SUBSCRIPTION sub1;
+s1: WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+step s1recreate: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres' PUBLICATION pub1 WITH(connect=false, create_slot=false, slot_name = NONE);
+step s2rename: ALTER SUBSCRIPTION sub1 RENAME TO sub3; <waiting ...>
+step s1commit: COMMIT;
+step s2rename: <... completed>
+
+starting permutation: s1rename s1recreate s2rename s1commit
+step s1rename: ALTER SUBSCRIPTION sub1 RENAME TO sub2;
+s1: WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+step s1recreate: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres' PUBLICATION pub1 WITH(connect=false, create_slot=false, slot_name = NONE);
+step s2rename: ALTER SUBSCRIPTION sub1 RENAME TO sub3; <waiting ...>
+step s1commit: COMMIT;
+step s2rename: <... completed>
+
+starting permutation: s1alter s2owner s1commit
+step s1alter: ALTER SUBSCRIPTION sub1 DISABLE;
+step s2owner: ALTER SUBSCRIPTION sub1 OWNER TO testsuperuser; <waiting ...>
+step s1commit: COMMIT;
+step s2owner: <... completed>
+
+starting permutation: s1owner s2owner s1commit
+step s1owner: ALTER SUBSCRIPTION sub1 OWNER TO testsuperuser;
+step s2owner: ALTER SUBSCRIPTION sub1 OWNER TO testsuperuser; <waiting ...>
+step s1commit: COMMIT;
+step s2owner: <... completed>
+
+starting permutation: s1drop s2owner s1commit
+step s1drop: DROP SUBSCRIPTION sub1;
+step s2owner: ALTER SUBSCRIPTION sub1 OWNER TO testsuperuser; <waiting ...>
+step s1commit: COMMIT;
+step s2owner: <... completed>
+ERROR:  subscription "sub1" does not exist
+
+starting permutation: s1rename s2owner s1commit
+step s1rename: ALTER SUBSCRIPTION sub1 RENAME TO sub2;
+step s2owner: ALTER SUBSCRIPTION sub1 OWNER TO testsuperuser; <waiting ...>
+step s1commit: COMMIT;
+step s2owner: <... completed>
+ERROR:  subscription "sub1" does not exist
+
+starting permutation: s1drop s1recreate s2owner s1commit
+step s1drop: DROP SUBSCRIPTION sub1;
+s1: WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+step s1recreate: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres' PUBLICATION pub1 WITH(connect=false, create_slot=false, slot_name = NONE);
+step s2owner: ALTER SUBSCRIPTION sub1 OWNER TO testsuperuser; <waiting ...>
+step s1commit: COMMIT;
+step s2owner: <... completed>
+
+starting permutation: s1rename s1recreate s2owner s1commit
+step s1rename: ALTER SUBSCRIPTION sub1 RENAME TO sub2;
+s1: WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+step s1recreate: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres' PUBLICATION pub1 WITH(connect=false, create_slot=false, slot_name = NONE);
+step s2owner: ALTER SUBSCRIPTION sub1 OWNER TO testsuperuser; <waiting ...>
+step s1commit: COMMIT;
+step s2owner: <... completed>
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 529a4cbd4d..6e9144319a 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -84,6 +84,7 @@ test: alter-table-3
 test: alter-table-4
 test: create-trigger
 test: sequence-ddl
+test: subscription-ddl
 test: async-notify
 test: vacuum-no-cleanup-lock
 test: timeouts
diff --git a/src/test/isolation/specs/subscription-ddl.spec b/src/test/isolation/specs/subscription-ddl.spec
new file mode 100644
index 0000000000..b55c8f90ca
--- /dev/null
+++ b/src/test/isolation/specs/subscription-ddl.spec
@@ -0,0 +1,59 @@
+# Test sequence usage and concurrent sequence DDL
+
+setup
+{
+    CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres' PUBLICATION pub1 WITH(connect=false, create_slot=false, slot_name = NONE);
+    CREATE ROLE testsuperuser WITH SUPERUSER;
+}
+
+teardown
+{
+    DROP SUBSCRIPTION IF EXISTS sub1;
+    DROP SUBSCRIPTION IF EXISTS sub2;
+    DROP SUBSCRIPTION IF EXISTS sub3;
+    DROP ROLE testsuperuser;
+}
+
+session s1
+setup             { BEGIN; }
+step s1alter      { ALTER SUBSCRIPTION sub1 DISABLE; }
+step s1drop       { DROP SUBSCRIPTION sub1; }
+step s1rename     { ALTER SUBSCRIPTION sub1 RENAME TO sub2; }
+step s1owner      { ALTER SUBSCRIPTION sub1 OWNER TO testsuperuser; }
+step s1recreate   { CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres' PUBLICATION pub1 WITH(connect=false, create_slot=false, slot_name = NONE); }
+step s1commit     { COMMIT; }
+
+session s2
+step s2alter    { ALTER SUBSCRIPTION sub1 DISABLE; }
+step s2drop     { DROP SUBSCRIPTION sub1; }
+step s2rename   { ALTER SUBSCRIPTION sub1 RENAME TO sub3; }
+step s2owner    { ALTER SUBSCRIPTION sub1 OWNER TO testsuperuser; }
+
+
+permutation s1alter s2alter s1commit
+permutation s1owner s2alter s1commit
+permutation s1drop s2alter s1commit
+permutation s1rename s2alter s1commit
+permutation s1drop s1recreate s2alter s1commit
+permutation s1rename s1recreate s2alter s1commit
+
+permutation s1alter s2drop s1commit
+permutation s1owner s2drop s1commit
+permutation s1drop s2drop s1commit
+permutation s1rename s2drop s1commit
+permutation s1drop s1recreate s2drop s1commit
+permutation s1rename s1recreate s2drop s1commit
+
+permutation s1alter s2rename s1commit
+permutation s1owner s2rename s1commit
+permutation s1drop s2rename s1commit
+permutation s1rename s2rename s1commit
+permutation s1drop s1recreate s2rename s1commit
+permutation s1rename s1recreate s2rename s1commit
+
+permutation s1alter s2owner s1commit
+permutation s1owner s2owner s1commit
+permutation s1drop s2owner s1commit
+permutation s1rename s2owner s1commit
+permutation s1drop s1recreate s2owner s1commit
+permutation s1rename s1recreate s2owner s1commit
-- 
2.34.1

