On 06/09/10 17:18, Tom Lane wrote:
Heikki Linnakangas<heikki.linnakan...@enterprisedb.com>  writes:
I think we have just a terminology issue. What you're describing is
exactly how it works now, if you just s/InitLatch/AcquireLatch.

No, it isn't.  What I'm suggesting requires breaking InitLatch into two
operations.

We also need to define the semantics of SetLatch
on an unowned latch --- does this set a signal condition that will be
available to the next owner?

At the moment, no. Perhaps that would be useful, separating the Init and
Acquire operations is needed to make that sane.

Exactly.  I'm not totally sure either if it would be useful, but the
current design makes it impossible to allow that.

Ok, I've split the Init and Acquire steps into two.

BTW, on reflection the AcquireLatch/ReleaseLatch terminology seems a bit
ill chosen: ReleaseLatch sounds way too much like something that would
just unlock or clear the latch.  Perhaps OwnLatch/DisownLatch, or
something along that line.

Yeah, I see what you mean. Although, maybe it's just me but Own/Disown looks ugly. Anyone have a better suggestion?

Here's an updated patch, with all the issues reported this far fixed, except for that naming issue, and Fujii's suggestion to use poll() instead of select() where available. I've also polished it quite a bit, improving comments etc. Magnus, can you take a look at the Windows implementation to check that it's sane? At least it seems to work.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/configure b/configure
index bd9b347..432cd58 100755
--- a/configure
+++ b/configure
@@ -27773,6 +27773,13 @@ _ACEOF
   SHMEM_IMPLEMENTATION="src/backend/port/win32_shmem.c"
 fi
 
+# Select latch implementation type.
+if test "$PORTNAME" != "win32"; then
+  LATCH_IMPLEMENTATION="src/backend/port/unix_latch.c"
+else
+  LATCH_IMPLEMENTATION="src/backend/port/win32_latch.c"
+fi
+
 # If not set in template file, set bytes to use libc memset()
 if test x"$MEMSET_LOOP_LIMIT" = x"" ; then
   MEMSET_LOOP_LIMIT=1024
@@ -29098,7 +29105,7 @@ fi
 ac_config_files="$ac_config_files GNUmakefile src/Makefile.global"
 
 
-ac_config_links="$ac_config_links src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION} src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION} src/include/dynloader.h:src/backend/port/dynloader/${template}.h src/include/pg_config_os.h:src/include/port/${template}.h src/Makefile.port:src/makefiles/Makefile.${template}"
+ac_config_links="$ac_config_links src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION} src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION} src/backend/port/pg_latch.c:${LATCH_IMPLEMENTATION} src/include/dynloader.h:src/backend/port/dynloader/${template}.h src/include/pg_config_os.h:src/include/port/${template}.h src/Makefile.port:src/makefiles/Makefile.${template}"
 
 
 if test "$PORTNAME" = "win32"; then
@@ -29722,6 +29729,7 @@ do
     "src/backend/port/dynloader.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c" ;;
     "src/backend/port/pg_sema.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION}" ;;
     "src/backend/port/pg_shmem.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION}" ;;
+    "src/backend/port/pg_latch.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/pg_latch.c:${LATCH_IMPLEMENTATION}" ;;
     "src/include/dynloader.h") CONFIG_LINKS="$CONFIG_LINKS src/include/dynloader.h:src/backend/port/dynloader/${template}.h" ;;
     "src/include/pg_config_os.h") CONFIG_LINKS="$CONFIG_LINKS src/include/pg_config_os.h:src/include/port/${template}.h" ;;
     "src/Makefile.port") CONFIG_LINKS="$CONFIG_LINKS src/Makefile.port:src/makefiles/Makefile.${template}" ;;
diff --git a/configure.in b/configure.in
index 7b09986..7f84cea 100644
--- a/configure.in
+++ b/configure.in
@@ -1700,6 +1700,13 @@ else
   SHMEM_IMPLEMENTATION="src/backend/port/win32_shmem.c"
 fi
 
+# Select latch implementation type.
+if test "$PORTNAME" != "win32"; then
+  LATCH_IMPLEMENTATION="src/backend/port/unix_latch.c"
+else
+  LATCH_IMPLEMENTATION="src/backend/port/win32_latch.c"
+fi
+
 # If not set in template file, set bytes to use libc memset()
 if test x"$MEMSET_LOOP_LIMIT" = x"" ; then
   MEMSET_LOOP_LIMIT=1024
@@ -1841,6 +1848,7 @@ AC_CONFIG_LINKS([
   src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c
   src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION}
   src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION}
+  src/backend/port/pg_latch.c:${LATCH_IMPLEMENTATION}
   src/include/dynloader.h:src/backend/port/dynloader/${template}.h
   src/include/pg_config_os.h:src/include/port/${template}.h
   src/Makefile.port:src/makefiles/Makefile.${template}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 615a7fa..094d0c9 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -55,6 +55,7 @@
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "pgstat.h"
+#include "replication/walsender.h"
 #include "storage/fd.h"
 #include "storage/procarray.h"
 #include "storage/sinvaladt.h"
@@ -1025,6 +1026,13 @@ EndPrepare(GlobalTransaction gxact)
 
 	/* If we crash now, we have prepared: WAL replay will fix things */
 
+	/*
+	 * Wake up all walsenders to send WAL up to the PREPARE record
+	 * immediately if replication is enabled
+	 */
+	if (max_wal_senders > 0)
+		WalSndWakeup();
+
 	/* write correct CRC and close file */
 	if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
 	{
@@ -2005,6 +2013,13 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	/* Flush XLOG to disk */
 	XLogFlush(recptr);
 
+	/*
+	 * Wake up all walsenders to send WAL up to the COMMIT PREPARED record
+	 * immediately if replication is enabled
+	 */
+	if (max_wal_senders > 0)
+		WalSndWakeup();
+
 	/* Mark the transaction committed in pg_clog */
 	TransactionIdCommitTree(xid, nchildren, children);
 
@@ -2078,6 +2093,13 @@ RecordTransactionAbortPrepared(TransactionId xid,
 	XLogFlush(recptr);
 
 	/*
+	 * Wake up all walsenders to send WAL up to the ABORT PREPARED record
+	 * immediately if replication is enabled
+	 */
+	if (max_wal_senders > 0)
+		WalSndWakeup();
+
+	/*
 	 * Mark the transaction aborted in clog.  This is not absolutely necessary
 	 * but we may as well do it while we are here.
 	 */
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 6bcc55c..942d5c2 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -36,6 +36,7 @@
 #include "libpq/be-fsstubs.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/walsender.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/lmgr.h"
@@ -1068,6 +1069,13 @@ RecordTransactionCommit(void)
 		XLogFlush(XactLastRecEnd);
 
 		/*
+		 * Wake up all walsenders to send WAL up to the COMMIT record
+		 * immediately if replication is enabled
+		 */
+		if (max_wal_senders > 0)
+			WalSndWakeup();
+
+		/*
 		 * Now we may update the CLOG, if we wrote a COMMIT record above
 		 */
 		if (markXidCommitted)
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile
index db0c2af..f50cff8 100644
--- a/src/backend/port/Makefile
+++ b/src/backend/port/Makefile
@@ -21,7 +21,7 @@ subdir = src/backend/port
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = dynloader.o pg_sema.o pg_shmem.o $(TAS)
+OBJS = dynloader.o pg_sema.o pg_shmem.o pg_latch.o $(TAS)
 
 ifeq ($(PORTNAME), darwin)
 SUBDIRS += darwin
diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
new file mode 100644
index 0000000..cc8c8b4
--- /dev/null
+++ b/src/backend/port/unix_latch.c
@@ -0,0 +1,431 @@
+/*-------------------------------------------------------------------------
+ *
+ * unix_latch.c
+ *	  Routines for inter-process latches
+ *
+ * A latch is a simple boolean variable, with operations that let you to
+ * sleep until it is set. A latch can be set from another process, or a
+ * signal handler within the same process.
+ *
+ * The latch interface is a reliable replacement for the common pattern of
+ * using pg_usleep() or select() to wait until a signal arrives, where the
+ * signal handler sets a global variable. Because on some platforms, an
+ * incoming signal doesn't interrupt sleep, and even on platforms where it
+ * does there is a race condition if the signal arrives just before
+ * entering the sleep, the common pattern must periodically wake up and
+ * poll the global variable. pselect() system call was invented to solve
+ * the problem, but it is not portable enough. Latches are designed to
+ * overcome these limitations, allowing you to sleep without polling and
+ * ensuring a quick response to signals from other processes.
+ *
+ * There are two kinds of latches: local and shared. A local latch is
+ * initialized by InitLatch, and can only be set from the same process.
+ * A local latch can be used to wait for a signal to arrive, by calling
+ * SetLatch in the signal handler. A shared latch resides in shared memory,
+ * and must be initialized at postmaster startup by InitSharedLatch. Before
+ * a shared latch can be waited on, it must be associated with a process
+ * with AcquireLatch. Only the process owning the latch can wait on it, but
+ * any process can set it.
+ *
+ * There are three basic operations on a latch:
+ *
+ * SetLatch		- Sets the latch
+ * ResetLatch	- Clears the latch, allowing it to be set again
+ * WaitLatch	- Waits for the latch to become set
+ *
+ * The correct pattern to wait for an event is:
+ *
+ * for (;;)
+ * {
+ *     ResetLatch();
+ *     if (work to do)
+ *         Do Stuff();
+ *
+ *     WaitLatch();
+ * }
+ *
+ * It's important to reset the latch *before* checking if there's work to
+ * do. Otherwise, if someone sets the latch between the check and the
+ * ResetLatch call, you will miss it and Wait will block.
+ *
+ * To wake up the waiter, you must first set a global flag or something
+ * else that the main loop tests in the "if (work to do)" part, and call
+ * SetLatch *after* that. SetLatch is designed to return quickly if the
+ * latch is already set.
+ *
+ *
+ * Implementation
+ * --------------
+ *
+ * The Unix implementation uses the so-called self-pipe trick to overcome
+ * the race condition involved with select() and setting a global flag
+ * in the signal handler. When a latch is set and the current process
+ * is waiting for it, the signal handler wakes up the select() in
+ * WaitLatch by writing a byte to a pipe. A signal by itself doesn't
+ * interrupt select() on all platforms, and even on platforms where it
+ * does, a signal that arrives just before the select() call does not
+ * prevent the select() from entering sleep. An incoming byte on a pipe
+ * however reliably interrupts the sleep, and makes select() to return
+ * immediately if the signal arrives just before select() begins.
+ *
+ * When SetLatch is called from the same process that owns the latch,
+ * SetLatch writes the byte directly to the pipe. If it's owned by another
+ * process, SIGUSR1 is sent and the signal handler in the waiting process
+ * writes the byte to the pipe on behalf of the signaling process.
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <signal.h>
+#include <unistd.h>
+
+#include "miscadmin.h"
+#include "storage/latch.h"
+#include "storage/shmem.h"
+
+/* Are we currently in WaitLatch? The signal handler would like to know. */
+static volatile sig_atomic_t waiting = false;
+
+/* Read and write end of the self-pipe */
+static int selfpipe_readfd = -1;
+static int selfpipe_writefd = -1;
+
+/* private function prototypes */
+static void initSelfPipe(void);
+static void drainSelfPipe(void);
+static void sendSelfPipeByte(void);
+
+
+/*
+ * Initialize a backend-local latch.
+ */
+void
+InitLatch(volatile Latch *latch)
+{
+	/* Initialize the self pipe if this is our first latch in the process */
+	if (selfpipe_readfd == -1)
+		initSelfPipe();
+
+	latch->is_set = false;
+	latch->owner_pid = MyProcPid;
+	latch->is_shared = false;
+}
+
+/*
+ * Initialize a shared latch that can be set from other processes. The latch
+ * is initially owned by no-one, use AcquireLatch to associate it with the
+ * current process.
+ *
+ * NB: When you introduce a new shared latch, you must increase the shared
+ * latch count in NumSharedLatches in win32_latch.c!
+ */
+void
+InitSharedLatch(volatile Latch *latch)
+{
+	latch->is_set = false;
+	latch->owner_pid = 0;
+	latch->is_shared = true;
+}
+
+/*
+ * Associate a shared latch with the current process, allowing it to
+ * wait on it.
+ *
+ * Make sure that latch_sigusr1_handler() is called from the SIGUSR1 signal
+ * handler, as shared latches use SIGUSR1 to for inter-process communication.
+ */
+void
+AcquireLatch(volatile Latch *latch)
+{
+	Assert(latch->is_shared);
+
+	/* Initialize the self pipe if this is our first latch in the process */
+	if (selfpipe_readfd == -1)
+		initSelfPipe();
+
+	if (latch->owner_pid != 0)
+		elog(ERROR, "latch already owned");
+	latch->owner_pid = MyProcPid;
+}
+
+/*
+ * Release a latch previously acquired with AcquireLatch.
+ */
+void
+ReleaseLatch(volatile Latch *latch)
+{
+	Assert(latch->is_shared);
+	Assert(latch->owner_pid == MyProcPid);
+	latch->owner_pid = 0;
+}
+
+/*
+ * Wait for given latch to be set or until timeout is exceeded.
+ * If the latch is already set, the function returns immediately.
+ *
+ * The 'timeout' is given in microseconds, and -1 means wait forever.
+ * On some platforms, signals cause the timeout to be restarted, so beware
+ * that the function can sleep for several times longer than the specified
+ * timeout.
+ *
+ * The latch must be owned by the current process, ie. it must be a
+ * backend-local latch initialized with InitLatch, or a shared latch
+ * previously acquired with AcquireLatch.
+ *
+ * Returns 'true' if the latch was set, or 'false' if timeout was reached.
+ */
+bool
+WaitLatch(volatile Latch *latch, long timeout)
+{
+	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
+}
+
+/*
+ * Like WaitLatch, but will also return when there's data available in
+ * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch
+ * was set, or 2 if the scoket became readable.
+ */
+int
+WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
+{
+	struct timeval tv, *tvp = NULL;
+	fd_set		input_mask;
+	int			rc;
+	int			result = 0;
+
+	if (latch->owner_pid != MyProcPid)
+		elog(ERROR, "cannot wait on a latch owned by another process");
+
+	/* Initialize timeout */
+	if (timeout >= 0)
+	{
+		tv.tv_sec = timeout / 1000000L;
+		tv.tv_usec = timeout % 1000000L;
+		tvp = &tv;
+	}
+
+	waiting = true;
+	for (;;)
+	{
+		int hifd;
+
+		/*
+		 * Clear the pipe, and check if the latch is set already. If someone
+		 * sets the latch between this and the select() below, the setter
+		 * will write a byte to the pipe (or signal us and the signal handler
+		 * will do that), and the select() will return immediately.
+		 */
+		drainSelfPipe();
+		if (latch->is_set)
+		{
+			result = 1;
+			break;
+		}
+
+		FD_ZERO(&input_mask);
+		FD_SET(selfpipe_readfd, &input_mask);
+		hifd = selfpipe_readfd;
+		if (sock != PGINVALID_SOCKET)
+		{
+			FD_SET(sock, &input_mask);
+			if (sock > hifd)
+				hifd = sock;
+		}
+
+		rc = select(hifd + 1, &input_mask, NULL, NULL, tvp);
+		if (rc < 0)
+		{
+			if (errno == EINTR)
+				continue;
+			ereport(ERROR,
+					(errcode_for_socket_access(),
+					 errmsg("select() failed: %m")));
+		}
+		if (rc == 0)
+		{
+			/* timeout exceeded */
+			result = 0;
+			break;
+		}
+		if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask))
+		{
+			result = 2;
+			break;		/* data available in socket */
+		}
+	}
+	waiting = false;
+
+	return result;
+}
+
+/*
+ * Sets a latch and wakes up anyone waiting on it. Returns quickly if the
+ * latch is already set.
+ */
+void
+SetLatch(volatile Latch *latch)
+{
+	pid_t owner_pid;
+
+	/* Quick exit if already set */
+	if (latch->is_set)
+		return;
+
+	latch->is_set = true;
+
+	/*
+	 * See if anyone's waiting for the latch. It can be the current process
+	 * if we're in a signal handler. We use the self-pipe to wake up the
+	 * select() in that case. If it's another process, send a signal.
+	 *
+	 * Fetch owner_pid only once, in case the owner simultaneously releases
+	 * the latch and clears owner_pid. XXX: This assumes that pid_t is
+	 * atomic, which isn't guaranteed to be true! In practice, the effective
+	 * range of pid_t fits in a 32 bit integer, and so should be atomic. In
+	 * the worst case, we might end up signaling wrong process if the right
+	 * one releases the latch just as we fetch owner_pid. Even then, you're
+	 * very unlucky if a process with that bogus pid exists.
+	 */
+	owner_pid = latch->owner_pid;
+	if (owner_pid == 0)
+		return;
+	else if (owner_pid == MyProcPid)
+		sendSelfPipeByte();
+	else
+		kill(owner_pid, SIGUSR1);
+}
+
+/*
+ * Clear the latch. Calling WaitLatch after this will sleep, unless
+ * the latch is set again before the WaitLatch call.
+ */
+void
+ResetLatch(volatile Latch *latch)
+{
+	/* Only the owner should reset the latch */
+	Assert(latch->owner_pid == MyProcPid);
+
+	latch->is_set = false;
+}
+
+/*
+ * LatchShmemSize
+ *		Compute space needed for latch's shared memory
+ *
+ * Not needed for Unix implementation.
+ */
+Size
+LatchShmemSize(void)
+{
+	return 0;
+}
+
+/*
+ * LatchShmemInit
+ *		Allocate and initialize shared memory needed for latches
+ *
+ * Not needed for Unix implementation.
+ */
+void
+LatchShmemInit(void)
+{
+}
+
+/*
+ * SetLatch uses SIGUSR1 to wake up the process waiting on the latch. Wake
+ * up WaitLatch.
+ */
+void
+latch_sigusr1_handler(void)
+{
+	if (waiting)
+		sendSelfPipeByte();
+}
+
+/* initialize the self-pipe */
+static void
+initSelfPipe(void)
+{
+	int pipefd[2];
+
+	/*
+	 * Set up the self-pipe that allows a signal handler to wake up the
+	 * select() in WaitLatch. Make the write-end non-blocking, so that
+	 * SetLatch won't block if the event has already been set many times
+	 * filling the kernel buffer. Make the read-end non-blocking too, so
+	 * that we can easily clear the pipe by reading until EAGAIN or
+	 * EWOULDBLOCK.
+	 */
+	if (pipe(pipefd) < 0)
+		elog(FATAL, "pipe() failed: %m");
+	if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0)
+		elog(FATAL, "fcntl() failed on read-end of self-pipe: %m");
+	if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0)
+		elog(FATAL, "fcntl() failed on write-end of self-pipe: %m");
+
+	selfpipe_readfd = pipefd[0];
+	selfpipe_writefd = pipefd[1];
+}
+
+/* Send one byte to the self-pipe, to wake up WaitLatch */
+static void
+sendSelfPipeByte(void)
+{
+	int rc;
+	char dummy = 0;
+
+retry:
+	rc = write(selfpipe_writefd, &dummy, 1);
+	if (rc < 0)
+	{
+		/* If interrupted by signal, just retry */
+		if (errno == EINTR)
+			goto retry;
+
+		/*
+		 * If the pipe is full, we don't need to retry, the data that's
+		 * there already is enough to wake up WaitLatch.
+		 */
+		if (errno == EAGAIN || errno == EWOULDBLOCK)
+			return;
+
+		/*
+		 * Oops, the write() failed for some other reason. We might be in
+		 * a signal handler, so it's not safe to elog(). We have no choice
+		 * but silently ignore the error.
+		 */
+		return;
+	}
+}
+
+/* Read all available data from the self-pipe */
+static void
+drainSelfPipe(void)
+{
+	int rc;
+	char buf;
+
+	for (;;)
+	{
+		rc = read(selfpipe_readfd, &buf, 1);
+		if (rc < 0)
+		{
+			if (errno == EAGAIN || errno == EWOULDBLOCK)
+				break;		/* the pipe is empty */
+			else if (errno == EINTR)
+				continue;	/* retry */
+			else
+				elog(ERROR, "read() on self-pipe failed: %m");
+		}
+		else if (rc == 0)
+			elog(ERROR, "unexpected EOF on self-pipe");
+	}
+}
diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c
new file mode 100644
index 0000000..3e3a30d
--- /dev/null
+++ b/src/backend/port/win32_latch.c
@@ -0,0 +1,284 @@
+/*-------------------------------------------------------------------------
+ *
+ * win32_latch.c
+ *	  Windows implementation of latches.
+ *
+ * The Windows implementation uses Windows events. See unix_latch.c for
+ * information on usage.
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <signal.h>
+#include <unistd.h>
+
+#include "miscadmin.h"
+#include "replication/walsender.h"
+#include "storage/latch.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+
+/*
+ * Shared latches are implemented with Windows events that are shared by
+ * all postmaster child processes. At postmaster startup we create enough
+ * Event objects, and mark them as inheritable so that they are accessible
+ * in child processes. The handles are stored in sharedHandles.
+ */
+typedef struct
+{
+	slock_t		mutex;			/* protects all the other fields */
+
+	int			maxhandles;		/* number of shared handles created */
+	int			nfreehandles;	/* number of free handles in array */
+	HANDLE		handles[1];		/* free handles, variable length */
+} SharedEventHandles;
+
+static SharedEventHandles *sharedHandles;
+
+void
+InitLatch(volatile Latch *latch)
+{
+	latch->event = CreateEvent(NULL, TRUE, FALSE, NULL);
+	latch->is_shared = false;
+	latch->is_set = false;
+}
+
+void
+InitSharedLatch(volatile Latch *latch)
+{
+	latch->is_shared = true;
+	latch->is_set = false;
+	latch->event = NULL;
+}
+
+void
+AcquireLatch(volatile Latch *latch)
+{
+	HANDLE event;
+
+	/* Sanity checks */
+	Assert(latch->is_shared);
+	if (latch->event != 0)
+		elog(ERROR, "latch already owned");
+
+	/* Reserve an event handle from the shared handles array */
+	SpinLockAcquire(&sharedHandles->mutex);
+	if (sharedHandles->nfreehandles <= 0)
+	{
+		SpinLockRelease(&sharedHandles->mutex);
+		elog(ERROR, "out of shared event objects");
+	}
+	sharedHandles->nfreehandles--;
+	event = sharedHandles->handles[sharedHandles->nfreehandles];
+	SpinLockRelease(&sharedHandles->mutex);
+
+	latch->event = event;
+}
+
+void
+ReleaseLatch(volatile Latch *latch)
+{
+	Assert(latch->is_shared);
+	Assert(latch->event != NULL);
+
+	/* Put the event handle back to the pool */
+	SpinLockAcquire(&sharedHandles->mutex);
+	if (sharedHandles->nfreehandles >= sharedHandles->maxhandles)
+	{
+		SpinLockRelease(&sharedHandles->mutex);
+		elog(PANIC, "too many free event handles");
+	}
+	sharedHandles->handles[sharedHandles->nfreehandles] = latch->event;
+	sharedHandles->nfreehandles++;
+	SpinLockRelease(&sharedHandles->mutex);
+
+	latch->event = NULL;
+}
+
+bool
+WaitLatch(volatile Latch *latch, long timeout)
+{
+	return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
+}
+
+int
+WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
+{
+	DWORD		rc;
+	HANDLE		events[3];
+	HANDLE		latchevent;
+	HANDLE		sockevent;
+	int			numevents;
+	int			result = 0;
+
+	latchevent = latch->event;
+
+	events[0] = latchevent;
+	events[1] = pgwin32_signal_event;
+	numevents = 2;
+	if (sock != PGINVALID_SOCKET)
+	{
+		sockevent = WSACreateEvent();
+		WSAEventSelect(sock, sockevent, FD_READ);
+		events[numevents++] = sockevent;
+	}
+
+	for (;;)
+	{
+		/*
+		 * Reset the event, and check if the latch is set already. If someone
+		 * sets the latch between this and the WaitForMultipleObjects() call
+		 * below, the setter will set the event and WaitForMultipleObjects()
+		 * will return immediately.
+		 */
+		if (!ResetEvent(latchevent))
+			elog(ERROR, "ResetEvent failed: error code %d", (int) GetLastError());
+		if (latch->is_set)
+		{
+			result = 1;
+			break;
+		}
+
+		rc = WaitForMultipleObjects(numevents, events, FALSE,
+								(timeout >= 0) ? (timeout / 1000) : INFINITE);
+		if (rc == WAIT_FAILED)
+			elog(ERROR, "WaitForMultipleObjects() failed: error code %d", (int) GetLastError());
+		else if (rc == WAIT_TIMEOUT)
+		{
+			result = 0;
+			break;
+		}
+		else if (rc == WAIT_OBJECT_0 + 1)
+			pgwin32_dispatch_queued_signals();
+		else if (rc == WAIT_OBJECT_0 + 2)
+		{
+			Assert(sock != PGINVALID_SOCKET);
+			result = 2;
+			break;
+		}
+		else if (rc != WAIT_OBJECT_0)
+			elog(ERROR, "unexpected return code from WaitForMultipleObjects(): %d", rc);
+	}
+
+	/* Clean up the handle we created for the socket */
+		if (sock != PGINVALID_SOCKET)
+	{
+		WSAEventSelect(sock, sockevent, 0);
+		WSACloseEvent(sockevent);
+	}
+
+	return result;
+}
+
+void
+SetLatch(volatile Latch *latch)
+{
+	HANDLE handle;
+
+	/* Quick exit if already set */
+	if (latch->is_set)
+		return;
+
+	latch->is_set = true;
+
+	/*
+	 * See if anyone's waiting for the latch. It can be the current process
+	 * if we're in a signal handler. Use a local variable here in case the
+	 * latch is just released between the test and the SetEvent call, and
+	 * event field set to NULL.
+	 *
+	 * Fetch handle field only once, in case the owner simultaneously
+	 * releases the latch and clears handle. This assumes that HANDLE is
+	 * atomic, which isn't guaranteed to be true! In practice, it should be,
+	 * and in the worst case we end up calling SetEvent with a bogus handle,
+	 * and SetEvent will return an error with no harm done.
+	 */
+	handle = latch->event;
+	if (handle)
+	{
+		SetEvent(handle);
+		/*
+		 * Note that we silently ignore any errors. We might be in a signal
+		 * handler or other critical path where it's not safe to call elog().
+		 */
+	}
+}
+
+void
+ResetLatch(volatile Latch *latch)
+{
+	latch->is_set = false;
+}
+
+/*
+ * Number of shared latches, used to allocate the right number of shared
+ * Event handles at postmaster startup. You must update this if you
+ * introduce a new shared latch!
+ */
+static int
+NumSharedLatches(void)
+{
+	int numLatches = 0;
+
+	/* Each walsender needs one latch */
+	numLatches += max_wal_senders;
+
+	return numLatches;
+}
+
+/*
+ * LatchShmemSize
+ *		Compute space needed for latch's shared memory
+ */
+Size
+LatchShmemSize(void)
+{
+	return offsetof(SharedEventHandles, handles) +
+		NumSharedLatches() * sizeof(HANDLE);
+}
+
+/*
+ * LatchShmemInit
+ *		Allocate and initialize shared memory needed for latches
+ */
+void
+LatchShmemInit(void)
+{
+	Size		size = LatchShmemSize();
+	bool		found;
+
+	sharedHandles = ShmemInitStruct("SharedEventHandles", size, &found);
+
+	/* If we're first, initialize the struct and allocate handles */
+	if (!found)
+	{
+		int i;
+		SECURITY_ATTRIBUTES sa;
+
+		/*
+		 * Set up security attributes to specify that the events are
+		 * inherited.
+		 */
+		ZeroMemory(&sa, sizeof(sa));
+		sa.nLength = sizeof(sa);
+		sa.bInheritHandle = TRUE;
+
+		SpinLockInit(&sharedHandles->mutex);
+		sharedHandles->maxhandles = NumSharedLatches();
+		sharedHandles->nfreehandles = sharedHandles->maxhandles;
+		for (i = 0; i < sharedHandles->maxhandles; i++)
+		{
+			sharedHandles->handles[i] = CreateEvent(&sa, TRUE, FALSE, NULL);
+			if (sharedHandles->handles[i] == NULL)
+				elog(ERROR, "CreateEvent failed: error code %d", (int) GetLastError());
+		}
+	}
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 53c2581..3ceec24 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -34,6 +34,7 @@
  */
 #include "postgres.h"
 
+#include <signal.h>
 #include <unistd.h>
 
 #include "access/xlog_internal.h"
@@ -66,9 +67,6 @@ bool		am_walsender = false;		/* Am I a walsender process ? */
 int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
 int			WalSndDelay = 200;	/* max sleep time between some actions */
 
-#define NAPTIME_PER_CYCLE 100000L		/* max sleep time between cycles
-										 * (100ms) */
-
 /*
  * These variables are used similarly to openLogFile/Id/Seg/Off,
  * but for walsender to read the XLOG.
@@ -93,6 +91,7 @@ static volatile sig_atomic_t ready_to_stop = false;
 static void WalSndSigHupHandler(SIGNAL_ARGS);
 static void WalSndShutdownHandler(SIGNAL_ARGS);
 static void WalSndQuickDieHandler(SIGNAL_ARGS);
+static void WalSndXLogSendHandler(SIGNAL_ARGS);
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
@@ -144,6 +143,16 @@ WalSenderMain(void)
 	/* Handle handshake messages before streaming */
 	WalSndHandshake();
 
+	/* Initialize shared memory status */
+	{
+		/* use volatile pointer to prevent code rearrangement */
+		volatile WalSnd *walsnd = MyWalSnd;
+
+		SpinLockAcquire(&walsnd->mutex);
+		walsnd->sentPtr = sentPtr;
+		SpinLockRelease(&walsnd->mutex);
+	}
+
 	/* Main loop of walsender */
 	return WalSndLoop();
 }
@@ -380,8 +389,6 @@ WalSndLoop(void)
 	/* Loop forever, unless we get an error */
 	for (;;)
 	{
-		long		remain;		/* remaining time (us) */
-
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
 		 * necessity for manual cleanup of all postmaster children.
@@ -421,32 +428,42 @@ WalSndLoop(void)
 		/*
 		 * If we had sent all accumulated WAL in last round, nap for the
 		 * configured time before retrying.
-		 *
-		 * On some platforms, signals won't interrupt the sleep.  To ensure we
-		 * respond reasonably promptly when someone signals us, break down the
-		 * sleep into NAPTIME_PER_CYCLE increments, and check for interrupts
-		 * after each nap.
 		 */
 		if (caughtup)
 		{
-			remain = WalSndDelay * 1000L;
-			while (remain > 0)
-			{
-				/* Check for interrupts */
-				if (got_SIGHUP || shutdown_requested || ready_to_stop)
-					break;
+			/*
+			 * Even if we wrote all the WAL that was available when we started
+			 * sending, more might have arrived while we were sending this
+			 * batch. We had the latch set while sending, so we have not
+			 * received any signals from that time. Let's arm the latch
+			 * again, and after that check that we're still up-to-date.
+			 */
+			ResetLatch(&MyWalSnd->latch);
 
-				/* Sleep and check that the connection is still alive */
-				pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
-				CheckClosedConnection();
+			if (!XLogSend(output_message, &caughtup))
+				break;
+			if (caughtup && !got_SIGHUP && !ready_to_stop && !shutdown_requested)
+			{
+				/*
+				 * XXX: We don't really need the periodic wakeups anymore,
+				 * WaitLatchOrSocket should reliably wake up as soon as
+				 * something interesting happens.
+				 */
 
-				remain -= NAPTIME_PER_CYCLE;
+				/* Sleep */
+				WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock, -1);
+				elog(LOG, "walsender woke up");
 			}
-		}
 
-		/* Attempt to send the log once every loop */
-		if (!XLogSend(output_message, &caughtup))
-			break;
+			/* Check if the connection was closed */
+			CheckClosedConnection();
+		}
+		else
+		{
+			/* Attempt to send the log once every loop */
+			if (!XLogSend(output_message, &caughtup))
+				break;
+		}
 	}
 
 	/*
@@ -493,10 +510,15 @@ InitWalSnd(void)
 		}
 		else
 		{
-			/* found */
-			MyWalSnd = (WalSnd *) walsnd;
+			/*
+			 * Found a free slot. Take ownership of the latch and initialize
+			 * the other fields.
+			 */
+			AcquireLatch((Latch *) &walsnd->latch);
 			walsnd->pid = MyProcPid;
-			MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr));
+			MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
+			/* Set MyWalSnd only after it's fully initialized. */
+			MyWalSnd = (WalSnd *) walsnd;
 			SpinLockRelease(&walsnd->mutex);
 			break;
 		}
@@ -523,6 +545,7 @@ WalSndKill(int code, Datum arg)
 	 * for this.
 	 */
 	MyWalSnd->pid = 0;
+	ReleaseLatch(&MyWalSnd->latch);
 
 	/* WalSnd struct isn't mine anymore */
 	MyWalSnd = NULL;
@@ -787,6 +810,8 @@ static void
 WalSndSigHupHandler(SIGNAL_ARGS)
 {
 	got_SIGHUP = true;
+	if (MyWalSnd)
+		SetLatch(&MyWalSnd->latch);
 }
 
 /* SIGTERM: set flag to shut down */
@@ -794,6 +819,8 @@ static void
 WalSndShutdownHandler(SIGNAL_ARGS)
 {
 	shutdown_requested = true;
+	if (MyWalSnd)
+		SetLatch(&MyWalSnd->latch);
 }
 
 /*
@@ -828,11 +855,20 @@ WalSndQuickDieHandler(SIGNAL_ARGS)
 	exit(2);
 }
 
+/* SIGUSR1: set flag to send WAL records */
+static void
+WalSndXLogSendHandler(SIGNAL_ARGS)
+{
+	latch_sigusr1_handler();
+}
+
 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
 static void
 WalSndLastCycleHandler(SIGNAL_ARGS)
 {
 	ready_to_stop = true;
+	if (MyWalSnd)
+		SetLatch(&MyWalSnd->latch);
 }
 
 /* Set up signal handlers */
