At Wed, 17 Jun 2020 10:17:07 +0900 (JST), Kyotaro Horiguchi
<[email protected]> wrote in
> At Tue, 16 Jun 2020 14:31:43 -0400, Alvaro Herrera <[email protected]>
> wrote in
> > On 2020-Jun-16, Kyotaro Horiguchi wrote:
> >
> > > I noticed the another issue. If some required WALs are removed, the
> > > slot will be "invalidated", that is, restart_lsn is set to invalid
> > > value. As the result we hardly see the "lost" state.
> > >
> > > It can be "fixed" by remembering the validity of a slot separately
> > > from restart_lsn. Is that worth doing?
> >
> > We discussed this before. I agree it would be better to do this
> > in some way, but I fear that if we do it naively, some code might exist
> > that reads the LSN without realizing that it needs to check the validity
> > flag first.
>
> Yes, that was my main concern on it. That's error-prone. How about
> remembering the LSN where invalidation happened? It's safe since no
> others than slot-monitoring functions would look
> last_invalidated_lsn. It can be reset if active_pid is a valid pid.
>
> InvalidateObsoleteReplicationSlots:
> ...
> SpinLockAcquire(&s->mutex);
> + s->data.last_invalidated_lsn = s->data.restart_lsn;
> s->data.restart_lsn = InvalidXLogRecPtr;
> SpinLockRelease(&s->mutex);
The attached does that (Poc). No document fix included.
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index d6fe205eb4..d3240d1e38 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9485,20 +9485,25 @@ CreateRestartPoint(int flags)
* (typically a slot's restart_lsn)
*
* Returns one of the following enum values:
- * * WALAVAIL_NORMAL means targetLSN is available because it is in the range
- * of max_wal_size.
*
- * * WALAVAIL_PRESERVED means it is still available by preserving extra
+ * * WALAVAIL_RESERVED means targetLSN is available and it is in the range of
+ * max_wal_size.
+ *
+ * * WALAVAIL_EXTENDED means it is still available by preserving extra
* segments beyond max_wal_size. If max_slot_wal_keep_size is smaller
* than max_wal_size, this state is not returned.
*
+ * * WALAVAIL_BEING_REMOVED means it is being lost. The walsender using this
+ * slot may return to the above.
+ *
* * WALAVAIL_REMOVED means it is definitely lost. A replication stream on
* a slot with this LSN cannot continue.
*
* * WALAVAIL_INVALID_LSN means the slot hasn't been set to reserve WAL.
*/
WALAvailability
-GetWALAvailability(XLogRecPtr targetLSN)
+GetWALAvailability(XLogRecPtr targetLSN, XLogSegNo last_removed_seg,
+ bool slot_is_active)
{
XLogRecPtr currpos; /* current write LSN */
XLogSegNo currSeg; /* segid of currpos */
@@ -9509,7 +9514,11 @@ GetWALAvailability(XLogRecPtr targetLSN)
* slot */
uint64 keepSegs;
- /* slot does not reserve WAL. Either deactivated, or has never been active */
+ /*
+ * slot does not reserve WAL. Either deactivated, or has never been active
+ * The caller should have passed last_invalidated_lsn as targetLSN if the
+ * slot has been invalidated.
+ */
if (XLogRecPtrIsInvalid(targetLSN))
return WALAVAIL_INVALID_LSN;
@@ -9524,7 +9533,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
* the first WAL segment file since startup, which causes the status being
* wrong under certain abnormal conditions but that doesn't actually harm.
*/
- oldestSeg = XLogGetLastRemovedSegno() + 1;
+ oldestSeg = last_removed_seg + 1;
/* calculate oldest segment by max_wal_size and wal_keep_segments */
XLByteToSeg(currpos, currSeg, wal_segment_size);
@@ -9544,20 +9553,21 @@ GetWALAvailability(XLogRecPtr targetLSN)
*/
if (targetSeg >= oldestSeg)
{
- /*
- * show "normal" when targetSeg is within max_wal_size, even if
- * max_slot_wal_keep_size is smaller than max_wal_size.
- */
- if ((max_slot_wal_keep_size_mb <= 0 ||
- max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
- oldestSegMaxWalSize <= targetSeg)
- return WALAVAIL_NORMAL;
-
- /* being retained by slots */
- if (oldestSlotSeg <= targetSeg)
+ /* show "reserved" when targetSeg is within max_wal_size */
+ if (oldestSegMaxWalSize <= targetSeg)
return WALAVAIL_RESERVED;
+
+ /* being retained by slots exceeding max_wal_size */
+ return WALAVAIL_EXTENDED;
}
+ /*
+ * However segments required by the slot has been lost, if walsender is
+ * active the walsender can read into the first reserved slot.
+ */
+ if (slot_is_active)
+ return WALAVAIL_BEING_REMOVED;
+
/* Definitely lost */
return WALAVAIL_REMOVED;
}
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc..f141b29d28 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -285,6 +285,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot->candidate_xmin_lsn = InvalidXLogRecPtr;
slot->candidate_restart_valid = InvalidXLogRecPtr;
slot->candidate_restart_lsn = InvalidXLogRecPtr;
+ slot->last_invalidated_lsn = InvalidXLogRecPtr;
/*
* Create the slot on disk. We haven't actually marked the slot allocated
@@ -1144,6 +1145,7 @@ restart:
(uint32) restart_lsn)));
SpinLockAcquire(&s->mutex);
+ s->last_invalidated_lsn = s->data.restart_lsn;
s->data.restart_lsn = InvalidXLogRecPtr;
SpinLockRelease(&s->mutex);
ReplicationSlotRelease();
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 1b929a603e..ed0abe0c39 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -243,6 +243,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
MemoryContext per_query_ctx;
MemoryContext oldcontext;
int slotno;
+ XLogSegNo last_removed_seg;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
@@ -272,6 +273,14 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
+ /*
+ * Remember the last removed segment at this point for the consistency in
+ * this table. Since there's no interlock between slot data and
+ * checkpointer, the segment can be removed in-between, but that doesn't
+ * make any practical difference.
+ */
+ last_removed_seg = XLogGetLastRemovedSegno();
+
MemoryContextSwitchTo(oldcontext);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
@@ -282,7 +291,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
Datum values[PG_GET_REPLICATION_SLOTS_COLS];
bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
WALAvailability walstate;
- XLogSegNo last_removed_seg;
+ XLogRecPtr targetLSN;
int i;
if (!slot->in_use)
@@ -342,7 +351,14 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
else
nulls[i++] = true;
- walstate = GetWALAvailability(slot_contents.data.restart_lsn);
+ /* use last_invalidated_lsn when the slot is invalidated */
+ if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
+ targetLSN = slot_contents.last_invalidated_lsn;
+ else
+ targetLSN = slot_contents.data.restart_lsn;
+
+ walstate = GetWALAvailability(targetLSN, last_removed_seg,
+ slot_contents.active_pid != 0);
switch (walstate)
{
@@ -350,14 +366,18 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
nulls[i++] = true;
break;
- case WALAVAIL_NORMAL:
- values[i++] = CStringGetTextDatum("normal");
- break;
-
case WALAVAIL_RESERVED:
values[i++] = CStringGetTextDatum("reserved");
break;
+ case WALAVAIL_EXTENDED:
+ values[i++] = CStringGetTextDatum("extended");
+ break;
+
+ case WALAVAIL_BEING_REMOVED:
+ values[i++] = CStringGetTextDatum("being lost");
+ break;
+
case WALAVAIL_REMOVED:
values[i++] = CStringGetTextDatum("lost");
break;
@@ -367,8 +387,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
}
if (max_slot_wal_keep_size_mb >= 0 &&
- (walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_RESERVED) &&
- ((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
+ (walstate == WALAVAIL_RESERVED || walstate == WALAVAIL_EXTENDED) &&
+ (last_removed_seg != 0))
{
XLogRecPtr min_safe_lsn;
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index e917dfe92d..49d9578bc5 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -270,8 +270,9 @@ extern CheckpointStatsData CheckpointStats;
typedef enum WALAvailability
{
WALAVAIL_INVALID_LSN, /* parameter error */
- WALAVAIL_NORMAL, /* WAL segment is within max_wal_size */
- WALAVAIL_RESERVED, /* WAL segment is reserved by a slot */
+ WALAVAIL_RESERVED, /* WAL segment is within max_wal_size */
+ WALAVAIL_EXTENDED, /* WAL segment is reserved by a slot */
+ WALAVAIL_BEING_REMOVED, /* WAL segment is being removed */
WALAVAIL_REMOVED /* WAL segment has been removed */
} WALAvailability;
@@ -326,7 +327,9 @@ extern void ShutdownXLOG(int code, Datum arg);
extern void InitXLOGAccess(void);
extern void CreateCheckPoint(int flags);
extern bool CreateRestartPoint(int flags);
-extern WALAvailability GetWALAvailability(XLogRecPtr restart_lsn);
+extern WALAvailability GetWALAvailability(XLogRecPtr targetLSN,
+ XLogSegNo last_removed_seg,
+ bool slot_is_active);
extern XLogRecPtr CalculateMaxmumSafeLSN(void);
extern void XLogPutNextOid(Oid nextOid);
extern XLogRecPtr XLogRestorePoint(const char *rpName);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 917876010e..8090ca81fe 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -156,6 +156,9 @@ typedef struct ReplicationSlot
XLogRecPtr candidate_xmin_lsn;
XLogRecPtr candidate_restart_valid;
XLogRecPtr candidate_restart_lsn;
+
+ /* restart_lsn is copied here when the slot is invalidated */
+ XLogRecPtr last_invalidated_lsn;
} ReplicationSlot;
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)