On Monday, September 28, 2015 07:22:26 PM Tom Lane wrote:
> Matt Newell <newe...@blur.com> writes:
> > 1. When a connection issues it's first LISTEN command, in
> > Exec_ListenPreCommit QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
> > this causes the connection to iterate through every notify queued in the
> > slru, even though at that point I believe the connection can safely
> > ignore any notifications from transactions that are already committed,
> > and if I understand correctly notifications aren't put into the slru
> > until precommit, so wouldn't it be safe to do:
> > QUEUE_BACKEND_POS(MyBackendId) = QUEUE_HEAD;
> > inside Async_Listen?
> 
> Per the comment in Exec_ListenPreCommit, the goal here is to see entries
> that have been made and not yet committed.  If you don't do it like this,
> then a new listener has the possibility of receiving notifications from
> one transaction, while not seeing notifications from another one that
> committed later, even though longer-established listeners saw outputs from
> both transactions.  We felt that that sort of application-visible
> inconsistency would be a bad thing.
> 
Right,  QUEUE_HEAD can't be used, however...

> > If that's not safe, then could a new member be added to
> > AsyncQueueControl that points to the first uncommitted QueuePosition
> > (wouldn't have to be kept completely up to date).
> 
> Err ... isn't that QUEUE_TAIL already?  I've not studied this code in
> detail recently, though.
> 
QUEUE_TAIL is the oldest notification.  My idea is to keep a somewhat up-to-
date pointer to the oldest uncommitted notification, which can be used as a 
starting point when a connection issues it's first listen.  Just as the 
current code will advance a backend's pointer past any committed notifications 
when it calls asyncQueueReadAllNotifications in Exec_ListenPreCommit with no
registered listeners.

I came up with a proof of concept and it appears to work as expected. With 
500,000 notifications queued a listen is consistently under .5ms, while my 
9.4.4 install is taking 70ms, and the speedup should be much greater on a
busy server where there is more lock contention.

Attached is the patch and the ugly test script.


> > 2. Would it be possible when a backend is signaled to check if it is idle
> > inside an aborted transaction, and if so process the notifications and put
> > any that match what the backend is listening on into a local list.
> 
> Possibly.  sinval catchup notification works like that, so you could maybe
> invent a similar mechanism for notify.  Beware that we've had to fix
> performance issues with sinval catchup; you don't want to release a whole
> bunch of processes to do work at the same time.

I'll have to think about this more but i don't envision it causing such as 
scenario.

The extra processing that will happen at signaling time would have been done 
anyway when the aborted transaction is finally rolled back, the only extra 
work is copying the relevant notifications to local memory. 

Regardless it might not be worthwhile since my patch for #1 seems to address 
the symptom that bit me.

Matt Newell
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 3b71174..e89ece5 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -196,12 +196,24 @@ typedef struct QueuePosition
 #define QUEUE_POS_EQUAL(x,y) \
 	 ((x).page == (y).page && (x).offset == (y).offset)
 
+/* Returns 1 if x is larger than y, 0 if equal, else -1 */
+#define QUEUE_POS_COMPARE(x,y) \
+	(((x).page > (y).page) ? 1 : \
+	  ((x).page < (y).page) ? -1 : \
+	    ((x).offset > (y).offset ? 1 : ((x).offset == (y).offset ? 0 : -1)))
+
 /* choose logically smaller QueuePosition */
 #define QUEUE_POS_MIN(x,y) \
 	(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
 	 (x).page != (y).page ? (y) : \
 	 (x).offset < (y).offset ? (x) : (y))
 
+/* choose logically smaller QueuePosition */
+#define QUEUE_POS_MAX(x,y) \
+	(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
+	 (x).page != (y).page ? (x) : \
+	 (x).offset < (y).offset ? (y) : (x))
+
 /*
  * Struct describing a listening backend's status
  */
@@ -217,12 +229,13 @@ typedef struct QueueBackendStatus
  * The AsyncQueueControl structure is protected by the AsyncQueueLock.
  *
  * When holding the lock in SHARED mode, backends may only inspect their own
- * entries as well as the head and tail pointers. Consequently we can allow a
- * backend to update its own record while holding only SHARED lock (since no
- * other backend will inspect it).
+ * entries as well as the head, tail, and firstUncommitted pointers. 
+ * Consequently we can allow a backend to update its own record while holding
+ * only SHARED lock (since no other backend will inspect it).
  *
  * When holding the lock in EXCLUSIVE mode, backends can inspect the entries
- * of other backends and also change the head and tail pointers.
+ * of other backends and also change the head, tail and firstUncommitted
+ * pointers.
  *
  * In order to avoid deadlocks, whenever we need both locks, we always first
  * get AsyncQueueLock and then AsyncCtlLock.
@@ -230,12 +243,23 @@ typedef struct QueueBackendStatus
  * Each backend uses the backend[] array entry with index equal to its
  * BackendId (which can range from 1 to MaxBackends).  We rely on this to make
  * SendProcSignal fast.
+ *
+ * In case a long running transaction is causing the size of the queue to grow
+ * we keep track of the first uncommitted transaction to prevent a
+ * connection from having to process a lot of old notifications when it
+ * issues it's first listen.  To facilitate this we allow one process
+ * at a time to advance firstUncommitted by using advancingFirstUncommitted
+ * as a mutex.
  */
 typedef struct AsyncQueueControl
 {
 	QueuePosition head;			/* head points to the next free location */
 	QueuePosition tail;			/* the global tail is equivalent to the tail
 								 * of the "slowest" backend */
+
+	QueuePosition firstUncommitted;
+	int32 advancingFirstUncommitted; /* Backend ID */
+
 	TimestampTz lastQueueFillWarn;		/* time of last queue-full msg */
 	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
 	/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
@@ -245,6 +269,8 @@ static AsyncQueueControl *asyncQueueControl;
 
 #define QUEUE_HEAD					(asyncQueueControl->head)
 #define QUEUE_TAIL					(asyncQueueControl->tail)
+#define QUEUE_FIRST_UNCOMMITTED		(asyncQueueControl->firstUncommitted)
+#define QUEUE_ADVANCING_FIRST_UNCOMMITTED	(asyncQueueControl->advancingFirstUncommitted)
 #define QUEUE_BACKEND_PID(i)		(asyncQueueControl->backend[i].pid)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
 
@@ -376,6 +402,7 @@ static void asyncQueueFillWarning(void);
 static bool SignalBackends(void);
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
+							 volatile QueuePosition *firstUncommitted,
 							 QueuePosition stop,
 							 char *page_buffer);
 static void asyncQueueAdvanceTail(void);
@@ -455,6 +482,9 @@ AsyncShmemInit(void)
 
 		SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
 		SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
+		SET_QUEUE_POS(QUEUE_FIRST_UNCOMMITTED, 0, 0);
+		QUEUE_ADVANCING_FIRST_UNCOMMITTED = 0;
+
 		asyncQueueControl->lastQueueFillWarn = 0;
 		/* zero'th entry won't be used, but let's initialize it anyway */
 		for (i = 0; i <= MaxBackends; i++)
@@ -935,10 +965,12 @@ Exec_ListenPreCommit(void)
 	 * doesn't hurt.
 	 */
 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
-	QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
+	QUEUE_BACKEND_POS(MyBackendId) = QUEUE_FIRST_UNCOMMITTED;
 	QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
 	LWLockRelease(AsyncQueueLock);
 
+	elog(DEBUG1, "Listener registered, queue position is: %i:%i", QUEUE_POS_PAGE(QUEUE_FIRST_UNCOMMITTED),QUEUE_POS_OFFSET(QUEUE_FIRST_UNCOMMITTED));
+
 	/* Now we are listed in the global array, so remember we're listening */
 	amRegisteredListener = true;
 
@@ -1703,7 +1735,9 @@ asyncQueueReadAllNotifications(void)
 	volatile QueuePosition pos;
 	QueuePosition oldpos;
 	QueuePosition head;
+	QueuePosition firstUncommitted;
 	bool		advanceTail;
+	bool		advanceFirstUncommitted;
 
 	/* page_buffer must be adequately aligned, so use a union */
 	union
@@ -1718,6 +1752,18 @@ asyncQueueReadAllNotifications(void)
 	Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
 	pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
 	head = QUEUE_HEAD;
+
+	if (!QUEUE_POS_EQUAL(pos,head) && /* If we aren't bailing out early */
+		(!QUEUE_ADVANCING_FIRST_UNCOMMITTED ||
+		  QUEUE_BACKEND_PID(QUEUE_ADVANCING_FIRST_UNCOMMITTED) == InvalidPid))
+	{
+		advanceFirstUncommitted = true;
+		QUEUE_ADVANCING_FIRST_UNCOMMITTED = MyBackendId;
+		firstUncommitted = QUEUE_FIRST_UNCOMMITTED;
+		elog(DEBUG1, "Attempting to advance first uncommitted from: %i:%i", QUEUE_POS_PAGE(QUEUE_FIRST_UNCOMMITTED),QUEUE_POS_OFFSET(QUEUE_FIRST_UNCOMMITTED));
+	} else
+		advanceFirstUncommitted = false;
+
 	LWLockRelease(AsyncQueueLock);
 
 	if (QUEUE_POS_EQUAL(pos, head))
@@ -1812,8 +1858,9 @@ asyncQueueReadAllNotifications(void)
 			 * rewrite pages under us. Especially we don't want to hold a lock
 			 * while sending the notifications to the frontend.
 			 */
-			reachedStop = asyncQueueProcessPageEntries(&pos, head,
-													   page_buffer.buf);
+			reachedStop = asyncQueueProcessPageEntries(&pos,
+				   advanceFirstUncommitted ? &firstUncommitted : 0,
+				   head, page_buffer.buf);
 		} while (!reachedStop);
 	}
 	PG_CATCH();
@@ -1822,6 +1869,11 @@ asyncQueueReadAllNotifications(void)
 		LWLockAcquire(AsyncQueueLock, LW_SHARED);
 		QUEUE_BACKEND_POS(MyBackendId) = pos;
 		advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
+		if (advanceFirstUncommitted) {
+			QUEUE_FIRST_UNCOMMITTED = QUEUE_POS_MAX(QUEUE_FIRST_UNCOMMITTED, firstUncommitted);
+			QUEUE_ADVANCING_FIRST_UNCOMMITTED = 0;
+			elog(DEBUG1, "Advanced first uncommitted to: %i:%i", QUEUE_POS_PAGE(QUEUE_FIRST_UNCOMMITTED),QUEUE_POS_OFFSET(QUEUE_FIRST_UNCOMMITTED));
+		}
 		LWLockRelease(AsyncQueueLock);
 
 		/* If we were the laziest backend, try to advance the tail pointer */
@@ -1836,6 +1888,11 @@ asyncQueueReadAllNotifications(void)
 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
 	QUEUE_BACKEND_POS(MyBackendId) = pos;
 	advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
+	if (advanceFirstUncommitted) {
+		QUEUE_FIRST_UNCOMMITTED = QUEUE_POS_MAX(QUEUE_FIRST_UNCOMMITTED, firstUncommitted);
+		QUEUE_ADVANCING_FIRST_UNCOMMITTED = 0;
+		elog(DEBUG1, "Advanced first uncommitted to: %i:%i", QUEUE_POS_PAGE(QUEUE_FIRST_UNCOMMITTED),QUEUE_POS_OFFSET(QUEUE_FIRST_UNCOMMITTED));
+	}
 	LWLockRelease(AsyncQueueLock);
 
 	/* If we were the laziest backend, try to advance the tail pointer */