@@ -847,7 +883,7 @@ WalSndSignals(void)
 	pqsignal(SIGQUIT, WalSndQuickDieHandler);	/* hard crash time */
 	pqsignal(SIGALRM, SIG_IGN);
 	pqsignal(SIGPIPE, SIG_IGN);
-	pqsignal(SIGUSR1, SIG_IGN); /* not used */
+	pqsignal(SIGUSR1, WalSndXLogSendHandler);	/* request WAL sending */
 	pqsignal(SIGUSR2, WalSndLastCycleHandler);	/* request a last cycle and
 												 * shutdown */
 
@@ -891,10 +927,21 @@ WalSndShmemInit(void)
 			WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
 
 			SpinLockInit(&walsnd->mutex);
+			InitSharedLatch(&walsnd->latch);
 		}
 	}
 }
 
+/* Wake up all walsenders */
+void
+WalSndWakeup(void)
+{
+	int		i;
+
+	for (i = 0; i < max_wal_senders; i++)
+		SetLatch(&WalSndCtl->walsnds[i].latch);
+}
+
 /*
  * This isn't currently used for anything. Monitoring tools might be
  * interested in the future, and we'll need something like this in the
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 492ac9a..0083513 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -30,6 +30,7 @@
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
 #include "storage/ipc.h"
+#include "storage/latch.h"
 #include "storage/pg_shmem.h"
 #include "storage/pmsignal.h"
 #include "storage/procarray.h"
@@ -117,6 +118,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, SInvalShmemSize());
 		size = add_size(size, PMSignalShmemSize());
 		size = add_size(size, ProcSignalShmemSize());
+		size = add_size(size, LatchShmemSize());
 		size = add_size(size, BgWriterShmemSize());
 		size = add_size(size, AutoVacuumShmemSize());
 		size = add_size(size, WalSndShmemSize());
@@ -217,6 +219,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	 */
 	PMSignalShmemInit();
 	ProcSignalShmemInit();
+	LatchShmemInit();
 	BgWriterShmemInit();
 	AutoVacuumShmemInit();
 	WalSndShmemInit();
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index e892e2d..3dd25f1 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -21,6 +21,7 @@
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "storage/ipc.h"
+#include "storage/latch.h"
 #include "storage/procsignal.h"
 #include "storage/shmem.h"
 #include "storage/sinval.h"
@@ -278,5 +279,7 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
 
+	latch_sigusr1_handler();
+
 	errno = save_errno;
 }
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 874959e..73c5904 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -13,6 +13,7 @@
 #define _WALSENDER_H
 
 #include "access/xlog.h"
+#include "storage/latch.h"
 #include "storage/spin.h"
 
 /*
@@ -24,6 +25,12 @@ typedef struct WalSnd
 	XLogRecPtr	sentPtr;		/* WAL has been sent up to this point */
 
 	slock_t		mutex;			/* locks shared variables shown above */
+
+	/*
+	 * Latch used by backends to wake up this walsender when it has work
+	 * to do.
+	 */
+	Latch		latch;
 } WalSnd;
 
 /* There is one WalSndCtl struct for the whole database cluster */
@@ -45,5 +52,6 @@ extern int	WalSenderMain(void);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);
+extern void WalSndWakeup(void);
 
 #endif   /* _WALSENDER_H */
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
new file mode 100644
index 0000000..b972586
--- /dev/null
+++ b/src/include/storage/latch.h
@@ -0,0 +1,62 @@
+/*-------------------------------------------------------------------------
+ *
+ * latch.h
+ *	  Routines for interprocess latches
+ *
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LATCH_H
+#define LATCH_H
+
+#include <signal.h>
+
+/*
+ * Latch structure should be treated as opaque and only accessed through
+ * the public functions. It is defined here to allow embedding Latches as
+ * part of bigger structs.
+ */
+typedef struct
+{
+	sig_atomic_t	is_set;
+	bool			is_shared;
+#ifndef WIN32
+	int				owner_pid;
+#else
+	HANDLE			event;
+#endif
+} Latch;
+
+/*
+ * prototypes for functions in latch.c
+ */
+extern void InitLatch(volatile Latch *latch);
+extern void InitSharedLatch(volatile Latch *latch);
+extern void AcquireLatch(volatile Latch *latch);
+extern void ReleaseLatch(volatile Latch *latch);
+extern bool WaitLatch(volatile Latch *latch, long timeout);
+extern int	WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
+				  long timeout);
+extern void SetLatch(volatile Latch *latch);
+extern void ResetLatch(volatile Latch *latch);
+#define TestLatch(latch) (((volatile Latch *) latch)->is_set)
+
+extern Size LatchShmemSize(void);
+extern void LatchShmemInit(void);
+
+/*
+ * Unix implementation uses SIGUSR1 for inter-process signaling, Win32 doesn't
+ * need this.
+ */
+#ifndef WIN32
+extern void latch_sigusr1_handler(void);
+#else
+#define latch_sigusr1_handler()
+#endif
+
+#endif   /* LATCH_H */
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index cf7c3ee..bf0e904 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -64,6 +64,7 @@ sub mkvcbuild
     $postgres->ReplaceFile('src\backend\port\dynloader.c','src\backend\port\dynloader\win32.c');
     $postgres->ReplaceFile('src\backend\port\pg_sema.c','src\backend\port\win32_sema.c');
     $postgres->ReplaceFile('src\backend\port\pg_shmem.c','src\backend\port\win32_shmem.c');
+    $postgres->ReplaceFile('src\backend\port\pg_latch.c','src\backend\port\win32_latch.c');
     $postgres->AddFiles('src\port',@pgportfiles);
     $postgres->AddDir('src\timezone');
     $postgres->AddFiles('src\backend\parser','scan.l','gram.y');
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to