At Wed, 17 Jun 2020 10:17:07 +0900 (JST), Kyotaro Horiguchi 
<horikyota....@gmail.com> wrote in 
> At Tue, 16 Jun 2020 14:31:43 -0400, Alvaro Herrera <alvhe...@2ndquadrant.com> 
> wrote in 
> > On 2020-Jun-16, Kyotaro Horiguchi wrote:
> > 
> > > I noticed the another issue. If some required WALs are removed, the
> > > slot will be "invalidated", that is, restart_lsn is set to invalid
> > > value. As the result we hardly see the "lost" state.
> > > 
> > > It can be "fixed" by remembering the validity of a slot separately
> > > from restart_lsn. Is that worth doing?
> > 
> > We discussed this before.  I agree it would be better to do this
> > in some way, but I fear that if we do it naively, some code might exist
> > that reads the LSN without realizing that it needs to check the validity
> > flag first.
> 
> Yes, that was my main concern on it. That's error-prone. How about
> remembering the LSN where invalidation happened?  It's safe since no
> others than slot-monitoring functions would look
> last_invalidated_lsn. It can be reset if active_pid is a valid pid.
> 
> InvalidateObsoleteReplicationSlots:
>  ...
>               SpinLockAcquire(&s->mutex);
> +             s->data.last_invalidated_lsn = s->data.restart_lsn;
>               s->data.restart_lsn = InvalidXLogRecPtr;
>               SpinLockRelease(&s->mutex);

The attached does that (Poc).  No document fix included.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index d6fe205eb4..d3240d1e38 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9485,20 +9485,25 @@ CreateRestartPoint(int flags)
  *		(typically a slot's restart_lsn)
  *
  * Returns one of the following enum values:
- * * WALAVAIL_NORMAL means targetLSN is available because it is in the range
- *   of max_wal_size.
  *
- * * WALAVAIL_PRESERVED means it is still available by preserving extra
+ * * WALAVAIL_RESERVED means targetLSN is available and it is in the range of
+ *   max_wal_size.
+ *
+ * * WALAVAIL_EXTENDED means it is still available by preserving extra
  *   segments beyond max_wal_size. If max_slot_wal_keep_size is smaller
  *   than max_wal_size, this state is not returned.
  *
+ * * WALAVAIL_BEING_REMOVED means it is being lost. The walsender using this
+ *   slot may return to the above.
+ *
  * * WALAVAIL_REMOVED means it is definitely lost. A replication stream on
  *   a slot with this LSN cannot continue.
  *
  * * WALAVAIL_INVALID_LSN means the slot hasn't been set to reserve WAL.
  */
 WALAvailability
-GetWALAvailability(XLogRecPtr targetLSN)
+GetWALAvailability(XLogRecPtr targetLSN, XLogSegNo last_removed_seg,
+				   bool slot_is_active)
 {
 	XLogRecPtr	currpos;		/* current write LSN */
 	XLogSegNo	currSeg;		/* segid of currpos */
@@ -9509,7 +9514,11 @@ GetWALAvailability(XLogRecPtr targetLSN)
 													 * slot */
 	uint64		keepSegs;
 
-	/* slot does not reserve WAL. Either deactivated, or has never been active */
+	/*
+	 * slot does not reserve WAL. Either deactivated, or has never been active
+	 * The caller should have passed last_invalidated_lsn as targetLSN if the
+	 * slot has been invalidated.
+	 */
 	if (XLogRecPtrIsInvalid(targetLSN))
 		return WALAVAIL_INVALID_LSN;
 
@@ -9524,7 +9533,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
 	 * the first WAL segment file since startup, which causes the status being
 	 * wrong under certain abnormal conditions but that doesn't actually harm.
 	 */
-	oldestSeg = XLogGetLastRemovedSegno() + 1;
+	oldestSeg = last_removed_seg + 1;
 
 	/* calculate oldest segment by max_wal_size and wal_keep_segments */
 	XLByteToSeg(currpos, currSeg, wal_segment_size);
@@ -9544,20 +9553,21 @@ GetWALAvailability(XLogRecPtr targetLSN)
 	 */
 	if (targetSeg >= oldestSeg)
 	{
-		/*
-		 * show "normal" when targetSeg is within max_wal_size, even if
-		 * max_slot_wal_keep_size is smaller than max_wal_size.
-		 */
-		if ((max_slot_wal_keep_size_mb <= 0 ||
-			 max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
-			oldestSegMaxWalSize <= targetSeg)
-			return WALAVAIL_NORMAL;
-
-		/* being retained by slots */
-		if (oldestSlotSeg <= targetSeg)
+		/* show "reserved" when targetSeg is within max_wal_size */
+		if (oldestSegMaxWalSize <= targetSeg)
 			return WALAVAIL_RESERVED;
+
+		/* being retained by slots exceeding max_wal_size */
+		return WALAVAIL_EXTENDED;
 	}
 
+	/*
+	 * However segments required by the slot has been lost, if walsender is
+	 * active the walsender can read into the first reserved slot.
+	 */
+	if (slot_is_active)
+		return WALAVAIL_BEING_REMOVED;
+
 	/* Definitely lost */
 	return WALAVAIL_REMOVED;
 }
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc..f141b29d28 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -285,6 +285,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->candidate_xmin_lsn = InvalidXLogRecPtr;
 	slot->candidate_restart_valid = InvalidXLogRecPtr;
 	slot->candidate_restart_lsn = InvalidXLogRecPtr;
+	slot->last_invalidated_lsn = InvalidXLogRecPtr;
 
 	/*
 	 * Create the slot on disk.  We haven't actually marked the slot allocated
@@ -1144,6 +1145,7 @@ restart:
 						(uint32) restart_lsn)));
 
 		SpinLockAcquire(&s->mutex);
+		s->last_invalidated_lsn = s->data.restart_lsn;
 		s->data.restart_lsn = InvalidXLogRecPtr;
 		SpinLockRelease(&s->mutex);
 		ReplicationSlotRelease();
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 1b929a603e..ed0abe0c39 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -243,6 +243,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 	MemoryContext per_query_ctx;
 	MemoryContext oldcontext;
 	int			slotno;
+	XLogSegNo	last_removed_seg;
 
 	/* check to see if caller supports us returning a tuplestore */
 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -272,6 +273,14 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 	rsinfo->setResult = tupstore;
 	rsinfo->setDesc = tupdesc;
 
+	/*
+	 * Remember the last removed segment at this point for the consistency in
+	 * this table. Since there's no interlock between slot data and
+	 * checkpointer, the segment can be removed in-between, but that doesn't
+	 * make any practical difference.
+	 */
+	last_removed_seg = XLogGetLastRemovedSegno();
+
 	MemoryContextSwitchTo(oldcontext);
 
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -282,7 +291,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		Datum		values[PG_GET_REPLICATION_SLOTS_COLS];
 		bool		nulls[PG_GET_REPLICATION_SLOTS_COLS];
 		WALAvailability walstate;
-		XLogSegNo	last_removed_seg;
+		XLogRecPtr	targetLSN;
 		int			i;
 
 		if (!slot->in_use)
@@ -342,7 +351,14 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
-		walstate = GetWALAvailability(slot_contents.data.restart_lsn);
+		/* use last_invalidated_lsn when the slot is invalidated */
+		if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+			targetLSN = slot_contents.last_invalidated_lsn;
+		else
+			targetLSN = slot_contents.data.restart_lsn;
+
+		walstate = GetWALAvailability(targetLSN, last_removed_seg,
+									  slot_contents.active_pid != 0);
 
 		switch (walstate)
 		{
@@ -350,14 +366,18 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 				nulls[i++] = true;
 				break;
 
-			case WALAVAIL_NORMAL:
-				values[i++] = CStringGetTextDatum("normal");
-				break;
-
 			case WALAVAIL_RESERVED:
 				values[i++] = CStringGetTextDatum("reserved");
 				break;
 
+			case WALAVAIL_EXTENDED:
+				values[i++] = CStringGetTextDatum("extended");
+				break;
+
+			case WALAVAIL_BEING_REMOVED:
+				values[i++] = CStringGetTextDatum("being lost");
+				break;
+
 			case WALAVAIL_REMOVED:
 				values[i++] = CStringGetTextDatum("lost");
 				break;
@@ -367,8 +387,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		}
 
 		if (max_slot_wal_keep_size_mb >= 0 &&
-			(walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_RESERVED) &&
-			((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
+			(walstate == WALAVAIL_RESERVED || walstate == WALAVAIL_EXTENDED) &&
+			(last_removed_seg != 0))
 		{
 			XLogRecPtr	min_safe_lsn;
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index e917dfe92d..49d9578bc5 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -270,8 +270,9 @@ extern CheckpointStatsData CheckpointStats;
 typedef enum WALAvailability
 {
 	WALAVAIL_INVALID_LSN,		/* parameter error */
-	WALAVAIL_NORMAL,			/* WAL segment is within max_wal_size */
-	WALAVAIL_RESERVED,			/* WAL segment is reserved by a slot */
+	WALAVAIL_RESERVED,			/* WAL segment is within max_wal_size */
+	WALAVAIL_EXTENDED,			/* WAL segment is reserved by a slot */
+	WALAVAIL_BEING_REMOVED,		/* WAL segment is being removed */
 	WALAVAIL_REMOVED			/* WAL segment has been removed */
 } WALAvailability;
 
@@ -326,7 +327,9 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
-extern WALAvailability GetWALAvailability(XLogRecPtr restart_lsn);
+extern WALAvailability GetWALAvailability(XLogRecPtr targetLSN,
+										  XLogSegNo last_removed_seg,
+										  bool slot_is_active);
 extern XLogRecPtr CalculateMaxmumSafeLSN(void);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 917876010e..8090ca81fe 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -156,6 +156,9 @@ typedef struct ReplicationSlot
 	XLogRecPtr	candidate_xmin_lsn;
 	XLogRecPtr	candidate_restart_valid;
 	XLogRecPtr	candidate_restart_lsn;
+
+	/* restart_lsn is copied here when the slot is invalidated */
+	XLogRecPtr	last_invalidated_lsn;
 } ReplicationSlot;
 
 #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)

Reply via email to