>
> > After further thought, I wonder whether instead of having an incoming
> > listener initialize its "pos" to QUEUE_TAIL, we couldn't have it
> > initialize to the maximum "pos" among existing listeners (with a floor of
> > QUEUE_TAIL of course). If any other listener has advanced over a given
> > message X, then X was committed (or aborted) earlier and the new listener
> > is not required to return it; so it should be all right to take that
> > listener's "pos" as a starting point.
>
> That's a great idea and I will try it. It does need to be the maximum
> position of existing listeners *with matching db oid" since
> asyncQueueReadAllNotifications doesn't check transaction status if the db
> oid doesn't match it's own. Another advantage of this approach is that a
> queued notify from an open transaction in one database won't incur
> additional cost on listens coming from other databases, whereas with my
> patch such a situation will prevent firstUncommitted from advancing.
>
Patch attached works great. I added the dboid to the QueueBackendStatus
struct but that might not be needed if there is an easy and fast way to get a
db oid from a backend pid.
I also skip the max pos calculation loop if QUEUE_HEAD is on the same page as
QUEUE_TAIL, with the thought that it's not desirable to increase the time that
AsyncQueueLock is held if the queue is small and
asyncQueueReadAllNotifications will execute quickly.
I then noticed that we are taking the same lock twice in a row to read the
same values, first in Exec_ListenPreCommit then in
asyncQueueReadAllNotifications, and much of the time the latter will simply
return because pos will already be at QUEUE_HEAD. I prepared a second patch
that splits asyncQueueReadAllNotifications. Exec_ListPreCommit then only calls
the worker version only when needed. This avoids the duplicate lock.
Thanks,
Matt Newell
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 91baede..58682dc 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -202,6 +202,12 @@ typedef struct QueuePosition
(x).page != (y).page ? (y) : \
(x).offset < (y).offset ? (x) : (y))
+/* choose logically larger QueuePosition */
+#define QUEUE_POS_MAX(x,y) \
+ (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
+ (x).page != (y).page ? (x) : \
+ (x).offset < (y).offset ? (y) : (x))
+
/*
* Struct describing a listening backend's status
*/
@@ -209,6 +215,7 @@ typedef struct QueueBackendStatus
{
int32 pid; /* either a PID or InvalidPid */
QueuePosition pos; /* backend has read queue up to here */
+ Oid dboid; /* backend's database OID */
} QueueBackendStatus;
/*
@@ -248,6 +255,7 @@ static AsyncQueueControl *asyncQueueControl;
#define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
/*
* The SLRU buffer area through which we access the notification queue
@@ -461,6 +469,7 @@ AsyncShmemInit(void)
for (i = 0; i <= MaxBackends; i++)
{
QUEUE_BACKEND_PID(i) = InvalidPid;
+ QUEUE_BACKEND_DBOID(i) = InvalidOid;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
}
}
@@ -907,6 +916,9 @@ AtCommit_Notify(void)
static void
Exec_ListenPreCommit(void)
{
+ QueuePosition max;
+ int i;
+
/*
* Nothing to do if we are already listening to something, nor if we
* already ran this routine in this transaction.
@@ -936,7 +948,25 @@ Exec_ListenPreCommit(void)
* doesn't hurt.
*/
LWLockAcquire(AsyncQueueLock, LW_SHARED);
- QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
+ max = QUEUE_TAIL;
+ /*
+ * If there is over a page of notifications queued, then we should find
+ * the maximum position of all backends connected to our database, to
+ * avoid having to process all of the notifications that belong to already
+ * committed transactions that we will disregard anyway. This solves
+ * a significant performance problem with listen when there is a long
+ * running transaction
+ */
+ if( QUEUE_POS_PAGE(max) < QUEUE_POS_PAGE(QUEUE_HEAD) )
+ {
+ for (i = 1; i <= MaxBackends; i++)
+ {
+ if (QUEUE_BACKEND_PID(i) != InvalidPid && QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+ max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
+ }
+ }
+ QUEUE_BACKEND_POS(MyBackendId) = max;
+ QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
LWLockRelease(AsyncQueueLock);
@@ -1156,6 +1186,7 @@ asyncQueueUnregister(void)
QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
/* ... then mark it invalid */
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
+ QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
LWLockRelease(AsyncQueueLock);
/* mark ourselves as no longer listed in the global array */
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 58682dc..e2f942f 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -384,6 +384,8 @@ static double asyncQueueUsage(void);
static void asyncQueueFillWarning(void);
static bool SignalBackends(void);
static void asyncQueueReadAllNotifications(void);
+static void asyncQueueReadAllNotificationsWorker(volatile QueuePosition pos,
+ QueuePosition head);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
char *page_buffer);
@@ -917,6 +919,7 @@ static void
Exec_ListenPreCommit(void)
{
QueuePosition max;
+ QueuePosition head;
int i;
/*
@@ -949,6 +952,7 @@ Exec_ListenPreCommit(void)
*/
LWLockAcquire(AsyncQueueLock, LW_SHARED);
max = QUEUE_TAIL;
+ head = QUEUE_HEAD;
/*
* If there is over a page of notifications queued, then we should find
* the maximum position of all backends connected to our database, to
@@ -957,7 +961,7 @@ Exec_ListenPreCommit(void)
* a significant performance problem with listen when there is a long
* running transaction
*/
- if( QUEUE_POS_PAGE(max) < QUEUE_POS_PAGE(QUEUE_HEAD) )
+ if( QUEUE_POS_PAGE(max) < QUEUE_POS_PAGE(head) )
{
for (i = 1; i <= MaxBackends; i++)
{
@@ -983,7 +987,10 @@ Exec_ListenPreCommit(void)
*
* This will also advance the global tail pointer if possible.
*/
- asyncQueueReadAllNotifications();
+ if (!QUEUE_POS_EQUAL(max,head))
+ {
+ asyncQueueReadAllNotificationsWorker(max,head);
+ }
}
/*
@@ -1732,23 +1739,14 @@ ProcessNotifyInterrupt(void)
static void
asyncQueueReadAllNotifications(void)
{
- volatile QueuePosition pos;
- QueuePosition oldpos;
+ QueuePosition pos;
QueuePosition head;
- bool advanceTail;
-
- /* page_buffer must be adequately aligned, so use a union */
- union
- {
- char buf[QUEUE_PAGESIZE];
- AsyncQueueEntry align;
- } page_buffer;
/* Fetch current state */
LWLockAcquire(AsyncQueueLock, LW_SHARED);
/* Assert checks that we have a valid state entry */
Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
- pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
+ pos = QUEUE_BACKEND_POS(MyBackendId);
head = QUEUE_HEAD;
LWLockRelease(AsyncQueueLock);
@@ -1757,6 +1755,24 @@ asyncQueueReadAllNotifications(void)
/* Nothing to do, we have read all notifications already. */
return;
}
+ asyncQueueReadAllNotificationsWorker(pos, head);
+}
+
+static void
+asyncQueueReadAllNotificationsWorker(volatile QueuePosition pos,
+ QueuePosition head)
+{
+ QueuePosition oldpos;
+ bool advanceTail;
+
+ /* page_buffer must be adequately aligned, so use a union */
+ union
+ {
+ char buf[QUEUE_PAGESIZE];
+ AsyncQueueEntry align;
+ } page_buffer;
+
+ oldpos = pos;
/*----------
* Note that we deliver everything that we see in the queue and that
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers