The attached patch adds a 'pid' column to pg_replication_slots, so it's possible to associate an active slot with the pg_stat_replication entry that describes the walsender using the slot.
If a user backend (or bgworker) is using the slot over the SQL interface, the 'pid' column will correspond to the pg_stat_activity entry for that backend instead. After all, both it and pg_stat_replication are views over pg_stat_get_activity() anyway. Detailed rationale in patch. Please consider this for 9.5. It's a pretty light patch, and it'd be good to have it in place to ease monitoring of slot-based replication. Note that logical replication walsenders are broken in HEAD so testing this using the test_decoding module and pg_recvlogical will crash the walsender. This is a pre-existing bug; see http://www.postgresql.org/message-id/CAB7nPqQSdx7coHk0D6G=mkjntgyjxpdw+pwiskkssaezfts...@mail.gmail.com and http://www.postgresql.org/message-id/CAMsr+YEh50r70+hP+w=rCzEuenoQRCNMDA7PmRSK06Ro9r=9...@mail.gmail.com ) -- Craig Ringer http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From eabd1a1a66045703d7561b3c8883e90756582206 Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Wed, 1 Apr 2015 17:18:39 +0800 Subject: [PATCH] Add a 'pid' column to pg_replication_slots This makes it easier to associate pg_stat_replication and pg_replication_slots entries. Right now an active replication slot could be being read by a walsender (using the replication protocol) or via normal user backend (using the SQL interface). The 'active' flag shows that it's in use, but not by whom. pg_stat_replication and pg_stat_activity both expose the relevant backend pid. Neither expose information about any replication slots being used. This patch adds a 'pid' column to pg_replication_slots, which is NULL when the slot isn't active, otherwise the PID of the backend that's using the slot. It makes the 'active' field wholly redundant, but that field is retained for backwards compatibility at the SQL level. This makes it possible to find out which walsenders or normal backends are using a slot, something that was not previously possible from the SQL level. This is particularly important for monitoring of replication lag. --- contrib/test_decoding/expected/ddl.out | 4 ++-- doc/src/sgml/catalogs.sgml | 13 +++++++++++++ src/backend/catalog/system_views.sql | 1 + src/backend/replication/slot.c | 22 +++++++++++----------- src/backend/replication/slotfuncs.c | 13 +++++++++---- src/include/catalog/pg_proc.h | 2 +- src/include/replication/slot.h | 6 ++++-- src/test/regress/expected/rules.out | 3 ++- 8 files changed, 43 insertions(+), 21 deletions(-) diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out index 780120d..80cf09b 100644 --- a/contrib/test_decoding/expected/ddl.out +++ b/contrib/test_decoding/expected/ddl.out @@ -603,7 +603,7 @@ SELECT pg_drop_replication_slot('regression_slot'); /* check that the slot is gone */ SELECT * FROM pg_replication_slots; - slot_name | plugin | slot_type | datoid | database | active | xmin | catalog_xmin | restart_lsn ------------+--------+-----------+--------+----------+--------+------+--------------+------------- + slot_name | plugin | slot_type | datoid | database | active | pid | xmin | catalog_xmin | restart_lsn +-----------+--------+-----------+--------+----------+--------+-----+------+--------------+------------- (0 rows) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index d0b78f2..9945d1f 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -5401,6 +5401,19 @@ </row> <row> + <entry><structfield>pid</structfield></entry> + <entry><type>integer</type></entry> + <entry></entry> + <entry>The process ID of the session or WALsender using this slot if the + slot is currently actively being used. <literal>NULL</literal> if + inactive. Corresponds to + <structname>pg_stat_activity</structname>.<structfield>pid</structfield> + (for normal backends) or + <structname>pg_stat_replication</structname>.<structfield>pid</structfield> + (for WALsenders). (Since: 9.5)</entry> + </row> + + <row> <entry><structfield>xmin</structfield></entry> <entry><type>xid</type></entry> <entry></entry> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 2800f73..5977075 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -654,6 +654,7 @@ CREATE VIEW pg_replication_slots AS L.datoid, D.datname AS database, L.active, + L.pid, L.xmin, L.catalog_xmin, L.restart_lsn diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index dd7ff0f..1ed0cd9 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -262,7 +262,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, * be doing that. So it's safe to initialize the slot. */ Assert(!slot->in_use); - Assert(!slot->active); + Assert(slot->active_pid == 0); slot->data.persistency = persistency; slot->data.xmin = InvalidTransactionId; slot->effective_xmin = InvalidTransactionId; @@ -291,8 +291,8 @@ ReplicationSlotCreate(const char *name, bool db_specific, volatile ReplicationSlot *vslot = slot; SpinLockAcquire(&slot->mutex); - Assert(!vslot->active); - vslot->active = true; + Assert(vslot->active_pid == 0); + vslot->active_pid = MyProcPid; SpinLockRelease(&slot->mutex); MyReplicationSlot = slot; } @@ -331,8 +331,8 @@ ReplicationSlotAcquire(const char *name) volatile ReplicationSlot *vslot = s; SpinLockAcquire(&s->mutex); - active = vslot->active; - vslot->active = true; + active = vslot->active_pid != 0; + vslot->active_pid = MyProcPid; SpinLockRelease(&s->mutex); slot = s; break; @@ -363,7 +363,7 @@ ReplicationSlotRelease(void) { ReplicationSlot *slot = MyReplicationSlot; - Assert(slot != NULL && slot->active); + Assert(slot != NULL && slot->active_pid != 0); if (slot->data.persistency == RS_EPHEMERAL) { @@ -380,7 +380,7 @@ ReplicationSlotRelease(void) volatile ReplicationSlot *vslot = slot; SpinLockAcquire(&slot->mutex); - vslot->active = false; + vslot->active_pid = 0; SpinLockRelease(&slot->mutex); } @@ -460,7 +460,7 @@ ReplicationSlotDropAcquired(void) bool fail_softly = slot->data.persistency == RS_EPHEMERAL; SpinLockAcquire(&slot->mutex); - vslot->active = false; + vslot->active_pid = 0; SpinLockRelease(&slot->mutex); ereport(fail_softly ? WARNING : ERROR, @@ -477,7 +477,7 @@ ReplicationSlotDropAcquired(void) * scanning the array. */ LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); - slot->active = false; + slot->active_pid = 0; slot->in_use = false; LWLockRelease(ReplicationSlotControlLock); @@ -749,7 +749,7 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive) /* count slots with spinlock held */ SpinLockAcquire(&s->mutex); (*nslots)++; - if (s->active) + if (s->active_pid != 0) (*nactive)++; SpinLockRelease(&s->mutex); } @@ -1227,7 +1227,7 @@ RestoreSlotFromDisk(const char *name) slot->candidate_restart_valid = InvalidXLogRecPtr; slot->in_use = true; - slot->active = false; + slot->active_pid = 0; restored = true; break; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index f31925d..bda70d7 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -158,7 +158,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 8 +#define PG_GET_REPLICATION_SLOTS_COLS 9 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -206,7 +206,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) TransactionId xmin; TransactionId catalog_xmin; XLogRecPtr restart_lsn; - bool active; + pid_t active_pid; Oid database; NameData slot_name; NameData plugin; @@ -227,7 +227,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) namecpy(&slot_name, &slot->data.name); namecpy(&plugin, &slot->data.plugin); - active = slot->active; + active_pid = slot->active_pid != 0; } SpinLockRelease(&slot->mutex); @@ -251,7 +251,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else values[i++] = database; - values[i++] = BoolGetDatum(active); + values[i++] = BoolGetDatum(active_pid != 0); + + if (active_pid != 0) + values[i++] = Int32GetDatum(active_pid); + else + nulls[i++] = true; if (xmin != InvalidTransactionId) values[i++] = TransactionIdGetDatum(xmin); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 8890ade..5242be7 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5104,7 +5104,7 @@ DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 DESCR("create a physical replication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); -DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,28,28,3220}" "{o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ )); +DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s 0 0 2249 "" "{19,19,25,26,16,23,28,28,3220}" "{o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,active,pid,xmin,catalog_xmin,restart_lsn}" _null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in use"); DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 19" "{19,19,25,3220}" "{i,i,o,o}" "{slot_name,plugin,slot_name,xlog_position}" _null_ pg_create_logical_replication_slot _null_ _null_ _null_ )); DESCR("set up a logical replication slot"); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a400136..e81432e 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -84,13 +84,15 @@ typedef struct ReplicationSlot /* is this slot defined */ bool in_use; - /* is somebody streaming out changes for this slot */ - bool active; + /* field 'active' removed in 9.5; see 'active_pid' instead */ /* any outstanding modifications? */ bool just_dirtied; bool dirty; + /* Who is streaming out changes for this slot? 0 for nobody */ + pid_t active_pid; + /* * For logical decoding, it's extremely important that we never remove any * data that's still needed for decoding purposes, even after a crash; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 1788270..82e4532 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1396,10 +1396,11 @@ pg_replication_slots| SELECT l.slot_name, l.datoid, d.datname AS database, l.active, + l.pid, l.xmin, l.catalog_xmin, l.restart_lsn - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, xmin, catalog_xmin, restart_lsn) + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, active, pid, xmin, catalog_xmin, restart_lsn) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.1.0
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers