On Thu, Aug 17, 2017 at 2:19 AM, Craig Ringer <cr...@2ndquadrant.com> wrote:
> On 17 August 2017 at 07:30, Michael Paquier <michael.paqu...@gmail.com> > wrote: > >> >> Definitely agreed on that. Any move function would need to check if >> the WAL position given by caller is already newer than what's >> available in the local pg_wal (minimum of all other slots), with a >> shared lock that would need to be taken by xlog.c when recycling past >> segments. A forward function works on a single entry, which should be >> disabled at the moment of the update. It looks dangerous to me to do >> such an operation if there is a consumer of a slot currently on it. >> >> > pg_advance_replication_slot(...) > > ERROR's on logical slot, for now. Physical slots only. > > Forward-only. > > Future work to allow it to use the logical decoding infrastructure to > fast-forward a slot by reading only catalog change information and > invalidations, either via a dummy output plugin that filters out all xacts, > or by lower level use of the decoding code. > > Reasonable? > > PFA an updated and rebased patch. Rebased. Now named pg_advance_replication_slot. ERROR on logical slots. Forward only. I think that, in the end, covered all the comments? -- Magnus Hagander Me: https://www.hagander.net/ <http://www.hagander.net/> Work: https://www.redpill-linpro.com/ <http://www.redpill-linpro.com/>
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 641b3b8f4e..452559f260 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -19030,6 +19030,24 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup()); be called when connected to the same database the slot was created on. </entry> </row> + <row> + <entry> + <indexterm> + <primary>pg_advance_replication_slot</primary> + </indexterm> + <literal><function>pg_advance_replication_slot(<parameter>slot_name</parameter> <type>name</type>, <parameter>position</parameter> <type>pg_lsn</type>)</function></literal> + </entry> + <entry> + <type>bool</type> + </entry> + <entry> + Advances the current restart position of a physical + replication slot named <parameter>slot_name</parameter>. + The slot will not be moved backwards, and it will not be + moved beyond the current insert location. Logical replication + slots cannot be moved. Returns true if the position was changed. + </entry> + </row> <row> <entry> diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d4cbd83bde..0e27e739bf 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -177,6 +177,73 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) } /* + * SQL function for moving the position in a replication slot. + */ +Datum +pg_advance_replication_slot(PG_FUNCTION_ARGS) +{ + Name slotname = PG_GETARG_NAME(0); + XLogRecPtr moveto = PG_GETARG_LSN(1); + bool changed = false; + bool backwards = false; + + Assert(!MyReplicationSlot); + + check_permissions(); + + if (XLogRecPtrIsInvalid(moveto)) + ereport(ERROR, + (errmsg("Invalid target xlog position "))); + + /* Temporarily acquire the slot so we "own" it */ + ReplicationSlotAcquire(NameStr(*slotname), true); + + /* Only physical slots can be moved */ + if (MyReplicationSlot->data.database != InvalidOid) + { + ReplicationSlotRelease(); + ereport(ERROR, + (errmsg("Only physical replication slots can be advanced."))); + } + + if (moveto > GetXLogWriteRecPtr()) + /* Can't move past current position, so truncate there */ + moveto = GetXLogWriteRecPtr(); + + /* Now adjust it */ + SpinLockAcquire(&MyReplicationSlot->mutex); + if (MyReplicationSlot->data.restart_lsn != moveto) + { + /* Never move backwards, because bad things can happen */ + if (MyReplicationSlot->data.restart_lsn > moveto) + backwards = true; + else + { + MyReplicationSlot->data.restart_lsn = moveto; + changed = true; + } + } + SpinLockRelease(&MyReplicationSlot->mutex); + + if (backwards) + ereport(WARNING, + (errmsg("Not moving replication slot backwards!"))); + + + if (changed) + { + ReplicationSlotMarkDirty(); + ReplicationSlotsComputeRequiredLSN(); + ReplicationSlotSave(); + } + + ReplicationSlotRelease(); + + PG_RETURN_BOOL(changed); +} + + +/* * pg_get_replication_slots - SQL SRF showing active replication slots. */ Datum diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 8b33b4e0ea..3495447cf9 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5293,6 +5293,8 @@ DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 DESCR("create a physical replication slot"); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DESCR("drop a replication slot"); +DATA(insert OID = 3998 ( pg_advance_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 16 "19 3220" _null_ _null_ _null_ _null_ _null_ pg_advance_replication_slot _null_ _null_ _null_ )); +DESCR("move a replication slot position"); DATA(insert OID = 3781 ( pg_get_replication_slots PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ )); DESCR("information about replication slots currently in use"); DATA(insert OID = 3786 ( pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers