So, the more I look at this patch, the less I like the way the slots are
handled.

* I think it's a mistake to try to do anything in KeepLogSeg itself;
  that function is merely in charge of some arithmetic.  I propose to
  make that function aware of the new size limitation (so that it
  doesn't trust the slot's LSNs completely), but to make the function
  have no side effects.  The attached patch does that, I hope.
  To replace that responsibility, let's add another function.  I named it
  InvalidateObsoleteReplicationSlots().  In CreateCheckPoint and
  CreateRestartPoint, we call the new function just before removing
  segments.  Note: the one in this patch doesn't actually work or even
  compile.
  The new function must:

  1. mark the slot as "invalid" somehow.  Maybe it would make sense to
  add a new flag in the on-disk struct for this; but for now I'm just
  thinking that changing the slot's restart_lsn is sufficient.
  (Of course, I haven't tested this, so there might be side-effects that
  mean that this idea doesn't work).

  2. send SIGTERM to a walsender that's using such a slot.

  3. Send the warning message.  Instead of trying to construct a message
  with a list of slots, send one message per slot.  (I think this is
  better from a translatability point of view, and also from a
  monitoring PoV).

* GetWalAvailability seems too much in competition with
  DistanceToWalRemoval.  Which is weird, because both functions do
  pretty much the same thing.  I think a better design is to make the
  former function return the distance as an out parameter.

* Andres complained that the "distance" column was not a great value to
  expose (20171106132050.6apzynxrqrzgh...@alap3.anarazel.de).  That's
  right: it changes both by the insertion LSN as well as the slot's
  consumption.  Maybe we can expose the earliest live LSN (start of the
  earliest segment?) as a new column.  It'll be the same for all slots,
  I suppose, but we don't care, do we?

I attach a rough sketch, which as I said before doesn't work and doesn't
compile.  Sadly I have reached the end of my day here so I won't be able
to work on this for today anymore.  I'll be glad to try again tomorrow,
but in the meantime I thought it was better to send it over and see
whether you had any thoughts about this proposed design (maybe you know
it doesn't work for some reason), or better yet, you have the chance to
actually complete the code or at least move it a little further.

Thanks

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 64614b569c..01a7802ed4 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9907,6 +9907,54 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </entry>
      </row>
 
+     <row>
+      <entry><structfield>wal_status</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+
+      <entry>Availability of WAL files claimed by this slot.
+      Valid values are:
+       <simplelist>
+        <member>
+         <literal>normal</literal> means that the claimed files
+         are within <varname>max_wal_size</varname>
+        </member>
+        <member>
+         <literal>keeping</literal> means that <varname>max_wal_size</varname>
+         is exceeded but still held by replication slots or
+         <varname>wal_keep_segments</varname>
+        </member>
+        <member>
+         <literal>losing</literal> means that some of the files are on the verge
+         of deletion, but can still be accessed by a session that's currently
+         reading it
+        </member>
+        <member>
+         <literal>lost</literal> means that some of them are definitely lost
+         and the session using this slot cannot continue replication.
+         This state also implies that the session using this slot has been
+         stopped.
+        </member>
+       </simplelist>
+      The last two states are seen only when
+      <xref linkend="guc-max-slot-wal-keep-size"/> is
+      non-negative. If <structfield>restart_lsn</structfield> is NULL, this
+      field is null.
+      </entry>
+     </row>
+
+     <row>
+      <entry><structfield>remain</structfield></entry>
+      <entry><type>bigint</type></entry>
+      <entry></entry>
+      <entry>The amount in bytes of WAL that can be written before this slot
+        loses required WAL files.
+        If <structfield>restart_lsn</structfield> is null or
+        <structfield>wal_status</structfield> is <literal>losing</literal>
+        or <literal>lost</literal>, this field is null.
+      </entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index c4d6ed4bbc..17c18386e2 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3753,6 +3753,29 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+      <varlistentry id="guc-max-slot-wal-keep-size" xreflabel="max_slot_wal_keep_size">
+       <term><varname>max_slot_wal_keep_size</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_slot_wal_keep_size</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+       <para>
+        Specify the maximum size of WAL files
+        that <link linkend="streaming-replication-slots">replication
+        slots</link> are allowed to retain in the <filename>pg_wal</filename>
+        directory at checkpoint time.
+        If <varname>max_slot_wal_keep_size</varname> is -1 (the default),
+        replication slots retain unlimited amount of WAL files.  If
+        restart_lsn of a replication slot gets behind more than that megabytes
+        from the current LSN, the standby using the slot may no longer be able
+        to continue replication due to removal of required WAL files. You
+        can see the WAL availability of replication slots
+        in <link linkend="view-pg-replication-slots">pg_replication_slots</link>.
+       </para>
+       </listitem>
+      </varlistentry>
+
      <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
       <term><varname>wal_sender_timeout</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index b5d32bb720..624e5f94ad 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -925,9 +925,11 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
     <xref linkend="guc-archive-command"/>.
     However, these methods often result in retaining more WAL segments than
     required, whereas replication slots retain only the number of segments
-    known to be needed.  An advantage of these methods is that they bound
-    the space requirement for <literal>pg_wal</literal>; there is currently no way
-    to do this using replication slots.
+    known to be needed.  On the other hand, replication slots can retain so
+    many WAL segments that they fill up the space allocated
+    for <literal>pg_wal</literal>;
+    <xref linkend="guc-max-slot-wal-keep-size"/> limits the size of WAL files
+    retained by replication slots.
    </para>
    <para>
     Similarly, <xref linkend="guc-hot-standby-feedback"/>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 977d448f50..bb735c6f68 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -106,6 +106,7 @@ int			wal_level = WAL_LEVEL_MINIMAL;
 int			CommitDelay = 0;	/* precommit delay in microseconds */
 int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int			wal_retrieve_retry_interval = 5000;
+int			max_slot_wal_keep_size_mb = -1;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -759,7 +760,7 @@ static ControlFileData *ControlFile = NULL;
  */
 #define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
 
-/* Convert min_wal_size_mb and max_wal_size_mb to equivalent segment count */
+/* Convert values of GUCs measured in megabytes to equiv. segment count */
 #define ConvertToXSegs(x, segsize)	\
 	(x / ((segsize) / (1024 * 1024)))
 
@@ -896,6 +897,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -3929,9 +3931,10 @@ XLogGetLastRemovedSegno(void)
 	return lastRemovedSegNo;
 }
 
+
 /*
- * Update the last removed segno pointer in shared memory, to reflect
- * that the given XLOG file has been removed.
+ * Update the last removed segno pointer in shared memory, to reflect that the
+ * given XLOG file has been removed.
  */
 static void
 UpdateLastRemovedPtr(char *filename)
@@ -9050,6 +9053,7 @@ CreateCheckPoint(int flags)
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 	KeepLogSeg(recptr, &_logSegNo);
 	_logSegNo--;
+	InvalidateObsoleteReplicationSlots(_logSegNo);
 	RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
 
 	/*
@@ -9401,6 +9405,7 @@ CreateRestartPoint(int flags)
 	if (RecoveryInProgress())
 		ThisTimeLineID = replayTLI;
 
+	InvalidateObsoleteReplicationSlots(_logSegNo);
 	RemoveOldXlogFiles(_logSegNo, RedoRecPtr, endptr);
 
 	/*
@@ -9451,13 +9456,165 @@ CreateRestartPoint(int flags)
 	return true;
 }
 
+/*
+ * Report availability of WAL for a replication slot
+ *		restart_lsn and active_pid are straight from the slot info
+ *
+ * *distance is set to the distance in bytes of ... XXX what?
+ *
+ * Returns one of the following enum values.
+ *
+ * WALAVAIL_NORMAL means targetLSN is available because it is in the range of
+ * max_wal_size.  If max_slot_wal_keep_size is smaller than max_wal_size, this
+ * state is not returned.
+ *
+ * WALAVAIL_PRESERVED means it is still available by preserving extra segments
+ * beyond max_wal_size.
+ *
+ * WALAVAIL_BEING_REMOVED means it is being removed or already removed but the
+ * replication stream on the given slot is live yet. The state may transit to
+ * WALAVAIL_PRESERVED or WALAVAIL_NORMAL state if the walsender advances
+ * restart_lsn.
+ *
+ * WALAVAIL_REMOVED means it is definitely lost. The replication stream on the
+ * slot cannot continue.
+ *
+ * WALAVAIL_INVALID_LSN means the slot hasn't been set to reserve WAL.
+ */
+WalAvailability
+GetWalAvailability(XLogRecPtr restart_lsn, pid_t walsender_pid, uint64 *distance)
+{
+	XLogRecPtr currpos;
+	XLogRecPtr slotPtr;
+	XLogSegNo currSeg;		/* segid of currpos */
+	XLogSegNo restartSeg;	/* segid of restart_lsn */
+	XLogSegNo oldestSeg;	/* actual oldest segid */
+	XLogSegNo oldestSegMaxWalSize;	/* oldest segid kept by max_wal_size */
+	XLogSegNo oldestSlotSeg;/* oldest segid kept by slot */
+	uint64	  keepSegs;
+
+	*distance = 0;	/* XXX figure out how to compute this */
+
+	/* slot does not reserve WAL. Either deactivated, or has never been active */
+	if (XLogRecPtrIsInvalid(restart_lsn))
+		return WALAVAIL_INVALID_LSN;
+
+	currpos = GetXLogWriteRecPtr();
+
+	/* calculate oldest segment currently needed by slots */
+	XLByteToSeg(restart_lsn, restartSeg, wal_segment_size);
+	slotPtr = XLogGetReplicationSlotMinimumLSN();
+	oldestSlotSeg = GetOldestKeepSegment(currpos, slotPtr);
+
+	/*
+	 * Find the oldest extant segment file. We get 1 until checkpoint removes
+	 * the first WAL segment file since startup, which causes the status being
+	 * wrong under certain abnormal conditions but that doesn't actually harm.
+	 */
+	oldestSeg = XLogGetLastRemovedSegno() + 1;
+
+	/* calculate oldest segment by max_wal_size */
+	XLByteToSeg(currpos, currSeg, wal_segment_size);
+	keepSegs = ConvertToXSegs(max_wal_size_mb, wal_segment_size) + 1;
+
+	if (currSeg > keepSegs)
+		oldestSegMaxWalSize = currSeg - keepSegs;
+	else
+		oldestSegMaxWalSize = 1;
+
+	/*
+	 * If max_slot_wal_keep_size has changed after the last call, the segment
+	 * that would been kept by the current setting might have been lost by the
+	 * previous setting. No point in showing normal or keeping status values if
+	 * the restartSeg is known to be lost.
+	 */
+	if (restartSeg >= oldestSeg)
+	{
+		/*
+		 * show "normal" when restartSeg is within max_wal_size. If
+		 * max_slot_wal_keep_size is smaller than max_wal_size, there's no
+		 * point in showing the status.
+		 */
+		if ((max_slot_wal_keep_size_mb <= 0 ||
+			 max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
+			oldestSegMaxWalSize <= restartSeg)
+			return WALAVAIL_NORMAL;
+
+		/* being retained by slots */
+		if (oldestSlotSeg <= restartSeg)
+			return WALAVAIL_PRESERVED;
+	}
+
+	/*
+	 * The segment is already lost or being lost. If the oldest segment is just
+	 * after the restartSeg, running walsender may be reading the just removed
+	 * segment. The walsender may safely move to the oldest existing segment in
+	 * that case.
+	 */
+	if (oldestSeg == restartSeg + 1 && walsender_pid != 0)
+		return WALAVAIL_BEING_REMOVED;
+
+	/* definitely lost. the walsender can no longer restart */
+	return WALAVAIL_REMOVED;
+}
+
+/*
+ * Returns minimum segment number that the next checkpoint must leave
+ * considering wal_keep_segments, replication slots and
+ * max_slot_wal_keep_size.
+ *
+ * currLSN is the current insert location.
+ * minSlotLSN is the minimum restart_lsn of all active slots.
+ */
+static XLogSegNo
+GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
+{
+	XLogSegNo	currSeg;
+	XLogSegNo	minSlotSeg;
+	uint64		keepSegs = 0;	/* # of segments actually kept */
+
+	XLByteToSeg(currLSN, currSeg, wal_segment_size);
+	XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size);
+
+	/*
+	 * Calculate how many segments are kept by slots first. The second
+	 * term of the condition is just a sanity check.
+	 */
+	if (minSlotLSN != InvalidXLogRecPtr && minSlotSeg <= currSeg)
+		keepSegs = currSeg - minSlotSeg;
+
+	/* Cap keepSegs by max_slot_wal_keep_size */
+	if (max_slot_wal_keep_size_mb >= 0)
+	{
+		uint64 limit;
+
+		limit = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+
+		/* Reduce it if slots already reserves too many. */
+		if (limit < keepSegs)
+			keepSegs = limit;
+	}
+
+	/* but, keep at least wal_keep_segments segments if any */
+	if (wal_keep_segments > 0 && keepSegs < wal_keep_segments)
+		keepSegs = wal_keep_segments;
+
+	/* avoid underflow, don't go below 1 */
+	if (currSeg <= keepSegs)
+		return 1;
+
+	return currSeg - keepSegs;
+}
+
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
  *
  * This is calculated by subtracting wal_keep_segments from the given xlog
  * location, recptr and by making sure that that result is below the
- * requirement of replication slots.
+ * requirement of replication slots.  For the latter criterion we do consider
+ * the effects of max_slot_wal_keep_size: reserve at most that much space back
+ * from recptr.
  */
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
@@ -9467,6 +9624,11 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 
 	XLByteToSeg(recptr, segno, wal_segment_size);
 	keep = XLogGetReplicationSlotMinimumLSN();
+	if (max_slot_wal_keep_size_mb > 0 &&
+		recptr - keep > max_slot_wal_keep_size_mb)
+	{
+		keep = recptr - max_slot_wal_keep_size;
+	}
 
 	/* compute limit for wal_keep_segments first */
 	if (wal_keep_segments > 0)
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 813ea8bfc3..e85f40c4c1 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -876,7 +876,9 @@ CREATE VIEW pg_replication_slots AS
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
-            L.confirmed_flush_lsn
+            L.confirmed_flush_lsn,
+            L.wal_status,
+            L.remain
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index d90c7235e9..d32ed7e62f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1064,6 +1064,53 @@ ReplicationSlotReserveWal(void)
 	}
 }
 
