The attached patch adds a 'pid' column to pg_replication_slots, so it's
possible to associate an active slot with the pg_stat_replication entry
that describes the walsender using the slot.

If a user backend (or bgworker) is using the slot over the SQL interface,
the 'pid' column will correspond to the pg_stat_activity entry for that
backend instead. After all, both it and pg_stat_replication are views
over pg_stat_get_activity() anyway.

Detailed rationale in patch.

Please consider this for 9.5. It's a pretty light patch, and it'd be good
to have it in place to ease monitoring of slot-based replication.

Note that logical replication walsenders are broken in HEAD so testing this
using the test_decoding module and pg_recvlogical will crash the walsender.
This is a pre-existing bug; see
http://www.postgresql.org/message-id/CAB7nPqQSdx7coHk0D6G=mkjntgyjxpdw+pwiskkssaezfts...@mail.gmail.com
and
http://www.postgresql.org/message-id/CAMsr+YEh50r70+hP+w=rCzEuenoQRCNMDA7PmRSK06Ro9r=9...@mail.gmail.com
)

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From eabd1a1a66045703d7561b3c8883e90756582206 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 1 Apr 2015 17:18:39 +0800
Subject: [PATCH] Add a 'pid' column to pg_replication_slots

This makes it easier to associate pg_stat_replication and
pg_replication_slots entries.

Right now an active replication slot could be being read by a walsender
(using the replication protocol) or via normal user backend (using the
SQL interface). The 'active' flag shows that it's in use, but not
by whom.

pg_stat_replication and pg_stat_activity both expose the relevant
backend pid. Neither expose information about any replication slots
being used.

This patch adds a 'pid' column to pg_replication_slots, which is NULL
when the slot isn't active, otherwise the PID of the backend that's
using the slot. It makes the 'active' field wholly redundant, but that
field is retained for backwards compatibility at the SQL level.

This makes it possible to find out which walsenders or normal backends
are using a slot, something that was not previously possible from the
SQL level. This is particularly important for monitoring of replication
lag.
---
 contrib/test_decoding/expected/ddl.out |  4 ++--
 doc/src/sgml/catalogs.sgml             | 13 +++++++++++++
 src/backend/catalog/system_views.sql   |  1 +
 src/backend/replication/slot.c         | 22 +++++++++++-----------
 src/backend/replication/slotfuncs.c    | 13 +++++++++----
 src/include/catalog/pg_proc.h          |  2 +-
 src/include/replication/slot.h         |  6 ++++--
 src/test/regress/expected/rules.out    |  3 ++-
 8 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 780120d..80cf09b 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -603,7 +603,7 @@ SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
 SELECT * FROM pg_replication_slots;
- slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn 
------------+--------+-----------+--------+----------+--------+------+--------------+-------------
+ slot_name | plugin | slot_type | datoid | database | active | pid | xmin | catalog_xmin | restart_lsn 
+-----------+--------+-----------+--------+----------+--------+-----+------+--------------+-------------
 (0 rows)
 
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index d0b78f2..9945d1f 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -5401,6 +5401,19 @@
      </row>
 
      <row>
+      <entry><structfield>pid</structfield></entry>
+      <entry><type>integer</type></entry>
+      <entry></entry>
+      <entry>The process ID of the session or WALsender using this slot if the
+      slot is currently actively being used.  <literal>NULL</literal> if
+      inactive. Corresponds to
+      <structname>pg_stat_activity</structname>.<structfield>pid</structfield>
+      (for normal backends) or
+      <structname>pg_stat_replication</structname>.<structfield>pid</structfield>
+      (for WALsenders). (Since: 9.5)</entry>
+     </row>
+
+     <row>
       <entry><structfield>xmin</structfield></entry>
       <entry><type>xid</type></entry>
       <entry></entry>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 2800f73..5977075 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -654,6 +654,7 @@ CREATE VIEW pg_replication_slots AS
             L.datoid,
             D.datname AS database,
             L.active,
+            L.pid,
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index dd7ff0f..1ed0cd9 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -262,7 +262,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	 * be doing that.  So it's safe to initialize the slot.
 	 */
 	Assert(!slot->in_use);
-	Assert(!slot->active);
+	Assert(slot->active_pid == 0);
 	slot->data.persistency = persistency;
 	slot->data.xmin = InvalidTransactionId;
 	slot->effective_xmin = InvalidTransactionId;
@@ -291,8 +291,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 		volatile ReplicationSlot *vslot = slot;
 
 		SpinLockAcquire(&slot->mutex);
-		Assert(!vslot->active);
-		vslot->active = true;
+		Assert(vslot->active_pid == 0);
+		vslot->active_pid = MyProcPid;
 		SpinLockRelease(&slot->mutex);
 		MyReplicationSlot = slot;
 	}
