On Mon, Jun 04, 2018 at 11:51:35AM +0200, Petr Jelinek wrote:
> On 01/06/18 21:13, Michael Paquier wrote:
>> -    startlsn =3D MyReplicationSlot->data.confirmed_flush;
>> +    if (OidIsValid(MyReplicationSlot->data.database))
>> +            startlsn =3D MyReplicationSlot->data.confirmed_flush;
>> +    else
>> +            startlsn =3D MyReplicationSlot->data.restart_lsn;
>> +
>>      if (moveto < startlsn)
>>      {
>>              ReplicationSlotRelease();
>
> This part looks correct for the checking that we are not moving
> backwards. However, there is another existing issue with this code
> which
> is that we are later using the confirmed_flush (via startlsn) as start
> point of logical decoding (XLogReadRecord parameter in
> pg_logical_replication_slot_advance) which is not correct. The
> restart_lsn should be used for that. I think it would make sense to
> fix
> that as part of this patch as well.

I am not sure I understand what you are coming at here.  Could you
explain your point a bit more please as 9c7d06d is yours?  When creating
the decoding context, all other code paths use the confirmed LSN as a
start point if not explicitely defined by the caller of
CreateDecodingContext, as it points to the last LSN where a transaction
has been committed and decoded.  The backward check is also correct to
me, for which I propose to add a comment block like that:
+   /*
+    * Check if the slot is not moving backwards.  Physical slots rely
+    * simply on restart_lsn as a minimum point, while logical slots
+    * have confirmed consumption up to confirmed_lsn, meaning that
+    * in both cases data older than that is not available anymore.
+    */
+   if (OidIsValid(MyReplicationSlot->data.database))
+       minlsn = MyReplicationSlot->data.confirmed_flush;
+   else
+       minlsn = MyReplicationSlot->data.restart_lsn;

Any tests I do are showing me that using confirmed_lsn would not matter
much?  as we want the slot's consumer to still decode transactions whose
commits happened after the point where the slot has been advanced to.
So let's make sure that we are on the same page for the starting
LSN used.

On top of that, the locking issues in CreateInitDecodingContext() and
DecodingContextFindStartpoint go back to 9.4.  So I would be inclined to
get 0001 applied first as a bug fix on all branches, still that's a
minor issue so there could be arguments for just doing it on HEAD.  I am
as well fully open to suggestions for the extra comments which document
the use of ReplicationSlotControlLock and mutex for in-memory slot data.
Any thoughts about those two last points?
--
Michael
From 0cd02e359cbf5b74c8231f6619bc479a314213bc Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Fri, 1 Jun 2018 14:30:55 -0400
Subject: [PATCH] Fix and document lock handling for in-memory replication slot
 data

While debugging issues on HEAD for the new slot forwarding feature of
Postgres 11, some monitoring of the code surrounding in-memory slot data
has proved that the lock handling may cause inconsistent data to be read
by read-only callers of slot functions, particularly
pg_get_replication_slots() which fetches data for the system view
pg_replication_slots.

The code paths involved in those problems concern logical decoding
initialization (down to 9.4) and WAL reservation for slots (new as of
10).

A set of comments documenting all the lock handlings, particularly the
dependency with LW locks for slots and the in_use flag as well as the
internal mutex lock is added, based on a suggested by Simon Riggs.

Backpatch down to 9.4, where WAL decoding has been introduced.

Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9g...@mail.gmail.com
---
 src/backend/replication/logical/logical.c | 13 +++++++++----
 src/backend/replication/slot.c            |  4 ++++
 src/include/replication/slot.h            | 13 +++++++++++++
 3 files changed, 26 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1393591538..61588d626f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -297,10 +297,12 @@ CreateInitDecodingContext(char *plugin,
 
 	xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
 
+	SpinLockAcquire(&slot->mutex);
 	slot->effective_catalog_xmin = xmin_horizon;
 	slot->data.catalog_xmin = xmin_horizon;
 	if (need_full_snapshot)
 		slot->effective_xmin = xmin_horizon;
+	SpinLockRelease(&slot->mutex);
 
 	ReplicationSlotsComputeRequiredXmin(true);
 
@@ -445,13 +447,14 @@ void
 DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 {
 	XLogRecPtr	startptr;
+	ReplicationSlot *slot = ctx->slot;
 
 	/* Initialize from where to start reading WAL. */
-	startptr = ctx->slot->data.restart_lsn;
+	startptr = slot->data.restart_lsn;
 
 	elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
-		 (uint32) (ctx->slot->data.restart_lsn >> 32),
-		 (uint32) ctx->slot->data.restart_lsn);
+		 (uint32) (slot->data.restart_lsn >> 32),
+		 (uint32) slot->data.restart_lsn);
 
 	/* Wait for a consistent starting point */
 	for (;;)
@@ -477,7 +480,9 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 		CHECK_FOR_INTERRUPTS();
 	}
 
-	ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	SpinLockAcquire(&slot->mutex);
+	slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+	SpinLockRelease(&slot->mutex);
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 056628fe8e..79d7a57d67 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,7 +1016,9 @@ ReplicationSlotReserveWal(void)
 			XLogRecPtr	flushptr;
 
 			/* start at current insert position */
+			SpinLockAcquire(&slot->mutex);
 			slot->data.restart_lsn = GetXLogInsertRecPtr();
+			SpinLockRelease(&slot->mutex);
 
 			/* make sure we have enough information to start */
 			flushptr = LogStandbySnapshot();
@@ -1026,7 +1028,9 @@ ReplicationSlotReserveWal(void)
 		}
 		else
 		{
+			SpinLockAcquire(&slot->mutex);
 			slot->data.restart_lsn = GetRedoRecPtr();
+			SpinLockRelease(&slot->mutex);
 		}
 
 		/* prevent WAL removal as fast as possible */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 76a88c6de7..2af2a14994 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -86,6 +86,19 @@ typedef struct ReplicationSlotPersistentData
 
 /*
  * Shared memory state of a single replication slot.
+ *
+ * The in-memory data of replication slots follows a locking model based
+ * on two linked concepts:
+ * - A replication slot's in_use is switched when added or discarded using
+ * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
+ * mode when updating the flag by the backend owning the slot and doing the
+ * operation, while readers (concurrent backends not owning the slot) need
+ * to hold it in shared mode when looking at replication slot data.
+ * - Individual fields are protected by mutex where only the backend owning
+ * the slot is authorized to update the fields from its own slot.  The
+ * backend owning the slot does not need to take this lock when reading its
+ * own fields, while concurrent backends not owning this slot should take the
+ * lock when reading this slot's data.
  */
 typedef struct ReplicationSlot
 {
-- 
2.17.0

Attachment: signature.asc
Description: PGP signature

Reply via email to