On Mon, 23 Jan 2023 at 10:52, Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> IIRC, this is done to prevent concurrent drops of origin drop say by
> exposed API pg_replication_origin_drop(). See the discussion in [1]
> related to it. If we want we can optimize it so that we can acquire
> the lock on the specific origin as mentioned in comments
> replorigin_drop_by_name() but it was not clear that this operation
> would be frequent enough.

Here is an attached patch to lock the replication origin record using
LockSharedObject instead of locking pg_replication_origin relation in
ExclusiveLock mode. Now tablesync worker will wait only if the
tablesync worker is trying to drop the same replication origin which
has already been dropped by the apply worker, the other tablesync
workers will be able to successfully drop the replication origin
without any wait.

Regards,
Vignesh
From 83aa8b6a0c7b4de42c1bf7aa6cfd863830a86c76 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Fri, 27 Jan 2023 15:17:09 +0530
Subject: [PATCH] Lock the replication origin record instead of locking the
 pg_replication_origin relation.

Lock the replication origin record instead of locking the
pg_replication_origin relation.
---
 src/backend/replication/logical/origin.c | 37 ++++++++++++++++--------
 1 file changed, 25 insertions(+), 12 deletions(-)

diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index b754c43840..049a2547d3 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -417,6 +417,21 @@ restart:
 	CommandCounterIncrement();
 }
 
+static bool
+replorigin_exists(RepOriginId roident)
+{
+	HeapTuple	tuple;
+
+	tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum((Oid) roident));
+	if (HeapTupleIsValid(tuple))
+	{
+		ReleaseSysCache(tuple);
+		return true;
+	}
+
+	return false;
+}
+
 /*
  * Drop replication origin (by name).
  *
@@ -430,23 +445,21 @@ replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
 
 	Assert(IsTransactionState());
 
-	/*
-	 * To interlock against concurrent drops, we hold ExclusiveLock on
-	 * pg_replication_origin till xact commit.
-	 *
-	 * XXX We can optimize this by acquiring the lock on a specific origin by
-	 * using LockSharedObject if required. However, for that, we first to
-	 * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock
-	 * the specific origin and then re-check if the origin still exists.
-	 */
-	rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
+	rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
 
 	roident = replorigin_by_name(name, missing_ok);
 
-	if (OidIsValid(roident))
+	/*
+	 * Lock the origin to prevent concurrent drops. We keep the lock until the
+	 * end of transaction.
+	 */
+	LockSharedObject(ReplicationOriginRelationId, roident, 0,
+					 AccessExclusiveLock);
+
+	/* Drop the replication origin if it has not been dropped already */
+	if (replorigin_exists(roident))
 		replorigin_drop_guts(rel, roident, nowait);
 
-	/* We keep the lock on pg_replication_origin until commit */
 	table_close(rel, NoLock);
 }
 
-- 
2.34.1

Reply via email to