On Thu, Aug 17, 2017 at 2:19 AM, Craig Ringer <cr...@2ndquadrant.com> wrote:

> On 17 August 2017 at 07:30, Michael Paquier <michael.paqu...@gmail.com>
> wrote:
>
>>
>> Definitely agreed on that. Any move function would need to check if
>> the WAL position given by caller is already newer than what's
>> available in the local pg_wal (minimum of all other slots), with a
>> shared lock that would need to be taken by xlog.c when recycling past
>> segments. A forward function works on a single entry, which should be
>> disabled at the moment of the update. It looks dangerous to me to do
>> such an operation if there is a consumer of a slot currently on it.
>>
>>
> pg_advance_replication_slot(...)
>
> ERROR's on logical slot, for now. Physical slots only.
>
> Forward-only.
>
> Future work to allow it to use the logical decoding infrastructure to
> fast-forward a slot by reading only catalog change information and
> invalidations, either via a dummy output plugin that filters out all xacts,
> or by lower level use of the decoding code.
>
> Reasonable?
>
>
PFA an updated and rebased patch.

Rebased. Now named pg_advance_replication_slot. ERROR on logical slots.
Forward only.

I think that, in the end, covered all the comments?

-- 
 Magnus Hagander
 Me: https://www.hagander.net/ <http://www.hagander.net/>
 Work: https://www.redpill-linpro.com/ <http://www.redpill-linpro.com/>
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 641b3b8f4e..452559f260 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -19030,6 +19030,24 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
         be called when connected to the same database the slot was created on.
        </entry>
       </row>
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_advance_replication_slot</primary>
+        </indexterm>
+        <literal><function>pg_advance_replication_slot(<parameter>slot_name</parameter> <type>name</type>, <parameter>position</parameter> <type>pg_lsn</type>)</function></literal>
+       </entry>
+       <entry>
+        <type>bool</type>
+       </entry>
+       <entry>
+        Advances the current restart position of a physical
+        replication slot named <parameter>slot_name</parameter>.
+        The slot will not be moved backwards, and it will not be
+        moved beyond the current insert location. Logical replication
+        slots cannot be moved. Returns true if the position was changed.
+       </entry>
+      </row>
 
       <row>
        <entry>
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d4cbd83bde..0e27e739bf 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -177,6 +177,73 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 }
 
 /*
+ * SQL function for moving the position in a replication slot.
+ */
+Datum
+pg_advance_replication_slot(PG_FUNCTION_ARGS)
+{
+	Name		slotname = PG_GETARG_NAME(0);
+	XLogRecPtr	moveto = PG_GETARG_LSN(1);
+	bool		changed = false;
+	bool 		backwards = false;
+
+	Assert(!MyReplicationSlot);
+
+	check_permissions();
+
+	if (XLogRecPtrIsInvalid(moveto))
+		ereport(ERROR,
+				(errmsg("Invalid target xlog position ")));
+
+	/* Temporarily acquire the slot so we "own" it */
+	ReplicationSlotAcquire(NameStr(*slotname), true);
+
+	/* Only physical slots can be moved */
+	if (MyReplicationSlot->data.database != InvalidOid)
+	{
+		ReplicationSlotRelease();
+		ereport(ERROR,
+				(errmsg("Only physical replication slots can be advanced.")));
+	}
+
+	if (moveto > GetXLogWriteRecPtr())
+		/* Can't move past current position, so truncate there */
+		moveto = GetXLogWriteRecPtr();
+
+	/* Now adjust it */
+	SpinLockAcquire(&MyReplicationSlot->mutex);
+	if (MyReplicationSlot->data.restart_lsn != moveto)
+	{
+		/* Never move backwards, because bad things can happen */
+		if (MyReplicationSlot->data.restart_lsn > moveto)
+			backwards = true;
+		else
+		{
+			MyReplicationSlot->data.restart_lsn = moveto;
+			changed = true;
+		}
+	}
+	SpinLockRelease(&MyReplicationSlot->mutex);
+
+	if (backwards)
+		ereport(WARNING,
+				(errmsg("Not moving replication slot backwards!")));
+
+
+	if (changed)
+	{
+		ReplicationSlotMarkDirty();
+		ReplicationSlotsComputeRequiredLSN();
+		ReplicationSlotSave();
+	}
+
+	ReplicationSlotRelease();
+
+	PG_RETURN_BOOL(changed);
+}
+
+
+/*
  * pg_get_replication_slots - SQL SRF showing active replication slots.
  */
 Datum
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 8b33b4e0ea..3495447cf9 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5293,6 +5293,8 @@ DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
+DATA(insert OID = 3998 (  pg_advance_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 2 0 16 "19 3220" _null_ _null_ _null_ _null_ _null_ pg_advance_replication_slot _null_ _null_ _null_ ));
+DESCR("move a replication slot position");
 DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
 DESCR("information about replication slots currently in use");
 DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to