Now that we have the wonderful latch facility, let's use it to reduce the delay between receiving a piece of WAL and applying in the standby. Currently, the startup process polls every 100ms to see if new WAL has arrived, which adds an average a 50 ms delay between a transaction commit in the master and it appearing as committed in a hot standby server. The latch patch eliminated a similar polling delay in walsender already, the attached patch does the same for walreceiver.

After this patch, there is no unnecessary delays in the streaming replication code path. Note that this is all still asynchronous, just with reduced latency.

This is pretty straightforward, but any comments?

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index ddf7d79..40e1718 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -46,6 +46,7 @@
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
+#include "storage/latch.h"
 #include "storage/pmsignal.h"
 #include "storage/procarray.h"
 #include "storage/smgr.h"
@@ -9139,6 +9140,13 @@ startupproc_quickdie(SIGNAL_ARGS)
 }
 
 
+/* SIGUSR1: let latch facility handle the signal */
+static void
+StartupProcSigUsr1Handler(SIGNAL_ARGS)
+{
+	latch_sigusr1_handler();
+}
+
 /* SIGHUP: set flag to re-read config file at next convenient time */
 static void
 StartupProcSigHupHandler(SIGNAL_ARGS)
@@ -9213,7 +9221,7 @@ StartupProcessMain(void)
 	else
 		pqsignal(SIGALRM, SIG_IGN);
 	pqsignal(SIGPIPE, SIG_IGN);
-	pqsignal(SIGUSR1, SIG_IGN);
+	pqsignal(SIGUSR1, StartupProcSigUsr1Handler);
 	pqsignal(SIGUSR2, SIG_IGN);
 
 	/*
@@ -9397,16 +9405,13 @@ retry:
 					}
 
 					/*
-					 * Data not here yet, so check for trigger then sleep.
+					 * Data not here yet, so check for trigger then sleep for
+					 * five seconds like in the WAL file polling case below.
 					 */
 					if (CheckForStandbyTrigger())
 						goto triggered;
 
-					/*
-					 * When streaming is active, we want to react quickly when
-					 * the next WAL record arrives, so sleep only a bit.
-					 */
-					pg_usleep(100000L); /* 100ms */
+					WaitForWalArrival(5000000L);
 				}
 				else
 				{
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index b868707..e12f1f5 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -529,6 +529,9 @@ XLogWalRcvFlush(void)
 		walrcv->receivedUpto = LogstreamResult.Flush;
 		SpinLockRelease(&walrcv->mutex);
 
+		/* Signal the startup process that new WAL has arrived */
+		SetLatch(&walrcv->receivedLatch);
+
 		/* Report XLOG streaming progress in PS display */
 		if (update_process_title)
 		{
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index b206885..8182160 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -64,6 +64,7 @@ WalRcvShmemInit(void)
 		MemSet(WalRcv, 0, WalRcvShmemSize());
 		WalRcv->walRcvState = WALRCV_STOPPED;
 		SpinLockInit(&WalRcv->mutex);
+		InitSharedLatch(&WalRcv->receivedLatch);
 	}
 }
 
@@ -163,6 +164,9 @@ ShutdownWalRcv(void)
 
 		pg_usleep(100000);		/* 100ms */
 	}
+
+	/* We don't need the latch anymore */
+	DisownLatch(&walrcv->receivedLatch);
 }
 
 /*
@@ -187,6 +191,9 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
 	if (recptr.xrecoff % XLogSegSize != 0)
 		recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
 
+	/*
+	 * Update shared memory status with information needed by walreceiver
+	 */
 	SpinLockAcquire(&walrcv->mutex);
 
 	/* It better be stopped before we try to restart it */
@@ -204,6 +211,10 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
 
 	SpinLockRelease(&walrcv->mutex);
 
+	/* Take ownership of the latch so that we can wait on it */
+	OwnLatch(&walrcv->receivedLatch);
+
+	/* Request postmaster to start the walreceiver process */
 	SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
 }
 
@@ -229,3 +240,20 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
 
 	return recptr;
 }
+
+/*
+ * Wait for more WAL to arrive, or timeout (in microseconds) to be reached
+ */
+void
+WaitForWalArrival(int timeout)
+{
+	/* Wait for more WAL to arrive */
+	if (WaitLatch(&WalRcv->receivedLatch, timeout))
+	{
+		/*
+		 * Reset the latch so that next call to WaitForWalArrival will sleep
+		 * again.
+		 */
+		ResetLatch(&WalRcv->receivedLatch);
+	}
+}
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 2ea881e..66a8229 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -13,6 +13,7 @@
 #define _WALRECEIVER_H
 
 #include "access/xlogdefs.h"
+#include "storage/latch.h"
 #include "storage/spin.h"
 #include "pgtime.h"
 
@@ -72,6 +73,13 @@ typedef struct
 	char		conninfo[MAXCONNINFO];
 
 	slock_t		mutex;			/* locks shared variables shown above */
+
+	/*
+	 * Walreceiver sets this latch every time new WAL has been received and
+	 * fsync'd to disk, allowing startup process to wait for new WAL to
+	 * arrive.
+	 */
+	Latch		receivedLatch;
 } WalRcvData;
 
 extern WalRcvData *WalRcv;
@@ -92,8 +100,8 @@ extern Size WalRcvShmemSize(void);
 extern void WalRcvShmemInit(void);
 extern void ShutdownWalRcv(void);
 extern bool WalRcvInProgress(void);
-extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
 extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
 extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
+extern void WaitForWalArrival(int timeout);
 
 #endif   /* _WALRECEIVER_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to