At Thu, 07 Sep 2017 21:59:56 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI 
<horiguchi.kyot...@lab.ntt.co.jp> wrote in 
<20170907.215956.110216588.horiguchi.kyot...@lab.ntt.co.jp>
> Hello,
> 
> At Thu, 07 Sep 2017 14:12:12 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI 
> <horiguchi.kyot...@lab.ntt.co.jp> wrote in 
> <20170907.141212.227032666.horiguchi.kyot...@lab.ntt.co.jp>
> > > I would like a flag in pg_replication_slots, and possibly also a
> > > numerical column that indicates how far away from the critical point
> > > each slot is.  That would be great for a monitoring system.
> > 
> > Great! I'll do that right now.
> 
> Done.

The CF status of this patch turned into "Waiting on Author".
This is because the second patch is posted separately from the
first patch. I repost them together after rebasing to the current
master.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 105,110 **** int			wal_level = WAL_LEVEL_MINIMAL;
--- 105,111 ----
  int			CommitDelay = 0;	/* precommit delay in microseconds */
  int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
  int			wal_retrieve_retry_interval = 5000;
+ int			max_slot_wal_keep_size_mb = 0;
  
  #ifdef WAL_DEBUG
  bool		XLOG_DEBUG = false;
***************
*** 9365,9373 **** KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
--- 9366,9397 ----
  	if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
  	{
  		XLogSegNo	slotSegNo;
+ 		int			slotlimitsegs = ConvertToXSegs(max_slot_wal_keep_size_mb);
  
  		XLByteToSeg(keep, slotSegNo);
  
+ 		/*
+ 		 * ignore slots if too many wal segments are kept.
+ 		 * max_slot_wal_keep_size is just accumulated on wal_keep_segments.
+ 		 */
+ 		if (max_slot_wal_keep_size_mb > 0 && slotSegNo + slotlimitsegs < segno)
+ 		{
+ 			segno = segno - slotlimitsegs; /* must be positive */
+ 
+ 			/*
+ 			 * warn only if the checkpoint flushes the required segment.
+ 			 * we assume here that *logSegNo is calculated keep location.
+ 			 */
+ 			if (slotSegNo < *logSegNo)
+ 				ereport(WARNING,
+ 					(errmsg ("restart LSN of replication slots is ignored by checkpoint"),
+ 					 errdetail("Some replication slots have lost required WAL segnents to continue by up to %ld segments.",
+ 					   (segno < *logSegNo ? segno : *logSegNo) - slotSegNo)));
+ 
+ 			/* emergency vent */
+ 			slotSegNo = segno;
+ 		}
+ 
  		if (slotSegNo <= 0)
  			segno = 1;
  		else if (slotSegNo < segno)
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 2371,2376 **** static struct config_int ConfigureNamesInt[] =
--- 2371,2387 ----
  	},
  
  	{
+ 		{"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+ 			gettext_noop("Sets the maximum size of extra WALs kept by replication slots."),
+ 		 NULL,
+ 		 GUC_UNIT_MB
+ 		},
+ 		&max_slot_wal_keep_size_mb,
+ 		0, 0, INT_MAX,
+ 		NULL, NULL, NULL
+ 	},
+ 
+ 	{
  		{"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,
  			gettext_noop("Sets the maximum time to wait for WAL replication."),
  			NULL,
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 235,240 ****
--- 235,241 ----
  #max_wal_senders = 10		# max number of walsender processes
  				# (change requires restart)
  #wal_keep_segments = 0		# in logfile segments, 16MB each; 0 disables
+ #max_slot_wal_keep_size = 0	# measured in bytes; 0 disables
  #wal_sender_timeout = 60s	# in milliseconds; 0 disables
  
  #max_replication_slots = 10	# max number of replication slots
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 97,102 **** extern bool reachedConsistency;
--- 97,103 ----
  extern int	min_wal_size_mb;
  extern int	max_wal_size_mb;
  extern int	wal_keep_segments;
+ extern int	max_slot_wal_keep_size_mb;
  extern int	XLOGbuffers;
  extern int	XLogArchiveTimeout;
  extern int	wal_retrieve_retry_interval;
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 9336,9341 **** CreateRestartPoint(int flags)
--- 9336,9420 ----
  }
  
  /*
+  * Check if the record on the given lsn will be preserved at the next
+  * checkpoint.
+  *
+  * Returns true if it will be preserved. If distance is given, the distance
+  * from origin to the beginning of the first segment kept at the next
+  * checkpoint. It means margin when this function returns true and gap of lost
+  * records when false.
+  *
+  * This function should return the consistent result with KeepLogSeg.
+  */
+ bool
+ GetMarginToSlotSegmentLimit(XLogRecPtr restartLSN, uint64 *distance)
+ {
+ 	XLogRecPtr currpos;
+ 	XLogRecPtr tailpos;
+ 	uint64 currSeg;
+ 	uint64 restByteInSeg;
+ 	uint64 restartSeg;
+ 	uint64 tailSeg;
+ 	uint64 keepSegs;
+ 
+ 	currpos = GetXLogWriteRecPtr();
+ 
+ 	LWLockAcquire(ControlFileLock, LW_SHARED);
+ 	tailpos = ControlFile->checkPointCopy.redo;
+ 	LWLockRelease(ControlFileLock);
+ 
+ 	/* Move the pointer to the beginning of the segment*/
+ 	XLByteToSeg(currpos, currSeg);
+ 	XLByteToSeg(restartLSN, restartSeg);
+ 	XLByteToSeg(tailpos, tailSeg);
+ 	restByteInSeg = 0;
+ 
+ 	Assert(wal_keep_segments >= 0);
+ 	Assert(max_slot_wal_keep_size_mb >= 0);
+ 
+ 	/*
+ 	 * WAL are removed by the unit of segment.
+ 	 */
+ 	keepSegs = wal_keep_segments + ConvertToXSegs(max_slot_wal_keep_size_mb);
+ 
+ 	/*
+ 	 * If the latest checkpoint's redo point is older than the current head
+ 	 * minus keep segments, the next checkpoint keeps the redo point's
+ 	 * segment. Elsewise use current head minus number of segments to keep.
+ 	 */
+ 	if (currSeg < tailSeg + keepSegs)
+ 	{
+ 		if (currSeg < keepSegs)
+ 			tailSeg = 0;
+ 		else
+ 			tailSeg = currSeg - keepSegs;
+ 
+ 		/* In this case, the margin will be the bytes to the next segment */
+ 		restByteInSeg = XLogSegSize - (currpos % XLogSegSize);
+ 	}
+ 
+ 	/* Required sements will be removed at the next checkpoint */
+ 	if (restartSeg < tailSeg)
+ 	{
+ 		/* Calculate how may bytes the slot have lost */
+ 		if (distance)
+ 		{
+ 			uint64 restbytes = (restartSeg + 1) * XLogSegSize - restartLSN;
+ 			*distance =
+ 				(tailSeg - restartSeg - 1) * XLogSegSize
+ 				+ restbytes;
+ 		}
+ 		return false;
+ 	}
+ 
+ 	/* Margin at the next checkpoint before the slot lose sync  */
+ 	if (distance)
+ 		*distance = (restartSeg - tailSeg) * XLogSegSize + restByteInSeg;
+ 
+ 	return true;
+ }
+ 
+ /*
   * Retreat *logSegNo to the last segment that we need to retain because of
   * either wal_keep_segments or replication slots.
   *
*** a/src/backend/catalog/system_views.sql
--- b/src/backend/catalog/system_views.sql
***************
*** 793,799 **** CREATE VIEW pg_replication_slots AS
              L.xmin,
              L.catalog_xmin,
              L.restart_lsn,
!             L.confirmed_flush_lsn
      FROM pg_get_replication_slots() AS L
              LEFT JOIN pg_database D ON (L.datoid = D.oid);
  
--- 793,801 ----
              L.xmin,
              L.catalog_xmin,
              L.restart_lsn,
!             L.confirmed_flush_lsn,
! 			L.live,
! 			L.distance
      FROM pg_get_replication_slots() AS L
              LEFT JOIN pg_database D ON (L.datoid = D.oid);
  
*** a/src/backend/replication/slotfuncs.c
--- b/src/backend/replication/slotfuncs.c
***************
*** 182,188 **** pg_drop_replication_slot(PG_FUNCTION_ARGS)
  Datum
  pg_get_replication_slots(PG_FUNCTION_ARGS)
  {
! #define PG_GET_REPLICATION_SLOTS_COLS 11
  	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
  	TupleDesc	tupdesc;
  	Tuplestorestate *tupstore;
--- 182,188 ----
  Datum
  pg_get_replication_slots(PG_FUNCTION_ARGS)
  {
! #define PG_GET_REPLICATION_SLOTS_COLS 13
  	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
  	TupleDesc	tupdesc;
  	Tuplestorestate *tupstore;
***************
*** 304,309 **** pg_get_replication_slots(PG_FUNCTION_ARGS)
--- 304,323 ----
  		else
  			nulls[i++] = true;
  
+ 		if (max_slot_wal_keep_size_mb > 0 && restart_lsn != InvalidXLogRecPtr)
+ 		{
+ 			uint64 distance;
+ 
+ 			values[i++] = BoolGetDatum(GetMarginToSlotSegmentLimit(restart_lsn,
+ 																   &distance));
+ 			values[i++] = Int64GetDatum(distance);
+ 		}
+ 		else
+ 		{
+ 			values[i++] = BoolGetDatum(true);
+ 			nulls[i++] = true;
+ 		}
+ 
  		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
  	}
  	LWLockRelease(ReplicationSlotControlLock);
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 267,272 **** extern void ShutdownXLOG(int code, Datum arg);
--- 267,273 ----
  extern void InitXLOGAccess(void);
  extern void CreateCheckPoint(int flags);
  extern bool CreateRestartPoint(int flags);
+ extern bool GetMarginToSlotSegmentLimit(XLogRecPtr restartLSN, uint64 *distance);
  extern void XLogPutNextOid(Oid nextOid);
  extern XLogRecPtr XLogRestorePoint(const char *rpName);
  extern void UpdateFullPageWrites(void);
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
***************
*** 5347,5353 **** DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0
  DESCR("create a physical replication slot");
  DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
  DESCR("drop a replication slot");
! DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220}" "{o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
  DESCR("information about replication slots currently in use");
  DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
  DESCR("set up a logical replication slot");
--- 5347,5353 ----
  DESCR("create a physical replication slot");
  DATA(insert OID = 3780 (  pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 1 0 2278 "19" _null_ _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
  DESCR("drop a replication slot");
! DATA(insert OID = 3781 (  pg_get_replication_slots	PGNSP PGUID 12 1 10 0 0 f f f f f t s s 0 0 2249 "" "{19,19,25,26,16,16,23,28,28,3220,3220,16,3220}" "{o,o,o,o,o,o,o,o,o,o,o,o,o}" "{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,live,distance}" _null_ _null_ pg_get_replication_slots _null_ _null_ _null_ ));
  DESCR("information about replication slots currently in use");
  DATA(insert OID = 3786 (  pg_create_logical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 19 16" "{19,19,16,25,3220}" "{i,i,i,o,o}" "{slot_name,plugin,temporary,slot_name,lsn}" _null_ _null_ pg_create_logical_replication_slot _null_ _null_ _null_ ));
  DESCR("set up a logical replication slot");
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to