Hi,

I've been playing around with measuring the latch implementation in 9.1, and here are the results of a ping-pong test with 2 processes signalling and waiting on the latch. I did three variations (linux 2.6.18, nehalem processor).

One is the current one.

The second is built on native semaphors on linux. This one cannot
implement WaitLatchOrSocket, there's no select involved.

The third is an implementation based on pipe() and poll. Note: in its current incarnation it's essentially a hack to measure performance, it's not usable in postgres, this assumes all latches are created before any process is forked. We'd need to use mkfifo to sort that out if we really want to go this route, or similar.

- Current implementation: 1 pingpong is avg 15 usecs
- Pipe+poll: 9 usecs
- Semaphore: 6 usecs

The test program & modified unix_latch.c is attached, you can compile it like "gcc -DPIPE -O2 sema.c" or "gcc -DLINUX_SEM -O2 sema.c" or "gcc -O2 sema.c".


Thanks,
--Ganesh

#include <sys/file.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <sys/sem.h>

#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
#include <errno.h>

int nIter = 1000000;

#define LATCH_TIMEOUT 50000000L

#include "latch.h"
#include "unix_latch.c"
int selfLatch, otherLatch;

Latch *latchArray;

#ifdef LINUX_SEM
int semId = -1;
#endif

void DoWork(int selfLatch);

void
sigusr1handler(int n)
{
   DEBUG("sigusr handler\n");
   latch_sigusr1_handler();
}


main()
{
   int child;
   int i;
   int shmid;

   for (i = 0; i < 1000; i++) {
      shmid = shmget(6500 + i, sizeof(Latch) * 2, IPC_CREAT | IPC_EXCL|0x1ff);
      if (shmid < 0) {
         DEBUG("shmget error %d %d\n", shmid, errno);
         if (i == 999) { 
            printf("Can't get shm, aborting test\n");
            exit (1); 
         }
      } else {
         break;
      }
   }
   latchArray = shmat(shmid, NULL, 0);
   if ((long)latchArray < 0) {
      printf("shmat error %ld %d\n", (long)latchArray, errno);
      exit(1);
   }
   InitSharedLatch(&latchArray[0]);
   InitSharedLatch(&latchArray[1]);
   child = fork();
   if (child < 0) {
      printf("fork error %d %d\n", child, errno);
      exit(1);
   }
   MyProcPid = getpid();
   signal(SIGUSR1, sigusr1handler);
   DoWork(child != 0);
}

static int
WaitForOther(int latch)
{
   int ret;
   Latch *l = &latchArray[latch];
   DEBUG("Wait %p\n", l);
   ret = WaitLatch(l, LATCH_TIMEOUT);
   ResetLatch(l);
   return ret;
}

static void
SignalOther(int latch)
{
   Latch *l = &latchArray[latch];
   DEBUG("Signal %p\n", l);
   SetLatch(l);
}

void 
DoWork(int l)
{
   int i;
   struct timeval tv1, tv2;
   struct timezone tz;
   float diff;

   selfLatch = l, otherLatch = selfLatch ^ 1;
   Latch *self = &latchArray[selfLatch];
   Latch *other = &latchArray[otherLatch ^ 1];
   OwnLatch(self);
   sleep(2); /* Cheat: pseudo-barrier. */
   printf("Start Test:");
#if defined(LINUX_SEM)
   printf("Using semaphores\n");
#elif defined(PIPE)
   printf("Using pipe\n");
#else
   printf("Using signal\n");
#endif
   /* Set one of the latches in the beginning. */
   if (selfLatch == 0) {
      SignalOther(otherLatch);
   }

   gettimeofday(&tv1, &tz);
   for (i = 0; i < nIter; i++) {
      if (WaitForOther(selfLatch) != 1) { 
         printf("BUG BUG BUG\n");
         exit(1);
      }
      SignalOther(otherLatch);
   }
   gettimeofday(&tv2, &tz);
   diff = (tv2.tv_sec - tv1.tv_sec) * 1000000 + (tv2.tv_usec - tv1.tv_usec);
   printf("%ld iterations took %.2f sec (%.2f usec/iter)\n",
          nIter, diff/1000000, diff/nIter);
}
/*-------------------------------------------------------------------------
 *
 * latch.h
 *       Routines for interprocess latches
 *
 *
 * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * $PostgreSQL$
 *
 *-------------------------------------------------------------------------
 */

#ifndef _LATCH_H_
#define _LATCH_H_

#include <stdio.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>

/* Fake postgres. */

int MyProcPid;

#define false 0
#define true 1

typedef unsigned char bool;

typedef int pgsocket;

#define PGINVALID_SOCKET -1


#define elog(_x,...) printf(__VA_ARGS__)
#define Assert(_x) do { if ((_x) == 0) { printf("Assertion failure @%d\n", 
__LINE__); exit(1); }} while (0);
//#define DEBUG(...) printf(__VA_ARGS__) 
#define DEBUG(...) do { ; } while (0); 

union semun {
   int              val;    /* Value for SETVAL */
   struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */
   unsigned short  *array;  /* Array for GETALL, SETALL */
   struct seminfo  *__buf;  /* Buffer for IPC_INFO
                               (Linux specific) */
}; 




/*
 * Latch structure should be treated as opaque and only accessed through
 * the public functions. It is defined here to allow embedding Latches as
 * part of bigger structs.
 */
typedef struct
{
#if defined(LINUX_SEM)
       int semId;
#elif defined(PIPE)
       int pipefds[2];
#else
       sig_atomic_t    is_set;
#endif
       bool                    is_shared;
#ifndef WIN32
       int                             owner_pid;
#else
       HANDLE                  event;
#endif
} Latch;

/*
 * prototypes for functions in latch.c
 */
extern void InitLatch(volatile Latch *latch);
extern void InitSharedLatch(volatile Latch *latch);
extern void OwnLatch(volatile Latch *latch);
extern void DisownLatch(volatile Latch *latch);
extern bool WaitLatch(volatile Latch *latch, long timeout);
extern int     WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
                                 long timeout);
extern void SetLatch(volatile Latch *latch);
extern void ResetLatch(volatile Latch *latch);
#define TestLatch(latch) (((volatile Latch *) latch)->is_set)

#ifndef LINUX_SEM
extern void latch_sigusr1_handler(void);
#endif

#endif

/*-------------------------------------------------------------------------
 *
 * unix_latch.c
 *       Routines for inter-process latches
 *
 * A latch is a boolean variable, with operations that let you to sleep
 * until it is set. A latch can be set from another process, or a signal
 * handler within the same process.
 *
 * The latch interface is a reliable replacement for the common pattern of
 * using pg_usleep() or select() to wait until a signal arrives, where the
 * signal handler sets a global variable. Because on some platforms, an
 * incoming signal doesn't interrupt sleep, and even on platforms where it
 * does there is a race condition if the signal arrives just before
 * entering the sleep, the common pattern must periodically wake up and
 * poll the global variable. pselect() system call was invented to solve
 * the problem, but it is not portable enough. Latches are designed to
 * overcome these limitations, allowing you to sleep without polling and
 * ensuring a quick response to signals from other processes.
 *
 * There are two kinds of latches: local and shared. A local latch is
 * initialized by InitLatch, and can only be set from the same process.
 * A local latch can be used to wait for a signal to arrive, by calling
 * SetLatch in the signal handler. A shared latch resides in shared memory,
 * and must be initialized at postmaster startup by InitSharedLatch. Before
 * a shared latch can be waited on, it must be associated with a process
 * with OwnLatch. Only the process owning the latch can wait on it, but any
 * process can set it.
 *
 * There are three basic operations on a latch:
 *
 * SetLatch            - Sets the latch
 * ResetLatch  - Clears the latch, allowing it to be set again
 * WaitLatch   - Waits for the latch to become set
 *
 * The correct pattern to wait for an event is:
 *
 * for (;;)
 * {
 *     ResetLatch();
 *     if (work to do)
 *         Do Stuff();
 *
 *     WaitLatch();
 * }
 *
 * It's important to reset the latch *before* checking if there's work to
 * do. Otherwise, if someone sets the latch between the check and the
 * ResetLatch call, you will miss it and Wait will block.
 *
 * To wake up the waiter, you must first set a global flag or something
 * else that the main loop tests in the "if (work to do)" part, and call
 * SetLatch *after* that. SetLatch is designed to return quickly if the
 * latch is already set.
 *
 *
 * Implementation
 * --------------
 *
 * The Unix implementation uses the so-called self-pipe trick to overcome
 * the race condition involved with select() and setting a global flag
 * in the signal handler. When a latch is set and the current process
 * is waiting for it, the signal handler wakes up the select() in
 * WaitLatch by writing a byte to a pipe. A signal by itself doesn't
 * interrupt select() on all platforms, and even on platforms where it
 * does, a signal that arrives just before the select() call does not
 * prevent the select() from entering sleep. An incoming byte on a pipe
 * however reliably interrupts the sleep, and makes select() to return
 * immediately if the signal arrives just before select() begins.
 *
 * When SetLatch is called from the same process that owns the latch,
 * SetLatch writes the byte directly to the pipe. If it's owned by another
 * process, SIGUSR1 is sent and the signal handler in the waiting process
 * writes the byte to the pipe on behalf of the signaling process.
 *
 * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *       $PostgreSQL$
 *
 *-------------------------------------------------------------------------
 */

#include <fcntl.h>
#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/sem.h>
#include <poll.h>

#include "latch.h"

/* Are we currently in WaitLatch? The signal handler would like to know. */
static volatile sig_atomic_t waiting = false;

/* Read and write end of the self-pipe */
static int selfpipe_readfd = -1;
static int selfpipe_writefd = -1;

/* private function prototypes */
static void initSelfPipe(void);
static void drainSelfPipe(void);
static void sendSelfPipeByte(void);

#ifdef PIPE
static void drainLatchPipe(volatile Latch *latch);
#endif

/*
 * Initialize a backend-local latch.
 */
void
InitLatch(volatile Latch *latch)
{
#if defined(LINUX_SEM) 
#elif defined(PIPE)
#else
       /* Initialize the self pipe if this is our first latch in the process 
*/
       if (selfpipe_readfd == -1)
               initSelfPipe();
       latch->is_set = false;
#endif

       DEBUG("SetLatch %p\n", latch);
       latch->owner_pid = MyProcPid;
       latch->is_shared = false;
}

/*
 * Initialize a shared latch that can be set from other processes. The latch
 * is initially owned by no-one, use OwnLatch to associate it with the
 * current process.
 *
 * NB: When you introduce a new shared latch, you must increase the shared
 * latch count in NumSharedLatches in win32_latch.c!
 */
void
InitSharedLatch(volatile Latch *latch)
{
       latch->owner_pid = 0;
       latch->is_shared = true;
#if defined(LINUX_SEM)
       latch->semId = -1;
#elif defined(PIPE)
       {
          int ret = pipe((int *)(latch->pipefds));
          Assert(!ret);
          if (fcntl(latch->pipefds[0], F_SETFL, O_NONBLOCK) < 0)
             elog(FATAL, "fcntl() failed on read-end of self-pipe: %m");
          if (fcntl(latch->pipefds[1], F_SETFL, O_NONBLOCK) < 0)
               elog(FATAL, "fcntl() failed on write-end of self-pipe: %m");
       }
#else 
       latch->is_set = false;
#endif
}

/*
 * Associate a shared latch with the current process, allowing it to
 * wait on it.
 *
 * Make sure that latch_sigusr1_handler() is called from the SIGUSR1 signal
 * handler, as shared latches use SIGUSR1 to for inter-process communication.
 */
void
OwnLatch(volatile Latch *latch)
{
   int i;
   int s;
   int ret;
   union semun semun;
   Assert(latch->is_shared);

       /* Initialize the self pipe if this is our first latch in the process */
#if defined(LINUX_SEM)
   semun.val = 0;
   if (latch->semId == -1) {
      for (i = 0; i < 1000; i++) {
         s = semget(7500 + i, 1, IPC_CREAT | IPC_EXCL|0x1ff);
         if (s < 0) {
            DEBUG("semget error %d %d\n", s, errno);
            if (i == 999) {
               printf("Can't get sem, aborting\n");
               exit (1);
            }
         } else {
            break;
         }
      }
      ret = semctl(s, 0, SETVAL, semun);
      Assert(!ret);
      latch->semId = s;
   }
   DEBUG("latch %p latch->semId %d\n", latch, latch->semId);
#elif defined (PIPE)
   /* Nothing to do: we allocated it in initlatch. */
#else
   if (selfpipe_readfd == -1)
      initSelfPipe();
#endif
   if (latch->owner_pid != 0)
      elog(ERROR, "latch already owned");
   latch->owner_pid = MyProcPid;
}

/*
 * Disown a shared latch currently owned by the current process.
 */
void
DisownLatch(volatile Latch *latch)
{
       Assert(latch->is_shared);
       Assert(latch->owner_pid == MyProcPid);
       latch->owner_pid = 0;
}

/*
 * Wait for given latch to be set or until timeout is exceeded.
 * If the latch is already set, the function returns immediately.
 *
 * The 'timeout' is given in microseconds, and -1 means wait forever.
 * On some platforms, signals cause the timeout to be restarted, so beware
 * that the function can sleep for several times longer than the specified
 * timeout.
 *
 * The latch must be owned by the current process, ie. it must be a
 * backend-local latch initialized with InitLatch, or a shared latch
 * associated with the current process by calling OwnLatch.
 *
 * Returns 'true' if the latch was set, or 'false' if timeout was reached.
 */
bool
WaitLatch(volatile Latch *latch, long timeout)
{
#if defined(LINUX_SEM)
   struct timespec t;
   struct sembuf sops;
   t.tv_sec = timeout * 1000000;
   t.tv_nsec = 0;
   sops.sem_op = -1;
   sops.sem_flg = 0;
   sops.sem_num = 0;
   DEBUG("wait on latch %p latch->semId %d\n", latch, latch->semId);
   do {
      int errStatus = semtimedop(latch->semId, &sops, 1, &t);
      if (errStatus == 0) {
         return true;
      } else if (errStatus == -1) {
         Assert(errno == EAGAIN || errno == EINTR);
         if (errno == EAGAIN) {
            return false;
         } else {
            continue;
         }
      }
   } while (true);
#else
   return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
#endif
}

/*
 * Like WaitLatch, but will also return when there's data available in
 * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch
 * was set, or 2 if the scoket became readable.
 */
int
WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
{
#if defined(LINUX_SEM)
   /* Nothing to do. */
#elif defined(PIPE)
       struct timeval tv, *tvp = NULL;
       int                     result = 0;
       struct pollfd pfd[2];
       int nfds = 1;
       int rc = 0;

       if (latch->owner_pid != MyProcPid)
          elog(ERROR, "cannot wait on a latch owned by another process");

       if (timeout > 0) {
          timeout = timeout * 1000; //millis
       }
       pfd[0].fd = latch->pipefds[0];
       pfd[0].events = POLLIN;
       pfd[0].revents = 0x0;
       if (sock != PGINVALID_SOCKET) {
          pfd[1].fd = sock;
          pfd[1].events = POLLIN;
          pfd[1].revents = 0x0;
          nfds = 2;
       }
       for (;;)
       {
               int hifd;

               rc = poll(pfd, nfds, timeout);
               if (rc < 0)
               {
                       if (errno == EINTR)
                               continue;
                       elog(ERROR, "select() failed: %m");
               }
               if (rc == 0 && timeout != -1)
               {
                       result = 0;
                       break;
               }
               if (sock != PGINVALID_SOCKET && (pfd[1].revents & POLLIN) != 0) {
                       result = 2;
                       break;   
               } else {
                  Assert((pfd[0].revents & POLLIN) != 0);
                  result = 1;
                  break;
               }
       }

       return result;
#else
       struct timeval tv, *tvp = NULL;
       fd_set          input_mask;
       int                     rc;
       int                     result = 0;

       if (latch->owner_pid != MyProcPid)
               elog(ERROR, "cannot wait on a latch owned by another process")
;

       /* Initialize timeout */
       if (timeout >= 0)
       {
               tv.tv_sec = timeout / 1000000L;
               tv.tv_usec = timeout % 1000000L;
               tvp = &tv;
       }

       waiting = true;
       for (;;)
       {
               int hifd;

               /*
                * Clear the pipe, and check if the latch is set already. If s
omeone
                * sets the latch between this and the select() below, the set
ter
                * will write a byte to the pipe (or signal us and the signal 
handler
                * will do that), and the select() will return immediately.
                */
               drainSelfPipe();
               if (latch->is_set)
               {
                       result = 1;
                       break;
               }

               FD_ZERO(&input_mask);
               FD_SET(selfpipe_readfd, &input_mask);
               hifd = selfpipe_readfd;
               if (sock != PGINVALID_SOCKET)
               {
                       FD_SET(sock, &input_mask);
                       if (sock > hifd)
                               hifd = sock;
               }

               rc = select(hifd + 1, &input_mask, NULL, NULL, tvp);
               if (rc < 0)
               {
                       if (errno == EINTR)
                               continue;
                       elog(ERROR, "select() failed: %m");
               }
               if (rc == 0)
               {
                       /* timeout exceeded */
                       result = 0;
                       break;
               }
               if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask))
               {
                       result = 2;
                       break;          /* data available in socket */
               }
       }
       waiting = false;

       return result;
#endif
}

/*
 * Sets a latch and wakes up anyone waiting on it. Returns quickly if the
 * latch is already set.
 */
void
SetLatch(volatile Latch *latch)
{
#if defined(LINUX_SEM)
   int ret;
   union semun semun;
   struct sembuf sops;
   semun.val = 1;
   DEBUG("SetLatch %p\n", latch);
   ret = semctl(latch->semId, 0, SETVAL, semun);
   Assert(!ret);
#elif defined(PIPE)
   char c;
   int ret;
   DEBUG("SetLatch %p\n", latch);
   ret = write(latch->pipefds[1], &c, 1);
   Assert(ret == 1);
#else
       pid_t owner_pid;

       /* Quick exit if already set */
       if (latch->is_set)
               return;

       DEBUG("SetLatch %p\n", latch);

       latch->is_set = true;

       /*
        * See if anyone's waiting for the latch. It can be the current process
        * if we're in a signal handler. We use the self-pipe to wake up the 
        * select() in that case. If it's another process, send a signal.  * *
        Fetch owner_pid only once, in case the owner simultaneously disowns
        * the latch and clears owner_pid. XXX: This assumes that pid_t is *
        atomic, which isn't guaranteed to be true! In practice, the effecti
ve
        * range of pid_t fits in a 32 bit integer, and so should be atomic. I
n
        * the worst case, we might end up signaling wrong process if the righ
t
        * one disowns the latch just as we fetch owner_pid. Even then,
        you're * very unlucky if a process with that bogus pid exists.  */

       owner_pid = latch->owner_pid; if (owner_pid == 0)
               return;
       else if (owner_pid == MyProcPid)
               sendSelfPipeByte();
       else
               kill(owner_pid, SIGUSR1);
#endif
} 

/*
 * Clear the latch. Calling WaitLatch after this will sleep, unless
 * the latch is set again before the WaitLatch call.
 */
void
ResetLatch(volatile Latch *latch)
{
#if defined(LINUX_SEM) 
#elif defined (PIPE)
   drainLatchPipe(latch);
#else
       /* Only the owner should reset the latch */
       Assert(latch->owner_pid == MyProcPid);
       latch->is_set = false;
#endif
}


/*
 * SetLatch uses SIGUSR1 to wake up the process waiting on the latch. Wake
 * up WaitLatch.
 */
void
latch_sigusr1_handler(void)
{
#ifdef PIPE
   Assert(false);
#endif
       if (waiting)
               sendSelfPipeByte();
}

/* initialize the self-pipe */
static void
initSelfPipe(void)
{
       int pipefd[2];

       /*
        * Set up the self-pipe that allows a signal handler to wake up the
        * select() in WaitLatch. Make the write-end non-blocking, so that
        * SetLatch won't block if the event has already been set many times
        * filling the kernel buffer. Make the read-end non-blocking too, so
        * that we can easily clear the pipe by reading until EAGAIN or
        * EWOULDBLOCK.
        */
       if (pipe(pipefd) < 0)
               elog(FATAL, "pipe() failed: %m");
       if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0)
               elog(FATAL, "fcntl() failed on read-end of self-pipe: %m");
       if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0)
               elog(FATAL, "fcntl() failed on write-end of self-pipe: %m");

       DEBUG("InitSelfPipe %d\n", getpid());

       selfpipe_readfd = pipefd[0];
       selfpipe_writefd = pipefd[1];
}

/* Send one byte to the self-pipe, to wake up WaitLatch */
static void
sendSelfPipeByte(void)
{
       int rc;
       char dummy = 0;

retry:
       rc = write(selfpipe_writefd, &dummy, 1);
       if (rc < 0)
       {
               /* If interrupted by signal, just retry */
               if (errno == EINTR)
                       goto retry;

               /*
                * If the pipe is full, we don't need to retry, the data that'
s
                * there already is enough to wake up WaitLatch.
                */
               if (errno == EAGAIN || errno == EWOULDBLOCK)
                       return;

               /*
                * Oops, the write() failed for some other reason. We might be
 in
                * a signal handler, so it's not safe to elog(). We have no ch
oice
                * but silently ignore the error.
                */
               return;
       }
}

/* Read all available data from the self-pipe */
static void
drainSelfPipe(void)
{
       /*
        * There shouldn't normally be more than one byte in the pipe, or mayb
e
        * a few more if multiple processes run SetLatch at the same instant.
        */
       char buf[16];
       int rc;

       for (;;)
       {
               rc = read(selfpipe_readfd, buf, sizeof(buf));
               if (rc < 0)
               {
                       if (errno == EAGAIN || errno == EWOULDBLOCK)
                               break;          /* the pipe is empty */
                       else if (errno == EINTR)
                               continue;       /* retry */
                       else
                               elog(ERROR, "read() on self-pipe failed: %m");
               }
               else if (rc == 0)
                       elog(ERROR, "unexpected EOF on self-pipe");
       }
}

#ifdef PIPE
/* Read all available data from the self-pipe */
static void
drainLatchPipe(volatile Latch *latch)
{
       /*
        * There shouldn't normally be more than one byte in the pipe, or mayb
e
        * a few more if multiple processes run SetLatch at the same instant.
        */
       char buf[16];
       int rc;

       for (;;)
       {
               rc = read(latch->pipefds[0], buf, sizeof(buf));
               if (rc < 0)
               {
                       if (errno == EAGAIN || errno == EWOULDBLOCK)
                               break;          /* the pipe is empty */
                       else if (errno == EINTR)
                               continue;       /* retry */
                       else
                               elog(ERROR, "read() on self-pipe failed: %m");
               }
               else if (rc == 0)
                       elog(ERROR, "unexpected EOF on latch-pipe");
       }
}
#endif
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to