Hello, I don't see how ac1d794 will be dealt, but I tried an example implement of multi-socket version of WaitLatchOrSocket using callbacks on top of the current master where ac1d794 has not been removed yet.
At Thu, 14 Jan 2016 13:46:44 -0500, Robert Haas <robertmh...@gmail.com> wrote in <CA+TgmoYBa8TJRGS07JCSLKpqGkrRd5hLpirvwp36s=83chm...@mail.gmail.com> > On Thu, Jan 14, 2016 at 12:14 PM, Andres Freund <and...@anarazel.de> wrote: > > On 2016-01-14 12:07:23 -0500, Robert Haas wrote: > >> > Do we want to provide a backward compatible API for all this? I'm fine > >> > either way. > >> > >> How would that work? > > > > I'm thinking of something like; > > > > int WaitOnLatchSet(LatchEventSet *set, int wakeEvents, long timeout); > > > > int > > WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,long > > timeout) > > { > > LatchEventSet set; > > > > LatchEventSetInit(&set, latch); > > > > if (sock != PGINVALID_SOCKET) > > LatchEventSetAddSock(&set, sock); > > > > return WaitOnLatchSet(set, wakeEvents, timeout); > > } > > > > I think we'll need to continue having wakeEvents and timeout parameters > > for WaitOnLatchSet, we quite frequently want to wait socket > > readability/writability, not wait on the socket, or have/not have > > timeouts. > > Well, if we ever wanted to support multiple FDs, we'd need the > readability/writeability thing to be per-fd, not per-set. > > Overall, if this is what you have in mind for backward compatibility, > I rate it M for Meh. Let's just break compatibility and people will > have to update their code. That shouldn't be hard, and if we don't > make people do it when we make the change, then we'll be stuck with > the backward-compatibility interface for a decade. I doubt it's worth > it. The API is similar to what Robert suggested but different because it would too complicate a bit for the most cases. So this example implement has an intermediate style of the current API and the Robert's suggestion, and using callbacks as I proposed. int WaitLatchOrSocketMulti(pgwaitobject *wobjs, int nobjs, long timeout); This is implemented only for poll, not for select. A sample usage is seen in secure_read(). > pgwaitobject objs[3]; ... > InitWaitLatch(objs[0], MyLatch); > InitWaitPostmasterDeath(objs[1]); > InitWaitSocket(objs[2], port->sock, waitfor); > > w = WaitLatchOrSocketMulti(objs, 3, 0); > // w = WaitLatchOrSocket(MyLatch, > // WL_LATCH_SET | WL_POSTMASTER_DEATH | waitfor, > // port->sock, 0); The core of the function looks as the following. It runs callbacks for every fired events. > rc = poll(pfds, nfds, (int) cur_timeout); ... > if (rc < 0) ... > else > { > for (i = 0 ; i < nfds ; i++) > { > wobjs[i].retEvents = 0; > if (pfds[i].revents && wobjs[i].cb) > result |= wobjs[i].cb(&wobjs[i], pfds[i].revents); > > if (result & WL_IMMEDIATELY_BREAK) > break; > } > } In the above part, poll()'s event is passed the callbacks so callbacks may have a different inplement for select(). Having a callback for sockets. The initializer could be as the following. > InitWaitSocketCB(wait_obj, sock, event, your_callback); If we want to have the waiting-object array independently from specific functions to achieve asynchronous handling of socket events. It could be realised by providing a set of wrapper functions as exactly what Robert said as above. Does this make sense? Does anyone have any opinion? or thoughts? regards, -- Kyotaro Horiguchi NTT Open Source Software Center
>From b7cc9939ea61654fae98c4fe958c8c67df9f3758 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Fri, 19 Feb 2016 16:34:49 +0900 Subject: [PATCH] [PoC] Add mult-socket version of WaitLatchOrSocket. --- src/backend/libpq/be-secure.c | 12 +- src/backend/port/unix_latch.c | 268 ++++++++++++++++++++++++++++++++++++++++++ src/include/storage/latch.h | 29 +++++ 3 files changed, 306 insertions(+), 3 deletions(-) diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index ac709d1..d99a983 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -141,12 +141,18 @@ retry: if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN)) { int w; + pgwaitobject objs[3]; Assert(waitfor); - w = WaitLatchOrSocket(MyLatch, - WL_LATCH_SET | WL_POSTMASTER_DEATH | waitfor, - port->sock, 0); + InitWaitLatch(objs[0], MyLatch); + InitWaitPostmasterDeath(objs[1]); + InitWaitSocket(objs[2], port->sock, waitfor); + + w = WaitLatchOrSocketMulti(objs, 3, 0); +// w = WaitLatchOrSocket(MyLatch, +// WL_LATCH_SET | WL_POSTMASTER_DEATH | waitfor, +// port->sock, 0); /* * If the postmaster has died, it's not safe to continue running, diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c index 2ad609c..dacb869 100644 --- a/src/backend/port/unix_latch.c +++ b/src/backend/port/unix_latch.c @@ -504,6 +504,274 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, return result; } +int +WAITOBJCB_SOCK_DEF(pgwaitobject *wobj, int revents) +{ + int r = 0; + + if (revents & POLLIN) + { + /* data available in socket, or EOF/error + * condition */ + wobj->retEvents |= WL_SOCKET_READABLE; + r |= WL_SOCKET_READABLE; + } + if (revents & POLLOUT) + { + /* socket is writable */ + wobj->retEvents |= WL_SOCKET_WRITEABLE; + r |= WL_SOCKET_WRITEABLE; + } + if (revents & (POLLHUP | POLLERR | POLLNVAL)) + { + /* EOF/error condition */ + if (wobj->wakeEvents & WL_SOCKET_READABLE) + { + wobj->retEvents |= WL_SOCKET_READABLE; + r |= WL_SOCKET_READABLE; + } + if (wobj->wakeEvents & WL_SOCKET_WRITEABLE) + { + wobj->retEvents |= WL_SOCKET_WRITEABLE; + r |= WL_SOCKET_WRITEABLE; + } + } + + return r; +} + +int +WAITOBJCB_PMDEATH_DEF(pgwaitobject *wobj, int revents) +{ + int r = 0; + + /* + * We expect a POLLHUP when the remote end is closed, but because we don't + * expect the pipe to become readable or to have any errors either, treat + * those cases as postmaster death, too. + */ + if (revents & (POLLHUP | POLLIN | POLLERR | POLLNVAL)) + { + /* + * According to the select(2) man page on Linux, select(2) may + * spuriously return and report a file descriptor as readable, when + * it's not; and presumably so can poll(2). It's not clear that the + * relevant cases would ever apply to the postmaster pipe, but since + * the consequences of falsely returning WL_POSTMASTER_DEATH could be + * pretty unpleasant, we take the trouble to positively verify EOF + * with PostmasterIsAlive(). + */ + if (!PostmasterIsAlive()) + { + wobj->retEvents |= WL_POSTMASTER_DEATH; + r |= WL_POSTMASTER_DEATH; + } + } + + return r; +} + +int +WAITOBJCB_LATCH_DEF(pgwaitobject *wobj, int revents) +{ + + return 0; +} + +/* + * Like WaitLatch, but with an extra socket argument for WL_SOCKET_* + * conditions. + * + * When waiting on a socket, EOF and error conditions are reported by + * returning the socket as readable/writable or both, depending on + * WL_SOCKET_READABLE/WL_SOCKET_WRITEABLE being specified. + */ +int +WaitLatchOrSocketMulti(pgwaitobject *wobjs, int nobjs, long timeout) +{ + int result = 0; + int rc; + instr_time start_time, + cur_time; + long cur_timeout; + int wakeEvents = 0; + volatile Latch *waitlatch = NULL; + pgwaitobject *latchobj = NULL; + int i; + + struct pollfd *pfds; + int nfds; + + /* If timeout > 0, WL_TIMEOUT is implicated */ + wakeEvents = (timeout > 0 ? WL_TIMEOUT : 0); + + for (i = 0 ; i < nobjs ; i++) + { + switch (wobjs[i].type) + { + case WAITOBJ_LATCH: + Assert(!waitlatch); + latchobj = &wobjs[i]; + waitlatch = latchobj->latch; + if (waitlatch->owner_pid != MyProcPid) + elog(ERROR, "cannot wait on a latch owned by another process"); + wobjs[i].wakeEvents = WL_LATCH_SET; + break; + case WAITOBJ_POSTMASTER_DEATH: + wobjs[i].wakeEvents = WL_POSTMASTER_DEATH; + break; + case WAITOBJ_SOCK: + Assert((wobjs[i].wakeEvents & + ~(WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0); + break; + default: + elog(ERROR, "Unknown wait object type: %d", wobjs[i].type); + } + wakeEvents |= wobjs[i].wakeEvents; + } + Assert(wakeEvents != 0); /* must have at least one wake event */ + + /* + * Initialize timeout if requested. We must record the current time so + * that we can determine the remaining timeout if the poll() or select() + * is interrupted. (On some platforms, select() will update the contents + * of "tv" for us, but unfortunately we can't rely on that.) + */ + if (wakeEvents & WL_TIMEOUT) + { + INSTR_TIME_SET_CURRENT(start_time); + Assert(timeout >= 0 && timeout <= INT_MAX); + cur_timeout = timeout; + } + else + { + cur_timeout = -1; + + } + + pfds = palloc(sizeof(struct pollfd) * nobjs); + + for (nfds = 0 ; nfds < nobjs ; nfds++) + { + switch (wobjs[nfds].type) + { + case WAITOBJ_SOCK: + pfds[nfds].fd = wobjs[nfds].sock; + pfds[nfds].events = 0; + if (wobjs[nfds].wakeEvents & WL_SOCKET_READABLE) + pfds[nfds].events |= POLLIN; + if (wobjs[nfds].wakeEvents & WL_SOCKET_WRITEABLE) + pfds[nfds].events |= POLLOUT; + break; + + case WAITOBJ_LATCH: + pfds[nfds].fd = selfpipe_readfd; + pfds[nfds].events = POLLIN; + break; + + case WAITOBJ_POSTMASTER_DEATH: + /* postmaster fd, if used, is always in pfds[nfds - 1] */ + pfds[nfds].fd = postmaster_alive_fds[POSTMASTER_FD_WATCH]; + pfds[nfds].events = POLLIN; + } + } + + waiting = true; + do + { + /* + * Clear the pipe, then check if the latch is set already. If someone + * sets the latch between this and the poll()/select() below, the + * setter will write a byte to the pipe (or signal us and the signal + * handler will do that), and the poll()/select() will return + * immediately. + * + * Note: we assume that the kernel calls involved in drainSelfPipe() + * and SetLatch() will provide adequate synchronization on machines + * with weak memory ordering, so that we cannot miss seeing is_set if + * the signal byte is already in the pipe when we drain it. + */ + drainSelfPipe(); + + if ((wakeEvents & WL_LATCH_SET) && waitlatch->is_set) + { + /* + * Leave loop immediately, avoid blocking again. We don't attempt + * to report any other events that might also be satisfied. + */ + result |= latchobj->cb(latchobj, 0); + + /* If callback is provided, follow its order */ + if (result & WL_IMMEDIATELY_BREAK) + break; + } + + /* + * Must wait ... we use poll(2) if available, otherwise select(2). + * + * On at least older linux kernels select(), in violation of POSIX, + * doesn't reliably return a socket as writable if closed - but we + * rely on that. So far all the known cases of this problem are on + * platforms that also provide a poll() implementation without that + * bug. If we find one where that's not the case, we'll need to add a + * workaround. + */ + for (i = 0 ; i < nfds ; i++) + pfds[i].revents = 0; + + /* Sleep */ + rc = poll(pfds, nfds, (int) cur_timeout); + + /* Check return code */ + if (rc < 0) + { + /* EINTR is okay, otherwise complain */ + if (errno != EINTR) + { + waiting = false; + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("poll() failed: %m"))); + } + } + else if (rc == 0) + { + /* timeout exceeded */ + if (wakeEvents & WL_TIMEOUT) + result |= WL_TIMEOUT; + } + else + { + for (i = 0 ; i < nfds ; i++) + { + wobjs[i].retEvents = 0; + if (pfds[i].revents && wobjs[i].cb) + result |= wobjs[i].cb(&wobjs[i], pfds[i].revents); + + if (result & WL_IMMEDIATELY_BREAK) + break; + } + } + + /* If we're not done, update cur_timeout for next iteration */ + if (result == 0 && (wakeEvents & WL_TIMEOUT)) + { + INSTR_TIME_SET_CURRENT(cur_time); + INSTR_TIME_SUBTRACT(cur_time, start_time); + cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time); + if (cur_timeout <= 0) + { + /* Timeout has expired, no need to continue looping */ + result |= WL_TIMEOUT; + } + } + } while (result == 0); + waiting = false; + + return result; +} + + /* * Sets a latch and wakes up anyone waiting on it. * diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index e77491e..4fe87ed 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -95,12 +95,37 @@ typedef struct Latch #endif } Latch; +struct pgwaitobject; +typedef int (*WaitSockCallback)(struct pgwaitobject *wobjs, int revents); +typedef enum pgwaitobjtype +{ + WAITOBJ_LATCH, + WAITOBJ_POSTMASTER_DEATH, + WAITOBJ_SOCK +} pgwaitobjtype; + +typedef struct pgwaitobject +{ + pgwaitobjtype type; + pgsocket sock; + Latch *latch; + int wakeEvents; + int retEvents; + WaitSockCallback cb; + void *param; +} pgwaitobject; + +#define InitWaitLatch(o, l) ((o).type=WAITOBJ_LATCH,(o).latch=(l),(o).cb=WAITOBJCB_LATCH_DEF) +#define InitWaitPostmasterDeath(o) ((o).type=WAITOBJ_POSTMASTER_DEATH,(o).cb=WAITOBJCB_PMDEATH_DEF) +#define InitWaitSocket(o, s, e) ((o).type=WAITOBJ_SOCK,(o).sock=(s),(o).wakeEvents=(e),(o).cb=WAITOBJCB_SOCK_DEF) + /* Bitmasks for events that may wake-up WaitLatch() clients */ #define WL_LATCH_SET (1 << 0) #define WL_SOCKET_READABLE (1 << 1) #define WL_SOCKET_WRITEABLE (1 << 2) #define WL_TIMEOUT (1 << 3) #define WL_POSTMASTER_DEATH (1 << 4) +#define WL_IMMEDIATELY_BREAK (1 << 5) /* * prototypes for functions in latch.c @@ -113,6 +138,10 @@ extern void DisownLatch(volatile Latch *latch); extern int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout); extern int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout); +extern int WAITOBJCB_LATCH_DEF(pgwaitobject *wobj, int revents); +extern int WAITOBJCB_SOCK_DEF(pgwaitobject *wobj, int revents); +extern int WAITOBJCB_PMDEATH_DEF(pgwaitobject *wobj, int revents); +extern int WaitLatchOrSocketMulti(pgwaitobject *wobjs, int nelem, long timeout); extern void SetLatch(volatile Latch *latch); extern void ResetLatch(volatile Latch *latch); -- 1.8.3.1
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers