Thanks for reviewing!

At Thu, 27 Jun 2019 16:22:56 +0200, Jehan-Guillaume de Rorthais 
<j...@dalibo.com> wrote in <20190627162256.4f4872b8@firost>
> Hi all,
> 
> Being interested by this feature, I did a patch review.
> 
> This features adds the GUC "max_slot_wal_keep_size". This is the maximum 
> amount
> of WAL that can be kept in "pg_wal" by active slots.
> 
> If the amount of WAL is superior to this limit, the slot is deactivated and
> its status (new filed in pg_replication_slot) is set as "lost".

This patch doesn't deactivate walsender. A walsender stops by
itself when it finds that it cannot continue ongoing replication.

> Patching
> ========
> 
> The patch v13-0003 does not apply on HEAD anymore.
> 
> The patch v13-0005 applies using "git am --ignore-space-change"
> 
> Other patches applies correctly.
> 
> Please, find attached the v14 set of patches rebased on master.

Sorry for missing this for log time. It is hit by 67b9b3ca32
again so I repost a rebased version.

> Documentation
> =============
> 
> The documentation explains the GUC and related columns in 
> "pg_replication_slot".
> 
> It reflects correctly the current behavior of the patch.
> 
> 
> Usability
> =========
> 
> The patch implement what it described. It is easy to enable and disable. The
> GUC name is describing correctly its purpose.
> 
> This feature is useful in some HA scenario where slot are required (eg. no
> possible archiving), but where primary availability is more important than
> standbys.

Yes. Thanks for the clear explanation on the purpose.

> In "pg_replication_slots" view, the new "wal_status" field is misleading.
> Consider this sentence and the related behavior from documentation
> (catalogs.sgml):
> 
>   <literal>keeping</literal> means that some of them are to be removed by the
>   next checkpoint.
> 
> "keeping" appears when the current checkpoint will delete some WAL further 
> than
> "current_lsn - max_slot_wal_keep_size", but still required by at least one 
> slot.
> As some WAL required by some slots will be deleted quite soon, probably before
> anyone can react, "keeping" status is misleading here. We are already in the
> red zone.

It may be "losing", which would be less misleading.

> I would expect this "wal_status" to be:
> 
> - streaming: slot lag between 0 and "max_wal_size"
> - keeping: slot lag between "max_wal_size" and "max_slot_wal_keep_size". the
>   slot actually protect some WALs from being deleted
> - lost: slot lag superior of max_slot_wal_keep_size. The slot couldn't protect
>   some WAL from deletion

I agree that comparing to max_wal_size is meaningful. The revised
version behaves as that.

> Documentation follows with:
> 
>   The last two states are seen only when max_slot_wal_keep_size is
>   non-negative
> 
> This is true with the current behavior. However, if "keeping" is set as soon 
> as
> te slot lag is superior than "max_wal_size", this status could be useful even
> with "max_slot_wal_keep_size = -1". As soon as a slot is stacking WALs that
> should have been removed by previous checkpoint, it "keeps" them.

I revised the documentation that way. Both
view-pg-replication-slots.html and
runtime-config-replication.html are reworded.

> Feature tests
> =============
> 
> I have played with various traffic shaping setup between nodes, to observe how
> columns "active", "wal_status" and "remain" behaves in regard to each others
> using:
> 
>   while true; do
> 
<removed testing details>
> 
> Finally, at least once the following messages appeared in primary logs
> **before** the "wal_status" changed from "keeping" to "streaming":
> 
>      WARNING:  some replication slots have lost required WAL segments
> 
> So the slot lost one WAL, but the standby has been able to catch-up anyway.

Thanks for the intensive test run. It is quite helpful.

> My humble opinion about these results:
> 
> * after many different tests, the status "keeping" appears only when "remain"
>   equals 0. In current implementation, "keeping" really adds no value...

Hmm. I agree that given that the "lost" (or "losing" in the
patch) state is a not definite state. That is, the slot may
recover from the state.

> * "remain" should be NULL if "max_slot_wal_keep_size=-1 or if the slot isn't
>   active

The revised  version shows the following statuses.

   streaming / NULL             max_slot_wal_keep_size is -1
   unkown    / NULL             mswks >= 0 and restart_lsn is invalid
   <status>  / <bytes>          elsewise

> * the "lost" status should be a definitive status
> * it seems related, but maybe the "wal_status" should be set as "lost"
>   only when the slot has been deactivate ?

Agreed. While replication is active, if required segments seems
to be lost once, delayed walreceiver ack can advance restart_lsn
to "safe" zone later. So, in the revised version, if the segment
for restart_lsn has been removed, GetLsnAvailablity() returns
"losing" if walsender is active and "lost" if not.

> * logs should warn about a failing slot as soon as it is effectively
>   deactivated, not before.

Agreed. Slots on which walsender is running are exlucded from the
output of ReplicationSlotsEnumerateBehnds. As theresult the "some
replcation slots lost.." is emitted after related walsender
stops.

I attach the revised patch. I'll repost the polished version
sooner.

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 87b35cb3c1c1a50218563037a97a368d86451040 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:20:20 +0900
Subject: [PATCH 1/6] Add WAL relief vent for replication slots

Replication slot is useful to maintain replication connection in the
configurations where replication is so delayed that connection is
broken. On the other hand so many WAL files can fill up disk that the
master downs by a long delay. This feature, which is activated by a
GUC "max_slot_wal_keep_size", protects master servers from suffering
disk full by limiting the number of WAL files reserved by replication
slots.
---
 src/backend/access/transam/xlog.c             | 128 +++++++++++++++++++++-----
 src/backend/replication/slot.c                |  58 ++++++++++++
 src/backend/utils/misc/guc.c                  |  12 +++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 +
 src/include/replication/slot.h                |   1 +
 6 files changed, 176 insertions(+), 25 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f553523857..fcb076100f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -104,6 +104,7 @@ int			wal_level = WAL_LEVEL_MINIMAL;
 int			CommitDelay = 0;	/* precommit delay in microseconds */
 int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int			wal_retrieve_retry_interval = 5000;
+int			max_slot_wal_keep_size_mb = -1;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -872,6 +873,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -9288,6 +9290,54 @@ CreateRestartPoint(int flags)
 	return true;
 }
 
+/*
+ * Returns minimum segment number that the next checkpoint must leave
+ * considering wal_keep_segments, replication slots and
+ * max_slot_wal_keep_size.
+ *
+ * currLSN is the current insert location.
+ * minSlotLSN is the minimum restart_lsn of all active slots.
+ */
+static XLogSegNo
+GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
+{
+	XLogSegNo	currSeg;
+	XLogSegNo	minSlotSeg;
+	uint64		keepSegs = 0;	/* # of segments actually kept */
+
+	XLByteToSeg(currLSN, currSeg, wal_segment_size);
+	XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size);
+
+	/*
+	 * Calculate how many segments are kept by slots first. The second
+	 * term of the condition is just a sanity check.
+	 */
+	if (minSlotLSN != InvalidXLogRecPtr && minSlotSeg <= currSeg)
+		keepSegs = currSeg - minSlotSeg;
+
+	/* Cap keepSegs by max_slot_wal_keep_size */
+	if (max_slot_wal_keep_size_mb >= 0)
+	{
+		uint64 limitSegs;
+
+		limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+
+		/* Reduce it if slots already reserves too many. */
+		if (limitSegs < keepSegs)
+			keepSegs = limitSegs;
+	}
+
+	/* but, keep at least wal_keep_segments segments if any */
+	if (wal_keep_segments > 0 && keepSegs < wal_keep_segments)
+		keepSegs = wal_keep_segments;
+
+	/* avoid underflow, don't go below 1 */
+	if (currSeg <= keepSegs)
+		return 1;
+
+	return currSeg - keepSegs;
+}
+
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
@@ -9299,38 +9349,66 @@ CreateRestartPoint(int flags)
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
-	XLogSegNo	segno;
-	XLogRecPtr	keep;
+	XLogRecPtr	slotminptr = InvalidXLogRecPtr;
+	XLogSegNo	minSegNo;
+	XLogSegNo	slotSegNo;
 
-	XLByteToSeg(recptr, segno, wal_segment_size);
-	keep = XLogGetReplicationSlotMinimumLSN();
+	if (max_replication_slots > 0)
+		slotminptr = XLogGetReplicationSlotMinimumLSN();
 
-	/* compute limit for wal_keep_segments first */
-	if (wal_keep_segments > 0)
+	/*
+	 * We should keep certain number of WAL segments after this checkpoint.
+	 */
+	minSegNo = GetOldestKeepSegment(recptr, slotminptr);
+
+	/*
+	 * Warn the checkpoint is going to flush the segments required by
+	 * replication slots.
+	 */
+	if (!XLogRecPtrIsInvalid(slotminptr))
 	{
-		/* avoid underflow, don't go below 1 */
-		if (segno <= wal_keep_segments)
-			segno = 1;
+		static XLogSegNo prev_lost_segs = 0;	/* avoid duplicate messages */
+
+		XLByteToSeg(slotminptr, slotSegNo, wal_segment_size);
+
+		if (slotSegNo < minSegNo)
+		{
+			XLogSegNo lost_segs = minSegNo - slotSegNo;
+			if (prev_lost_segs != lost_segs)
+			{
+				/* We have lost a new segment, warn it.*/
+				XLogRecPtr minlsn;
+				char *slot_names;
+				int nslots;
+
+				XLogSegNoOffsetToRecPtr(minSegNo, 0, wal_segment_size, minlsn);
+				slot_names =
+					ReplicationSlotsEnumerateBehinds(minlsn, ", ", &nslots);
+
+				/*
+				 * Some of the affected slots could have just been removed.
+				 * We don't need show anything here if no affected slot
+				 * remains.
+				 */
+				if (slot_names)
+				{
+					ereport(WARNING,
+							(errmsg ("some replication slots have lost required WAL segments"),
+							 errdetail_plural(
+								 "Slot %s lost %ld segment(s).",
+								 "Slots %s lost at most %ld segment(s).",
+								 nslots, slot_names, lost_segs)));
+				}
+			}
+			prev_lost_segs = lost_segs;
+		}
 		else
-			segno = segno - wal_keep_segments;
-	}
-
-	/* then check whether slots limit removal further */
-	if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
-	{
-		XLogSegNo	slotSegNo;
-
-		XLByteToSeg(keep, slotSegNo, wal_segment_size);
-
-		if (slotSegNo <= 0)
-			segno = 1;
-		else if (slotSegNo < segno)
-			segno = slotSegNo;
+			prev_lost_segs = 0;
 	}
 
 	/* don't delete WAL segments newer than the calculated segment */
-	if (segno < *logSegNo)
-		*logSegNo = segno;
+	if (minSegNo < *logSegNo)
+		*logSegNo = minSegNo;
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 62342a69cb..24b8d42eab 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1064,6 +1064,64 @@ ReplicationSlotReserveWal(void)
 	}
 }
 
+/*
+ * Returns names of replication slots that their restart_lsn are behind
+ * specified LSN, in palloc'ed character array stuffed with slot names
+ * delimited by the given separator.  Returns NULL if no slot matches. If
+ * pnslots is given, the number of the returned slots is returned there.
+ */
+char *
+ReplicationSlotsEnumerateBehinds(XLogRecPtr target, char *separator, int *pnslots)
+{
+	static StringInfoData retstr;
+	static bool retstr_initialized = false;
+	bool insert_separator = false;
+	int i;
+	int nslots = 0;
+
+	Assert (separator);
+	if (max_replication_slots <= 0)
+		return NULL;
+
+	if (!retstr_initialized)
+	{
+		initStringInfo(&retstr);
+		retstr_initialized = true;
+	}
+	else
+		resetStringInfo(&retstr);
+
+	/* construct name list */
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0 ; i < max_replication_slots ; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		/* Exclude active walsenders */
+		if (s->in_use && s->active_pid == 0 && s->data.restart_lsn < target)
+		{
+			if (insert_separator)
+				appendStringInfoString(&retstr, separator);
+
+			/*
+			 * Slot names consist only with lower-case letters. We don't
+			 * bother quoting.
+			 */
+			appendStringInfoString(&retstr, NameStr(s->data.name));
+			insert_separator = true;
+			nslots++;
+		}
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	/* return the number of slots in the list if requested */
+	if (pnslots)
+		*pnslots = nslots;
+
+	/* return NULL instead of an empty string */
+	return retstr.data[0] ? retstr.data : NULL;
+}
+
 /*
  * Flush all replication slots to disk.
  *
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index fc463601ff..f8e796b6c1 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2654,6 +2654,18 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the maximum size of extra WALs kept by replication slots."),
+		 NULL,
+		 GUC_UNIT_MB
+		},
+		&max_slot_wal_keep_size_mb,
+		-1, -1,
+		MAX_KILOBYTES, /* XXX: This is in megabytes, like max/min_wal_size */
+		NULL, NULL, NULL
+	},
+
 	{
 		{"wal_sender_timeout", PGC_USERSET, REPLICATION_SENDING,
 			gettext_noop("Sets the maximum time to wait for WAL replication."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index cfad86c02a..aadbc76d85 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -286,6 +286,7 @@
 #max_wal_senders = 10		# max number of walsender processes
 				# (change requires restart)
 #wal_keep_segments = 0		# in logfile segments; 0 disables
+#max_slot_wal_keep_size = -1	# measured in bytes; -1 disables
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 
 #max_replication_slots = 10	# max number of replication slots
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index d519252aad..b355452072 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -108,6 +108,7 @@ extern int	wal_segment_size;
 extern int	min_wal_size_mb;
 extern int	max_wal_size_mb;
 extern int	wal_keep_segments;
+extern int	max_slot_wal_keep_size_mb;
 extern int	XLOGbuffers;
 extern int	XLogArchiveTimeout;
 extern int	wal_retrieve_retry_interval;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8fbddea78f..e0fee0663c 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -199,6 +199,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
+extern char *ReplicationSlotsEnumerateBehinds(XLogRecPtr target, char *separator, int *pnslots);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
-- 
2.16.3

>From 839a102791aadef5dd7af28623f55be411b9374b Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:23:25 +0900
Subject: [PATCH 2/6] Add monitoring aid for max_slot_wal_keep_size

Adds two columns "status" and "remain" in pg_replication_slot. Setting
max_slot_wal_keep_size, replication connections may lose sync by a
long delay. The "status" column shows whether the slot is
reconnectable or not, or about to lose reserving WAL segments. The
"remain" column shows the remaining bytes of WAL that can be advance
until the slot loses required WAL records.
---
 contrib/test_decoding/expected/ddl.out |   4 +-
 contrib/test_decoding/sql/ddl.sql      |   2 +
 src/backend/access/transam/xlog.c      | 117 ++++++++++++++++++++++++++++++---
 src/backend/catalog/system_views.sql   |   4 +-
 src/backend/replication/slotfuncs.c    |  21 +++++-
 src/include/access/xlog.h              |   2 +
 src/include/catalog/pg_proc.dat        |   6 +-
 src/test/regress/expected/rules.out    |   6 +-
 8 files changed, 144 insertions(+), 18 deletions(-)

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 2c999fd3eb..cf0318f697 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -723,8 +723,8 @@ SELECT pg_drop_replication_slot('regression_slot');
 (1 row)
 
 /* check that the slot is gone */
+\x
 SELECT * FROM pg_replication_slots;
- slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn 
------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
 (0 rows)
 
+\x
diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql
index 856495c952..0f2b9992f7 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -387,4 +387,6 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
+\x
 SELECT * FROM pg_replication_slots;
+\x
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index fcb076100f..d0cc2e0f6d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -873,7 +873,8 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
-static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr,
+					   XLogRecPtr targetLSN, uint64 *restBytes);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -9290,6 +9291,63 @@ CreateRestartPoint(int flags)
 	return true;
 }
 
+/*
+ * Returns availability of the record at given targetLSN.
+ *
+ * Returns three kinds of value in string.
+ * "streaming" means the WAL record at targetLSN is available.
+ * "keeping" means it is still available but about to be removed at the next
+ * checkpoint.
+ * "losing" means it is already removed. This state is not definite since
+ * delayed ack from walreceiver can advance restart_lsn later.
+ * "lost" means it is already removed and no walsenders are running on it. The
+ * slot is no longer recoverable.
+ *
+ * If restBytes is not NULL, sets the remaining LSN bytes until the segment
+ * for targetLSN will be removed.
+ */
+char *
+GetLsnAvailability(pid_t walsender_pid, XLogRecPtr targetLSN, uint64 *restBytes)
+{
+	XLogRecPtr currpos;
+	XLogRecPtr slotPtr;
+	XLogSegNo currSeg;		/* segid of currpos */
+	XLogSegNo targetSeg;	/* segid of targetLSN */
+	XLogSegNo oldestSeg;	/* oldest segid kept by max_wal_size */
+	XLogSegNo oldestSlotSeg;/* oldest segid kept by slot */
+
+	Assert(!XLogRecPtrIsInvalid(targetLSN));
+	Assert(restBytes);
+
+	currpos = GetXLogWriteRecPtr();
+
+	/* oldest segment currently needed by slots */
+	XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
+	slotPtr = XLogGetReplicationSlotMinimumLSN();
+	oldestSlotSeg = GetOldestKeepSegment(currpos, slotPtr, targetLSN,
+										 restBytes);
+
+	/* oldest segment by max_wal_size */
+	XLByteToSeg(currpos, currSeg, wal_segment_size);
+	oldestSeg = currSeg -
+		ConvertToXSegs(max_wal_size_mb, wal_segment_size) + 1;
+
+	/* targetSeg is within max_wal_size */
+	if (oldestSeg <= targetSeg)
+		return "streaming";
+
+	/* targetSeg is being retained by slots */
+	if (oldestSlotSeg <= targetSeg)
+		return "keeping";
+
+	/* targetSeg is no longer protected. We ignore the possible availability */
+
+	if (walsender_pid != 0)
+		return	"losing";
+
+	return "lost";
+}
+
 /*
  * Returns minimum segment number that the next checkpoint must leave
  * considering wal_keep_segments, replication slots and
@@ -9297,13 +9355,19 @@ CreateRestartPoint(int flags)
  *
  * currLSN is the current insert location.
  * minSlotLSN is the minimum restart_lsn of all active slots.
+ * targetLSN is used when restBytes is not NULL.
+ *
+ * If restBytes is not NULL, sets the remaining LSN bytes until the segment
+ * for targetLSN will be removed.
  */
 static XLogSegNo
-GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
+GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN,
+					 XLogRecPtr targetLSN, uint64 *restBytes)
 {
 	XLogSegNo	currSeg;
 	XLogSegNo	minSlotSeg;
 	uint64		keepSegs = 0;	/* # of segments actually kept */
+	uint64		limitSegs = 0;	/* # of maximum segments possibly kept */
 
 	XLByteToSeg(currLSN, currSeg, wal_segment_size);
 	XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size);
@@ -9318,8 +9382,6 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
 	/* Cap keepSegs by max_slot_wal_keep_size */
 	if (max_slot_wal_keep_size_mb >= 0)
 	{
-		uint64 limitSegs;
-
 		limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
 
 		/* Reduce it if slots already reserves too many. */
@@ -9327,9 +9389,45 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
 			keepSegs = limitSegs;
 	}
 
-	/* but, keep at least wal_keep_segments segments if any */
-	if (wal_keep_segments > 0 && keepSegs < wal_keep_segments)
-		keepSegs = wal_keep_segments;
+	if (wal_keep_segments > 0)
+	{
+		/* but, keep at least wal_keep_segments segments if any */
+		if (keepSegs < wal_keep_segments)
+			keepSegs = wal_keep_segments;
+
+		/* ditto for limitSegs */
+		if (limitSegs < wal_keep_segments)
+			limitSegs = wal_keep_segments;
+	}
+
+	/*
+	 * If requested, calculate the remaining LSN bytes until the slot gives up
+	 * keeping WAL records.
+	 */
+	if (restBytes)
+	{
+		uint64 fragbytes;
+		XLogSegNo targetSeg;
+
+		*restBytes = 0;
+
+		if (max_slot_wal_keep_size_mb >= 0)
+		{
+			XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
+
+			/* avoid underflow */
+			if (currSeg <= targetSeg + limitSegs)
+			{
+				/*
+				 * This slot still has all required segments. Calculate how
+				 * many LSN bytes the slot has until it loses targetLSN.
+				 */
+				fragbytes = wal_segment_size - (currLSN % wal_segment_size);
+				XLogSegNoOffsetToRecPtr(targetSeg + limitSegs - currSeg, fragbytes,
+										wal_segment_size, *restBytes);
+			}
+		}
+	}
 
 	/* avoid underflow, don't go below 1 */
 	if (currSeg <= keepSegs)
@@ -9359,7 +9457,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 	/*
 	 * We should keep certain number of WAL segments after this checkpoint.
 	 */
