At Thu, 30 Nov 2017 12:44:16 +0900, Michael Paquier <michael.paqu...@gmail.com> 
wrote in <CAB7nPqS4bhSsDm_47GVjQno=iu6thx13mqvwwxxkbhqwfww...@mail.gmail.com>
> On Thu, Nov 9, 2017 at 5:31 PM, Kyotaro HORIGUCHI
> <horiguchi.kyot...@lab.ntt.co.jp> wrote:
> > # I had been forgetting to count the version for latestst several
> > # patches. I give the version '4' - as the next of the last
> > # numbered patch.
> 
> With all the changes that have happened in the documentation lately, I
> suspect that this is going to need a rework.. Moved to next CF per
> lack of reviews, with waiting on author as status.

I refactored this patch so that almost-same don't appear
twice. And added recovery TAP test for this.

New function GetMinSecuredSegment() calculates the segment number
considering wal_keep_segments and
max_slot_wal_keep_size. KeepLogSeg and IsLsnStillAvailable no
longer have the code block that should be in "sync".
I think the new code is far understandable than the previous one.

The new third patch contains a TAP test to check
max_slot_wal_keep_size and relevant stats view are surely
working.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From b80345f1d600ecb427fec8e0a03bb4ed0f1ec7ba Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:20:20 +0900
Subject: [PATCH 1/3] Add WAL releaf vent for replication slots

Adds a capability to limit the number of segments kept by replication
slots by a GUC variable.
---
 src/backend/access/transam/xlog.c             | 114 +++++++++++++++++++++-----
 src/backend/utils/misc/guc.c                  |  11 +++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 +
 4 files changed, 107 insertions(+), 20 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 3e9a12d..723a983 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -105,6 +105,7 @@ int			wal_level = WAL_LEVEL_MINIMAL;
 int			CommitDelay = 0;	/* precommit delay in microseconds */
 int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int			wal_retrieve_retry_interval = 5000;
+int			max_slot_wal_keep_size_mb = 0;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -861,6 +862,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+static XLogSegNo GetMinSecuredSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -9348,6 +9350,74 @@ CreateRestartPoint(int flags)
 }
 
 /*
+ * Returns minimum segment number the next checktpoint must leave considering
+ * wal_keep_segments, replication slots and max_slot_wal_keep_size.
+ */
+static XLogSegNo
+GetMinSecuredSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr)
+{
+	uint64 keepSegs;
+	XLogSegNo currSeg;
+	XLogSegNo tailSeg;
+	uint64 slotlimitbytes;
+	uint64 slotlimitfragment;
+	uint64 currposoff;
+	XLogRecPtr slotpos = minSlotPtr;
+	XLogSegNo	slotSeg;
+
+	Assert(wal_keep_segments >= 0);
+	Assert(max_slot_wal_keep_size_mb >= 0);
+
+	XLByteToSeg(currpos, currSeg, wal_segment_size);
+	XLByteToSeg(slotpos, slotSeg, wal_segment_size);
+
+	/*
+	 * wal_keep_segments keeps more segments than slot, slotpos is no longer
+	 * useful. Don't perform subtraction to keep values positive.
+	 */
+	if (slotpos != InvalidXLogRecPtr && currSeg <= slotSeg + wal_keep_segments)
+		slotpos = InvalidXLogRecPtr;
+
+	/* slots aren't useful, consider only wal_keep_segments */
+	if (slotpos == InvalidXLogRecPtr)
+	{
+		/* avoid underflow, don't go below 1 */
+		if (currSeg <= wal_keep_segments)
+			return 1;
+
+		return currSeg - wal_keep_segments;
+	}
+
+	/* just return slotSeg if we don't put a limit */
+	if (max_slot_wal_keep_size_mb == 0)
+		return slotSeg;
+
+	/*
+	 * Slot limit is defined and slot gives the oldest segment to keep,
+	 * calculate the oldest segment that should not be removed
+	 */
+	slotlimitbytes = 1024 * 1024 * max_slot_wal_keep_size_mb;
+	slotlimitfragment = XLogSegmentOffset(slotlimitbytes,
+												 wal_segment_size);
+	currposoff = XLogSegmentOffset(currpos, wal_segment_size);
+	keepSegs = wal_keep_segments +
+		ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+	if (currposoff < slotlimitfragment)
+		keepSegs++;
+
+	/*
+	 * calculate the oldest segment that is kept by wal_keep_segments and
+	 * max_slot_wal_keep_size.
+	 */
+	if (currSeg <= keepSegs)
+		tailSeg = 1;
+	else
+		tailSeg = currSeg - keepSegs;
+
+	return tailSeg;
+}
+
+/*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
  *
@@ -9359,34 +9429,38 @@ static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
 	XLogSegNo	segno;
-	XLogRecPtr	keep;
+	XLogRecPtr	slotminptr = InvalidXLogRecPtr;
+	XLogSegNo	minSegNo;
+	XLogSegNo	slotSegNo;
 
 	XLByteToSeg(recptr, segno, wal_segment_size);
-	keep = XLogGetReplicationSlotMinimumLSN();
 
-	/* compute limit for wal_keep_segments first */
-	if (wal_keep_segments > 0)
-	{
-		/* avoid underflow, don't go below 1 */
-		if (segno <= wal_keep_segments)
-			segno = 1;
-		else
-			segno = segno - wal_keep_segments;
-	}
+	if (max_replication_slots > 0)
+		slotminptr = XLogGetReplicationSlotMinimumLSN();
 
-	/* then check whether slots limit removal further */
-	if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
-	{
-		XLogSegNo	slotSegNo;
+	/*
+	 * We should keep certain number of WAL segments after this checktpoint.
+	 */
+	minSegNo = GetMinSecuredSegment(recptr, slotminptr);
 
-		XLByteToSeg(keep, slotSegNo, wal_segment_size);
+	/*
+	 * warn if the checkpoint flushes the segments required by replication
+	 * slots.
+	 */
+	if (!XLogRecPtrIsInvalid(slotminptr))
+	{
+		XLByteToSeg(slotminptr, slotSegNo, wal_segment_size);
 
-		if (slotSegNo <= 0)
-			segno = 1;
-		else if (slotSegNo < segno)
-			segno = slotSegNo;
+		if (slotSegNo < minSegNo)
+			ereport(WARNING,
+					(errmsg ("some replication slots have lost required WAL segments"),
+					 errdetail("The most affected slot has lost %ld segments.",
+						   minSegNo - slotSegNo)));
 	}
 
+	if (minSegNo < segno)
+		segno = minSegNo;
+
 	/* don't delete WAL segments newer than the calculated segment */
 	if (segno < *logSegNo)
 		*logSegNo = segno;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e32901d..97d83f3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2395,6 +2395,17 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the maximum size of extra WALs kept by replication slots."),
+		 NULL,
+		 GUC_UNIT_MB
+		},
+		&max_slot_wal_keep_size_mb,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,
 			gettext_noop("Sets the maximum time to wait for WAL replication."),
 			NULL,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 69f40f0..c7335b6 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -234,6 +234,7 @@
 #max_wal_senders = 10		# max number of walsender processes
 				# (change requires restart)
 #wal_keep_segments = 0		# in logfile segments; 0 disables
+#max_slot_wal_keep_size = 0	# measured in bytes; 0 disables
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 
 #max_replication_slots = 10	# max number of replication slots
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index dd7d8b5..45eb51a 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -98,6 +98,7 @@ extern int	wal_segment_size;
 extern int	min_wal_size_mb;
 extern int	max_wal_size_mb;
 extern int	wal_keep_segments;
+extern int	max_slot_wal_keep_size_mb;
 extern int	XLOGbuffers;
 extern int	XLogArchiveTimeout;
 extern int	wal_retrieve_retry_interval;
-- 
2.9.2

>From c972f22a2697f54cda71b6b4e7b7f0eac477e9af Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:23:25 +0900
Subject: [PATCH 2/3] Add monitoring aid for max_replication_slots.

Adds two columns "status" and "min_secure_lsn" in pg_replication_slot.
Setting max_slot_wal_keep_size, long-disconnected slots may lose sync.
The two columns shows that a slot can be reconnected or not, or about
to lose required WAL segments. And the LSN back to where the next
checkpoint will secure.
---
 src/backend/access/transam/xlog.c    | 93 ++++++++++++++++++++++++++++++++++++
 src/backend/catalog/system_views.sql |  4 +-
 src/backend/replication/slotfuncs.c  | 25 +++++++++-
 src/include/access/xlog.h            |  1 +
 src/include/catalog/pg_proc.h        |  2 +-
 src/test/regress/expected/rules.out  |  6 ++-
 6 files changed, 126 insertions(+), 5 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 723a983..b630224 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9349,6 +9349,99 @@ CreateRestartPoint(int flags)
 	return true;
 }
 
+
+/*
+ * Returns the segment number of the oldest file in XLOG directory.
+ */
+static XLogSegNo
+GetOldestXLogFileSegNo(void)
+{
+	DIR		*xldir;
+	struct dirent *xlde;
+	XLogSegNo segno = 0;
+
+	xldir = AllocateDir(XLOGDIR);
+	if (xldir == NULL)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open write-ahead log directory \"%s\": %m",
+						XLOGDIR)));
+
+	while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
+	{
+		TimeLineID tli;
+		XLogSegNo fsegno;
+
+		/* Ignore files that are not XLOG segments */
+		if (!IsXLogFileName(xlde->d_name) &&
+			!IsPartialXLogFileName(xlde->d_name))
+			continue;
+
+		XLogFromFileName(xlde->d_name, &tli, &fsegno, wal_segment_size);
+
+		/* get minimum segment ignorig timeline ID */
+		if (segno == 0 || fsegno < segno)
+			segno = fsegno;
+	}
+
+	return segno;
+}
+
+/*
+ * Check if the record on the given restartLSN is present in XLOG files.
+ *
+ * Returns true if it is present. If minSecureLSN is given, it receives the
+ * LSN at the beginning of the oldest existing WAL segment.
+ */
+bool
+IsLsnStillAvaiable(XLogRecPtr restartLSN, XLogRecPtr *minSecureLSN)
+{
+	XLogRecPtr currpos;
+	XLogSegNo restartSeg;
+	XLogSegNo tailSeg;
+	XLogSegNo oldestSeg;
+
+	Assert(!XLogRecPtrIsInvalid(restartLSN));
+
+	currpos = GetXLogWriteRecPtr();
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+	oldestSeg = XLogCtl->lastRemovedSegNo;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	/*
+	 * oldestSeg is zero before at least one segment has been removed since
+	 * startup. Use oldest segno taken from file names.
+	 */
+	if (oldestSeg == 0)
+	{
+		static XLogSegNo oldestFileSeg = 0;
+
+		if (oldestFileSeg == 0)
+			oldestFileSeg = GetOldestXLogFileSegNo();
+		/* let it have the same meaning with lastRemovedSegNo here */
+		oldestSeg = oldestFileSeg - 1;
+	}
+
+	/* oldest segment is just after the last removed segment */
+	oldestSeg++;
+
+	XLByteToSeg(restartLSN, restartSeg, wal_segment_size);
+
+
+	if (minSecureLSN)
+	{
+		XLogRecPtr slotPtr = XLogGetReplicationSlotMinimumLSN();
+		Assert(!XLogRecPtrIsInvalid(slotPtr));
+
+		tailSeg = GetMinSecuredSegment(currpos, slotPtr);
+
+		XLogSegNoOffsetToRecPtr(tailSeg, 0, *minSecureLSN, wal_segment_size);
+	}
+
+	return	oldestSeg <= restartSeg;
+}
+
 /*
  * Returns minimum segment number the next checktpoint must leave considering
  * wal_keep_segments, replication slots and max_slot_wal_keep_size.
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 394aea8..4167146 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -793,7 +793,9 @@ CREATE VIEW pg_replication_slots AS
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
-            L.confirmed_flush_lsn
+            L.confirmed_flush_lsn,
+			L.status,
+			L.min_secure_lsn
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ab776e8..3880807 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -182,7 +182,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -304,6 +304,29 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
+		if (restart_lsn == InvalidXLogRecPtr)
+		{
+			values[i++] = CStringGetTextDatum("unknown");
+			values[i++] = LSNGetDatum(InvalidXLogRecPtr);
+		}
+		else
+		{
+			XLogRecPtr	min_secure_lsn;
+			char *status = "broken";
+
+			if (BoolGetDatum(IsLsnStillAvaiable(restart_lsn,
+												&min_secure_lsn)))
+			{
+				if (min_secure_lsn <= restart_lsn)
+					status = "secured";
+				else
+					status = "insecured";
+			}
+
+			values[i++] = CStringGetTextDatum(status);
+			values[i++] = LSNGetDatum(min_secure_lsn);
+		}
+
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
 	LWLockRelease(ReplicationSlotControlLock);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 45eb51a..542df28 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -269,6 +269,7 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
+extern bool IsLsnStillAvaiable(XLogRecPtr restartLSN, XLogRecPtr *minSecureLSN);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index c969375..1157438 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5340,7 +5340,7 @@ DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
-DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220,25,3220}" "{o,o,o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,status,min_secure_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
 DESCR("information about replication slots currently in use");
 DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
 DESCR("set up a logical replication slot");
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index f1c1b44..d9d74a3 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1451,8 +1451,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
-    l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
+    l.confirmed_flush_lsn,
+    l.status,
+    l.min_secure_lsn
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, status, min_secure_lsn)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.9.2

>From badd89a8c167cc7887349564c6f8fb3007d158f1 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 17:33:53 +0900
Subject: [PATCH 3/3] TAP test for the slot limit feature

---
 src/test/recovery/t/014_replslot_limit.pl | 162 ++++++++++++++++++++++++++++++
 1 file changed, 162 insertions(+)
 create mode 100644 src/test/recovery/t/014_replslot_limit.pl

diff --git a/src/test/recovery/t/014_replslot_limit.pl b/src/test/recovery/t/014_replslot_limit.pl
new file mode 100644
index 0000000..41b828d
--- /dev/null
+++ b/src/test/recovery/t/014_replslot_limit.pl
@@ -0,0 +1,162 @@
+# Test for replication slot limit
+# Ensure that max_slot_wal_keep_size limits the number of WAL files to
+# be kept by replication slot.
+
+use strict;
+use warnings;
+use File::Path qw(rmtree);
+use PostgresNode;
+use TestLib;
+use Test::More tests => 7;
+use Time::HiRes qw(usleep);
+
+$ENV{PGDATABASE} = 'postgres';
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->append_conf('postgresql.conf', qq(
+min_wal_size = 32MB
+max_wal_size = 48MB
+));
+$node_master->start;
+$node_master->safe_psql('postgres', "SELECT pg_create_physical_replication_slot('rep1')");
+
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a standby linking to it using a replication slot
+my $node_standby = get_new_node('standby_1');
+$node_standby->init_from_backup($node_master, $backup_name, has_streaming => 1, primary_slot_name => 'rep1');
+$node_standby->append_conf('recovery.conf', qq(
+primary_slot_name = 'rep1'
+));
+$node_standby->start;
+
+# Wait until standby has replayed enough data on the standby
+my $start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+# Stop standby
+$node_standby->stop;
+
+
+# Preparation done, currently the slot must be secured.
+my $result = $node_master->safe_psql('postgres', "SELECT restart_lsn, status, min_secure_lsn FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|secured|$start_lsn", 'check initial state of standby');
+
+# Advance WAL by ten segments (= 160MB) on master
+advance_wal($node_master, 10);
+
+# All segments still must be secured after a checkpoint.
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, status, min_secure_lsn FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|secured|$start_lsn", 'check slot is securing all segments');
+
+# The stanby can connect master
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+
+# Advance WAL again
+advance_wal($node_master, 10);
+
+# Set max_slot_wal_keep_size on master
+my $max_slot_wal_keep_size_mb = 32;
+$node_master->append_conf('postgresql.conf', qq(
+max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB
+));
+$node_master->reload;
+
+# Some segments become 'insecured'
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, status FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|insecured", 'check some segments became insecured');
+
+# The stanby still can connect master
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+ok(!find_in_log($node_standby,
+				"requested WAL segment [0-9A-F]+ has already been removed"),
+   'check no replication failure is caused by insecure state');
+
+# Advance WAL again
+advance_wal($node_master, 10);
+
+my $logstart = get_log_size($node_master);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# WARNING should be issued
+ok(find_in_log($node_master,
+			   "some replication slots have lost required WAL segments",
+			   $logstart),
+   'check the warning is correctly logged');
+
+# This slot should be broken
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, status FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|broken", 'check overflown segments have been removed');
+
+# The stanby no longer can connect to the master
+$logstart = get_log_size($node_standby);
+$node_standby->start;
+
+my $failed = 0;
+for (my $i = 0 ; $i < 10000 ; $i++)
+{
+	if (find_in_log($node_standby,
+					"requested WAL segment [0-9A-F]+ has already been removed",
+					$logstart))
+	{
+		$failed = 1;
+		last;
+	}
+	usleep(100_000);
+}
+ok($failed, 'check replication has been broken');
+
+$node_standby->stop;
+
+#####################################
+# Advance WAL of $node by $n segments
+sub advance_wal
+{
+	my ($node, $n) = @_;
+
+	# Advance by ten segments (= 160MB) on master
+	for (my $i = 0 ; $i < $n ; $i++)
+	{
+		$node->safe_psql('postgres', "CREATE TABLE t (a int); DROP TABLE t; SELECT pg_switch_wal();");
+	}
+}
+
+# return the size of logfile of $node in bytes
+sub get_log_size
+{
+	my ($node) = @_;
+
+	return (stat $node->logfile)[7];
+}
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+	my ($node, $pat, $off) = @_;
+
+	$off = 0 unless defined $off;
+	my $log = TestLib::slurp_file($node->logfile);
+	return 0 if (length($log) <= $off);
+
+	$log = substr($log, $off);
+
+	return $log =~ m/$pat/;
+}
-- 
2.9.2

Reply via email to