@@ -1861,6 +1918,7 @@ asyncQueueReadAllNotifications(void)
  */
 static bool
 asyncQueueProcessPageEntries(volatile QueuePosition *current,
+							 volatile QueuePosition *firstUncommitted,
 							 QueuePosition stop,
 							 char *page_buffer)
 {
@@ -1928,6 +1986,29 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 				 */
 			}
 		}
+		else if(firstUncommitted && QUEUE_POS_COMPARE(*firstUncommitted,thisentry) <= 0)
+		{
+			/*
+			 * If we are trying to advance firstUncommitted we need
+			 * to check for live transactions at every queue position.
+			 */
+			if(qe->dboid != InvalidOid && TransactionIdIsInProgress(qe->xid))
+			{
+				/*
+				 * We hit an uncommitted transaction so there is no possibility
+				 * to further advance firstUncommitted
+				 */
+				firstUncommitted = 0;
+			}
+		}
+
+		/*
+		 * If we got to here with a valid firstUncommitted pointer then
+		 * we know that all transactions up to thisentry are committed.
+		 * If *firstUncommitted matches thisentry then we can advance.
+		 */
+		if (firstUncommitted && QUEUE_POS_EQUAL(*firstUncommitted,thisentry))
+			*firstUncommitted = *current;
 
 		/* Loop back if we're not at end of page */
 	} while (!reachedEndOfPage);
@@ -1960,6 +2041,7 @@ asyncQueueAdvanceTail(void)
 	}
 	oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
 	QUEUE_TAIL = min;
+	QUEUE_FIRST_UNCOMMITTED = QUEUE_POS_MAX(min, QUEUE_FIRST_UNCOMMITTED);
 	LWLockRelease(AsyncQueueLock);
 
 	/*

Attachment: notify_dos.sh
Description: application/shellscript

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to