@@ -331,8 +331,8 @@ ReplicationSlotAcquire(const char *name)
 			volatile ReplicationSlot *vslot = s;
 
 			SpinLockAcquire(&s->mutex);
-			active = vslot->active;
-			vslot->active = true;
+			active = vslot->active_pid != 0;
+			vslot->active_pid = MyProcPid;
 			SpinLockRelease(&s->mutex);
 			slot = s;
 			break;
@@ -363,7 +363,7 @@ ReplicationSlotRelease(void)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
 
-	Assert(slot != NULL && slot->active);
+	Assert(slot != NULL && slot->active_pid != 0);
 
 	if (slot->data.persistency == RS_EPHEMERAL)
 	{
@@ -380,7 +380,7 @@ ReplicationSlotRelease(void)
 		volatile ReplicationSlot *vslot = slot;
 
 		SpinLockAcquire(&slot->mutex);
-		vslot->active = false;
+		vslot->active_pid = 0;
 		SpinLockRelease(&slot->mutex);
 	}
 
@@ -460,7 +460,7 @@ ReplicationSlotDropAcquired(void)
 		bool		fail_softly = slot->data.persistency == RS_EPHEMERAL;
 
 		SpinLockAcquire(&slot->mutex);
-		vslot->active = false;
+		vslot->active_pid = 0;
 		SpinLockRelease(&slot->mutex);
 
 		ereport(fail_softly ? WARNING : ERROR,
@@ -477,7 +477,7 @@ ReplicationSlotDropAcquired(void)
 	 * scanning the array.
 	 */
 	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
-	slot->active = false;
+	slot->active_pid = 0;
 	slot->in_use = false;
 	LWLockRelease(ReplicationSlotControlLock);
 
@@ -749,7 +749,7 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
 		/* count slots with spinlock held */
 		SpinLockAcquire(&s->mutex);
 		(*nslots)++;
-		if (s->active)
+		if (s->active_pid != 0)
 			(*nactive)++;
 		SpinLockRelease(&s->mutex);
 	}
@@ -1227,7 +1227,7 @@ RestoreSlotFromDisk(const char *name)
 		slot->candidate_restart_valid = InvalidXLogRecPtr;
 
 		slot->in_use = true;
-		slot->active = false;
+		slot->active_pid = 0;
 
 		restored = true;
 		break;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index f31925d..bda70d7 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -158,7 +158,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 8
+#define PG_GET_REPLICATION_SLOTS_COLS 9
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -206,7 +206,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		TransactionId xmin;
 		TransactionId catalog_xmin;
 		XLogRecPtr	restart_lsn;
-		bool		active;
+		pid_t		active_pid;
 		Oid			database;
 		NameData	slot_name;
 		NameData	plugin;
@@ -227,7 +227,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 			namecpy(&slot_name, &slot->data.name);
 			namecpy(&plugin, &slot->data.plugin);
 
-			active = slot->active;
+			active_pid = slot->active_pid != 0;
 		}
 		SpinLockRelease(&slot->mutex);
 
@@ -251,7 +251,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			values[i++] = database;
 
-		values[i++] = BoolGetDatum(active);
+		values[i++] = BoolGetDatum(active_pid != 0);
+
+		if (active_pid != 0)
+			values[i++] = Int32GetDatum(active_pid);
+		else
+			nulls[i++] = true;
 
 		if (xmin != InvalidTransactionId)
 			values[i++] = TransactionIdGetDatum(xmin);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 8890ade..5242be7 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5104,7 +5104,7 @@ DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
-DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,28,28,3220}" "{o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220}" "{o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,pid,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ ));
 DESCR("information about replication slots currently in use");
 DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
 DESCR("set up a logical replication slot");
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a400136..e81432e 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -84,13 +84,15 @@ typedef struct ReplicationSlot
 	/* is this slot defined */
 	bool		in_use;
 
-	/* is somebody streaming out changes for this slot */
-	bool		active;
+    /* field 'active' removed in 9.5; see 'active_pid' instead */
 
 	/* any outstanding modifications? */
 	bool		just_dirtied;
 	bool		dirty;
 
+	/* Who is streaming out changes for this slot? 0 for nobody */
+	pid_t		active_pid;
+
 	/*
 	 * For logical decoding, it's extremely important that we never remove any
 	 * data that's still needed for decoding purposes, even after a crash;
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 1788270..82e4532 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1396,10 +1396,11 @@ pg_replication_slots| SELECT l.slot_name,
     l.datoid,
     d.datname AS database,
     l.active,
+    l.pid,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, xmin, catalog_xmin, restart_lsn)
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, pid, xmin, catalog_xmin, restart_lsn)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.1.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to