Thank you for looking this and trouble rebasing!

At Mon, 30 Mar 2020 20:03:27 -0300, Alvaro Herrera <alvhe...@2ndquadrant.com> 
wrote in 
> I rebased this patch; it's failing to apply due to minor concurrent
> changes in PostgresNode.pm.  I squashed the patches in a series that
> made the most sense to me.
> 
> I have a question about static variable lastFoundOldestSeg in
> FindOldestXLogFileSegNo.  It may be set the first time the function
> runs; if it is, the function never again does anything, it just returns
> that value.  In other words, the static value is never reset; it never
> advances either.  Isn't that strange?  I think the coding is to assume
> that XLogCtl->lastRemovedSegNo will always be set, so its code will
> almost never run ... except when the very first wal file has not been
> removed yet.  This seems weird and pointless.  Maybe we should think
> about this differently -- example: if XLogGetLastRemovedSegno returns
> zero, then the oldest file is the zeroth one.  In what cases this is
> wrong?  Maybe we should fix those.

That's right, but without the static variable, every call to the
pg_replication_slots view before the fist checkpoint causes scanning
pg_xlog. XLogCtl->lastRemovedSegNo advances only at a checkpoint, so
it is actually right that the return value from
FindOldestXLogFileSegNo doesn't change until the first checkpoint.

Also we could set XLogCtl->lastRemovedSegNo at startup, but the
scanning on pg_xlog is useless in most cases.

I avoided to update the XLogCtl->lastRemovedSegNo directlry, but the
third way would be if XLogGetLastRemovedSegno() returned 0, then set
XLogCtl->lastRemovedSegNo by scanning the WAL directory. The attached
takes this way.

> Regarding the PostgresNode change in 0001, I think adding a special
> parameter for primary_slot_name is limited.  I'd like to change the
> definition so that anything that you give as a parameter that's not one
> of the recognized keywords (has_streaming, etc) is tested to see if it's
> a GUC; and if it is, then put it in postgresql.conf.  This would have to
> apply both to PostgresNode::init() as well as
> PostgresNode::init_from_backup(), obviously, since it would make no
> sense for the APIs to diverge on this point.  So you'd be able to do
>   $node->init_from_backup(allow_streaming => 1, work_mem => "4MB");
> without having to add code to init_from_backup to handle work_mem
> specifically.  This could be done by having a Perl hash with all the GUC
> names, that we could read lazily from "postmaster --describe-config" the
> first time we see an unrecognized keyword as an option to init() /
> init_from_backup().

Done that way. We could exclude "known" parameters by explicitly
delete the key at reading it, but I choosed to enumerate the known
keywords.  Although it can be used widely but actually I changed only
018_repslot_limit.pl to use the feature.

> I edited the doc changes a bit.
> 
> I don't know what to think of 0003 yet.  Has this been agreed to be a
> good idea?

So it was a separate patch. I think it has not been approved nor
rejected.  The main objective of the patch is preventing
pg_replication_slots.wal_status from strange coming back from the
"lost" state to other states. However, in the first place I doubt that
it's right that logical replication sends the content of a WAL segment
already recycled.

> I also made a few small edits to the code; all cosmetic so far:
> 
> * added long_desc to the new GUC; it now reads:
> 
>         {"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
>             gettext_noop("Sets the maximum size of WAL space reserved by 
> replication slots."),
>             gettext_noop("Replication slots will be marked as failed, and 
> segments released "
>                          "for deletion or recycling, if this much space is 
> occupied by WAL "
>                          "on disk."),
> 
> * updated the comment to ConvertToXSegs() which is now being used for
>   this purpose
> 
> * remove outdated comment to GetWalAvailability; it was talking about
>   restBytes parameter that no longer exists

Thank you for the fixes. All of the looks fine.

I fixed several typos. (s/requred/required/, s/devinitly/definitely/,
s/errror/error/)

regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 95165d218d633e354c8136d2e200d83685ef3799 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Wed, 19 Dec 2018 12:43:57 +0900
Subject: [PATCH v20 1/3] Allow arbitrary GUC parameter setting init and
 init_from_backup in TAP test.

It is convenient that arbitrary GUC parameters can be specified on
initializing a node or taking a backup.
---
 src/test/perl/PostgresNode.pm | 32 ++++++++++++++++++++++++++++++++
 1 file changed, 32 insertions(+)

diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index 1d5450758e..4671dc5eb1 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -416,6 +416,13 @@ parameter allows_streaming => 'logical' or 'physical' (passing 1 will also
 suffice for physical replication) depending on type of replication that
 should be enabled. This is disabled by default.
 
+The keyword parameter extra is appended to the parameters to
+initdb. Similarly auth_extra is appended to the parameter list to
+pg_regress.
+
+The keyword parameters other than the aboves are appended to the
+configuration file as configuraion parameters.
+
 The new node is set up in a fast but unsafe configuration where fsync is
 disabled.
 
@@ -494,6 +501,17 @@ sub init
 		print $conf "unix_socket_directories = '$host'\n";
 		print $conf "listen_addresses = ''\n";
 	}
+
+	# Finally, append all unknown parameters as configuration parameters.
+	foreach my $k (keys %params)
+	{
+		# Ignore known parameters, which are shown above.
+		next if (grep { $k eq $_ }
+				 ('has_archiving', 'allows_streaming', 'extra', 'auth_extra'));
+
+		print $conf "$k = \'$params{$k}\'\n";
+	}
+
 	close $conf;
 
 	chmod($self->group_access ? 0640 : 0600, "$pgdata/postgresql.conf")
@@ -656,6 +674,9 @@ default.
 If has_restoring is used, standby mode is used by default.  To use
 recovery mode instead, pass the keyword parameter standby => 0.
 
+The keyword parameters other than the aboves are appended to the
+configuration file as configuraion parameters.
+
 The backup is copied, leaving the original unmodified. pg_hba.conf is
 unconditionally set to enable replication connections.
 
@@ -702,6 +723,17 @@ port = $port
 		$self->append_conf('postgresql.conf',
 			"unix_socket_directories = '$host'");
 	}
+
+	# Translate unknown parameters into configuration parameters.
+	foreach my $k (keys %params)
+	{
+		# Ignore known parameters, which are shown above.
+		next if (grep { $k eq $_ }
+				 ('has_streaming', 'has_restoring', 'standby'));
+
+		$self->append_conf('postgresql.conf', "$k = \'$params{$k}\'");
+	}
+
 	$self->enable_streaming($root_node) if $params{has_streaming};
 	$self->enable_restoring($root_node, $params{standby}) if $params{has_restoring};
 	return;
-- 
2.18.2

>From d21d9cc87022c384e18068328307bc294afd4c54 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:20:20 +0900
Subject: [PATCH v20 2/3] Add WAL relief vent for replication slots

Replication slot is useful to maintain replication connection in the
configurations where replication is so delayed that connection is
broken. On the other hand so many WAL files can fill up disk that the
master downs by a long delay. This feature, which is activated by a
GUC "max_slot_wal_keep_size", protects master servers from suffering
disk full by limiting the number of WAL files reserved by replication
slots.
---
 contrib/test_decoding/expected/ddl.out        |   4 +-
 contrib/test_decoding/sql/ddl.sql             |   2 +
 doc/src/sgml/catalogs.sgml                    |  48 +++
 doc/src/sgml/config.sgml                      |  23 ++
 doc/src/sgml/high-availability.sgml           |   8 +-
 src/backend/access/transam/xlog.c             | 361 ++++++++++++++++--
 src/backend/catalog/system_views.sql          |   4 +-
 src/backend/replication/slot.c                |   1 +
 src/backend/replication/slotfuncs.c           |  39 +-
 src/backend/utils/misc/guc.c                  |  13 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |  19 +
 src/include/catalog/pg_proc.dat               |   6 +-
 src/test/recovery/t/018_replslot_limit.pl     | 202 ++++++++++
 src/test/regress/expected/rules.out           |   6 +-
 15 files changed, 699 insertions(+), 38 deletions(-)
 create mode 100644 src/test/recovery/t/018_replslot_limit.pl

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index 2c999fd3eb..cf0318f697 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -723,8 +723,8 @@ SELECT pg_drop_replication_slot('regression_slot');
 (1 row)
 
 /* check that the slot is gone */
+\x
 SELECT * FROM pg_replication_slots;
- slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn 
------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
 (0 rows)
 
+\x
diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql
index 856495c952..0f2b9992f7 100644
--- a/contrib/test_decoding/sql/ddl.sql
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -387,4 +387,6 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
+\x
 SELECT * FROM pg_replication_slots;
+\x
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 64614b569c..01a7802ed4 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9907,6 +9907,54 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </entry>
      </row>
 
+     <row>
+      <entry><structfield>wal_status</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+
+      <entry>Availability of WAL files claimed by this slot.
+      Valid values are:
+       <simplelist>
+        <member>
+         <literal>normal</literal> means that the claimed files
+         are within <varname>max_wal_size</varname>
+        </member>
+        <member>
+         <literal>keeping</literal> means that <varname>max_wal_size</varname>
+         is exceeded but still held by replication slots or
+         <varname>wal_keep_segments</varname>
+        </member>
+        <member>
+         <literal>losing</literal> means that some of the files are on the verge
+         of deletion, but can still be accessed by a session that's currently
+         reading it
+        </member>
+        <member>
+         <literal>lost</literal> means that some of them are definitely lost
+         and the session using this slot cannot continue replication.
+         This state also implies that the session using this slot has been
+         stopped.
+        </member>
+       </simplelist>
+      The last two states are seen only when
+      <xref linkend="guc-max-slot-wal-keep-size"/> is
+      non-negative. If <structfield>restart_lsn</structfield> is NULL, this
+      field is null.
+      </entry>
+     </row>
+
+     <row>
+      <entry><structfield>remain</structfield></entry>
+      <entry><type>bigint</type></entry>
+      <entry></entry>
+      <entry>The amount in bytes of WAL that can be written before this slot
+        loses required WAL files.
+        If <structfield>restart_lsn</structfield> is null or
+        <structfield>wal_status</structfield> is <literal>losing</literal>
+        or <literal>lost</literal>, this field is null.
+      </entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 2de21903a1..dc99c6868a 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3758,6 +3758,29 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+      <varlistentry id="guc-max-slot-wal-keep-size" xreflabel="max_slot_wal_keep_size">
+       <term><varname>max_slot_wal_keep_size</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_slot_wal_keep_size</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+       <para>
+        Specify the maximum size of WAL files
+        that <link linkend="streaming-replication-slots">replication
+        slots</link> are allowed to retain in the <filename>pg_wal</filename>
+        directory at checkpoint time.
+        If <varname>max_slot_wal_keep_size</varname> is -1 (the default),
+        replication slots retain unlimited amount of WAL files.  If
+        restart_lsn of a replication slot gets behind more than that megabytes
+        from the current LSN, the standby using the slot may no longer be able
+        to continue replication due to removal of required WAL files. You
+        can see the WAL availability of replication slots
+        in <link linkend="view-pg-replication-slots">pg_replication_slots</link>.
+       </para>
+       </listitem>
+      </varlistentry>
+
      <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
       <term><varname>wal_sender_timeout</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index b5d32bb720..624e5f94ad 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -925,9 +925,11 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
     <xref linkend="guc-archive-command"/>.
     However, these methods often result in retaining more WAL segments than
     required, whereas replication slots retain only the number of segments
-    known to be needed.  An advantage of these methods is that they bound
-    the space requirement for <literal>pg_wal</literal>; there is currently no way
-    to do this using replication slots.
+    known to be needed.  On the other hand, replication slots can retain so
+    many WAL segments that they fill up the space allocated
+    for <literal>pg_wal</literal>;
+    <xref linkend="guc-max-slot-wal-keep-size"/> limits the size of WAL files
+    retained by replication slots.
    </para>
    <para>
     Similarly, <xref linkend="guc-hot-standby-feedback"/>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1951103b26..e68a89fded 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -105,6 +105,7 @@ int			wal_level = WAL_LEVEL_MINIMAL;
 int			CommitDelay = 0;	/* precommit delay in microseconds */
 int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int			wal_retrieve_retry_interval = 5000;
+int			max_slot_wal_keep_size_mb = -1;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -758,7 +759,7 @@ static ControlFileData *ControlFile = NULL;
  */
 #define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
 
-/* Convert min_wal_size_mb and max_wal_size_mb to equivalent segment count */
+/* Convert values of GUCs measured in megabytes to equiv. segment count */
 #define ConvertToXSegs(x, segsize)	\
 	(x / ((segsize) / (1024 * 1024)))
 
@@ -895,6 +896,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -3929,8 +3931,56 @@ XLogGetLastRemovedSegno(void)
 }
 
 /*
- * Update the last removed segno pointer in shared memory, to reflect
- * that the given XLOG file has been removed.
+ * Scan the WAL directory then set the lastRemovedSegNo.
+ *
+ * In the case we need to know the last removed segment before the first
+ * checkpoint runs, call this function to initialize the variable by scanning
+ * the WAL directory.
+ */
+XLogSegNo
+ScanAndSetLastRemovedSegno(void)
+{
+	DIR		*xldir;
+	struct dirent *xlde;
+	XLogSegNo segno;
+
+	xldir = AllocateDir(XLOGDIR);
+	while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
+	{
+		TimeLineID tli;
+		XLogSegNo fsegno;
+
+		/* Ignore files that are not XLOG segments */
+		if (!IsXLogFileName(xlde->d_name) &&
+			!IsPartialXLogFileName(xlde->d_name))
+			continue;
+
+		XLogFromFileName(xlde->d_name, &tli, &fsegno, wal_segment_size);
+
+		/*
+		 * Get minimum segment ignoring timeline ID, the same way with
+		 * RemoveOldXlogFiles().
+		 */
+		if (segno == 0 || fsegno < segno)
+			segno = fsegno;
+	}
+
+	FreeDir(xldir);
+
+	/* Update the last removed segno, not making retrogression. */
+	SpinLockAcquire(&XLogCtl->info_lck);
+	if (segno > XLogCtl->lastRemovedSegNo)
+		XLogCtl->lastRemovedSegNo = segno;
+	else
+		segno = XLogCtl->lastRemovedSegNo;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	return segno;
+}
+
+/*
+ * Update the last removed segno pointer in shared memory, to reflect that the
+ * given XLOG file has been removed.
  */
 static void
 UpdateLastRemovedPtr(char *filename)
@@ -9441,6 +9491,201 @@ CreateRestartPoint(int flags)
 	return true;
 }
 
+/*
+ * Detect availability of the record at given targetLSN.
+ *
+ * targetLSN is restart_lsn of a slot.
+ * walsender_pid is the slot's walsender PID.
+ *
+ * Returns one of the following enum values.
+ *
+ * WALAVAIL_NORMAL means targetLSN is available because it is in the range of
+ * max_wal_size.  If max_slot_wal_keep_size is smaller than max_wal_size, this
+ * state is not returned.
+ *
+ * WALAVAIL_PRESERVED means it is still available by preserving extra segments
+ * beyond max_wal_size.
+ *
+ * WALAVAIL_BEING_REMOVED means it is being removed or already removed but the
+ * replication stream on the given slot is live yet. The state may transit to
+ * WALAVAIL_PRESERVED or WALAVAIL_NORMAL state if the walsender advances
+ * restart_lsn.
+ *
+ * WALAVAIL_REMOVED means it is definitely lost. The replication stream on the
+ * slot cannot continue.
+ *
+ * returns WALAVAIL_NULL if restart_lsn is invalid.
+ */
+WalAvailability
+GetWalAvailability(XLogRecPtr restart_lsn, pid_t walsender_pid)
+{
+	XLogRecPtr currpos;
+	XLogRecPtr slotPtr;
+	XLogSegNo currSeg;		/* segid of currpos */
+	XLogSegNo restartSeg;	/* segid of restart_lsn */
+	XLogSegNo oldestSeg;	/* actual oldest segid */
+	XLogSegNo oldestSegMaxWalSize;	/* oldest segid kept by max_wal_size */
+	XLogSegNo oldestSlotSeg;/* oldest segid kept by slot */
+	uint64	  keepSegs;
+
+	/* the case where the slot has never been activated */
+	if (XLogRecPtrIsInvalid(restart_lsn))
+		return WALAVAIL_INVALID_LSN;
+
+	currpos = GetXLogWriteRecPtr();
+
+	/* calculate oldest segment currently needed by slots */
+	XLByteToSeg(restart_lsn, restartSeg, wal_segment_size);
+	slotPtr = XLogGetReplicationSlotMinimumLSN();
+	oldestSlotSeg = GetOldestKeepSegment(currpos, slotPtr);
+
+	/* find the oldest extant segment file */
+	oldestSeg = XLogGetLastRemovedSegno() + 1;
+
+	/* initialize last removed segno if not yet */
+	if (oldestSeg == 1)
+		oldestSeg = ScanAndSetLastRemovedSegno() + 1;
+
+	/* calculate oldest segment by max_wal_size */
+	XLByteToSeg(currpos, currSeg, wal_segment_size);
+	keepSegs = ConvertToXSegs(max_wal_size_mb, wal_segment_size) + 1;
+
+	if (currSeg > keepSegs)
+		oldestSegMaxWalSize = currSeg - keepSegs;
+	else
+		oldestSegMaxWalSize = 1;
+
+	/*
+	 * If max_slot_wal_keep_size has changed after the last call, the segment
+	 * that would been kept by the current setting might have been lost by the
+	 * previous setting. No point in showing normal or keeping status values if
+	 * the restartSeg is known to be lost.
+	 */
+	if (restartSeg >= oldestSeg)
+	{
+		/*
+		 * show "normal" when restartSeg is within max_wal_size. If
+		 * max_slot_wal_keep_size is smaller than max_wal_size, there's no
+		 * point in showing the status.
+		 */
+		if ((max_slot_wal_keep_size_mb <= 0 ||
+			 max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
+			oldestSegMaxWalSize <= restartSeg)
+			return WALAVAIL_NORMAL;
+
+		/* being retained by slots */
+		if (oldestSlotSeg <= restartSeg)
+			return WALAVAIL_PRESERVED;
+	}
+	
+	/*
+	 * The segment is already lost or being lost. If the oldest segment is just
+	 * after the restartSeg, running walsender may be reading the just removed
+	 * segment. The walsender may safely move to the oldest existing segment in
+	 * that case.
+	 */
+	if (oldestSeg == restartSeg + 1 && walsender_pid != 0)
+		return WALAVAIL_BEING_REMOVED;
+
+	/* definitely lost. the walsender can no longer restart */
+	return WALAVAIL_REMOVED;
+}
+
+/*
+ * Returns minimum segment number that the next checkpoint must leave
+ * considering wal_keep_segments, replication slots and
+ * max_slot_wal_keep_size.
+ *
+ * currLSN is the current insert location.
+ * minSlotLSN is the minimum restart_lsn of all active slots.
+ */
+static XLogSegNo
+GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
+{
+	XLogSegNo	currSeg;
+	XLogSegNo	minSlotSeg;
+	uint64		keepSegs = 0;	/* # of segments actually kept */
+
+	XLByteToSeg(currLSN, currSeg, wal_segment_size);
+	XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size);
+
+	/*
+	 * Calculate how many segments are kept by slots first. The second
+	 * term of the condition is just a sanity check.
+	 */
+	if (minSlotLSN != InvalidXLogRecPtr && minSlotSeg <= currSeg)
+		keepSegs = currSeg - minSlotSeg;
+
+	/* Cap keepSegs by max_slot_wal_keep_size */
+	if (max_slot_wal_keep_size_mb >= 0)
+	{
+		uint64 limitSegs;
+
+		limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+
+		/* Reduce it if slots already reserves too many. */
+		if (limitSegs < keepSegs)
+			keepSegs = limitSegs;
+	}
+
+	/* but, keep at least wal_keep_segments segments if any */
+	if (wal_keep_segments > 0 && keepSegs < wal_keep_segments)
+		keepSegs = wal_keep_segments;
+
+	/* avoid underflow, don't go below 1 */
+	if (currSeg <= keepSegs)
+		return 1;
+
+	return currSeg - keepSegs;
+}
+
+/*
+ * Calculate remaining bytes until WAL segment for targetLSN will be removed.
+ */
+int64
+DistanceToWalRemoval(XLogRecPtr currLSN, XLogRecPtr targetLSN)
+{
+	XLogSegNo	currSeg;
+	uint64		limitSegs = 0;
+	int64 		restbytes;
+	uint64		fragbytes;
+	XLogSegNo	targetSeg;
+
+	XLByteToSeg(currLSN, currSeg, wal_segment_size);
+
+	/* Calculate how far back WAL segments are preserved */
+	if (max_slot_wal_keep_size_mb >= 0)
+		limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+
+	if (wal_keep_segments > 0 && limitSegs < wal_keep_segments)
+		limitSegs = wal_keep_segments;
+
+	XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
+
+	/* avoid underflow */
+	if (targetSeg + limitSegs < currSeg)
+		return 0;
+
+	/*
+	 * This slot still has all required segments. Calculate how
+	 * many LSN bytes the slot has until it loses targetLSN.
+	 */
+	fragbytes = wal_segment_size - (currLSN % wal_segment_size);
+	XLogSegNoOffsetToRecPtr(targetSeg + limitSegs - currSeg,
+							fragbytes, wal_segment_size,
+							restbytes);
+
+	/*
+	 * Not realistic, but make sure that it is not out of the
+	 * range of int64. No problem to do so since such large values
+	 * have no significant difference.
+	 */
+	if (restbytes > PG_INT64_MAX)
+		restbytes = PG_INT64_MAX;
+
+	return restbytes;
+}
+
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
@@ -9452,38 +9697,102 @@ CreateRestartPoint(int flags)
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
-	XLogSegNo	segno;
-	XLogRecPtr	keep;
+	static XLogSegNo last_lost_segs = 0;
+	static int last_nslots = 0;
+	static char *last_slot_name = NULL;
+	XLogRecPtr	slotminptr = InvalidXLogRecPtr;
+	XLogSegNo	minSegNo;
+	XLogSegNo	minSlotSegNo;
+	int			nslots_affected = 0;
 
