Oops! The previous patch is forgetting the default case and crashes.

At Wed, 08 Nov 2017 13:14:31 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI 
<horiguchi.kyot...@lab.ntt.co.jp> wrote in 
<20171108.131431.170534842.horiguchi.kyot...@lab.ntt.co.jp>
> > I don't think 'distance' is a good metric - that's going to continually
> > change. Why not store the LSN that's available and provide a function
> > that computes this? Or just rely on the lsn - lsn operator?
> 
> It seems reasonable.,The 'secured minimum LSN' is common among
> all slots so showing it in the view may look a bit stupid but I
> don't find another suitable place for it.  distance = 0 meant the
> state that the slot is living but insecured in the previous patch
> and that information is lost by changing 'distance' to
> 'min_secure_lsn'.
> 
> Thus I changed the 'live' column to 'status' and show that staus
> in text representation.
> 
> status: secured | insecured | broken
> 
> So this looks like the following (max_slot_wal_keep_size = 8MB,
> which is a half of the default segment size)
> 
> -- slots that required WAL is surely available
> select restart_lsn, status, min_secure_lsn, pg_current_wal_lsn() from 
> pg_replication_slots;
> restart_lsn | status  | min_recure_lsn | pg_current_wal_lsn 
> ------------+---------+----------------+--------------------
> 0/1A000060  | secured | 0/1A000000     | 0/1B42BC78
> 
> -- slots that required WAL is still available but insecured
> restart_lsn | status    | min_recure_lsn | pg_current_wal_lsn 
> ------------+-----------+----------------+--------------------
> 0/1A000060  | insecured | 0/1C000000     | 0/1D76C948
> 
> -- slots that required WAL is lost
> # We should have seen the log 'Some replication slots have lost...'
> 
> restart_lsn | status | min_recure_lsn | pg_current_wal_lsn 
> ------------+--------+----------------+--------------------
> 0/1A000060  | broken | 0/1C000000     | 0/1D76C9F0
> 
> 
> I noticed that I abandoned the segment fragment of
> max_slot_wal_keep_size in calculating in the routines. The
> current patch honors the frament part of max_slot_wal_keep_size.

I changed IsLsnStillAvailable to return meaningful values
regardless whether max_slot_wal_keep_size is set or not.

# I had been forgetting to count the version for latestst several
# patches. I give the version '4' - as the next of the last
# numbered patch.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From 109f056e257aba70dddc8d466767ed0a1db371e2 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Tue, 28 Feb 2017 11:39:48 +0900
Subject: [PATCH 1/2] Add WAL releaf vent for replication slots

Adds a capability to limit the number of segments kept by replication
slots by a GUC variable.
---
 src/backend/access/transam/xlog.c             | 39 +++++++++++++++++++++++++++
 src/backend/utils/misc/guc.c                  | 11 ++++++++
 src/backend/utils/misc/postgresql.conf.sample |  1 +
 src/include/access/xlog.h                     |  1 +
 4 files changed, 52 insertions(+)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index dd028a1..cfdae39 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -105,6 +105,7 @@ int			wal_level = WAL_LEVEL_MINIMAL;
 int			CommitDelay = 0;	/* precommit delay in microseconds */
 int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int			wal_retrieve_retry_interval = 5000;
+int			max_slot_wal_keep_size_mb = 0;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -9432,9 +9433,47 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 	if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
 	{
 		XLogSegNo	slotSegNo;
+		int			slotlimitsegs;
+		uint64		recptroff;
+		uint64		slotlimitbytes;
+		uint64		slotlimitfragment;
+
+		recptroff = XLogSegmentOffset(recptr, wal_segment_size);
+		slotlimitbytes = 1024 * 1024 * max_slot_wal_keep_size_mb;
+		slotlimitfragment =	XLogSegmentOffset(slotlimitbytes,
+											  wal_segment_size);
+
+		/* calculate segments to keep by max_slot_wal_keep_size_mb */
+		slotlimitsegs = ConvertToXSegs(max_slot_wal_keep_size_mb,
+									   wal_segment_size);
+		/* honor the fragment */
+		if (recptroff < slotlimitfragment)
+			slotlimitsegs++;
 
 		XLByteToSeg(keep, slotSegNo, wal_segment_size);
 
+		/*
+		 * ignore slots if too many wal segments are kept.
+		 * max_slot_wal_keep_size is just accumulated on wal_keep_segments.
+		 */
+		if (max_slot_wal_keep_size_mb > 0 && slotSegNo + slotlimitsegs < segno)
+		{
+			segno = segno - slotlimitsegs; /* must be positive */
+
+			/*
+			 * warn only if the checkpoint flushes the required segment.
+			 * we assume here that *logSegNo is calculated keep location.
+			 */
+			if (slotSegNo < *logSegNo)
+				ereport(WARNING,
+					(errmsg ("restart LSN of replication slots is ignored by checkpoint"),
+					 errdetail("Some replication slots have lost required WAL segnents to continue by up to %ld segments.",
+					   (segno < *logSegNo ? segno : *logSegNo) - slotSegNo)));
+
+			/* emergency vent */
+			slotSegNo = segno;
+		}
+
 		if (slotSegNo <= 0)
 			segno = 1;
 		else if (slotSegNo < segno)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 65372d7..511023a 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2368,6 +2368,17 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
+		{"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the maximum size of extra WALs kept by replication slots."),
+		 NULL,
+		 GUC_UNIT_MB
+		},
+		&max_slot_wal_keep_size_mb,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,
 			gettext_noop("Sets the maximum time to wait for WAL replication."),
 			NULL,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 368b280..e76c73a 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -234,6 +234,7 @@
 #max_wal_senders = 10		# max number of walsender processes
 				# (change requires restart)
 #wal_keep_segments = 0		# in logfile segments; 0 disables
+#max_slot_wal_keep_size = 0	# measured in bytes; 0 disables
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 
 #max_replication_slots = 10	# max number of replication slots
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 0f2b8bd..f0c0255 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -98,6 +98,7 @@ extern int	wal_segment_size;
 extern int	min_wal_size_mb;
 extern int	max_wal_size_mb;
 extern int	wal_keep_segments;
+extern int	max_slot_wal_keep_size_mb;
 extern int	XLOGbuffers;
 extern int	XLogArchiveTimeout;
 extern int	wal_retrieve_retry_interval;
-- 
2.9.2

>From 67f73c35b0c1c97bd2fff80139bfd3b7142f6bee Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Thu, 7 Sep 2017 19:13:22 +0900
Subject: [PATCH 2/2] Add monitoring aid for max_replication_slots.

Adds two columns "live" and "distance" in pg_replication_slot.
Setting max_slot_wal_keep_size, long-disconnected slots may lose sync.
The two columns shows how long a slot can live on or how many bytes a
slot have lost if max_slot_wal_keep_size is set.
---
 src/backend/access/transam/xlog.c    | 128 ++++++++++++++++++++++++++++++++++-
 src/backend/catalog/system_views.sql |   4 +-
 src/backend/replication/slotfuncs.c  |  25 ++++++-
 src/include/access/xlog.h            |   1 +
 src/include/catalog/pg_proc.h        |   2 +-
 src/test/regress/expected/rules.out  |   6 +-
 6 files changed, 160 insertions(+), 6 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index cfdae39..be53e0f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9402,6 +9402,128 @@ CreateRestartPoint(int flags)
 	return true;
 }
 
+
+/*
+ * Returns the segment number of the oldest file in XLOG directory.
+ */
+static XLogSegNo
+GetOldestXLogFileSegNo(void)
+{
+	DIR		*xldir;
+	struct dirent *xlde;
+	XLogSegNo segno = 0;
+
+	xldir = AllocateDir(XLOGDIR);
+	if (xldir == NULL)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open write-ahead log directory \"%s\": %m",
+						XLOGDIR)));
+
+	while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
+	{
+		TimeLineID tli;
+		XLogSegNo fsegno;
+
+		/* Ignore files that are not XLOG segments */
+		if (!IsXLogFileName(xlde->d_name) &&
+			!IsPartialXLogFileName(xlde->d_name))
+			continue;
+
+		XLogFromFileName(xlde->d_name, &tli, &fsegno, wal_segment_size);
+
+		/* get minimum segment ignorig timeline ID */
+		if (segno == 0 || fsegno < segno)
+			segno = fsegno;
+	}
+
+	return segno;
+}
+
+/*
+ * Check if the record on the given restartLSN is present in XLOG files.
+ *
+ * Returns true if it is present. If minSecureLSN is given, it receives the
+ * LSN at the beginning of the oldest existing WAL segment.
+ */
+bool
+IsLsnStillAvaiable(XLogRecPtr restartLSN, XLogRecPtr *minSecureLSN)
+{
+	XLogRecPtr currpos;
+	XLogSegNo currSeg;
+	XLogSegNo restartSeg;
+	XLogSegNo tailSeg;
+	XLogSegNo oldestSeg;
+	uint64 keepSegs;
+
+	currpos = GetXLogWriteRecPtr();
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+	oldestSeg = XLogCtl->lastRemovedSegNo;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	/*
+	 * oldestSeg is zero before at least one segment has been removed since
+	 * startup. Use oldest segno taken from file names.
+	 */
+	if (oldestSeg == 0)
+	{
+		static XLogSegNo oldestFileSeg = 0;
+
+		if (oldestFileSeg == 0)
+			oldestFileSeg = GetOldestXLogFileSegNo();
+		/* let it have the same meaning with lastRemovedSegNo here */
+		oldestSeg = oldestFileSeg - 1;
+	}
+
+	/* oldest segment is just after the last removed segment */
+	oldestSeg++;
+
+	XLByteToSeg(currpos, currSeg, wal_segment_size);
+	XLByteToSeg(restartLSN, restartSeg, wal_segment_size);
+
+
+	if (minSecureLSN)
+	{
+		if (max_slot_wal_keep_size_mb > 0)
+		{
+			uint64 slotlimitbytes = 1024 * 1024 * max_slot_wal_keep_size_mb;
+			uint64 slotlimitfragment = XLogSegmentOffset(slotlimitbytes,
+														 wal_segment_size);
+			uint64 currposoff = XLogSegmentOffset(currpos, wal_segment_size);
+
+			/* Calculate keep segments. Must be in sync with KeepLogSeg. */
+			Assert(wal_keep_segments >= 0);
+			Assert(max_slot_wal_keep_size_mb >= 0);
+
+			keepSegs = wal_keep_segments +
+				ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+			if (currposoff < slotlimitfragment)
+				keepSegs++;
+
+			/*
+			 * calculate the oldest segment that will be kept by
+			 * wal_keep_segments and max_slot_wal_keep_size_mb
+			 */
+			if (currSeg < keepSegs)
+				tailSeg = 0;
+			else
+				tailSeg = currSeg - keepSegs;
+
+		}
+		else
+		{
+			/* all requred segments are secured in this case */
+			XLogRecPtr keep = XLogGetReplicationSlotMinimumLSN();
+			XLByteToSeg(keep, tailSeg, wal_segment_size);
+		}
+
+		XLogSegNoOffsetToRecPtr(tailSeg, 0, *minSecureLSN, wal_segment_size);
+	}
+
+	return	oldestSeg <= restartSeg;
+}
+
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
@@ -9429,7 +9551,11 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 			segno = segno - wal_keep_segments;
 	}
 
-	/* then check whether slots limit removal further */
+	/*
+	 * then check whether slots limit removal further
+	 * should be consistent with IsLsnStillAvaiable().
+	 */
+
 	if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
 	{
 		XLogSegNo	slotSegNo;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index dc40cde..6512ac3 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -793,7 +793,9 @@ CREATE VIEW pg_replication_slots AS
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
-            L.confirmed_flush_lsn
+            L.confirmed_flush_lsn,
+			L.status,
+			L.min_secure_lsn
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ab776e8..200a478 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -182,7 +182,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -304,6 +304,29 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
+		if (restart_lsn == InvalidXLogRecPtr)
+		{
+			values[i++] = CStringGetTextDatum("unknown");
+			values[i++] = LSNGetDatum(InvalidXLogRecPtr);
+		}
+		else
+		{
+			XLogRecPtr	min_secure_lsn;
+			char *status = "borken";
+
+			if (BoolGetDatum(IsLsnStillAvaiable(restart_lsn,
+												&min_secure_lsn)))
+			{
+				if (min_secure_lsn <= restart_lsn)
+					status = "secured";
+				else
+					status = "insecured";
+			}
+
+			values[i++] = CStringGetTextDatum(status);
+			values[i++] = LSNGetDatum(min_secure_lsn);
+		}
+
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
 	LWLockRelease(ReplicationSlotControlLock);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index f0c0255..a316ead 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -269,6 +269,7 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
+extern bool IsLsnStillAvaiable(XLogRecPtr restartLSN, XLogRecPtr *minSecureLSN);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 93c031a..d03fd6f 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5340,7 +5340,7 @@ DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0
 DESCR("create a physical replication slot");
 DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
 DESCR("drop a replication slot");
-DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
+DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220,25,3220}" "{o,o,o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,status,min_secure_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
 DESCR("information about replication slots currently in use");
 DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
 DESCR("set up a logical replication slot");
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index f1c1b44..d9d74a3 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1451,8 +1451,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
-    l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
+    l.confirmed_flush_lsn,
+    l.status,
+    l.min_secure_lsn
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, status, min_secure_lsn)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.9.2

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to