From 311220bf4e70bd67481292bffaad2d133e06211b Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Fri, 5 Feb 2021 12:20:13 +0530
Subject: [PATCH v3] Make pg_replication_origin_drop safe against concurrent
 drops.

Currently, we get the origin id from the name and then drop the origin by
taking ExclusiveLock on ReplicationOriginRelationId. So, two concurrent
sessions can get the id from the name at the same time, and then when they
try to drop the origin, one of the sessions will get either
"tuple concurrently deleted" or "cache lookup failed for replication
origin ..".

To prevent this race condition we do the entire operation under lock. This
obviates the need for replorigin_drop() API but we have kept it for backward
compatibility.

Author: Peter Smith
Reviewed-by: Amit Kapila
Discussion: https://www.postgresql.org/message-id/CAHut%2BPuW8DWV5fskkMWWMqzt-x7RPcNQOtJQBp6SdwyRghCk7A%40mail.gmail.com
---
 src/backend/commands/subscriptioncmds.c  |  5 +-
 src/backend/replication/logical/origin.c | 78 ++++++++++++++++++++++++--------
 src/include/replication/origin.h         |  1 +
 3 files changed, 61 insertions(+), 23 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 082f785..5ccbc9d 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -926,7 +926,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	ListCell   *lc;
 	char		originname[NAMEDATALEN];
 	char	   *err = NULL;
-	RepOriginId originid;
 	WalReceiverConn *wrconn = NULL;
 	StringInfoData cmd;
 	Form_pg_subscription form;
@@ -1050,9 +1049,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 
 	/* Remove the origin tracking if exists. */
 	snprintf(originname, sizeof(originname), "pg_%u", subid);
-	originid = replorigin_by_name(originname, true);
-	if (originid != InvalidRepOriginId)
-		replorigin_drop(originid, false);
+	replorigin_drop_by_name(originname, true, false);
 
 	/*
 	 * If there is no slot associated with the subscription, we can finish
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 9bd761a..3b6bf3d 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -324,25 +324,14 @@ replorigin_create(char *roname)
 
 
 /*
- * Drop replication origin.
- *
- * Needs to be called in a transaction.
+ * Common code to drop a replication origin.
  */
-void
-replorigin_drop(RepOriginId roident, bool nowait)
+static void
+replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait)
 {
 	HeapTuple	tuple;
-	Relation	rel;
 	int			i;
 
-	Assert(IsTransactionState());
-
-	/*
-	 * To interlock against concurrent drops, we hold ExclusiveLock on
-	 * pg_replication_origin throughout this function.
-	 */
-	rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
-
 	/*
 	 * First, clean up the slot state info, if there is any matching slot.
 	 */
@@ -415,6 +404,61 @@ restart:
 	ReleaseSysCache(tuple);
 
 	CommandCounterIncrement();
+}
+
+/*
+ * Drop replication origin (by name).
+ *
+ * The difference between this and replorigin_drop is that we check
+ * whether the given origin exists or not after acquiring the lock. This
+ * provides an interlock against the concurrent drop of the origin for the
+ * callers that need to first get the origin_id from the name. We don't need
+ * replorigin_drop after this but we have kept it for backward compatibility.
+ *
+ * Needs to be called in a transaction.
+ */
+void
+replorigin_drop_by_name(char *name, bool missing_ok, bool nowait)
+{
+	RepOriginId roident;
+	Relation	rel;
+
+	Assert(IsTransactionState());
+
+	/*
+	 * To interlock against concurrent drops, we hold ExclusiveLock on
+	 * pg_replication_origin throughout this function.
+	 */
+	rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
+
+	roident = replorigin_by_name(name, missing_ok);
+
+	if (OidIsValid(roident))
+		replorigin_drop_guts(rel, roident, nowait);
+
+	/* now release lock again */
+	table_close(rel, ExclusiveLock);
+}
+
+/*
+ * Drop replication origin (by id).
+ *
+ * Needs to be called in a transaction.
+ */
+void
+replorigin_drop(RepOriginId roident, bool nowait)
+{
+	Relation	rel;
+
+	Assert(IsTransactionState());
+
+	/*
+	 * To interlock against concurrent drops, we hold ExclusiveLock on
+	 * pg_replication_origin throughout this function.
+	 */
+	rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
+
+	replorigin_drop_guts(rel, roident, nowait);
 
 	/* now release lock again */
 	table_close(rel, ExclusiveLock);
@@ -1256,16 +1300,12 @@ Datum
 pg_replication_origin_drop(PG_FUNCTION_ARGS)
 {
 	char	   *name;
-	RepOriginId roident;
 
 	replorigin_check_prerequisites(false, false);
 
 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
 
-	roident = replorigin_by_name(name, false);
-	Assert(OidIsValid(roident));
-
-	replorigin_drop(roident, true);
+	replorigin_drop_by_name(name, false /* missing_ok */ , true /* nowait */ );
 
 	pfree(name);
 
diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h
index 731445a..e13c238 100644
--- a/src/include/replication/origin.h
+++ b/src/include/replication/origin.h
@@ -41,6 +41,7 @@ extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
 extern RepOriginId replorigin_by_name(char *name, bool missing_ok);
 extern RepOriginId replorigin_create(char *name);
 extern void replorigin_drop(RepOriginId roident, bool nowait);
+extern void replorigin_drop_by_name(char *name, bool missing_ok, bool nowait);
 extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok,
 							  char **roname);
 
-- 
1.8.3.1