-	minSegNo = GetOldestKeepSegment(recptr, slotminptr);
+	minSegNo =
+		GetOldestKeepSegment(recptr, slotminptr, InvalidXLogRecPtr, NULL);
 
 	/*
 	 * Warn the checkpoint is going to flush the segments required by
@@ -9393,7 +9492,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 				if (slot_names)
 				{
 					ereport(WARNING,
-							(errmsg ("some replication slots have lost required WAL segments"),
+							(errmsg ("some replication slots lost required WAL segments"),
 							 errdetail_plural(
 								 "Slot %s lost %ld segment(s).",
 								 "Slots %s lost at most %ld segment(s).",
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index ea4c85e395..6a9491e64a 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -849,7 +849,9 @@ CREATE VIEW pg_replication_slots AS
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
-            L.confirmed_flush_lsn
+            L.confirmed_flush_lsn,
+            L.wal_status,
+            L.remain
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 808a6f5b83..2229a46154 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -221,7 +221,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -343,6 +343,25 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
+		if (max_slot_wal_keep_size_mb < 0)
+		{
+			values[i++] = CStringGetTextDatum("streaming");
+			nulls[i++] = true;
+		}
+		else if (restart_lsn == InvalidXLogRecPtr)
+		{
+			values[i++] = CStringGetTextDatum("unknown");
+			nulls[i++] = true;
+		}
+		else
+		{
+			uint64	remaining_bytes;
+
+			values[i++] = CStringGetTextDatum(
+				GetLsnAvailability(active_pid, restart_lsn, &remaining_bytes));
+			values[i++] = Int64GetDatum(remaining_bytes);
+		}
+
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
 	LWLockRelease(ReplicationSlotControlLock);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index b355452072..68ca21f780 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -305,6 +305,8 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
+extern char *GetLsnAvailability(pid_t walsender_pid, XLogRecPtr targetLSN,
+								uint64 *restBytes);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 0902dce5f1..300d868980 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9844,9 +9844,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,remain}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 210e9cd146..74c44891a4 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1461,8 +1461,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
-    l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
+    l.confirmed_flush_lsn,
+    l.wal_status,
+    l.remain
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, remain)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.16.3

>From a7d798cbe4142a2f04393671acfc032917c4cd11 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Wed, 19 Dec 2018 12:43:57 +0900
Subject: [PATCH 3/6] Add primary_slot_name to init_from_backup in TAP test.

It is convenient that priary_slot_name can be specified on taking a
base backup. This adds a new parameter of the name to the perl
function.
---
 src/test/perl/PostgresNode.pm | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 6019f37f91..c7e138c121 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -694,6 +694,10 @@ port = $port
 		$self->append_conf('postgresql.conf',
 			"unix_socket_directories = '$host'");
 	}
+	$self->append_conf('postgresql.conf',
+					   qq(primary_slot_name = $params{primary_slot_name}))
+	  if (defined $params{primary_slot_name});
+
 	$self->enable_streaming($root_node) if $params{has_streaming};
 	$self->enable_restoring($root_node) if $params{has_restoring};
 	return;
-- 
2.16.3

>From 07b28b280aeb4f10273057c1b528ab33adddd5ad Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 17:33:53 +0900
Subject: [PATCH 4/6] TAP test for the slot limit feature

---
 src/test/recovery/t/018_replslot_limit.pl | 198 ++++++++++++++++++++++++++++++
 1 file changed, 198 insertions(+)
 create mode 100644 src/test/recovery/t/018_replslot_limit.pl

diff --git a/src/test/recovery/t/018_replslot_limit.pl b/src/test/recovery/t/018_replslot_limit.pl
new file mode 100644
index 0000000000..2bd6fdf39c
--- /dev/null
+++ b/src/test/recovery/t/018_replslot_limit.pl
@@ -0,0 +1,198 @@
+# Test for replication slot limit
+# Ensure that max_slot_wal_keep_size limits the number of WAL files to
+# be kept by replication slots.
+
+use strict;
+use warnings;
+use File::Path qw(rmtree);
+use PostgresNode;
+use TestLib;
+use Test::More tests => 13;
+use Time::HiRes qw(usleep);
+
+$ENV{PGDATABASE} = 'postgres';
+
+# Initialize master node, setting wal-segsize to 1MB
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, extra => ['--wal-segsize=1']);
+$node_master->append_conf('postgresql.conf', qq(
+min_wal_size = 2MB
+max_wal_size = 3MB
+));
+$node_master->start;
+$node_master->safe_psql('postgres', "SELECT pg_create_physical_replication_slot('rep1')");
+
+# The slot state should be the state "unknown" before the first connection
+my $result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "|unknown|0", 'check the state of non-reserved slot is "unknown"');
+
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a standby linking to it using the replication slot
+my $node_standby = get_new_node('standby_1');
+$node_standby->init_from_backup($node_master, $backup_name, has_streaming => 1, primary_slot_name => 'rep1');
+
+$node_standby->start;
+
+# Wait until standby has replayed enough data
+my $start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+# Stop standby
+$node_standby->stop;
+
+
+# Preparation done, the slot is the state "streaming" now
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|streaming|0", 'check the catching-up state');
+
+# Advance WAL by five segments (= 5MB) on master
+advance_wal($node_master, 1);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is always "safe" when fitting max_wal_size
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|streaming|0", 'check that within max_wal_size');
+
+advance_wal($node_master, 4);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is always "safe" when max_slot_wal_keep_size is not set
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|keeping|0", 'check that slot is working');
+
+# The standby can reconnect to master
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+# Set max_slot_wal_keep_size on master
+my $max_slot_wal_keep_size_mb = 4;
+$node_master->append_conf('postgresql.conf', qq(
+max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB
+));
+$node_master->reload;
+
+# The slot is in safe state. The remaining bytes should be as almost
+# (max_slot_wal_keep_size + 1) times large as the segment size
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|streaming|5120 kB", 'check that max_slot_wal_keep_size is working');
+
+# Advance WAL again then checkpoint, reducing remain by 2 MB.
+advance_wal($node_master, 2);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is still working
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|streaming|3072 kB", 'check that remaining byte is calculated correctly');
+
+# wal_keep_segments overrides max_slot_wal_keep_size
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 6; SELECT pg_reload_conf();");
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|streaming|5120 kB", 'check that wal_keep_segments overrides max_slot_wal_keep_size');
+
+# restore wal_keep_segments
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 0; SELECT pg_reload_conf();");
+
+# Advance WAL again without checkpoint, reducing remain by 2 MB.
+advance_wal($node_master, 2);
+
+# Slot gets into 'keeping' state
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|keeping|1024 kB", 'check that the slot state changes to "keeping"');
+
+# Advance WAL again without checkpoint; remain goes to 0.
+advance_wal($node_master, 1);
+
+# Slot gets into 'keeping' state
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|lost|0 bytes", 'check that the slot state changes to "lost"');
+
+# The standby still can connect to master before a checkpoint
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+ok(!find_in_log($node_standby,
+				"requested WAL segment [0-9A-F]+ has already been removed"),
+   'check that required WAL segments are still available');
+
+# Advance WAL again, the slot loses the oldest segment.
+my $logstart = get_log_size($node_master);
+advance_wal($node_master, 5);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# WARNING should be issued
+ok(find_in_log($node_master,
+			   "some replication slots have lost required WAL segments\n".
+			   ".*Slot rep1 lost 1 segment\\(s\\)\\.",
+			   $logstart),
+   'check that the warning is logged');
+
+# This slot should be broken
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|lost|0 bytes", 'check that the slot state changes to "lost"');
+
+# The standby no longer can connect to the master
+$logstart = get_log_size($node_standby);
+$node_standby->start;
+
+my $failed = 0;
+for (my $i = 0 ; $i < 10000 ; $i++)
+{
+	if (find_in_log($node_standby,
+					"requested WAL segment [0-9A-F]+ has already been removed",
+					$logstart))
+	{
+		$failed = 1;
+		last;
+	}
+	usleep(100_000);
+}
+ok($failed, 'check that replication has been broken');
+
+$node_standby->stop;
+
+#####################################
+# Advance WAL of $node by $n segments
+sub advance_wal
+{
+	my ($node, $n) = @_;
+
+	# Advance by $n segments (= (16 * $n) MB) on master
+	for (my $i = 0 ; $i < $n ; $i++)
+	{
+		$node->safe_psql('postgres', "CREATE TABLE t (); DROP TABLE t; SELECT pg_switch_wal();");
+	}
+}
+
+# return the size of logfile of $node in bytes
+sub get_log_size
+{
+	my ($node) = @_;
+
+	return (stat $node->logfile)[7];
+}
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+	my ($node, $pat, $off) = @_;
+
+	$off = 0 unless defined $off;
+	my $log = TestLib::slurp_file($node->logfile);
+	return 0 if (length($log) <= $off);
+
+	$log = substr($log, $off);
+
+	return $log =~ m/$pat/;
+}
-- 
2.16.3

>From 320af84bf916ba7b70cf01147dab234c8bf318cb Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 11 Jan 2018 15:00:32 +0900
Subject: [PATCH 5/6] Documentation for slot-limit feature

---
 doc/src/sgml/catalogs.sgml          | 30 ++++++++++++++++++++++++++++++
 doc/src/sgml/config.sgml            | 23 +++++++++++++++++++++++
 doc/src/sgml/high-availability.sgml |  8 +++++---
 3 files changed, 58 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 68ad5071ca..dc9679283a 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9975,6 +9975,36 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </entry>
      </row>
 
+     <row>
+      <entry><structfield>wal_status</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+
+      <entry>Availability of WAL records claimed by this
+      slot. <literal>streaming</literal>, <literal>keeping</literal>,
+      <literal>lost</literal>
+      or <literal>unknown</literal>. <literal>streaming</literal> means that
+      the claimed records are available within
+      max_wal_size. <literal>keeping</literal> means max_wal_size is exceeded
+      but still required records are held by replication slots or
+      wal_keep_segments.
+      <literal>lost</literal> means that some of them are on the verge of
+      removal or no longer available. This state is seen only when
+      <xref linkend="guc-max-slot-wal-keep-size"/> is
+      non-negative. If <structfield>restart_lsn</structfield> is NULL, this
+      field is <literal>unknown</literal>.
+      </entry>
+     </row>
+
+     <row>
+      <entry><structfield>remain</structfield></entry>
+      <entry><type>bigint</type></entry>
+      <entry></entry>
+      <entry>The amount in bytes that WAL location (LSN) can advance until
+        this slot may lose required WAL records.
+      </entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index c91e3e1550..c345538c8f 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3650,6 +3650,29 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+      <varlistentry id="guc-max-slot-wal-keep-size" xreflabel="max_slot_wal_keep_size">
+       <term><varname>max_slot_wal_keep_size</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_slot_wal_keep_size</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+       <para>
+        Specify the maximum size of WAL files
+        that <link linkend="streaming-replication-slots">replication
+        slots</link> are allowed to retain in the <filename>pg_wal</filename>
+        directory at checkpoint time.
+        If <varname>max_slot_wal_keep_size</varname> is -1 (the default),
+        replication slots retain unlimited amount of WAL files.  If
+        restart_lsn of a replication slot gets behind more than that megabytes
+        from the current LSN, the standby using the slot may no longer be able
+        to continue replication due to removal of required WAL records. You
+        can see the WAL availability of replication slots
+        in <link linkend="view-pg-replication-slots">pg_replication_slots</link>.
+       </para>
+       </listitem>
+      </varlistentry>
+
      <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
       <term><varname>wal_sender_timeout</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index 543691dad4..ae8c3a2aca 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -925,9 +925,11 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
     <xref linkend="guc-archive-command"/>.
     However, these methods often result in retaining more WAL segments than
     required, whereas replication slots retain only the number of segments
-    known to be needed.  An advantage of these methods is that they bound
-    the space requirement for <literal>pg_wal</literal>; there is currently no way
-    to do this using replication slots.
+    known to be needed.  On the other hand, replication slots can retain so
+    many WAL segments that they fill up the space allocated
+    for <literal>pg_wal</literal>;
+    <xref linkend="guc-max-slot-wal-keep-size"/> limits the size of WAL files
+    retained by replication slots.
    </para>
    <para>
     Similarly, <xref linkend="guc-hot-standby-feedback"/>
-- 
2.16.3

>From ab129736e06b23f4e251cbb65e1b841670ba924a Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Fri, 26 Oct 2018 10:07:05 +0900
Subject: [PATCH 6/6] Check removal of in-reading segment file.

Checkpoints can recycle a segment file while it is being read by
ReadRecord and that leads to an apparently odd error message during
logical decoding. This patch explicitly checks that then error out
immediately.  Reading a recycled file is safe. Inconsistency caused by
overwrites as a new segment are caught by page/record validation. So
this is only for keeping consistency with the wal_status shown in
pg_replication_slots.
---
 src/backend/access/transam/xlogreader.c | 21 ++++++++++++++++++++-
 1 file changed, 20 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 33ccfc1553..4999892932 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -27,6 +27,7 @@
 
 #ifndef FRONTEND
 #include "miscadmin.h"
+#include "access/xlog.h"
 #include "utils/memutils.h"
 #endif
 
@@ -225,7 +226,9 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	uint32		pageHeaderSize;
 	bool		gotheader;
 	int			readOff;
-
+#ifndef FRONTEND
+	XLogSegNo	targetSegNo;
+#endif
 	/*
 	 * randAccess indicates whether to verify the previous-record pointer of
 	 * the record we're reading.  We only do this if we're reading
@@ -271,6 +274,22 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
 	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
 	targetRecOff = RecPtr % XLOG_BLCKSZ;
 
+#ifndef FRONTEND
+	/*
+	 * Although It's safe that the current segment is recycled as a new
+	 * segment since we check the page/record header at reading, it leads to
+	 * an apparently strange error message when logical replication, which can
+	 * be prevented by explicitly checking if the current segment is removed.
+	 */
+	XLByteToSeg(targetPagePtr, targetSegNo, state->wal_segment_size);
+	if (targetSegNo <= XLogGetLastRemovedSegno())
+	{
+		report_invalid_record(state,
+							  "WAL segment for LSN %X/%X has been removed",
+							  (uint32)(RecPtr >> 32), (uint32) RecPtr);
+		goto err;
+	}
+#endif
 	/*
 	 * Read the page containing the record into state->readBuf. Request enough
 	 * byte to cover the whole record header, or at least the part of it that
-- 
2.16.3

Reply via email to