On 02/09/10 06:46, Fujii Masao wrote:
On Wed, Sep 1, 2010 at 4:11 PM, Heikki Linnakangas
<heikki.linnakan...@enterprisedb.com> wrote:
The obvious next question is how to wait for multiple sockets and a latch at
the same time? Perhaps we should have a select()-like interface where you
can pass multiple file descriptors. Then again, looking at the current
callers of select() in the backend, apart from postmaster they all wait for
only one fd.
Currently backends have not waited for multiple sockets, so I don't think that
interface is required for now. Similarly, we don't need to wait for the socket
to be ready to *write* because there is no use case for now.
Ok, here's an updated patch with WaitLatchOrSocket that let's you do that.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
diff --git a/configure b/configure
index bd9b347..432cd58 100755
--- a/configure
+++ b/configure
@@ -27773,6 +27773,13 @@ _ACEOF
SHMEM_IMPLEMENTATION="src/backend/port/win32_shmem.c"
fi
+# Select latch implementation type.
+if test "$PORTNAME" != "win32"; then
+ LATCH_IMPLEMENTATION="src/backend/port/unix_latch.c"
+else
+ LATCH_IMPLEMENTATION="src/backend/port/win32_latch.c"
+fi
+
# If not set in template file, set bytes to use libc memset()
if test x"$MEMSET_LOOP_LIMIT" = x"" ; then
MEMSET_LOOP_LIMIT=1024
@@ -29098,7 +29105,7 @@ fi
ac_config_files="$ac_config_files GNUmakefile src/Makefile.global"
-ac_config_links="$ac_config_links src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION} src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION} src/include/dynloader.h:src/backend/port/dynloader/${template}.h src/include/pg_config_os.h:src/include/port/${template}.h src/Makefile.port:src/makefiles/Makefile.${template}"
+ac_config_links="$ac_config_links src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION} src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION} src/backend/port/pg_latch.c:${LATCH_IMPLEMENTATION} src/include/dynloader.h:src/backend/port/dynloader/${template}.h src/include/pg_config_os.h:src/include/port/${template}.h src/Makefile.port:src/makefiles/Makefile.${template}"
if test "$PORTNAME" = "win32"; then
@@ -29722,6 +29729,7 @@ do
"src/backend/port/dynloader.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c" ;;
"src/backend/port/pg_sema.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION}" ;;
"src/backend/port/pg_shmem.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION}" ;;
+ "src/backend/port/pg_latch.c") CONFIG_LINKS="$CONFIG_LINKS src/backend/port/pg_latch.c:${LATCH_IMPLEMENTATION}" ;;
"src/include/dynloader.h") CONFIG_LINKS="$CONFIG_LINKS src/include/dynloader.h:src/backend/port/dynloader/${template}.h" ;;
"src/include/pg_config_os.h") CONFIG_LINKS="$CONFIG_LINKS src/include/pg_config_os.h:src/include/port/${template}.h" ;;
"src/Makefile.port") CONFIG_LINKS="$CONFIG_LINKS src/Makefile.port:src/makefiles/Makefile.${template}" ;;
diff --git a/configure.in b/configure.in
index 7b09986..7f84cea 100644
--- a/configure.in
+++ b/configure.in
@@ -1700,6 +1700,13 @@ else
SHMEM_IMPLEMENTATION="src/backend/port/win32_shmem.c"
fi
+# Select latch implementation type.
+if test "$PORTNAME" != "win32"; then
+ LATCH_IMPLEMENTATION="src/backend/port/unix_latch.c"
+else
+ LATCH_IMPLEMENTATION="src/backend/port/win32_latch.c"
+fi
+
# If not set in template file, set bytes to use libc memset()
if test x"$MEMSET_LOOP_LIMIT" = x"" ; then
MEMSET_LOOP_LIMIT=1024
@@ -1841,6 +1848,7 @@ AC_CONFIG_LINKS([
src/backend/port/dynloader.c:src/backend/port/dynloader/${template}.c
src/backend/port/pg_sema.c:${SEMA_IMPLEMENTATION}
src/backend/port/pg_shmem.c:${SHMEM_IMPLEMENTATION}
+ src/backend/port/pg_latch.c:${LATCH_IMPLEMENTATION}
src/include/dynloader.h:src/backend/port/dynloader/${template}.h
src/include/pg_config_os.h:src/include/port/${template}.h
src/Makefile.port:src/makefiles/Makefile.${template}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 615a7fa..094d0c9 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -55,6 +55,7 @@
#include "miscadmin.h"
#include "pg_trace.h"
#include "pgstat.h"
+#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
@@ -1025,6 +1026,13 @@ EndPrepare(GlobalTransaction gxact)
/* If we crash now, we have prepared: WAL replay will fix things */
+ /*
+ * Wake up all walsenders to send WAL up to the PREPARE record
+ * immediately if replication is enabled
+ */
+ if (max_wal_senders > 0)
+ WalSndWakeup();
+
/* write correct CRC and close file */
if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
{
@@ -2005,6 +2013,13 @@ RecordTransactionCommitPrepared(TransactionId xid,
/* Flush XLOG to disk */
XLogFlush(recptr);
+ /*
+ * Wake up all walsenders to send WAL up to the COMMIT PREPARED record
+ * immediately if replication is enabled
+ */
+ if (max_wal_senders > 0)
+ WalSndWakeup();
+
/* Mark the transaction committed in pg_clog */
TransactionIdCommitTree(xid, nchildren, children);
@@ -2078,6 +2093,13 @@ RecordTransactionAbortPrepared(TransactionId xid,
XLogFlush(recptr);
/*
+ * Wake up all walsenders to send WAL up to the ABORT PREPARED record
+ * immediately if replication is enabled
+ */
+ if (max_wal_senders > 0)
+ WalSndWakeup();
+
+ /*
* Mark the transaction aborted in clog. This is not absolutely necessary
* but we may as well do it while we are here.
*/
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 6bcc55c..942d5c2 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -36,6 +36,7 @@
#include "libpq/be-fsstubs.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
@@ -1068,6 +1069,13 @@ RecordTransactionCommit(void)
XLogFlush(XactLastRecEnd);
/*
+ * Wake up all walsenders to send WAL up to the COMMIT record
+ * immediately if replication is enabled
+ */
+ if (max_wal_senders > 0)
+ WalSndWakeup();
+
+ /*
* Now we may update the CLOG, if we wrote a COMMIT record above
*/
if (markXidCommitted)
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile
index db0c2af..f50cff8 100644
--- a/src/backend/port/Makefile
+++ b/src/backend/port/Makefile
@@ -21,7 +21,7 @@ subdir = src/backend/port
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = dynloader.o pg_sema.o pg_shmem.o $(TAS)
+OBJS = dynloader.o pg_sema.o pg_shmem.o pg_latch.o $(TAS)
ifeq ($(PORTNAME), darwin)
SUBDIRS += darwin
diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
new file mode 100644
index 0000000..1632529
--- /dev/null
+++ b/src/backend/port/unix_latch.c
@@ -0,0 +1,336 @@
+/*-------------------------------------------------------------------------
+ *
+ * unix_latch.c
+ * Routines for interprocess latches
+ *
+ * A latch allows you to wait until another process, or a signal handler
+ * within the same process, wakes you up. There is three basic operations
+ * on a latch:
+ *
+ * SetLatch - Sets the latch
+ * ResetLatch - Clears the latch, allowing it to be set again
+ * WaitLatch - waits for the latch to become set
+ *
+ * These can be used to wait for an event, without the race conditions
+ * involved with plain Unix signals and select(). pselect() was invented
+ * to solve the same problem, but it is not portable enough. Also,
+ * select() is not interrupted by signals on some platforms.
+ *
+ * The implementation is such that setting a latch that's already set
+ * is quick.
+ *
+ * The correct pattern to wait for an event is:
+ *
+ * for (;;)
+ * {
+ * ResetLatch();
+ * if (work to do)
+ * Do Stuff();
+ *
+ * WaitLatch();
+ * }
+ *
+ * It's important to reset the latch *before* checking if there's work to
+ * do. Otherwise, if someone sets the latch between the check and the
+ * ResetLatch call, you will miss it and Wait will block.
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <signal.h>
+#include <unistd.h>
+
+#include "miscadmin.h"
+#include "replication/walsender.h"
+#include "storage/latch.h"
+#include "storage/shmem.h"
+
+/* Are we currently in WaitLatch()? The signal handler would like to know. */
+static volatile sig_atomic_t waiting = false;
+
+/* Read and write end of the self-pipe */
+static int selfpipe_readfd = -1;
+static int selfpipe_writefd = -1;
+
+/* private function prototypes */
+static void initSelfPipe(void);
+static void drainSelfPipe(void);
+static void sendSelfPipeByte(void);
+
+/*
+ * Initialize a backend-local latch.
+ */
+void
+InitLatch(Latch *latch)
+{
+ Assert(latch->owner_pid == 0);
+
+ /* Initialize the self pipe if this is our first latch in the process */
+ if (selfpipe_readfd == -1)
+ initSelfPipe();
+
+ latch->owner_pid = MyProcPid;
+ latch->is_set = false;
+}
+
+/*
+ * Initialize an inter-process latch. Like InitLatch(), but the latch can
+ * be set from another process. A process that needs to wait on an
+ * inter-proess latch also needs to ensure that latch_sigusr1_handler()
+ * is called from the SIGUSR1 signal handler.
+ *
+ * NB: You must increase the shared latch count in NumSharedLatches() in
+ * win32_latch.c if you introduce a new shared latch!
+ */
+void
+InitSharedLatch(Latch *latch)
+{
+ /*
+ * This is the same as InitLatch() in this implementation. The
+ * Windows implementation will likely differ.
+ */
+ InitLatch(latch);
+}
+
+/*
+ * Release a latch previously allocated with InitLatch() or InitShareLatch().
+ */
+void
+ReleaseLatch(Latch *latch)
+{
+ Assert(latch->owner_pid == MyProcPid);
+ latch->owner_pid = 0;
+}
+
+/*
+ * Wait for given latch to be set, or until 'timeout' milliseconds passes.
+ * If 'timeout' is 0, wait forever. If the latch is already set, returns
+ * immediately.
+ *
+ * The latch must have been previously initialized by the current process.
+ */
+void
+WaitLatch(Latch *latch, long timeout)
+{
+ WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout);
+}
+
+/*
+ * Like WaitLatch, but will also return when there's data available in
+ * 'sock' for reading.
+ */
+void
+WaitLatchOrSocket(Latch *latch, pgsocket sock, long timeout)
+{
+ struct timeval tv;
+ fd_set input_mask;
+ int rc;
+
+ if (latch->owner_pid != MyProcPid)
+ elog(ERROR, "cannot wait on a latch owned by another process");
+
+ waiting = true;
+ for (;;)
+ {
+ int hifd;
+
+ /*
+ * Clear the pipe, and check if the latch is set already. If someone
+ * sets the latch between this and the select() below, the setter
+ * will write a byte to the pipe (or signal us and the signal handler
+ * will do that), and the select() will return immediately.
+ */
+ drainSelfPipe();
+ if (latch->is_set)
+ break;
+
+ /* Sleep */
+ if (timeout > 0)
+ {
+ tv.tv_sec = timeout / 1000000L;
+ tv.tv_usec = timeout % 1000000L;
+ }
+
+ FD_ZERO(&input_mask);
+ FD_SET(selfpipe_readfd, &input_mask);
+ hifd = selfpipe_readfd;
+ if (sock != PGINVALID_SOCKET)
+ {
+ FD_SET(sock, &input_mask);
+ if (sock > hifd)
+ hifd = sock;
+ }
+
+ rc = select(hifd + 1, &input_mask, NULL, NULL,
+ (timeout > 0) ? &tv : NULL);
+ if (rc < 0)
+ {
+ if (errno != EINTR)
+ ereport(ERROR,
+ (errcode_for_socket_access(),
+ errmsg("select() failed: %m")));
+ }
+ if (rc == 0)
+ break; /* timeout exceeded */
+ if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask))
+ break; /* data available in socket */
+ }
+ waiting = false;
+}
+
+/*
+ * Sets a latch and wakes up anyone waiting on it. Returns quickly if the
+ * latch is already set.
+ */
+void
+SetLatch(Latch *latch)
+{
+ pid_t owner_pid;
+
+ /* Quick exit if already set */
+ if (latch->is_set)
+ return;
+
+ latch->is_set = true;
+
+ /*
+ * See if anyone's waiting for the latch. It can be the current process
+ * if we're in a signal handler. We use the self-pipe to wake up the
+ * select() in that case. If it's another process, send a signal.
+ */
+ owner_pid = latch->owner_pid;
+ if (owner_pid == 0)
+ return;
+ else if (owner_pid == MyProcPid)
+ sendSelfPipeByte();
+ else
+ kill(owner_pid, SIGUSR1);
+}
+
+/*
+ * Clear the latch. Calling WaitLatch after this will sleep, unless
+ * the latch is set again before the WaitLatch call.
+ */
+void
+ResetLatch(Latch *latch)
+{
+ /* Only the owner should reset the latch */
+ Assert(latch->owner_pid == MyProcPid);
+
+ if (!latch->is_set)
+ return;
+
+ latch->is_set = false;
+}
+
+/*
+ * LatchShmemSize
+ * Compute space needed for latch's shared memory
+ *
+ * Not needed for Unix implementation.
+ */
+Size
+LatchShmemSize(void)
+{
+ return 0;
+}
+
+/*
+ * LatchShmemInit
+ * Allocate and initialize shared memory needed for latches
+ *
+ * Not needed for Unix implementation.
+ */
+void
+LatchShmemInit(void)
+{
+}
+
+/*
+ * SetEvent uses SIGUSR1 to wake up the process waiting on the latch.
+ */
+void
+latch_sigusr1_handler(void)
+{
+ if (waiting)
+ sendSelfPipeByte();
+}
+
+/* initialize the self-pipe */
+static void
+initSelfPipe(void)
+{
+ int pipefd[2];
+
+ /*
+ * Set up a pipe that allows a signal handler to wake up the select()
+ * in WaitLatch(). Make the write-end non-blocking, so that SetLatch()
+ * won't block if the event has already been set many times filling
+ * the kernel buffer. Make the read-end non-blocking too, so that we
+ * can easily clear the pipe by reading until EAGAIN or EWOULDBLOCK.
+ */
+ if (pipe(pipefd) < 0)
+ elog(FATAL, "pipe() failed: %m");
+ if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0)
+ elog(FATAL, "fcntl() failed on read-end of self-pipe: %m");
+ if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0)
+ elog(FATAL, "fcntl() failed on write-end of self-pipe: %m");
+
+ selfpipe_readfd = pipefd[0];
+ selfpipe_writefd = pipefd[1];
+}
+
+/* Send one byte to the self-pipe, to wake up WaitLatch() */
+static void
+sendSelfPipeByte(void)
+{
+ int rc;
+ char dummy = 0;
+
+retry:
+ rc = write(selfpipe_writefd, &dummy, 1);
+ if (rc < 0)
+ {
+ if (errno != EAGAIN && errno != EWOULDBLOCK)
+ {
+ /*
+ * XXX: Is it safe to elog(ERROR) in a signal handler?
+ */
+ elog(ERROR, "write() on self-pipe failed: %m");
+ }
+ if (errno == EINTR)
+ goto retry;
+ }
+}
+
+/* Read all available data from the self-pipe */
+static void
+drainSelfPipe(void)
+{
+ int rc;
+ char buf;
+
+ for (;;)
+ {
+ rc = read(selfpipe_readfd, &buf, 1);
+ if (rc < 0)
+ {
+ if (errno == EINTR)
+ continue;
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ break; /* the pipe is empty */
+
+ elog(ERROR, "read() on self-pipe failed: %m");
+ }
+ else if (rc == 0)
+ elog(ERROR, "unexpected EOF on self-pipe");
+ }
+}
diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c
new file mode 100644
index 0000000..219af0f
--- /dev/null
+++ b/src/backend/port/win32_latch.c
@@ -0,0 +1,302 @@
+/*-------------------------------------------------------------------------
+ *
+ * win32_latch.c
+ * Routines for interprocess latches
+ *
+ * Windows implementation of latches, using Windows events. See
+ * unix_latch.c for information on usage.
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <signal.h>
+#include <unistd.h>
+
+#include "miscadmin.h"
+#include "replication/walsender.h"
+#include "storage/latch.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+
+typedef struct
+{
+ volatile slock_t mutex;
+ int nfreehandles;
+ int maxhandles;
+ HANDLE handles[1]; /* variable length */
+} SharedEventHandles;
+
+SharedEventHandles *sharedHandles;
+
+/* Are we currently in WaitLatch()? The signal handler would like to know. */
+static volatile HANDLE waitingEvent = false;
+
+/*
+ * Initialize a backend-local latch.
+ */
+void
+InitLatch(Latch *latch)
+{
+ Assert(latch->event == NULL);
+ latch->isshared = false;
+ latch->event = CreateEvent(NULL, TRUE, FALSE, NULL);
+ latch->is_set = false;
+}
+
+/*
+ * Initialize an inter-process latch. Like InitLatch(), but the latch can
+ * be triggered from another process. A process that needs to wait on
+ * an inter-proess latch also needs to ensure that latch_sigusr1_handler()
+ * is called from the SIGUSR1 signal handler.
+ */
+void
+InitSharedLatch(Latch *latch)
+{
+ Assert(latch->event == NULL);
+ SpinLockAcquire(&sharedHandles->mutex);
+ if (sharedHandles->nfreehandles <= 0)
+ {
+ SpinLockRelease(&sharedHandles->mutex);
+ elog(ERROR, "out of shared event objects");
+ }
+ sharedHandles->nfreehandles--;
+ latch->event = sharedHandles->handles[sharedHandles->nfreehandles];
+ SpinLockRelease(&sharedHandles->mutex);
+
+ latch->isshared = true;
+ latch->is_set = false;
+ ResetEvent(latch->event);
+}
+
+/*
+ * Release a latch previously allocated with InitLatch() or InitShareLatch().
+ */
+void
+ReleaseLatch(Latch *latch)
+{
+ Assert(latch->event);
+ if (latch->isshared)
+ {
+ /* Put the event handle back to the pool */
+ SpinLockAcquire(&sharedHandles->mutex);
+ if (sharedHandles->nfreehandles >= sharedHandles->maxhandles)
+ {
+ SpinLockRelease(&sharedHandles->mutex);
+ elog(PANIC, "too many free event handles");
+ }
+ sharedHandles->handles[sharedHandles->nfreehandles] = latch->event;
+ sharedHandles->nfreehandles++;
+ SpinLockRelease(&sharedHandles->mutex);
+ latch->event = NULL;
+ }
+ else
+ {
+ /* XXX: is this safe from compiler rearrangement? */
+ HANDLE handle = latch->event;
+ latch->event = NULL;
+ CloseHandle(handle);
+ }
+}
+
+/*
+ * Wait for given latch to be set, or until 'timeout' milliseconds passes.
+ * If 'timeout' is 0, wait forever. If the latch is already set, returns
+ * immediately.
+ *
+ * The latch must have been previously initialized by the current process.
+ */
+void
+WaitLatch(Latch *latch, long timeout)
+{
+ WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout);
+}
+
+/*
+ * Like WaitLatch, but will also return when there's data available in
+ * 'sock' for reading.
+ */
+void
+WaitLatchOrSocket(Latch *latch, SOCKET sock, long timeout)
+
+{
+ DWORD rc;
+ HANDLE events[3];
+ HANDLE sockevent;
+ int numevents;
+
+ events[0] = latch->event;
+ events[1] = pgwin32_signal_event;
+ numevents = 2;
+ if (sock != PGINVALID_SOCKET)
+ {
+ sockevent = WSACreateEvent();
+ WSAEventSelect(sock, sockevent, FD_READ);
+ events[numevents++] = sockevent;
+ }
+
+ waitingEvent = latch->event;
+ for (;;)
+ {
+ /*
+ * Reset the event, and check if the latch is set already. If someone
+ * sets the latch between this and the WaitForMultipleObjects() call
+ * below, the setter will set the event and WaitForMultipleObjects()
+ * will return immediately.
+ */
+ if (!ResetEvent(latch->event))
+ {
+ waitingEvent = NULL;
+ elog(ERROR, "ResetEvent failed: error code %d", (int) GetLastError());
+ }
+ if (latch->is_set)
+ break;
+
+ rc = WaitForMultipleObjects(numevents, events, FALSE, (timeout > 0) ? (timeout / 1000) : INFINITE);
+ if (rc == WAIT_FAILED)
+ {
+ ereport(ERROR,
+ (errcode_for_socket_access(),
+ errmsg("WaitForSingleObject() failed: error code %d", (int) GetLastError())));
+ }
+ if (rc == WAIT_TIMEOUT)
+ break; /* timeout exceeded */
+ if (rc == WAIT_OBJECT_0 + 1)
+ pgwin32_dispatch_queued_signals();
+ if (rc == WAIT_OBJECT_0 + 2)
+ {
+ Assert(sock != PGINVALID_SOCKET);
+ break;
+ }
+ }
+ waitingEvent = NULL;
+
+ /* Clean up the handle we created for the socket */
+ if (sock != PGINVALID_SOCKET)
+ {
+ WSAEventSelect(sock, sockevent, 0);
+ WSACloseEvent(sockevent);
+ }
+}
+
+/*
+ * Sets a latch and wakes up anyone waiting on it. Returns quickly if the
+ * latch is set already.
+ */
+void
+SetLatch(Latch *latch)
+{
+ HANDLE handle;
+
+ /* Quick exit if already set */
+ if (latch->is_set)
+ return;
+
+ latch->is_set = true;
+
+ /*
+ * See if anyone's waiting for the latch. It can be the current process
+ * if we're in a signal handler. Use a local variable here in case the
+ * latch is just released between the test and the SetEvent call.
+ */
+ handle = latch->event;
+ if (handle)
+ {
+ if (!SetEvent(handle))
+ elog(LOG, "SetEvent failed: error code %d", (int) GetLastError());
+ }
+}
+
+/*
+ * Clear the latch. Calling WaitLatch after this will sleep, unless
+ * the latch is set again before the WaitLatch call.
+ */
+void
+ResetLatch(Latch *latch)
+{
+ if (!latch->is_set)
+ return;
+
+ latch->is_set = false;
+}
+
+/*
+ * Number of shared latches, used to allocate the right number of shared
+ * Event handles at postmaster startup. You must update this if you
+ * introduce a new shared latch!
+ */
+static int
+NumSharedLatches(void)
+{
+ int numLatches = 0;
+
+ /* Each walsender needs one latch */
+ numLatches += max_wal_senders;
+
+ return numLatches;
+}
+
+/*
+ * LatchShmemSize
+ * Compute space needed for latch's shared memory
+ */
+Size
+LatchShmemSize(void)
+{
+ return offsetof(SharedEventHandles, handles) +
+ NumSharedLatches() * sizeof(HANDLE);
+}
+
+/*
+ * LatchShmemInit
+ * Allocate and initialize shared memory needed for latches
+ */
+void
+LatchShmemInit(void)
+{
+ Size size = LatchShmemSize();
+ bool found;
+
+ sharedHandles = ShmemInitStruct("SharedEventHandles", size, &found);
+
+ /* If we're first, initialize the struct and allocate handles */
+ if (!found)
+ {
+ int i;
+ SECURITY_ATTRIBUTES sa;
+
+ /*
+ * Set up security attributes to specify that the events are
+ * inherited.
+ */
+ ZeroMemory(&sa, sizeof(sa));
+ sa.nLength = sizeof(sa);
+ sa.bInheritHandle = TRUE;
+
+ SpinLockInit(&sharedHandles->mutex);
+ sharedHandles->maxhandles = NumSharedLatches();
+ sharedHandles->nfreehandles = sharedHandles->maxhandles;
+ for (i = 0; i < sharedHandles->maxhandles; i++)
+ {
+ sharedHandles->handles[i] = CreateEvent(&sa, TRUE, FALSE, NULL);
+ if (sharedHandles->handles[i] == NULL)
+ elog(ERROR, "CreateEvent failed: error code %d", (int) GetLastError());
+ }
+ }
+}
+
+/* I'm not sure if this is needed on Windows... */
+void
+latch_sigusr1_handler(void)
+{
+ HANDLE handle = waitingEvent;
+ if (handle)
+ SetEvent(handle);
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 53c2581..cda191d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -34,6 +34,7 @@
*/
#include "postgres.h"
+#include <signal.h>
#include <unistd.h>
#include "access/xlog_internal.h"
@@ -66,9 +67,6 @@ bool am_walsender = false; /* Am I a walsender process ? */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int WalSndDelay = 200; /* max sleep time between some actions */
-#define NAPTIME_PER_CYCLE 100000L /* max sleep time between cycles
- * (100ms) */
-
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
* but for walsender to read the XLOG.
@@ -93,6 +91,7 @@ static volatile sig_atomic_t ready_to_stop = false;
static void WalSndSigHupHandler(SIGNAL_ARGS);
static void WalSndShutdownHandler(SIGNAL_ARGS);
static void WalSndQuickDieHandler(SIGNAL_ARGS);
+static void WalSndXLogSendHandler(SIGNAL_ARGS);
static void WalSndLastCycleHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
@@ -144,6 +143,16 @@ WalSenderMain(void)
/* Handle handshake messages before streaming */
WalSndHandshake();
+ /* Initialize shared memory status */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->sentPtr = sentPtr;
+ SpinLockRelease(&walsnd->mutex);
+ }
+
/* Main loop of walsender */
return WalSndLoop();
}
@@ -380,8 +389,6 @@ WalSndLoop(void)
/* Loop forever, unless we get an error */
for (;;)
{
- long remain; /* remaining time (us) */
-
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
@@ -421,32 +428,42 @@ WalSndLoop(void)
/*
* If we had sent all accumulated WAL in last round, nap for the
* configured time before retrying.
- *
- * On some platforms, signals won't interrupt the sleep. To ensure we
- * respond reasonably promptly when someone signals us, break down the
- * sleep into NAPTIME_PER_CYCLE increments, and check for interrupts
- * after each nap.
*/
if (caughtup)
{
- remain = WalSndDelay * 1000L;
- while (remain > 0)
- {
- /* Check for interrupts */
- if (got_SIGHUP || shutdown_requested || ready_to_stop)
- break;
+ /*
+ * Even if we wrote all the WAL that was available when we started
+ * sending, more might have arrived while we were sending this
+ * batch. We had the latch set while sending, so we have not
+ * received any signals from that time. Let's arm the latch
+ * again, and after that check that we're still up-to-date.
+ */
+ ResetLatch(&MyWalSnd->latch);
- /* Sleep and check that the connection is still alive */
- pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain);
- CheckClosedConnection();
+ if (!XLogSend(output_message, &caughtup))
+ break;
+ if (caughtup && !got_SIGHUP && !ready_to_stop && !shutdown_requested)
+ {
+ /*
+ * XXX: We don't really need the periodic wakeups anymore,
+ * WaitLatchOrSocket should reliably wake up as soon as
+ * something interesting happens.
+ */
- remain -= NAPTIME_PER_CYCLE;
+ /* Sleep */
+ WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock, 0);
+ elog(LOG, "woke up");
}
- }
- /* Attempt to send the log once every loop */
- if (!XLogSend(output_message, &caughtup))
- break;
+ /* Check if the connection was closed */
+ CheckClosedConnection();
+ }
+ else
+ {
+ /* Attempt to send the log once every loop */
+ if (!XLogSend(output_message, &caughtup))
+ break;
+ }
}
/*
@@ -493,10 +510,15 @@ InitWalSnd(void)
}
else
{
- /* found */
- MyWalSnd = (WalSnd *) walsnd;
+ /*
+ * Found a free slot. Take ownership of the latch and initialize
+ * the other fields.
+ */
+ InitSharedLatch((Latch *) &walsnd->latch);
walsnd->pid = MyProcPid;
- MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr));
+ MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
+ /* Set MyWalSnd only after it's fully initialized. */
+ MyWalSnd = (WalSnd *) walsnd;
SpinLockRelease(&walsnd->mutex);
break;
}
@@ -523,6 +545,7 @@ WalSndKill(int code, Datum arg)
* for this.
*/
MyWalSnd->pid = 0;
+ ReleaseLatch(&MyWalSnd->latch);
/* WalSnd struct isn't mine anymore */
MyWalSnd = NULL;
@@ -787,6 +810,8 @@ static void
WalSndSigHupHandler(SIGNAL_ARGS)
{
got_SIGHUP = true;
+ if (MyWalSnd)
+ SetLatch(&MyWalSnd->latch);
}
/* SIGTERM: set flag to shut down */
@@ -794,6 +819,8 @@ static void
WalSndShutdownHandler(SIGNAL_ARGS)
{
shutdown_requested = true;
+ if (MyWalSnd)
+ SetLatch(&MyWalSnd->latch);
}
/*
@@ -828,11 +855,20 @@ WalSndQuickDieHandler(SIGNAL_ARGS)
exit(2);
}
+/* SIGUSR1: set flag to send WAL records */
+static void
+WalSndXLogSendHandler(SIGNAL_ARGS)
+{
+ latch_sigusr1_handler();
+}
+
/* SIGUSR2: set flag to do a last cycle and shut down afterwards */
static void
WalSndLastCycleHandler(SIGNAL_ARGS)
{
ready_to_stop = true;
+ if (MyWalSnd)
+ SetLatch(&MyWalSnd->latch);
}
/* Set up signal handlers */
@@ -847,7 +883,7 @@ WalSndSignals(void)
pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGUSR1, SIG_IGN); /* not used */
+ pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
* shutdown */
@@ -895,6 +931,16 @@ WalSndShmemInit(void)
}
}
+/* Wake up all walsenders */
+void
+WalSndWakeup(void)
+{
+ int i;
+
+ for (i = 0; i < max_wal_senders; i++)
+ SetLatch(&WalSndCtl->walsnds[i].latch);
+}
+
/*
* This isn't currently used for anything. Monitoring tools might be
* interested in the future, and we'll need something like this in the
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 492ac9a..0083513 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -30,6 +30,7 @@
#include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
+#include "storage/latch.h"
#include "storage/pg_shmem.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
@@ -117,6 +118,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, SInvalShmemSize());
size = add_size(size, PMSignalShmemSize());
size = add_size(size, ProcSignalShmemSize());
+ size = add_size(size, LatchShmemSize());
size = add_size(size, BgWriterShmemSize());
size = add_size(size, AutoVacuumShmemSize());
size = add_size(size, WalSndShmemSize());
@@ -217,6 +219,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
*/
PMSignalShmemInit();
ProcSignalShmemInit();
+ LatchShmemInit();
BgWriterShmemInit();
AutoVacuumShmemInit();
WalSndShmemInit();
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index e892e2d..3dd25f1 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -21,6 +21,7 @@
#include "commands/async.h"
#include "miscadmin.h"
#include "storage/ipc.h"
+#include "storage/latch.h"
#include "storage/procsignal.h"
#include "storage/shmem.h"
#include "storage/sinval.h"
@@ -278,5 +279,7 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
+ latch_sigusr1_handler();
+
errno = save_errno;
}
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 874959e..73c5904 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -13,6 +13,7 @@
#define _WALSENDER_H
#include "access/xlog.h"
+#include "storage/latch.h"
#include "storage/spin.h"
/*
@@ -24,6 +25,12 @@ typedef struct WalSnd
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
slock_t mutex; /* locks shared variables shown above */
+
+ /*
+ * Latch used by backends to wake up this walsender when it has work
+ * to do.
+ */
+ Latch latch;
} WalSnd;
/* There is one WalSndCtl struct for the whole database cluster */
@@ -45,5 +52,6 @@ extern int WalSenderMain(void);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
+extern void WalSndWakeup(void);
#endif /* _WALSENDER_H */
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
new file mode 100644
index 0000000..677e723
--- /dev/null
+++ b/src/include/storage/latch.h
@@ -0,0 +1,50 @@
+/*-------------------------------------------------------------------------
+ *
+ * latch.h
+ * Routines for interprocess latches
+ *
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LATCH_H
+#define LATCH_H
+
+#include <signal.h>
+
+#ifndef WIN32
+typedef struct
+{
+ volatile sig_atomic_t is_set;
+ volatile sig_atomic_t owner_pid;
+} Latch;
+#else
+typedef struct
+{
+ volatile sig_atomic_t is_set;
+ bool isshared;
+ HANDLE event;
+} Latch;
+#endif
+
+/*
+ * prototypes for functions in latch.c
+ */
+extern void InitLatch(Latch *latch);
+extern void InitSharedLatch(Latch *latch);
+extern void ReleaseLatch(Latch *latch);
+extern void WaitLatch(Latch *latch, long timeout);
+extern void WaitLatchOrSocket(Latch *latch, pgsocket sock, long timeout);
+extern void SetLatch(Latch *latch);
+extern void ResetLatch(Latch *latch);
+
+extern Size LatchShmemSize(void);
+extern void LatchShmemInit(void);
+
+extern void latch_sigusr1_handler(void);
+
+#endif /* LATCH_H */
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index cf7c3ee..bf0e904 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -64,6 +64,7 @@ sub mkvcbuild
$postgres->ReplaceFile('src\backend\port\dynloader.c','src\backend\port\dynloader\win32.c');
$postgres->ReplaceFile('src\backend\port\pg_sema.c','src\backend\port\win32_sema.c');
$postgres->ReplaceFile('src\backend\port\pg_shmem.c','src\backend\port\win32_shmem.c');
+ $postgres->ReplaceFile('src\backend\port\pg_latch.c','src\backend\port\win32_latch.c');
$postgres->AddFiles('src\port',@pgportfiles);
$postgres->AddDir('src\timezone');
$postgres->AddFiles('src\backend\parser','scan.l','gram.y');
diff --git a/src/tools/msvc/build.pl b/src/tools/msvc/build.pl
index 24db58c..bf8f356 100644
--- a/src/tools/msvc/build.pl
+++ b/src/tools/msvc/build.pl
@@ -56,7 +56,7 @@ if ($buildwhat)
}
else
{
- system("msbuild pgsql.sln /verbosity:detailed /p:Configuration=$bconf");
+ system("msbuild pgsql.sln /verbosity:normal /p:Configuration=$bconf");
}
# report status
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers