Attached is a patch to mark a logical replication slot as dirty if its
confirmed lsn is changed.

Aesthetically I'm not sure if it's better to do it per this patch and call
ReplicationSlotMarkDirty after we release the spinlock, add a new
ReplicationSlotMarkDirtyLocked() that skips the spinlock acquisition, or
add a bool is_locked arg to ReplicationSlotMarkDirty and update all call
sites. All will have the same result.

I've confirmed this works as expected as part of the failover slots test
suite but it'd be pretty seriously cumbersome to extract. If anyone feels
strongly about it I can add a test that shows that

- start master
- create slot
- do some stuff
- replay from slot
- fast-stop master
- start master
- replay from slot

doesn't see the same data a second time, but if we repeat it using
immediate stop it will see the same data when replaying again.

Also attached is another patch to amend the docs to reflect the fact that a
slot can actually replay the same change more than once. I avoided the
strong temptation to update other parts of the docs nearby.

Both these are fixes that should IMO be applied to 9.6.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From e2bda9fa26f9d92059490d2b9ea7c37ae45f81b1 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 16 Mar 2016 15:12:34 +0800
Subject: [PATCH] Dirty replication slots when confirm_lsn is changed

---
 src/backend/replication/logical/logical.c | 62 +++++++++++++++++++++----------
 1 file changed, 42 insertions(+), 20 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2c7b749..d3fb1a5 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -440,6 +440,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 	}
 
 	ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	ReplicationSlotMarkDirty();
 }
 
 /*
@@ -850,10 +851,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 	{
 		bool		updated_xmin = false;
 		bool		updated_restart = false;
+		bool		updated_confirm = false;
 
 		SpinLockAcquire(&MyReplicationSlot->mutex);
 
-		MyReplicationSlot->data.confirmed_flush = lsn;
+		if (MyReplicationSlot->data.confirmed_flush != lsn)
+		{
+			MyReplicationSlot->data.confirmed_flush = lsn;
+			updated_confirm = true;
+		}
 
 		/* if were past the location required for bumping xmin, do so */
 		if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
@@ -891,34 +897,50 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 
 		SpinLockRelease(&MyReplicationSlot->mutex);
 
-		/* first write new xmin to disk, so we know whats up after a crash */
-		if (updated_xmin || updated_restart)
+		if (updated_xmin || updated_restart || updated_confirm)
 		{
 			ReplicationSlotMarkDirty();
-			ReplicationSlotSave();
-			elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
-		}
 
-		/*
-		 * Now the new xmin is safely on disk, we can let the global value
-		 * advance. We do not take ProcArrayLock or similar since we only
-		 * advance xmin here and there's not much harm done by a concurrent
-		 * computation missing that.
-		 */
-		if (updated_xmin)
-		{
-			SpinLockAcquire(&MyReplicationSlot->mutex);
-			MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
-			SpinLockRelease(&MyReplicationSlot->mutex);
+			/*
+			 * first write new xmin to disk, so we know whats up
+			 * after a crash.
+			 */
+			if (updated_xmin || updated_restart)
+			{
+				ReplicationSlotSave();
+				elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
+			}
 
-			ReplicationSlotsUpdateRequiredXmin(false);
-			ReplicationSlotsUpdateRequiredLSN();
+			/*
+			 * Now the new xmin is safely on disk, we can let the global value
+			 * advance. We do not take ProcArrayLock or similar since we only
+			 * advance xmin here and there's not much harm done by a concurrent
+			 * computation missing that.
+			 */
+			if (updated_xmin)
+			{
+				SpinLockAcquire(&MyReplicationSlot->mutex);
+				MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
+				SpinLockRelease(&MyReplicationSlot->mutex);
+
+				ReplicationSlotsUpdateRequiredXmin(false);
+				ReplicationSlotsUpdateRequiredLSN();
+			}
 		}
 	}
 	else
 	{
+		bool dirtied = false;
+
 		SpinLockAcquire(&MyReplicationSlot->mutex);
-		MyReplicationSlot->data.confirmed_flush = lsn;
+		if (MyReplicationSlot->data.confirmed_flush != lsn)
+		{
+			MyReplicationSlot->data.confirmed_flush = lsn;
+			dirtied = true;
+		}
 		SpinLockRelease(&MyReplicationSlot->mutex);
+
+		if (dirtied)
+			ReplicationSlotMarkDirty();
 	}
 }
-- 
2.1.0

From 1f8d9319ef9c934c414cebf1f5223c3b3023bf7f Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 16 Mar 2016 15:45:16 +0800
Subject: [PATCH 2/2] Correct incorrect claim that slots output a change
 "exactly once"

Replication slots may actually emit a change more than once
if the master crashes before flushing the slot.

See
http://www.postgresql.org/message-id/camsr+ygsatrgqpcx9qx4eocizwsa27xjkeipsotaje8ofix...@mail.gmail.com
for details.
---
 doc/src/sgml/logicaldecoding.sgml | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index c7b43ed..a971918 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -12,7 +12,6 @@
 
   <para>
    Changes are sent out in streams identified by logical replication slots.
-   Each stream outputs each change exactly once.
   </para>
 
   <para>
@@ -204,8 +203,7 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
      In the context of logical replication, a slot represents a stream of
      changes that can be replayed to a client in the order they were made on
      the origin server. Each slot streams a sequence of changes from a single
-     database, sending each change exactly once (except when peeking forward
-     in the stream).
+     database.
     </para>
 
     <note>
@@ -218,7 +216,17 @@ $ pg_recvlogical -d postgres --slot test --drop-slot
     <para>
      A replication slot has an identifier that is unique across all databases
      in a <productname>PostgreSQL</productname> cluster. Slots persist
-     independently of the connection using them and are crash-safe.
+     independently of the connection using them. Slot creation and drop is
+     crash-safe, and slots will never be corrupted by a crash.
+    </para>
+
+    <para>
+     A logical slot outputs each database change at least once. A slot will
+     usually only emit a change once, but recently-sent changes may be sent
+     again if the server server crashes and restarts. Clients should remember
+     the last LSN they saw when decoding and skip over any repeated data or
+     (when using the replication protocol) request that decoding start from
+     that LSN rather than letting the server determine the start point.
     </para>
 
     <para>
-- 
2.1.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to