+/*
+ * Mark any slot that points to an LSN older than the given segment
+ * as invalid; it requires WAL that's about to be removed.
+ *
+ * NB - this runs as part of checkpoint, so avoid raising errors if possible.
+ */
+void
+InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+{
+	XLogRecPtr	oldestLSN;
+	List	   *pids = NIL;
+
+	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (int i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (!s->in_use)
+			continue;
+
+		if (s->data.restart_lsn < oldestLSN)
+		{
+			/* mark this slot as invalid */
+			SpinLockAcquire(&s->mutex);
+			s->restart_lsn = InvalidXLogRecPtr;
+
+			/* remember PID for killing, if active*/
+			if (s->active_pid != 0)
+				pids = lappend_int(pids, s->active_pid);
+			SpinLockRelease(&s->mutex);
+		}
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	/*
+	 * Signal any active walsenders to terminate.  We do not wait to observe
+	 * them gone.
+	 */
+	foreach(cell, pids)
+	{
+		/* signal the walsender to terminate */
+		(void) kill(lfirst_int(cell), SIGTERM);
+	}
+}
+
 /*
  * Flush all replication slots to disk.
  *
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ce0c9127bc..68746ad3c8 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -234,7 +234,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -288,6 +288,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		Oid			database;
 		NameData	slot_name;
 		NameData	plugin;
+		WalAvailability walstate;
+		uint64		distance;
 		int			i;
 
 		if (!slot->in_use)
@@ -355,6 +357,39 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
+		walstate = GetWalAvailability(restart_lsn, active_pid, &distance);
+
+		switch (walstate)
+		{
+			case WALAVAIL_INVALID_LSN:
+				nulls[i++] = true;
+				break;
+
+			case WALAVAIL_NORMAL:
+				values[i++] = CStringGetTextDatum("normal");
+				break;
+
+			case WALAVAIL_PRESERVED:
+				values[i++] = CStringGetTextDatum("keeping");
+				break;
+
+			case WALAVAIL_BEING_REMOVED:
+				values[i++] = CStringGetTextDatum("losing");
+				break;
+
+			case WALAVAIL_REMOVED:
+				values[i++] = CStringGetTextDatum("lost");
+				break;
+		}
+
+		if (max_slot_wal_keep_size_mb >= 0 &&
+			(walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_PRESERVED))
+		{
+			values[i++] = Int64GetDatum(distance);
+		}
+		else
+			nulls[i++] = true;
+
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
 	LWLockRelease(ReplicationSlotControlLock);
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 64dc9fbd13..1cfb999748 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2763,6 +2763,19 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the maximum WAL size that can be reserved by replication slots."),
+			gettext_noop("Replication slots will be marked as failed, and segments released "
+						 "for deletion or recycling, if this much space is occupied by WAL "
+						 "on disk."),
+			GUC_UNIT_MB
+		},
+		&max_slot_wal_keep_size_mb,
+		-1, -1, MAX_KILOBYTES,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"wal_sender_timeout", PGC_USERSET, REPLICATION_SENDING,
 			gettext_noop("Sets the maximum time to wait for WAL replication."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e904fa7300..507a72b712 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -288,6 +288,7 @@
 #max_wal_senders = 10		# max number of walsender processes
 				# (change requires restart)
 #wal_keep_segments = 0		# in logfile segments; 0 disables
+#max_slot_wal_keep_size = -1	# measured in bytes; -1 disables
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 
 #max_replication_slots = 10	# max number of replication slots
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 9ec7b31cce..d0b2509081 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -108,6 +108,7 @@ extern int	wal_segment_size;
 extern int	min_wal_size_mb;
 extern int	max_wal_size_mb;
 extern int	wal_keep_segments;
+extern int	max_slot_wal_keep_size_mb;
 extern int	XLOGbuffers;
 extern int	XLogArchiveTimeout;
 extern int	wal_retrieve_retry_interval;
@@ -255,6 +256,20 @@ typedef struct CheckpointStatsData
 
 extern CheckpointStatsData CheckpointStats;
 
+/*
+ * WAL segment availability status
+ *
+ * This is used as the return value of GetWalAvailability.
+ */
+typedef enum WalAvailability
+{
+	WALAVAIL_INVALID_LSN,			/* parameter error */
+	WALAVAIL_NORMAL,				/* WAL segment is within max_wal_size */
+	WALAVAIL_PRESERVED,				/* WAL segment is preserved by repslots */
+	WALAVAIL_BEING_REMOVED,			/* WAL segment is no longer preserved */
+	WALAVAIL_REMOVED				/* WAL segment has been removed */
+} WalAvailability;
+
 struct XLogRecData;
 
 extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
@@ -305,6 +320,9 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
+extern WalAvailability GetWalAvailability(XLogRecPtr restart_lsn,
+										  pid_t walsender_pid,
+										  uint64 *distance);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a649e44d08..dd968adf74 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9986,9 +9986,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,remain}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/test/recovery/t/018_replslot_limit.pl b/src/test/recovery/t/018_replslot_limit.pl
new file mode 100644
index 0000000000..4ff2ef3b48
--- /dev/null
+++ b/src/test/recovery/t/018_replslot_limit.pl
@@ -0,0 +1,203 @@
+# Test for replication slot limit
+# Ensure that max_slot_wal_keep_size limits the number of WAL files to
+# be kept by replication slots.
+
+use strict;
+use warnings;
+use File::Path qw(rmtree);
+use PostgresNode;
+use TestLib;
+use Test::More tests => 13;
+use Time::HiRes qw(usleep);
+
+$ENV{PGDATABASE} = 'postgres';
+
+# Initialize master node, setting wal-segsize to 1MB
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, extra => ['--wal-segsize=1']);
+$node_master->append_conf('postgresql.conf', qq(
+min_wal_size = 2MB
+max_wal_size = 4MB
+log_checkpoints = yes
+));
+$node_master->start;
+$node_master->safe_psql('postgres', "SELECT pg_create_physical_replication_slot('rep1')");
+
+# The slot state and remain should be null before the first connection
+my $result = $node_master->safe_psql('postgres', "SELECT restart_lsn is NULL, wal_status is NULL, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "t|t|t", 'check the state of non-reserved slot is "unknown"');
+
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a standby linking to it using the replication slot
+my $node_standby = get_new_node('standby_1');
+$node_standby->init_from_backup($node_master, $backup_name, has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', "primary_slot_name = 'rep1'");
+
+$node_standby->start;
+
+# Wait until standby has replayed enough data
+my $start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+# Stop standby
+$node_standby->stop;
+
+
+# Preparation done, the slot is the state "normal" now
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|t", 'check the catching-up state');
+
+# Advance WAL by five segments (= 5MB) on master
+advance_wal($node_master, 1);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is always "safe" when fitting max_wal_size
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|t", 'check that restart_lsn is in max_wal_size');
+
+advance_wal($node_master, 4);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is always "safe" when max_slot_wal_keep_size is not set
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|t", 'check that slot is working');
+
+# The standby can reconnect to master
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+# Set max_slot_wal_keep_size on master
+my $max_slot_wal_keep_size_mb = 6;
+$node_master->append_conf('postgresql.conf', qq(
+max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB
+));
+$node_master->reload;
+
+# The slot is in safe state. The remaining bytes should be as almost
+# (max_slot_wal_keep_size + 1) times large as the segment size
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|7168 kB", 'check that max_slot_wal_keep_size is working');
+
+# Advance WAL again then checkpoint, reducing remain by 2 MB.
+advance_wal($node_master, 2);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is still working
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|5120 kB", 'check that remaining byte is calculated correctly');
+
+# wal_keep_segments overrides max_slot_wal_keep_size
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 8; SELECT pg_reload_conf();");
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|7168 kB", 'check that wal_keep_segments overrides max_slot_wal_keep_size');
+
+# restore wal_keep_segments
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 0; SELECT pg_reload_conf();");
+
+# Advance WAL again without checkpoint, reducing remain by 4 MB.
+advance_wal($node_master, 4);
+
+# Slot gets into 'keeping' state
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|keeping|1024 kB", 'check that the slot state changes to "keeping"');
+
+# do checkpoint so that the next checkpoint runs too early
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# Advance WAL again without checkpoint; remain goes to 0.
+advance_wal($node_master, 1);
+
+# Slot gets into 'lost' state
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|lost|t", 'check that the slot state changes to "lost"');
+
+# The standby still can connect to master before a checkpoint
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+ok(!find_in_log($node_standby,
+				"requested WAL segment [0-9A-F]+ has already been removed"),
+   'check that required WAL segments are still available');
+
+# Advance WAL again, the slot loses the oldest segment.
+my $logstart = get_log_size($node_master);
+advance_wal($node_master, 7);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# WARNING should be issued
+ok(find_in_log($node_master,
+			   "1 replication slot has lost required WAL segments by 1 segments\n".
+			   ".*Slot rep1 lost required segments.",
+			   $logstart),
+   'check that the warning is logged');
+
+# This slot should be broken
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|lost|t", 'check that the slot state changes to "lost"');
+
+# The standby no longer can connect to the master
+$logstart = get_log_size($node_standby);
+$node_standby->start;
+
+my $failed = 0;
+for (my $i = 0 ; $i < 10000 ; $i++)
+{
+	if (find_in_log($node_standby,
+					"requested WAL segment [0-9A-F]+ has already been removed",
+					$logstart))
+	{
+		$failed = 1;
+		last;
+	}
+	usleep(100_000);
+}
+ok($failed, 'check that replication has been broken');
+
+$node_standby->stop;
+
+#####################################
+# Advance WAL of $node by $n segments
+sub advance_wal
+{
+	my ($node, $n) = @_;
+
+	# Advance by $n segments (= (16 * $n) MB) on master
+	for (my $i = 0 ; $i < $n ; $i++)
+	{
+		$node->safe_psql('postgres', "CREATE TABLE t (); DROP TABLE t; SELECT pg_switch_wal();");
+	}
+}
+
+# return the size of logfile of $node in bytes
+sub get_log_size
+{
+	my ($node) = @_;
+
+	return (stat $node->logfile)[7];
+}
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+	my ($node, $pat, $off) = @_;
+
+	$off = 0 unless defined $off;
+	my $log = TestLib::slurp_file($node->logfile);
+	return 0 if (length($log) <= $off);
+
+	$log = substr($log, $off);
+
+	return $log =~ m/$pat/;
+}
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 6eec8ec568..3a0c619c1c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1462,8 +1462,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
-    l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
+    l.confirmed_flush_lsn,
+    l.wal_status,
+    l.remain
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, remain)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,

Reply via email to