-	XLByteToSeg(recptr, segno, wal_segment_size);
-	keep = XLogGetReplicationSlotMinimumLSN();
+	if (max_replication_slots > 0)
+		slotminptr = XLogGetReplicationSlotMinimumLSN();
 
-	/* compute limit for wal_keep_segments first */
-	if (wal_keep_segments > 0)
+	/*
+	 * We should keep certain number of WAL segments after this checkpoint.
+	 */
+	minSegNo = GetOldestKeepSegment(recptr, slotminptr);
+
+	/*
+	 * Warn the checkpoint is going to flush the segments required by
+	 * replication slots.
+	 */
+	if (!XLogRecPtrIsInvalid(slotminptr))
 	{
-		/* avoid underflow, don't go below 1 */
-		if (segno <= wal_keep_segments)
-			segno = 1;
-		else
-			segno = segno - wal_keep_segments;
+		Assert (max_replication_slots > 0);
+
+		XLByteToSeg(slotminptr, minSlotSegNo, wal_segment_size);
+
+		if (minSlotSegNo < minSegNo)
+		{
+			/* Some slots has lost required segments */
+			XLogSegNo	lost_segs = minSegNo - minSlotSegNo;
+			ReplicationSlot *earliest = NULL;
+			char	   *earliest_name = NULL;
+			int			i;
+
+			/* Find the most affected slot */
+			LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+			for (i = 0 ; i < max_replication_slots ; i++)
+			{
+				ReplicationSlot *s =
+					&ReplicationSlotCtl->replication_slots[i];
+				XLogSegNo slotSegNo;
+
+				XLByteToSeg(s->data.restart_lsn, slotSegNo, wal_segment_size);
+
+				if (s->in_use && s->active_pid == 0 && slotSegNo < minSegNo)
+				{
+					nslots_affected++;
+
+					if (earliest == NULL ||
+						s->data.restart_lsn < earliest->data.restart_lsn)
+						earliest = s;
+				}
+			}
+
+			if (earliest)
+			{
+				MemoryContext oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+				earliest_name = pstrdup(NameStr(earliest->data.name));
+				MemoryContextSwitchTo(oldcxt);
+			}
+
+			LWLockRelease(ReplicationSlotControlLock);
+
+			/* Emit WARNING if something has changed */
+			if (earliest_name &&
+				(last_lost_segs != lost_segs || last_nslots != nslots_affected))
+			{
+				ereport(WARNING,
+						(errmsg_plural ("%d replication slot has lost required WAL segments by %lu segments",
+										"%d replication slots have lost required WAL segments by %lu segments",
+										nslots_affected, nslots_affected,
+										lost_segs),
+						 errdetail("Most affected slot is %s.",
+								   earliest_name)));
+
+				if (last_slot_name)
+					pfree(last_slot_name);
+				last_slot_name = earliest_name;
+				last_lost_segs = lost_segs;
+				last_nslots = nslots_affected;
+			}
+		}
 	}
 
-	/* then check whether slots limit removal further */
-	if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
+	/* Reset the state if no affected slots remain. */
+	if (nslots_affected == 0 && last_slot_name)
 	{
-		XLogSegNo	slotSegNo;
-
-		XLByteToSeg(keep, slotSegNo, wal_segment_size);
-
-		if (slotSegNo <= 0)
-			segno = 1;
-		else if (slotSegNo < segno)
-			segno = slotSegNo;
+		pfree(last_slot_name);
+		last_slot_name = NULL;
+		last_lost_segs = 0;
+		last_nslots = 0;
 	}
 
 	/* don't delete WAL segments newer than the calculated segment */
-	if (segno < *logSegNo)
-		*logSegNo = segno;
+	if (minSegNo < *logSegNo)
+		*logSegNo = minSegNo;
 }
 
 /*
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 83d00c6cde..775b8b7f20 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -863,7 +863,9 @@ CREATE VIEW pg_replication_slots AS
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
-            L.confirmed_flush_lsn
+            L.confirmed_flush_lsn,
+            L.wal_status,
+            L.remain
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index d90c7235e9..a26f7999aa 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -49,6 +49,7 @@
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
+#include "utils/memutils.h"
 
 /*
  * Replication slot on-disk data structure.
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ce0c9127bc..47cd4375a1 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -234,7 +234,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -288,6 +288,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		Oid			database;
 		NameData	slot_name;
 		NameData	plugin;
+		WalAvailability walstate;
 		int			i;
 
 		if (!slot->in_use)
@@ -355,6 +356,42 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
+		walstate = GetWalAvailability(restart_lsn, active_pid);
+
+		switch (walstate)
+		{
+			case WALAVAIL_INVALID_LSN:
+				nulls[i++] = true;
+				break;
+
+			case WALAVAIL_NORMAL:
+				values[i++] = CStringGetTextDatum("normal");
+				break;
+
+			case WALAVAIL_PRESERVED:
+				values[i++] = CStringGetTextDatum("keeping");
+				break;
+
+			case WALAVAIL_BEING_REMOVED:
+				values[i++] = CStringGetTextDatum("losing");
+				break;
+
+			case WALAVAIL_REMOVED:
+				values[i++] = CStringGetTextDatum("lost");
+				break;
+		}
+
+		if (max_slot_wal_keep_size_mb >= 0 &&
+			(walstate == WALAVAIL_NORMAL ||
+			 walstate == WALAVAIL_PRESERVED))
+		{
+			values[i++] =
+				Int64GetDatum(DistanceToWalRemoval(GetXLogWriteRecPtr(),
+												   restart_lsn));
+		}
+		else
+			nulls[i++] = true;
+
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
 	LWLockRelease(ReplicationSlotControlLock);
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 79bc7ac8ca..54cd5f6420 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2771,6 +2771,19 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the maximum size of WAL space reserved by replication slots."),
+			gettext_noop("Replication slots will be marked as failed, and segments released "
+						 "for deletion or recycling, if this much space is occupied by WAL "
+						 "on disk."),
+			GUC_UNIT_MB
+		},
+		&max_slot_wal_keep_size_mb,
+		-1, -1, MAX_KILOBYTES,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"wal_sender_timeout", PGC_USERSET, REPLICATION_SENDING,
 			gettext_noop("Sets the maximum time to wait for WAL replication."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e9f8ca775d..0b696e7044 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -287,6 +287,7 @@
 #max_wal_senders = 10		# max number of walsender processes
 				# (change requires restart)
 #wal_keep_segments = 0		# in logfile segments; 0 disables
+#max_slot_wal_keep_size = -1	# measured in bytes; -1 disables
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 
 #max_replication_slots = 10	# max number of replication slots
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 9ec7b31cce..c1994eec6f 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -108,6 +108,7 @@ extern int	wal_segment_size;
 extern int	min_wal_size_mb;
 extern int	max_wal_size_mb;
 extern int	wal_keep_segments;
+extern int	max_slot_wal_keep_size_mb;
 extern int	XLOGbuffers;
 extern int	XLogArchiveTimeout;
 extern int	wal_retrieve_retry_interval;
@@ -255,6 +256,20 @@ typedef struct CheckpointStatsData
 
 extern CheckpointStatsData CheckpointStats;
 
+/*
+ * WAL segment availability status
+ *
+ * This is used as the return value of GetWalAvailability.
+ */
+typedef enum WalAvailability
+{
+	WALAVAIL_INVALID_LSN,			/* parameter error */
+	WALAVAIL_NORMAL,				/* WAL segment is within max_wal_size */
+	WALAVAIL_PRESERVED,				/* WAL segment is preserved by repslots */
+	WALAVAIL_BEING_REMOVED,			/* WAL segment is no longer preserved */
+	WALAVAIL_REMOVED				/* WAL segment has been removed */
+} WalAvailability;
+
 struct XLogRecData;
 
 extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
@@ -268,6 +283,7 @@ extern int	XLogFileOpen(XLogSegNo segno);
 
 extern void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli);
 extern XLogSegNo XLogGetLastRemovedSegno(void);
+extern XLogSegNo ScanAndSetLastRemovedSegno(void);
 extern void XLogSetAsyncXactLSN(XLogRecPtr record);
 extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn);
 
@@ -305,6 +321,9 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
+extern WalAvailability GetWalAvailability(XLogRecPtr restart_lsn,
+										  pid_t walsender_pid);
+extern int64 DistanceToWalRemoval(XLogRecPtr currLSN, XLogRecPtr targetLSN);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a6a708cca9..2025f34bfd 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9971,9 +9971,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,remain}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/test/recovery/t/018_replslot_limit.pl b/src/test/recovery/t/018_replslot_limit.pl
new file mode 100644
index 0000000000..84a1f3a9dd
--- /dev/null
+++ b/src/test/recovery/t/018_replslot_limit.pl
@@ -0,0 +1,202 @@
+# Test for replication slot limit
+# Ensure that max_slot_wal_keep_size limits the number of WAL files to
+# be kept by replication slots.
+
+use strict;
+use warnings;
+use File::Path qw(rmtree);
+use PostgresNode;
+use TestLib;
+use Test::More tests => 13;
+use Time::HiRes qw(usleep);
+
+$ENV{PGDATABASE} = 'postgres';
+
+# Initialize master node, setting wal-segsize to 1MB
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1,
+				   extra => ['--wal-segsize=1'],
+				   min_wal_size => '2MB',
+				   max_wal_size => '4MB',
+				   log_checkpoints => 'yes');
+$node_master->start;
+$node_master->safe_psql('postgres', "SELECT pg_create_physical_replication_slot('rep1')");
+
+# The slot state and remain should be null before the first connection
+my $result = $node_master->safe_psql('postgres', "SELECT restart_lsn is NULL, wal_status is NULL, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "t|t|t", 'check the state of non-reserved slot is "unknown"');
+
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a standby linking to it using the replication slot
+my $node_standby = get_new_node('standby_1');
+$node_standby->init_from_backup($node_master, $backup_name, has_streaming => 1,
+								primary_slot_name => 'rep1');
+
+$node_standby->start;
+
+# Wait until standby has replayed enough data
+my $start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+# Stop standby
+$node_standby->stop;
+
+
+# Preparation done, the slot is the state "normal" now
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|t", 'check the catching-up state');
+
+# Advance WAL by five segments (= 5MB) on master
+advance_wal($node_master, 1);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is always "safe" when fitting max_wal_size
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|t", 'check that restart_lsn is in max_wal_size');
+
+advance_wal($node_master, 4);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is always "safe" when max_slot_wal_keep_size is not set
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|t", 'check that slot is working');
+
+# The standby can reconnect to master
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+# Set max_slot_wal_keep_size on master
+my $max_slot_wal_keep_size_mb = 6;
+$node_master->append_conf('postgresql.conf', qq(
+max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB
+));
+$node_master->reload;
+
+# The slot is in safe state. The remaining bytes should be as almost
+# (max_slot_wal_keep_size + 1) times large as the segment size
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|7168 kB", 'check that max_slot_wal_keep_size is working');
+
+# Advance WAL again then checkpoint, reducing remain by 2 MB.
+advance_wal($node_master, 2);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is still working
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|5120 kB", 'check that remaining byte is calculated correctly');
+
+# wal_keep_segments overrides max_slot_wal_keep_size
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 8; SELECT pg_reload_conf();");
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|7168 kB", 'check that wal_keep_segments overrides max_slot_wal_keep_size');
+
+# restore wal_keep_segments
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 0; SELECT pg_reload_conf();");
+
+# Advance WAL again without checkpoint, reducing remain by 4 MB.
+advance_wal($node_master, 4);
+
+# Slot gets into 'keeping' state
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|keeping|1024 kB", 'check that the slot state changes to "keeping"');
+
+# do checkpoint so that the next checkpoint runs too early
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# Advance WAL again without checkpoint; remain goes to 0.
+advance_wal($node_master, 1);
+
+# Slot gets into 'lost' state
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|lost|t", 'check that the slot state changes to "lost"');
+
+# The standby still can connect to master before a checkpoint
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+ok(!find_in_log($node_standby,
+				"requested WAL segment [0-9A-F]+ has already been removed"),
+   'check that required WAL segments are still available');
+
+# Advance WAL again, the slot loses the oldest segment.
+my $logstart = get_log_size($node_master);
+advance_wal($node_master, 7);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# WARNING should be issued
+ok(find_in_log($node_master,
+			   "1 replication slot has lost required WAL segments by 1 segments\n".
+			   ".*Most affected slot is rep1.",
+			   $logstart),
+   'check that the warning is logged');
+
+# This slot should be broken
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|lost|t", 'check that the slot state changes to "lost"');
+
+# The standby no longer can connect to the master
+$logstart = get_log_size($node_standby);
+$node_standby->start;
+
+my $failed = 0;
+for (my $i = 0 ; $i < 10000 ; $i++)
+{
+	if (find_in_log($node_standby,
+					"requested WAL segment [0-9A-F]+ has already been removed",
+					$logstart))
+	{
+		$failed = 1;
+		last;
+	}
+	usleep(100_000);
+}
+ok($failed, 'check that replication has been broken');
+
+$node_standby->stop;
+
+#####################################
+# Advance WAL of $node by $n segments
+sub advance_wal
+{
+	my ($node, $n) = @_;
+
+	# Advance by $n segments (= (16 * $n) MB) on master
+	for (my $i = 0 ; $i < $n ; $i++)
+	{
+		$node->safe_psql('postgres', "CREATE TABLE t (); DROP TABLE t; SELECT pg_switch_wal();");
+	}
+}
+
+# return the size of logfile of $node in bytes
+sub get_log_size
+{
+	my ($node) = @_;
+
+	return (stat $node->logfile)[7];
+}
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+	my ($node, $pat, $off) = @_;
+
+	$off = 0 unless defined $off;
+	my $log = TestLib::slurp_file($node->logfile);
+	return 0 if (length($log) <= $off);
+
+	$log = substr($log, $off);
+
+	return $log =~ m/$pat/;
+}
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 7245b0e13b..8688f7138f 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1462,8 +1462,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
-    l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
+    l.confirmed_flush_lsn,
+    l.wal_status,
+    l.remain
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, remain)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.18.2

>From b01043efdbec8f52306c4d0dea4e71d7a93cae63 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horikyoga....@gmail.com>
Date: Tue, 31 Mar 2020 12:59:10 +0900
Subject: [PATCH v20 3/3] Allow init and init_from_backup to set arbitrary GUC
 parameters in TAP test.

It is convenient that arbitrary GUC parameters can be specified on
initializing a node or taking a base backup.  Any non-predefined
keyword parameter given to the methods is translated into a parameter
setting in the config file.
---
 src/backend/access/transam/xlogreader.c | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index f3fea5132f..90a9649f61 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -270,7 +270,9 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 	uint32		pageHeaderSize;
 	bool		gotheader;
 	int			readOff;
-
+#ifndef FRONTEND
+	XLogSegNo	targetSegNo;
+#endif
 	/*
 	 * randAccess indicates whether to verify the previous-record pointer of
 	 * the record we're reading.  We only do this if we're reading
@@ -314,6 +316,22 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
 	targetRecOff = RecPtr % XLOG_BLCKSZ;
 
+#ifndef FRONTEND
+	/*
+	 * Although It's safe that the current segment is recycled as a new
+	 * segment since we check the page/record header at reading, it leads to
+	 * an apparently strange error message when logical replication, which can
+	 * be prevented by explicitly checking if the current segment is removed.
+	 */
+	XLByteToSeg(targetPagePtr, targetSegNo, state->segcxt.ws_segsize);
+	if (targetSegNo <= XLogGetLastRemovedSegno())
+	{
+		report_invalid_record(state,
+							  "WAL segment for LSN %X/%X has been removed",
+							  (uint32)(RecPtr >> 32), (uint32) RecPtr);
+		goto err;
+	}
+#endif
 	/*
 	 * Read the page containing the record into state->readBuf. Request enough
 	 * byte to cover the whole record header, or at least the part of it that
-- 
2.18.2

Reply via email to