Hello, I don't see how ac1d794 will be dealt, but I tried an
example implement of multi-socket version of WaitLatchOrSocket
using callbacks on top of the current master where ac1d794 has
not been removed yet.

At Thu, 14 Jan 2016 13:46:44 -0500, Robert Haas <robertmh...@gmail.com> wrote 
in <CA+TgmoYBa8TJRGS07JCSLKpqGkrRd5hLpirvwp36s=83chm...@mail.gmail.com>
> On Thu, Jan 14, 2016 at 12:14 PM, Andres Freund <and...@anarazel.de> wrote:
> > On 2016-01-14 12:07:23 -0500, Robert Haas wrote:
> >> > Do we want to provide a backward compatible API for all this? I'm fine
> >> > either way.
> >>
> >> How would that work?
> >
> > I'm thinking of something like;
> >
> > int WaitOnLatchSet(LatchEventSet *set, int wakeEvents, long timeout);
> >
> > int
> > WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,long 
> > timeout)
> > {
> >         LatchEventSet set;
> >
> >         LatchEventSetInit(&set, latch);
> >
> >         if (sock != PGINVALID_SOCKET)
> >            LatchEventSetAddSock(&set, sock);
> >
> >         return WaitOnLatchSet(set, wakeEvents, timeout);
> > }
> >
> > I think we'll need to continue having wakeEvents and timeout parameters
> > for WaitOnLatchSet, we quite frequently want to wait socket
> > readability/writability, not wait on the socket, or have/not have
> > timeouts.
> 
> Well, if we ever wanted to support multiple FDs, we'd need the
> readability/writeability thing to be per-fd, not per-set.
> 
> Overall, if this is what you have in mind for backward compatibility,
> I rate it M for Meh.  Let's just break compatibility and people will
> have to update their code.  That shouldn't be hard, and if we don't
> make people do it when we make the change, then we'll be stuck with
> the backward-compatibility interface for a decade.  I doubt it's worth
> it.

The API is similar to what Robert suggested but different because
it would too complicate a bit for the most cases. So this example
implement has an intermediate style of the current API and the
Robert's suggestion, and using callbacks as I proposed.

int WaitLatchOrSocketMulti(pgwaitobject *wobjs, int nobjs, long timeout);

This is implemented only for poll, not for select.

A sample usage is seen in secure_read().

>     pgwaitobject objs[3];
...
>     InitWaitLatch(objs[0], MyLatch);
>     InitWaitPostmasterDeath(objs[1]);
>     InitWaitSocket(objs[2], port->sock, waitfor);
> 
>     w = WaitLatchOrSocketMulti(objs, 3, 0);
> //    w = WaitLatchOrSocket(MyLatch,
> //                WL_LATCH_SET | WL_POSTMASTER_DEATH | waitfor,
> //                port->sock, 0);


The core of the function looks as the following. It runs
callbacks for every fired events.

>    rc = poll(pfds, nfds, (int) cur_timeout);
...
>    if (rc < 0)
...
>    else
>    {
>      for (i = 0 ; i < nfds ; i++)
>      {
>        wobjs[i].retEvents = 0;
>        if (pfds[i].revents && wobjs[i].cb)
>          result |= wobjs[i].cb(&wobjs[i], pfds[i].revents);
>
>        if (result & WL_IMMEDIATELY_BREAK)
>          break;
>      }
>    }

In the above part, poll()'s event is passed the callbacks so
callbacks may have a different inplement for select().

Having a callback for sockets. The initializer could be as the
following.

> InitWaitSocketCB(wait_obj, sock, event, your_callback);


If we want to have the waiting-object array independently from
specific functions to achieve asynchronous handling of socket
events. It could be realised by providing a set of wrapper
functions as exactly what Robert said as above.

Does this make sense?
Does anyone have any opinion? or thoughts?

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From b7cc9939ea61654fae98c4fe958c8c67df9f3758 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Fri, 19 Feb 2016 16:34:49 +0900
Subject: [PATCH] [PoC] Add mult-socket version of WaitLatchOrSocket.

---
 src/backend/libpq/be-secure.c |  12 +-
 src/backend/port/unix_latch.c | 268 ++++++++++++++++++++++++++++++++++++++++++
 src/include/storage/latch.h   |  29 +++++
 3 files changed, 306 insertions(+), 3 deletions(-)

diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index ac709d1..d99a983 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -141,12 +141,18 @@ retry:
 	if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN))
 	{
 		int			w;
+		pgwaitobject objs[3];
 
 		Assert(waitfor);
 
-		w = WaitLatchOrSocket(MyLatch,
-							  WL_LATCH_SET | WL_POSTMASTER_DEATH | waitfor,
-							  port->sock, 0);
+		InitWaitLatch(objs[0], MyLatch);
+		InitWaitPostmasterDeath(objs[1]);
+		InitWaitSocket(objs[2], port->sock, waitfor);
+
+		w = WaitLatchOrSocketMulti(objs, 3, 0);
+//		w = WaitLatchOrSocket(MyLatch,
+//							  WL_LATCH_SET | WL_POSTMASTER_DEATH | waitfor,
+//							  port->sock, 0);
 
 		/*
 		 * If the postmaster has died, it's not safe to continue running,
diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index 2ad609c..dacb869 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -504,6 +504,274 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
 	return result;
 }
 
+int
+WAITOBJCB_SOCK_DEF(pgwaitobject *wobj, int revents)
+{
+	int r = 0;
+
+	if (revents & POLLIN)
+	{
+		/* data available in socket, or EOF/error
+		 * condition */
+		wobj->retEvents |= WL_SOCKET_READABLE;
+		r |= WL_SOCKET_READABLE;
+	}
+	if (revents & POLLOUT)
+	{
+		/* socket is writable */
+		wobj->retEvents |= WL_SOCKET_WRITEABLE;
+		r |= WL_SOCKET_WRITEABLE;
+	}
+	if (revents & (POLLHUP | POLLERR | POLLNVAL))
+	{
+		/* EOF/error condition */
+		if (wobj->wakeEvents & WL_SOCKET_READABLE)
+		{
+			wobj->retEvents |= WL_SOCKET_READABLE;
+			r |= WL_SOCKET_READABLE;
+		}
+		if (wobj->wakeEvents & WL_SOCKET_WRITEABLE)
+		{
+			wobj->retEvents |= WL_SOCKET_WRITEABLE;
+			r |= WL_SOCKET_WRITEABLE;
+		}
+	}
+
+	return r;
+}
+
+int
+WAITOBJCB_PMDEATH_DEF(pgwaitobject *wobj, int revents)
+{
+	int r = 0;
+
+	/*
+	 * We expect a POLLHUP when the remote end is closed, but because we don't
+	 * expect the pipe to become readable or to have any errors either, treat
+	 * those cases as postmaster death, too.
+	 */
+	if (revents & (POLLHUP | POLLIN | POLLERR | POLLNVAL))
+	{
+		/*
+		 * According to the select(2) man page on Linux, select(2) may
+		 * spuriously return and report a file descriptor as readable, when
+		 * it's not; and presumably so can poll(2).  It's not clear that the
+		 * relevant cases would ever apply to the postmaster pipe, but since
+		 * the consequences of falsely returning WL_POSTMASTER_DEATH could be
+		 * pretty unpleasant, we take the trouble to positively verify EOF
+		 * with PostmasterIsAlive().
+		 */
+		if (!PostmasterIsAlive())
+		{
+			wobj->retEvents |= WL_POSTMASTER_DEATH;
+			r |= WL_POSTMASTER_DEATH;
+		}
+	}
+
+	return r;
+}
+
+int
+WAITOBJCB_LATCH_DEF(pgwaitobject *wobj, int revents)
+{
+	
+	return 0;
+}
+
+/*
+ * Like WaitLatch, but with an extra socket argument for WL_SOCKET_*
+ * conditions.
+ *
+ * When waiting on a socket, EOF and error conditions are reported by
+ * returning the socket as readable/writable or both, depending on
+ * WL_SOCKET_READABLE/WL_SOCKET_WRITEABLE being specified.
+ */
+int
+WaitLatchOrSocketMulti(pgwaitobject *wobjs, int nobjs, long timeout)
+{
+	int			result = 0;
+	int			rc;
+	instr_time	start_time,
+				cur_time;
+	long		cur_timeout;
+	int			wakeEvents = 0;
+	volatile Latch  *waitlatch = NULL;
+	pgwaitobject *latchobj = NULL;
+	int 		i;
+
+	struct pollfd *pfds;
+	int			nfds;
+
+	/* If timeout > 0, WL_TIMEOUT is implicated */
+	wakeEvents = (timeout > 0 ? WL_TIMEOUT : 0);
+
+	for (i = 0 ; i < nobjs ; i++)
+	{
+		switch (wobjs[i].type)
+		{
+		case WAITOBJ_LATCH:
+			Assert(!waitlatch);
+			latchobj = &wobjs[i];
+			waitlatch = latchobj->latch;
+			if (waitlatch->owner_pid != MyProcPid)
+				elog(ERROR, "cannot wait on a latch owned by another process");
+			wobjs[i].wakeEvents = WL_LATCH_SET;
+			break;
+		case WAITOBJ_POSTMASTER_DEATH:
+			wobjs[i].wakeEvents = WL_POSTMASTER_DEATH;
+			break;
+		case WAITOBJ_SOCK:
+			Assert((wobjs[i].wakeEvents & 
+					~(WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0);
+			break;
+		default:
+			elog(ERROR, "Unknown wait object type: %d", wobjs[i].type);
+		}
+		wakeEvents |= wobjs[i].wakeEvents;
+	}
+	Assert(wakeEvents != 0);	/* must have at least one wake event */
+
+	/*
+	 * Initialize timeout if requested.  We must record the current time so
+	 * that we can determine the remaining timeout if the poll() or select()
+	 * is interrupted.  (On some platforms, select() will update the contents
+	 * of "tv" for us, but unfortunately we can't rely on that.)
+	 */
+	if (wakeEvents & WL_TIMEOUT)
+	{
+		INSTR_TIME_SET_CURRENT(start_time);
+		Assert(timeout >= 0 && timeout <= INT_MAX);
+		cur_timeout = timeout;
+	}
+	else
+	{
+		cur_timeout = -1;
+
+	}
+
+	pfds = palloc(sizeof(struct pollfd) * nobjs);
+
+	for (nfds = 0 ; nfds < nobjs ; nfds++)
+	{
+		switch (wobjs[nfds].type)
+		{
+		case WAITOBJ_SOCK:
+			pfds[nfds].fd = wobjs[nfds].sock;
+			pfds[nfds].events = 0;
+			if (wobjs[nfds].wakeEvents & WL_SOCKET_READABLE)
+				pfds[nfds].events |= POLLIN;
+			if (wobjs[nfds].wakeEvents & WL_SOCKET_WRITEABLE)
+				pfds[nfds].events |= POLLOUT;
+			break;
+
+		case WAITOBJ_LATCH:
+			pfds[nfds].fd = selfpipe_readfd;
+			pfds[nfds].events = POLLIN;
+			break;
+
+		case WAITOBJ_POSTMASTER_DEATH:
+			/* postmaster fd, if used, is always in pfds[nfds - 1] */
+			pfds[nfds].fd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
+			pfds[nfds].events = POLLIN;
+		}
+	}
+
+	waiting = true;
+	do
+	{
+		/*
+		 * Clear the pipe, then check if the latch is set already. If someone
+		 * sets the latch between this and the poll()/select() below, the
+		 * setter will write a byte to the pipe (or signal us and the signal
+		 * handler will do that), and the poll()/select() will return
+		 * immediately.
+		 *
+		 * Note: we assume that the kernel calls involved in drainSelfPipe()
+		 * and SetLatch() will provide adequate synchronization on machines
+		 * with weak memory ordering, so that we cannot miss seeing is_set if
+		 * the signal byte is already in the pipe when we drain it.
+		 */
+		drainSelfPipe();
+
+		if ((wakeEvents & WL_LATCH_SET) && waitlatch->is_set)
+		{
+			/*
+			 * Leave loop immediately, avoid blocking again. We don't attempt
+			 * to report any other events that might also be satisfied.
+			 */
+			result |= latchobj->cb(latchobj, 0);
+
+			/* If callback is provided, follow its order */
+			if (result & WL_IMMEDIATELY_BREAK)
+				break;
+		}
+
+		/*
+		 * Must wait ... we use poll(2) if available, otherwise select(2).
+		 *
+		 * On at least older linux kernels select(), in violation of POSIX,
+		 * doesn't reliably return a socket as writable if closed - but we
+		 * rely on that. So far all the known cases of this problem are on
+		 * platforms that also provide a poll() implementation without that
+		 * bug.  If we find one where that's not the case, we'll need to add a
+		 * workaround.
+		 */
+		for (i = 0 ; i < nfds ; i++)
+			pfds[i].revents = 0;
+
+		/* Sleep */
+		rc = poll(pfds, nfds, (int) cur_timeout);
+
+		/* Check return code */
+		if (rc < 0)
+		{
+			/* EINTR is okay, otherwise complain */
+			if (errno != EINTR)
+			{
+				waiting = false;
+				ereport(ERROR,
+						(errcode_for_socket_access(),
+						 errmsg("poll() failed: %m")));
+			}
+		}
+		else if (rc == 0)
+		{
+			/* timeout exceeded */
+			if (wakeEvents & WL_TIMEOUT)
+				result |= WL_TIMEOUT;
+		}
+		else
+		{
+			for (i = 0 ; i < nfds ; i++)
+			{
+				wobjs[i].retEvents = 0;
+				if (pfds[i].revents && wobjs[i].cb)
+					result |= wobjs[i].cb(&wobjs[i], pfds[i].revents);
+
+				if (result & WL_IMMEDIATELY_BREAK)
+					break;
+			}
+		}
+
+		/* If we're not done, update cur_timeout for next iteration */
+		if (result == 0 && (wakeEvents & WL_TIMEOUT))
+		{
+			INSTR_TIME_SET_CURRENT(cur_time);
+			INSTR_TIME_SUBTRACT(cur_time, start_time);
+			cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time);
+			if (cur_timeout <= 0)
+			{
+				/* Timeout has expired, no need to continue looping */
+				result |= WL_TIMEOUT;
+			}
+		}
+	} while (result == 0);
+	waiting = false;
+
+	return result;
+}
+
+
 /*
  * Sets a latch and wakes up anyone waiting on it.
  *
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index e77491e..4fe87ed 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -95,12 +95,37 @@ typedef struct Latch
 #endif
 } Latch;
 
+struct pgwaitobject;
+typedef int (*WaitSockCallback)(struct pgwaitobject *wobjs,	int revents);
+typedef enum pgwaitobjtype
+{
+	WAITOBJ_LATCH,
+	WAITOBJ_POSTMASTER_DEATH,
+	WAITOBJ_SOCK
+} pgwaitobjtype;
+
+typedef struct pgwaitobject
+{
+	pgwaitobjtype type;
+	pgsocket sock;
+	Latch *latch;
+	int wakeEvents;
+	int retEvents;
+	WaitSockCallback cb;
+	void *param;
+} pgwaitobject;
+
+#define InitWaitLatch(o, l) ((o).type=WAITOBJ_LATCH,(o).latch=(l),(o).cb=WAITOBJCB_LATCH_DEF)
+#define InitWaitPostmasterDeath(o) ((o).type=WAITOBJ_POSTMASTER_DEATH,(o).cb=WAITOBJCB_PMDEATH_DEF)
+#define InitWaitSocket(o, s, e) ((o).type=WAITOBJ_SOCK,(o).sock=(s),(o).wakeEvents=(e),(o).cb=WAITOBJCB_SOCK_DEF)
+
 /* Bitmasks for events that may wake-up WaitLatch() clients */
 #define WL_LATCH_SET		 (1 << 0)
 #define WL_SOCKET_READABLE	 (1 << 1)
 #define WL_SOCKET_WRITEABLE  (1 << 2)
 #define WL_TIMEOUT			 (1 << 3)
 #define WL_POSTMASTER_DEATH  (1 << 4)
+#define WL_IMMEDIATELY_BREAK (1 << 5)
 
 /*
  * prototypes for functions in latch.c
@@ -113,6 +138,10 @@ extern void DisownLatch(volatile Latch *latch);
 extern int	WaitLatch(volatile Latch *latch, int wakeEvents, long timeout);
 extern int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents,
 				  pgsocket sock, long timeout);
+extern int WAITOBJCB_LATCH_DEF(pgwaitobject *wobj, int revents);
+extern int WAITOBJCB_SOCK_DEF(pgwaitobject *wobj, int revents);
+extern int WAITOBJCB_PMDEATH_DEF(pgwaitobject *wobj, int revents);
+extern int WaitLatchOrSocketMulti(pgwaitobject *wobjs, int nelem, long timeout);
 extern void SetLatch(volatile Latch *latch);
 extern void ResetLatch(volatile Latch *latch);
 
-- 
1.8.3.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to