Module Name: src
Committed By: pooka
Date: Fri Nov 19 15:25:50 UTC 2010
Modified Files:
src/lib/librumpclient: rumpclient.c
src/lib/librumpuser: rumpuser_sp.c sp_common.c
Log Message:
Start working on making the syscall proxy code threadsafe. The
basics are there, but a few more tweaks are needed. The reason
I'm committing it now is that the code was mindnumbingly boring to
write (no wonder it took me almost 3 years to get it done), and I
might burn it if it's not in a safe place.
To generate a diff of this commit:
cvs rdiff -u -r1.2 -r1.3 src/lib/librumpclient/rumpclient.c
cvs rdiff -u -r1.6 -r1.7 src/lib/librumpuser/rumpuser_sp.c
cvs rdiff -u -r1.3 -r1.4 src/lib/librumpuser/sp_common.c
Please note that diffs are not public domain; they are subject to the
copyright notices on the relevant files.
Modified files:
Index: src/lib/librumpclient/rumpclient.c
diff -u src/lib/librumpclient/rumpclient.c:1.2 src/lib/librumpclient/rumpclient.c:1.3
--- src/lib/librumpclient/rumpclient.c:1.2 Fri Nov 5 13:50:48 2010
+++ src/lib/librumpclient/rumpclient.c Fri Nov 19 15:25:49 2010
@@ -1,4 +1,4 @@
-/* $NetBSD: rumpclient.c,v 1.2 2010/11/05 13:50:48 pooka Exp $ */
+/* $NetBSD: rumpclient.c,v 1.3 2010/11/19 15:25:49 pooka Exp $ */
/*
* Copyright (c) 2010 Antti Kantee. All Rights Reserved.
@@ -58,52 +58,70 @@
static struct spclient clispc;
static int
-send_syscall_req(struct spclient *spc, int sysnum,
- const void *data, size_t dlen)
+syscall_req(struct spclient *spc, int sysnum,
+ const void *data, size_t dlen, void **resp)
{
struct rsp_hdr rhdr;
+ struct respwait rw;
+ int rv;
rhdr.rsp_len = sizeof(rhdr) + dlen;
- rhdr.rsp_reqno = nextreq++;
- rhdr.rsp_type = RUMPSP_SYSCALL_REQ;
+ rhdr.rsp_class = RUMPSP_REQ;
+ rhdr.rsp_type = RUMPSP_SYSCALL;
rhdr.rsp_sysnum = sysnum;
- dosend(spc, &rhdr, sizeof(rhdr));
- dosend(spc, data, dlen);
+ putwait(spc, &rw, &rhdr);
- return 0;
+ sendlock(spc);
+ rv = dosend(spc, &rhdr, sizeof(rhdr));
+ rv = dosend(spc, data, dlen);
+ sendunlock(spc);
+ if (rv)
+ return rv; /* XXX: unputwait */
+
+ rv = waitresp(spc, &rw);
+ *resp = rw.rw_data;
+ return rv;
}
static int
send_copyin_resp(struct spclient *spc, uint64_t reqno, void *data, size_t dlen)
{
struct rsp_hdr rhdr;
+ int rv;
rhdr.rsp_len = sizeof(rhdr) + dlen;
rhdr.rsp_reqno = reqno;
- rhdr.rsp_type = RUMPSP_COPYIN_RESP;
+ rhdr.rsp_class = RUMPSP_RESP;
+ rhdr.rsp_type = RUMPSP_COPYIN;
rhdr.rsp_sysnum = 0;
- dosend(spc, &rhdr, sizeof(rhdr));
- dosend(spc, data, dlen);
+ sendlock(spc);
+ rv = dosend(spc, &rhdr, sizeof(rhdr));
+ rv = dosend(spc, data, dlen);
+ sendunlock(spc);
- return 0;
+ return rv;
}
static int
send_anonmmap_resp(struct spclient *spc, uint64_t reqno, void *addr)
{
struct rsp_hdr rhdr;
+ int rv;
rhdr.rsp_len = sizeof(rhdr) + sizeof(addr);
rhdr.rsp_reqno = reqno;
- rhdr.rsp_type = RUMPSP_ANONMMAP_RESP;
+ rhdr.rsp_class = RUMPSP_RESP;
+ rhdr.rsp_type = RUMPSP_ANONMMAP;
rhdr.rsp_sysnum = 0;
- dosend(spc, &rhdr, sizeof(rhdr));
- dosend(spc, &addr, sizeof(addr));
+ sendlock(spc);
+ rv = dosend(spc, &rhdr, sizeof(rhdr));
+ rv = dosend(spc, &addr, sizeof(addr));
+ sendunlock(spc);
- return 0;
+ return rv;
}
int
@@ -111,71 +129,71 @@
register_t *retval)
{
struct rsp_sysresp *resp;
- struct rsp_copydata *copydata;
- struct pollfd pfd;
- size_t maplen;
- void *mapaddr;
- int gotresp;
+ void *rdata;
+ int rv;
- DPRINTF(("rump_sp_syscall: executing syscall %d\n", sysnum));
+ DPRINTF(("rumpsp syscall_req: syscall %d with %p/%zu\n",
+ sysnum, data, dlen));
- send_syscall_req(&clispc, sysnum, data, dlen);
+ rv = syscall_req(&clispc, sysnum, data, dlen, &rdata);
+ if (rv)
+ return rv;
+
+ resp = rdata;
+ DPRINTF(("rumpsp syscall_resp: syscall %d error %d, rv: %d/%d\n",
+ sysnum, rv, resp->rsys_retval[0], resp->rsys_retval[1]));
- DPRINTF(("rump_sp_syscall: syscall %d request sent. "
- "waiting for response\n", sysnum));
+ memcpy(retval, &resp->rsys_retval, sizeof(resp->rsys_retval));
+ rv = resp->rsys_error;
+ free(rdata);
- pfd.fd = clispc.spc_fd;
- pfd.events = POLLIN;
-
- gotresp = 0;
- while (!gotresp) {
- while (readframe(&clispc) < 1)
- poll(&pfd, 1, INFTIM);
-
- switch (clispc.spc_hdr.rsp_type) {
- case RUMPSP_COPYIN_REQ:
- /*LINTED*/
- copydata = (struct rsp_copydata *)clispc.spc_buf;
- DPRINTF(("rump_sp_syscall: copyin request: %p/%zu\n",
- copydata->rcp_addr, copydata->rcp_len));
- send_copyin_resp(&clispc, clispc.spc_hdr.rsp_reqno,
- copydata->rcp_addr, copydata->rcp_len);
- clispc.spc_off = 0;
- break;
- case RUMPSP_COPYOUT_REQ:
- /*LINTED*/
- copydata = (struct rsp_copydata *)clispc.spc_buf;
- DPRINTF(("rump_sp_syscall: copyout request: %p/%zu\n",
- copydata->rcp_addr, copydata->rcp_len));
- /*LINTED*/
- memcpy(copydata->rcp_addr, copydata->rcp_data,
- copydata->rcp_len);
- clispc.spc_off = 0;
- break;
- case RUMPSP_ANONMMAP_REQ:
- /*LINTED*/
- maplen = *(size_t *)clispc.spc_buf;
- mapaddr = mmap(NULL, maplen, PROT_READ|PROT_WRITE,
- MAP_ANON, -1, 0);
- if (mapaddr == MAP_FAILED)
- mapaddr = NULL;
- send_anonmmap_resp(&clispc,
- clispc.spc_hdr.rsp_reqno, mapaddr);
- clispc.spc_off = 0;
- break;
- case RUMPSP_SYSCALL_RESP:
- DPRINTF(("rump_sp_syscall: got response \n"));
- gotresp = 1;
- break;
- }
- }
+ return rv;
+}
- /*LINTED*/
- resp = (struct rsp_sysresp *)clispc.spc_buf;
- memcpy(retval, &resp->rsys_retval, sizeof(resp->rsys_retval));
- clispc.spc_off = 0;
+static void
+handlereq(struct spclient *spc)
+{
+ struct rsp_copydata *copydata;
+ void *mapaddr;
+ size_t maplen;
+
+ switch (spc->spc_hdr.rsp_type) {
+ case RUMPSP_COPYIN:
+ /*LINTED*/
+ copydata = (struct rsp_copydata *)spc->spc_buf;
+ DPRINTF(("rump_sp handlereq: copyin request: %p/%zu\n",
+ copydata->rcp_addr, copydata->rcp_len));
+ send_copyin_resp(spc, spc->spc_hdr.rsp_reqno,
+ copydata->rcp_addr, copydata->rcp_len);
+ break;
+ case RUMPSP_COPYOUT:
+ /*LINTED*/
+ copydata = (struct rsp_copydata *)spc->spc_buf;
+ DPRINTF(("rump_sp handlereq: copyout request: %p/%zu\n",
+ copydata->rcp_addr, copydata->rcp_len));
+ /*LINTED*/
+ memcpy(copydata->rcp_addr, copydata->rcp_data,
+ copydata->rcp_len);
+ break;
+ case RUMPSP_ANONMMAP:
+ /*LINTED*/
+ maplen = *(size_t *)spc->spc_buf;
+ mapaddr = mmap(NULL, maplen, PROT_READ|PROT_WRITE,
+ MAP_ANON, -1, 0);
+ if (mapaddr == MAP_FAILED)
+ mapaddr = NULL;
+ DPRINTF(("rump_sp handlereq: anonmmap: %p\n", mapaddr));
+ send_anonmmap_resp(spc, spc->spc_hdr.rsp_reqno, mapaddr);
+ break;
+ default:
+ printf("PANIC: INVALID TYPE\n");
+ abort();
+ break;
+ }
- return resp->rsys_error;
+ free(spc->spc_buf);
+ spc->spc_off = 0;
+ spc->spc_buf = NULL;
}
int
@@ -213,6 +231,9 @@
return -1;
}
clispc.spc_fd = s;
+ TAILQ_INIT(&clispc.spc_respwait);
+ pthread_mutex_init(&clispc.spc_mtx, NULL);
+ pthread_cond_init(&clispc.spc_cv, NULL);
return 0;
}
Index: src/lib/librumpuser/rumpuser_sp.c
diff -u src/lib/librumpuser/rumpuser_sp.c:1.6 src/lib/librumpuser/rumpuser_sp.c:1.7
--- src/lib/librumpuser/rumpuser_sp.c:1.6 Wed Nov 17 17:36:14 2010
+++ src/lib/librumpuser/rumpuser_sp.c Fri Nov 19 15:25:49 2010
@@ -1,4 +1,4 @@
-/* $NetBSD: rumpuser_sp.c,v 1.6 2010/11/17 17:36:14 pooka Exp $ */
+/* $NetBSD: rumpuser_sp.c,v 1.7 2010/11/19 15:25:49 pooka Exp $ */
/*
* Copyright (c) 2010 Antti Kantee. All Rights Reserved.
@@ -38,7 +38,7 @@
*/
#include <sys/cdefs.h>
-__RCSID("$NetBSD: rumpuser_sp.c,v 1.6 2010/11/17 17:36:14 pooka Exp $");
+__RCSID("$NetBSD: rumpuser_sp.c,v 1.7 2010/11/19 15:25:49 pooka Exp $");
#include <sys/types.h>
#include <sys/mman.h>
@@ -68,7 +68,6 @@
static struct pollfd pfdlist[MAXCLI];
static struct spclient spclist[MAXCLI];
static unsigned int nfds, maxidx;
-static uint64_t nextreq;
static pthread_key_t spclient_tls;
static struct rumpuser_sp_ops spops;
@@ -132,45 +131,77 @@
return rv;
}
+static uint64_t
+nextreq(struct spclient *spc)
+{
+ uint64_t nw;
+
+ pthread_mutex_lock(&spc->spc_mtx);
+ nw = spc->spc_nextreq++;
+ pthread_mutex_unlock(&spc->spc_mtx);
+
+ return nw;
+}
+
static int
send_syscall_resp(struct spclient *spc, uint64_t reqno, int error,
- register_t retval[2])
+ register_t *retval)
{
struct rsp_hdr rhdr;
struct rsp_sysresp sysresp;
+ int rv;
rhdr.rsp_len = sizeof(rhdr) + sizeof(sysresp);
rhdr.rsp_reqno = reqno;
- rhdr.rsp_type = RUMPSP_SYSCALL_RESP;
+ rhdr.rsp_class = RUMPSP_RESP;
+ rhdr.rsp_type = RUMPSP_SYSCALL;
rhdr.rsp_sysnum = 0;
sysresp.rsys_error = error;
- memcpy(sysresp.rsys_retval, retval, sizeof(retval));
+ memcpy(sysresp.rsys_retval, retval, sizeof(sysresp.rsys_retval));
- dosend(spc, &rhdr, sizeof(rhdr));
- dosend(spc, &sysresp, sizeof(sysresp));
+ sendlock(spc);
+ rv = dosend(spc, &rhdr, sizeof(rhdr));
+ rv = dosend(spc, &sysresp, sizeof(sysresp));
+ sendunlock(spc);
- return 0;
+ return rv;
}
static int
-send_copyin_req(struct spclient *spc, const void *remaddr, size_t dlen)
+copyin_req(struct spclient *spc, const void *remaddr, size_t dlen, void **resp)
{
struct rsp_hdr rhdr;
struct rsp_copydata copydata;
+ struct respwait rw;
+ int rv;
+
+ DPRINTF(("copyin_req: %zu bytes from %p\n", dlen, remaddr));
rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata);
- rhdr.rsp_reqno = nextreq++;
- rhdr.rsp_type = RUMPSP_COPYIN_REQ;
+ rhdr.rsp_class = RUMPSP_REQ;
+ rhdr.rsp_type = RUMPSP_COPYIN;
rhdr.rsp_sysnum = 0;
copydata.rcp_addr = __UNCONST(remaddr);
copydata.rcp_len = dlen;
- dosend(spc, &rhdr, sizeof(rhdr));
- dosend(spc, ©data, sizeof(copydata));
+ putwait(spc, &rw, &rhdr);
+
+ sendlock(spc);
+ rv = dosend(spc, &rhdr, sizeof(rhdr));
+ rv = dosend(spc, ©data, sizeof(copydata));
+ sendunlock(spc);
+ if (rv)
+ return rv; /* XXX: unputwait */
+
+ rv = waitresp(spc, &rw);
+
+ DPRINTF(("copyin: response %d\n", rv));
+
+ *resp = rw.rw_data;
+ return rv;
- return 0;
}
static int
@@ -179,36 +210,57 @@
{
struct rsp_hdr rhdr;
struct rsp_copydata copydata;
+ int rv;
+
+ DPRINTF(("copyout_req (async): %zu bytes to %p\n", dlen, remaddr));
rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata) + dlen;
- rhdr.rsp_reqno = nextreq++;
- rhdr.rsp_type = RUMPSP_COPYOUT_REQ;
+ rhdr.rsp_reqno = nextreq(spc);
+ rhdr.rsp_class = RUMPSP_REQ;
+ rhdr.rsp_type = RUMPSP_COPYOUT;
rhdr.rsp_sysnum = 0;
copydata.rcp_addr = __UNCONST(remaddr);
copydata.rcp_len = dlen;
- dosend(spc, &rhdr, sizeof(rhdr));
- dosend(spc, ©data, sizeof(copydata));
- dosend(spc, data, dlen);
+ sendlock(spc);
+ rv = dosend(spc, &rhdr, sizeof(rhdr));
+ rv = dosend(spc, ©data, sizeof(copydata));
+ rv = dosend(spc, data, dlen);
+ sendunlock(spc);
- return 0;
+ return rv;
}
static int
-send_anonmmap_req(struct spclient *spc, size_t howmuch)
+anonmmap_req(struct spclient *spc, size_t howmuch, void **resp)
{
struct rsp_hdr rhdr;
+ struct respwait rw;
+ int rv;
+
+ DPRINTF(("anonmmap_req: %zu bytes\n", howmuch));
rhdr.rsp_len = sizeof(rhdr) + sizeof(howmuch);
- rhdr.rsp_reqno = nextreq++;
- rhdr.rsp_type = RUMPSP_ANONMMAP_REQ;
+ rhdr.rsp_class = RUMPSP_REQ;
+ rhdr.rsp_type = RUMPSP_ANONMMAP;
rhdr.rsp_sysnum = 0;
- dosend(spc, &rhdr, sizeof(rhdr));
- dosend(spc, &howmuch, sizeof(howmuch));
+ putwait(spc, &rw, &rhdr);
- return 0;
+ sendlock(spc);
+ rv = dosend(spc, &rhdr, sizeof(rhdr));
+ rv = dosend(spc, &howmuch, sizeof(howmuch));
+ sendunlock(spc);
+ if (rv)
+ return rv; /* XXX: unputwait */
+
+ rv = waitresp(spc, &rw);
+ *resp = rw.rw_data;
+
+ DPRINTF(("anonmmap: mapped at %p\n", **(void ***)resp));
+
+ return rv;
}
static void
@@ -222,6 +274,8 @@
lwproc_switch(spc->spc_lwp);
lwproc_release();
+ pthread_mutex_destroy(&spc->spc_mtx);
+ pthread_cond_destroy(&spc->spc_cv);
free(spc->spc_buf);
memset(spc, 0, sizeof(*spc));
close(fd);
@@ -289,6 +343,12 @@
pfdlist[i].fd = newfd;
spclist[i].spc_fd = newfd;
spclist[i].spc_lwp = lwproc_curlwp();
+ spclist[i].spc_istatus = SPCSTATUS_BUSY; /* dedicated receiver */
+
+ TAILQ_INIT(&spclist[i].spc_respwait);
+ pthread_mutex_init(&spclist[i].spc_mtx, NULL);
+ pthread_cond_init(&spclist[i].spc_cv, NULL);
+
if (maxidx < i)
maxidx = i;
@@ -303,7 +363,7 @@
static void
serv_handlesyscall(struct spclient *spc, struct rsp_hdr *rhdr, uint8_t *data)
{
- register_t retval[2];
+ register_t retval[2] = {0, 0};
int rv, sysnum;
sysnum = (int)rhdr->rsp_sysnum;
@@ -317,36 +377,41 @@
pthread_setspecific(spclient_tls, NULL);
free(data);
- DPRINTF(("rump_sp: got return value %d\n", rv));
+ DPRINTF(("rump_sp: got return value %d & %d/%d\n",
+ rv, retval[0], retval[1]));
send_syscall_resp(spc, rhdr->rsp_reqno, rv, retval);
}
+struct sysbouncearg {
+ struct spclient *sba_spc;
+ struct rsp_hdr sba_hdr;
+ uint8_t *sba_data;
+};
+static void *
+serv_syscallbouncer(void *arg)
+{
+ struct sysbouncearg *barg = arg;
+
+ serv_handlesyscall(barg->sba_spc, &barg->sba_hdr, barg->sba_data);
+ free(arg);
+ return NULL;
+}
+
int
rumpuser_sp_copyin(const void *uaddr, void *kaddr, size_t len)
{
struct spclient *spc;
- struct pollfd pfd;
+ void *rdata;
spc = pthread_getspecific(spclient_tls);
if (!spc)
return EFAULT;
- send_copyin_req(spc, uaddr, len);
+ copyin_req(spc, uaddr, len, &rdata);
- pfd.fd = spc->spc_fd;
- pfd.events = POLLIN;
- do {
- poll(&pfd, 1, INFTIM);
- } while (readframe(spc) < 1);
-
- if (spc->spc_hdr.rsp_type != RUMPSP_COPYIN_RESP) {
- abort();
- }
-
- memcpy(kaddr, spc->spc_buf, len);
- free(spc->spc_buf);
- spc->spc_off = 0;
+ memcpy(kaddr, rdata, len);
+ free(rdata);
return 0;
}
@@ -362,8 +427,8 @@
return EFAULT;
}
- send_copyout_req(spc, uaddr, kaddr, dlen);
-
+ if (send_copyout_req(spc, uaddr, kaddr, dlen) != 0)
+ return EFAULT;
return 0;
}
@@ -371,31 +436,23 @@
rumpuser_sp_anonmmap(size_t howmuch, void **addr)
{
struct spclient *spc;
- struct pollfd pfd;
- void *resp;
+ void *resp, *rdata;
+ int rv;
spc = pthread_getspecific(spclient_tls);
if (!spc)
return EFAULT;
- send_anonmmap_req(spc, howmuch);
-
- pfd.fd = spc->spc_fd;
- pfd.events = POLLIN;
- do {
- poll(&pfd, 1, INFTIM);
- } while (readframe(spc) < 1);
-
- if (spc->spc_hdr.rsp_type != RUMPSP_ANONMMAP_RESP) {
- abort();
- }
+ rv = anonmmap_req(spc, howmuch, &rdata);
+ if (rv)
+ return rv;
- /*LINTED*/
- resp = *(void **)spc->spc_buf;
- spc->spc_off = 0;
+ resp = *(void **)rdata;
+ free(rdata);
- if (resp == NULL)
+ if (resp == NULL) {
return ENOMEM;
+ }
*addr = resp;
return 0;
@@ -412,6 +469,38 @@
connecthook_fn sps_connhook;
};
+static void
+handlereq(struct spclient *spc)
+{
+ struct sysbouncearg *sba;
+ pthread_attr_t pattr;
+ pthread_t pt;
+ int rv;
+
+ /* XXX: check that it's a syscall */
+
+ sba = malloc(sizeof(*sba));
+ if (sba == NULL) {
+ /* panic */
+ abort();
+ }
+
+ sba->sba_spc = spc;
+ sba->sba_hdr = spc->spc_hdr;
+ sba->sba_data = spc->spc_buf;
+
+ spc->spc_buf = NULL;
+ spc->spc_off = 0;
+
+ pthread_attr_init(&pattr);
+ pthread_attr_setdetachstate(&pattr, 1);
+
+ if ((rv = pthread_create(&pt, &pattr, serv_syscallbouncer, sba)) != 0) {
+ /* panic */
+ abort();
+ }
+}
+
static void *
spserver(void *arg)
{
@@ -465,10 +554,18 @@
serv_handledisco(idx);
break;
default:
- spc->spc_off = 0;
- serv_handlesyscall(spc,
- &spc->spc_hdr, spc->spc_buf);
- spc->spc_buf = NULL;
+ switch (spc->spc_hdr.rsp_class) {
+ case RUMPSP_RESP:
+ kickwaiter(spc);
+ break;
+ case RUMPSP_REQ:
+ handlereq(spc);
+ break;
+ default:
+ printf("PANIC\n");
+ abort();
+ break;
+ }
break;
}
} else {
Index: src/lib/librumpuser/sp_common.c
diff -u src/lib/librumpuser/sp_common.c:1.3 src/lib/librumpuser/sp_common.c:1.4
--- src/lib/librumpuser/sp_common.c:1.3 Wed Nov 10 16:12:15 2010
+++ src/lib/librumpuser/sp_common.c Fri Nov 19 15:25:49 2010
@@ -1,4 +1,4 @@
-/* $NetBSD: sp_common.c,v 1.3 2010/11/10 16:12:15 pooka Exp $ */
+/* $NetBSD: sp_common.c,v 1.4 2010/11/19 15:25:49 pooka Exp $ */
/*
* Copyright (c) 2010 Antti Kantee. All Rights Reserved.
@@ -33,6 +33,7 @@
#include <sys/types.h>
#include <sys/mman.h>
+#include <sys/queue.h>
#include <sys/socket.h>
#include <sys/un.h>
@@ -44,6 +45,7 @@
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
+#include <pthread.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
@@ -70,17 +72,14 @@
* Bah, I hate writing on-off-wire conversions in C
*/
-enum {
- RUMPSP_SYSCALL_REQ, RUMPSP_SYSCALL_RESP,
- RUMPSP_COPYIN_REQ, RUMPSP_COPYIN_RESP,
- RUMPSP_COPYOUT_REQ, /* no copyout resp */
- RUMPSP_ANONMMAP_REQ, RUMPSP_ANONMMAP_RESP
-};
+enum { RUMPSP_REQ, RUMPSP_RESP };
+enum { RUMPSP_SYSCALL, RUMPSP_COPYIN, RUMPSP_COPYOUT, RUMPSP_ANONMMAP };
struct rsp_hdr {
uint64_t rsp_len;
uint64_t rsp_reqno;
- uint32_t rsp_type;
+ uint16_t rsp_class;
+ uint16_t rsp_type;
/*
* We want this structure 64bit-aligned for typecast fun,
* so might as well use the following for something.
@@ -106,6 +105,15 @@
register_t rsys_retval[2];
};
+struct respwait {
+ uint64_t rw_reqno;
+ void *rw_data;
+ size_t rw_dlen;
+
+ pthread_cond_t rw_cv;
+
+ TAILQ_ENTRY(respwait) rw_entries;
+};
struct spclient {
int spc_fd;
@@ -116,18 +124,47 @@
uint8_t *spc_buf;
size_t spc_off;
-#if 0
- /* outgoing */
- int spc_obusy;
- pthread_mutex_t spc_omtx;
+ pthread_mutex_t spc_mtx;
pthread_cond_t spc_cv;
-#endif
+
+ uint64_t spc_nextreq;
+ int spc_ostatus, spc_istatus;
+
+ TAILQ_HEAD(, respwait) spc_respwait;
};
+#define SPCSTATUS_FREE 0
+#define SPCSTATUS_BUSY 1
+#define SPCSTATUS_WANTED 2
typedef int (*addrparse_fn)(const char *, struct sockaddr **, int);
typedef int (*connecthook_fn)(int);
-static uint64_t nextreq;
+static int readframe(struct spclient *);
+static void handlereq(struct spclient *);
+
+static void
+sendlock(struct spclient *spc)
+{
+
+ pthread_mutex_lock(&spc->spc_mtx);
+ while (spc->spc_ostatus != SPCSTATUS_FREE) {
+ spc->spc_ostatus = SPCSTATUS_WANTED;
+ pthread_cond_wait(&spc->spc_cv, &spc->spc_mtx);
+ }
+ spc->spc_ostatus = SPCSTATUS_BUSY;
+ pthread_mutex_unlock(&spc->spc_mtx);
+}
+
+static void
+sendunlock(struct spclient *spc)
+{
+
+ pthread_mutex_lock(&spc->spc_mtx);
+ if (spc->spc_ostatus == SPCSTATUS_WANTED)
+ pthread_cond_broadcast(&spc->spc_cv);
+ spc->spc_ostatus = SPCSTATUS_FREE;
+ pthread_mutex_unlock(&spc->spc_mtx);
+}
static int
dosend(struct spclient *spc, const void *data, size_t dlen)
@@ -163,6 +200,108 @@
return 0;
}
+static void
+putwait(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr)
+{
+
+ rw->rw_data = NULL;
+ rw->rw_dlen = 0;
+ pthread_cond_init(&rw->rw_cv, NULL);
+
+ pthread_mutex_lock(&spc->spc_mtx);
+ rw->rw_reqno = rhdr->rsp_reqno = spc->spc_nextreq++;
+ TAILQ_INSERT_TAIL(&spc->spc_respwait, rw, rw_entries);
+ pthread_mutex_unlock(&spc->spc_mtx);
+}
+
+static void
+kickwaiter(struct spclient *spc)
+{
+ struct respwait *rw;
+
+ pthread_mutex_lock(&spc->spc_mtx);
+ TAILQ_FOREACH(rw, &spc->spc_respwait, rw_entries) {
+ if (rw->rw_reqno == spc->spc_hdr.rsp_reqno)
+ break;
+ }
+ if (rw == NULL) {
+ printf("PANIC: no waiter\n");
+ pthread_mutex_unlock(&spc->spc_mtx);
+ return;
+ }
+ rw->rw_data = spc->spc_buf;
+ TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
+ pthread_cond_signal(&rw->rw_cv);
+ pthread_mutex_unlock(&spc->spc_mtx);
+
+ spc->spc_buf = NULL;
+ spc->spc_off = 0;
+}
+
+static void
+kickall(struct spclient *spc)
+{
+ struct respwait *rw;
+
+ /* DIAGASSERT(mutex_owned(spc_lock)) */
+ TAILQ_FOREACH(rw, &spc->spc_respwait, rw_entries)
+ pthread_cond_signal(&rw->rw_cv);
+}
+
+static int
+waitresp(struct spclient *spc, struct respwait *rw)
+{
+ struct pollfd pfd;
+ int rv = 0;
+
+ pthread_mutex_lock(&spc->spc_mtx);
+ while (rw->rw_data == NULL) {
+ /* are we free to receive? */
+ if (spc->spc_istatus == SPCSTATUS_FREE) {
+ int gotresp;
+
+ spc->spc_istatus = SPCSTATUS_BUSY;
+ pthread_mutex_unlock(&spc->spc_mtx);
+
+ pfd.fd = spc->spc_fd;
+ pfd.events = POLLIN;
+
+ for (gotresp = 0; !gotresp; ) {
+ while (readframe(spc) < 1)
+ poll(&pfd, 1, INFTIM);
+
+ switch (spc->spc_hdr.rsp_class) {
+ case RUMPSP_RESP:
+ kickwaiter(spc);
+ gotresp = spc->spc_hdr.rsp_reqno ==
+ rw->rw_reqno;
+ break;
+ case RUMPSP_REQ:
+ handlereq(spc);
+ break;
+ default:
+ /* panic */
+ break;
+ }
+ }
+ pthread_mutex_lock(&spc->spc_mtx);
+ if (spc->spc_istatus == SPCSTATUS_WANTED)
+ kickall(spc);
+ spc->spc_istatus = SPCSTATUS_FREE;
+ pthread_mutex_unlock(&spc->spc_mtx);
+ } else {
+ spc->spc_istatus = SPCSTATUS_WANTED;
+ pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx);
+ }
+ }
+
+ TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
+ pthread_mutex_unlock(&spc->spc_mtx);
+
+ pthread_cond_destroy(&rw->rw_cv);
+ return rv;
+}
+
static int
readframe(struct spclient *spc)
{