Module Name: src Committed By: pooka Date: Wed Jan 5 17:14:50 UTC 2011
Modified Files: src/lib/librumpclient: rumpclient.c rumpclient.h src/lib/librumpuser: rumpuser_sp.c sp_common.c Log Message: Support fork() for rumpclient users. To generate a diff of this commit: cvs rdiff -u -r1.10 -r1.11 src/lib/librumpclient/rumpclient.c cvs rdiff -u -r1.1 -r1.2 src/lib/librumpclient/rumpclient.h cvs rdiff -u -r1.28 -r1.29 src/lib/librumpuser/rumpuser_sp.c cvs rdiff -u -r1.17 -r1.18 src/lib/librumpuser/sp_common.c Please note that diffs are not public domain; they are subject to the copyright notices on the relevant files.
Modified files: Index: src/lib/librumpclient/rumpclient.c diff -u src/lib/librumpclient/rumpclient.c:1.10 src/lib/librumpclient/rumpclient.c:1.11 --- src/lib/librumpclient/rumpclient.c:1.10 Thu Dec 16 17:05:44 2010 +++ src/lib/librumpclient/rumpclient.c Wed Jan 5 17:14:50 2011 @@ -1,7 +1,7 @@ -/* $NetBSD: rumpclient.c,v 1.10 2010/12/16 17:05:44 pooka Exp $ */ +/* $NetBSD: rumpclient.c,v 1.11 2011/01/05 17:14:50 pooka Exp $ */ /* - * Copyright (c) 2010 Antti Kantee. All Rights Reserved. + * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -45,6 +45,7 @@ #include <fcntl.h> #include <poll.h> #include <pthread.h> +#include <signal.h> #include <stdarg.h> #include <stdio.h> #include <stdlib.h> @@ -55,7 +56,9 @@ #include "sp_common.c" -static struct spclient clispc; +static struct spclient clispc = { + .spc_fd = -1, +}; static int syscall_req(struct spclient *spc, int sysnum, @@ -87,21 +90,30 @@ } static int -handshake_req(struct spclient *spc) +handshake_req(struct spclient *spc, uint32_t *auth, int cancel) { + struct handshake_fork rf; struct rsp_hdr rhdr; struct respwait rw; int rv; /* performs server handshake */ - rhdr.rsp_len = sizeof(rhdr); + rhdr.rsp_len = sizeof(rhdr) + (auth ? sizeof(rf) : 0); rhdr.rsp_class = RUMPSP_REQ; rhdr.rsp_type = RUMPSP_HANDSHAKE; - rhdr.rsp_handshake = HANDSHAKE_GUEST; + if (auth) + rhdr.rsp_handshake = HANDSHAKE_FORK; + else + rhdr.rsp_handshake = HANDSHAKE_GUEST; putwait(spc, &rw, &rhdr); rv = dosend(spc, &rhdr, sizeof(rhdr)); - if (rv != 0) { + if (auth) { + memcpy(rf.rf_auth, auth, AUTHLEN*sizeof(*auth)); + rf.rf_cancel = cancel; + rv = dosend(spc, &rf, sizeof(rf)); + } + if (rv != 0 || cancel) { unputwait(spc, &rw); return rv; } @@ -117,6 +129,31 @@ } static int +prefork_req(struct spclient *spc, void **resp) +{ + struct rsp_hdr rhdr; + struct respwait rw; + int rv; + + rhdr.rsp_len = sizeof(rhdr); + rhdr.rsp_class = RUMPSP_REQ; + rhdr.rsp_type = RUMPSP_PREFORK; + rhdr.rsp_error = 0; + + + putwait(spc, &rw, &rhdr); + rv = dosend(spc, &rhdr, sizeof(rhdr)); + if (rv != 0) { + unputwait(spc, &rw); + return rv; + } + + rv = waitresp(spc, &rw); + *resp = rw.rw_data; + return rv; +} + +static int send_copyin_resp(struct spclient *spc, uint64_t reqno, void *data, size_t dlen, int wantstr) { @@ -234,33 +271,30 @@ spcfreebuf(spc); } -int -rumpclient_init() +static unsigned ptab_idx; +static struct sockaddr *serv_sa; + +static int +doconnect(void) { char banner[MAXBANNER]; - struct sockaddr *sap; - char *p; - unsigned idx; + int s, error; ssize_t n; - int error, s; - if ((p = getenv("RUMP_SERVER")) == NULL) { - errno = ENOENT; + s = socket(parsetab[ptab_idx].domain, SOCK_STREAM, 0); + if (s == -1) return -1; - } - if ((error = parseurl(p, &sap, &idx, 0)) != 0) { + if (connect(s, serv_sa, (socklen_t)serv_sa->sa_len) == -1) { + error = errno; + fprintf(stderr, "rump_sp: client connect failed\n"); errno = error; return -1; } - s = socket(parsetab[idx].domain, SOCK_STREAM, 0); - if (s == -1) - return -1; - - if (connect(s, sap, (socklen_t)sap->sa_len) == -1) { + if ((error = parsetab[ptab_idx].connhook(s)) != 0) { error = errno; - fprintf(stderr, "rump_sp: client connect failed\n"); + fprintf(stderr, "rump_sp: connect hook failed\n"); errno = error; return -1; } @@ -281,23 +315,91 @@ /* parse the banner some day */ - if ((error = parsetab[idx].connhook(s)) != 0) { - error = errno; - fprintf(stderr, "rump_sp: connect hook failed\n"); + clispc.spc_fd = s; + TAILQ_INIT(&clispc.spc_respwait); + pthread_mutex_init(&clispc.spc_mtx, NULL); + pthread_cond_init(&clispc.spc_cv, NULL); + + return 0; +} + +int +rumpclient_init() +{ + char *p; + int error; + + if ((p = getenv("RUMP_SERVER")) == NULL) { + errno = ENOENT; + return -1; + } + + if ((error = parseurl(p, &serv_sa, &ptab_idx, 0)) != 0) { errno = error; return -1; } - pthread_mutex_init(&clispc.spc_mtx, NULL); - pthread_cond_init(&clispc.spc_cv, NULL); - clispc.spc_fd = s; - TAILQ_INIT(&clispc.spc_respwait); + if (doconnect() == -1) + return -1; + + error = handshake_req(&clispc, NULL, 0); + if (error) { + pthread_mutex_destroy(&clispc.spc_mtx); + pthread_cond_destroy(&clispc.spc_cv); + close(clispc.spc_fd); + errno = error; + return -1; + } + + return 0; +} + +struct rumpclient_fork { + uint32_t fork_auth[AUTHLEN]; +}; + +struct rumpclient_fork * +rumpclient_prefork(void) +{ + struct rumpclient_fork *rpf; + void *resp; + int rv; + + rpf = malloc(sizeof(*rpf)); + if (rpf == NULL) + return NULL; + + if ((rv = prefork_req(&clispc, &resp)) != 0) { + free(rpf); + errno = rv; + return NULL; + } + + memcpy(rpf->fork_auth, resp, sizeof(rpf->fork_auth)); + free(resp); - error = handshake_req(&clispc); + return rpf; +} + +int +rumpclient_fork_init(struct rumpclient_fork *rpf) +{ + int error; + + close(clispc.spc_fd); + memset(&clispc, 0, sizeof(clispc)); + clispc.spc_fd = -1; + + if (doconnect() == -1) + return -1; + + error = handshake_req(&clispc, rpf->fork_auth, 0); if (error) { pthread_mutex_destroy(&clispc.spc_mtx); pthread_cond_destroy(&clispc.spc_cv); - close(s); + errno = error; + return -1; } - return error; + + return 0; } Index: src/lib/librumpclient/rumpclient.h diff -u src/lib/librumpclient/rumpclient.h:1.1 src/lib/librumpclient/rumpclient.h:1.2 --- src/lib/librumpclient/rumpclient.h:1.1 Thu Nov 4 21:01:29 2010 +++ src/lib/librumpclient/rumpclient.h Wed Jan 5 17:14:50 2011 @@ -1,4 +1,4 @@ -/* $NetBSD: rumpclient.h,v 1.1 2010/11/04 21:01:29 pooka Exp $ */ +/* $NetBSD: rumpclient.h,v 1.2 2011/01/05 17:14:50 pooka Exp $ */ /*- * Copyright (c) 2010 Antti Kantee. All Rights Reserved. @@ -33,6 +33,10 @@ int rumpclient_syscall(int, const void *, size_t, register_t *); int rumpclient_init(void); +struct rumpclient_fork; +struct rumpclient_fork *rumpclient_prefork(void); +int rumpclient_fork_init(struct rumpclient_fork *); + __END_DECLS #endif /* _RUMP_RUMPCLIENT_H_ */ Index: src/lib/librumpuser/rumpuser_sp.c diff -u src/lib/librumpuser/rumpuser_sp.c:1.28 src/lib/librumpuser/rumpuser_sp.c:1.29 --- src/lib/librumpuser/rumpuser_sp.c:1.28 Sun Jan 2 13:01:45 2011 +++ src/lib/librumpuser/rumpuser_sp.c Wed Jan 5 17:14:50 2011 @@ -1,7 +1,7 @@ -/* $NetBSD: rumpuser_sp.c,v 1.28 2011/01/02 13:01:45 pooka Exp $ */ +/* $NetBSD: rumpuser_sp.c,v 1.29 2011/01/05 17:14:50 pooka Exp $ */ /* - * Copyright (c) 2010 Antti Kantee. All Rights Reserved. + * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -35,7 +35,7 @@ */ #include <sys/cdefs.h> -__RCSID("$NetBSD: rumpuser_sp.c,v 1.28 2011/01/02 13:01:45 pooka Exp $"); +__RCSID("$NetBSD: rumpuser_sp.c,v 1.29 2011/01/05 17:14:50 pooka Exp $"); #include <sys/types.h> #include <sys/atomic.h> @@ -85,7 +85,17 @@ static char banner[MAXBANNER]; #define PROTOMAJOR 0 -#define PROTOMINOR 0 +#define PROTOMINOR 1 + +struct prefork { + uint32_t pf_auth[AUTHLEN]; + struct lwp *pf_lwp; + + LIST_ENTRY(prefork) pf_entries; /* global list */ + LIST_ENTRY(prefork) pf_spcentries; /* linked from forking spc */ +}; +static LIST_HEAD(, prefork) preforks = LIST_HEAD_INITIALIZER(preforks); +static pthread_mutex_t pfmtx; /* * Manual wrappers, since librump does not have access to the @@ -244,6 +254,26 @@ } static int +send_prefork_resp(struct spclient *spc, uint64_t reqno, uint32_t *auth) +{ + struct rsp_hdr rhdr; + int rv; + + rhdr.rsp_len = sizeof(rhdr) + AUTHLEN*sizeof(*auth); + rhdr.rsp_reqno = reqno; + rhdr.rsp_class = RUMPSP_RESP; + rhdr.rsp_type = RUMPSP_PREFORK; + rhdr.rsp_sysnum = 0; + + sendlock(spc); + rv = dosend(spc, &rhdr, sizeof(rhdr)); + rv = dosend(spc, auth, AUTHLEN*sizeof(*auth)); + sendunlock(spc); + + return rv; +} + +static int copyin_req(struct spclient *spc, const void *remaddr, size_t *dlen, int wantstr, void **resp) { @@ -365,13 +395,15 @@ if (ref > 0) return; - DPRINTF(("spcrelease: spc %p fd %d\n", spc, spc->spc_fd)); + DPRINTF(("rump_sp: spcrelease: spc %p fd %d\n", spc, spc->spc_fd)); _DIAGASSERT(TAILQ_EMPTY(&spc->spc_respwait)); _DIAGASSERT(spc->spc_buf == NULL); - lwproc_switch(spc->spc_mainlwp); - lwproc_release(); + if (spc->spc_mainlwp) { + lwproc_switch(spc->spc_mainlwp); + lwproc_release(); + } spc->spc_mainlwp = NULL; close(spc->spc_fd); @@ -464,26 +496,16 @@ break; } - if (lwproc_rfork(&spclist[i], RUMP_RFCFDG) != 0) { - close(newfd); - return 0; - } - assert(i < MAXCLI); pfdlist[i].fd = newfd; spclist[i].spc_fd = newfd; - spclist[i].spc_mainlwp = lwproc_curlwp(); spclist[i].spc_istatus = SPCSTATUS_BUSY; /* dedicated receiver */ - spclist[i].spc_pid = lwproc_getpid(); spclist[i].spc_refcnt = 1; TAILQ_INIT(&spclist[i].spc_respwait); - DPRINTF(("rump_sp: added new connection fd %d at idx %u, pid %d\n", - newfd, i, lwproc_getpid())); - - lwproc_switch(NULL); + DPRINTF(("rump_sp: added new connection fd %d at idx %u\n", newfd, i)); return i; } @@ -496,7 +518,7 @@ sysnum = (int)rhdr->rsp_sysnum; DPRINTF(("rump_sp: handling syscall %d from client %d\n", - sysnum, 0)); + sysnum, spc->spc_pid)); lwproc_newlwp(spc->spc_pid); rv = rumpsyscall(sysnum, data, retval); @@ -668,22 +690,150 @@ { struct sysbouncearg *sba; pthread_t pt; - int retries, rv; + int retries, error, i; if (__predict_false(spc->spc_state == SPCSTATE_NEW)) { if (spc->spc_hdr.rsp_type != RUMPSP_HANDSHAKE) { send_error_resp(spc, spc->spc_hdr.rsp_reqno, EAUTH); + shutdown(spc->spc_fd, SHUT_RDWR); spcfreebuf(spc); return; } - rv = send_handshake_resp(spc, spc->spc_hdr.rsp_reqno, 0); + if (spc->spc_hdr.rsp_handshake == HANDSHAKE_GUEST) { + if ((error = lwproc_rfork(spc, RUMP_RFCFDG)) != 0) { + shutdown(spc->spc_fd, SHUT_RDWR); + } + + spcfreebuf(spc); + if (error) + return; + + spc->spc_mainlwp = lwproc_curlwp(); + + send_handshake_resp(spc, spc->spc_hdr.rsp_reqno, 0); + } else if (spc->spc_hdr.rsp_handshake == HANDSHAKE_FORK) { + struct lwp *tmpmain; + struct prefork *pf; + struct handshake_fork *rfp; + uint64_t reqno; + int cancel; + + reqno = spc->spc_hdr.rsp_reqno; + if (spc->spc_off-HDRSZ != sizeof(*rfp)) { + send_error_resp(spc, reqno, EINVAL); + shutdown(spc->spc_fd, SHUT_RDWR); + spcfreebuf(spc); + return; + } + + /*LINTED*/ + rfp = (void *)spc->spc_buf; + cancel = rfp->rf_cancel; + + pthread_mutex_lock(&pfmtx); + LIST_FOREACH(pf, &preforks, pf_entries) { + if (memcmp(rfp->rf_auth, pf->pf_auth, + sizeof(rfp->rf_auth)) == 0) { + LIST_REMOVE(pf, pf_entries); + LIST_REMOVE(pf, pf_spcentries); + break; + } + } + pthread_mutex_lock(&pfmtx); + spcfreebuf(spc); + + if (!pf) { + send_error_resp(spc, reqno, ESRCH); + shutdown(spc->spc_fd, SHUT_RDWR); + return; + } + + tmpmain = pf->pf_lwp; + free(pf); + lwproc_switch(tmpmain); + if (cancel) { + lwproc_release(); + shutdown(spc->spc_fd, SHUT_RDWR); + return; + } + + /* + * So, we forked already during "prefork" to save + * the file descriptors from a parent exit + * race condition. But now we need to fork + * a second time since the initial fork has + * the wrong spc pointer. (yea, optimize + * interfaces some day if anyone cares) + */ + if ((error = lwproc_rfork(spc, 0)) != 0) { + send_error_resp(spc, reqno, error); + shutdown(spc->spc_fd, SHUT_RDWR); + lwproc_release(); + return; + } + spc->spc_mainlwp = lwproc_curlwp(); + lwproc_switch(tmpmain); + lwproc_release(); + lwproc_switch(spc->spc_mainlwp); + + send_handshake_resp(spc, reqno, 0); + } + + spc->spc_pid = lwproc_getpid(); + + DPRINTF(("rump_sp: handshake for client %p complete, pid %d\n", + spc, spc->spc_pid)); + + lwproc_switch(NULL); + spc->spc_state = SPCSTATE_RUNNING; + return; + } + + if (__predict_false(spc->spc_hdr.rsp_type == RUMPSP_PREFORK)) { + struct prefork *pf; + uint64_t reqno; + uint32_t auth[AUTHLEN]; + + DPRINTF(("rump_sp: prefork handler executing for %p\n", spc)); + reqno = spc->spc_hdr.rsp_reqno; spcfreebuf(spc); - if (rv) { - shutdown(spc->spc_fd, SHUT_RDWR); + + pf = malloc(sizeof(*pf)); + if (pf == NULL) { + send_error_resp(spc, reqno, ENOMEM); return; } - spc->spc_state = SPCSTATE_RUNNING; + + /* + * Use client main lwp to fork. this is never used by + * worker threads (except if spc refcount goes to 0), + * so we can safely use it here. + */ + lwproc_switch(spc->spc_mainlwp); + if ((error = lwproc_rfork(spc, RUMP_RFFDG)) != 0) { + DPRINTF(("rump_sp: fork failed: %d (%p)\n",error, spc)); + send_error_resp(spc, reqno, error); + lwproc_switch(NULL); + free(pf); + return; + } + + /* Ok, we have a new process context and a new curlwp */ + for (i = 0; i < AUTHLEN; i++) { + pf->pf_auth[i] = auth[i] = arc4random(); + } + pf->pf_lwp = lwproc_curlwp(); + lwproc_switch(NULL); + + pthread_mutex_lock(&pfmtx); + LIST_INSERT_HEAD(&preforks, pf, pf_entries); + LIST_INSERT_HEAD(&spc->spc_pflist, pf, pf_spcentries); + pthread_mutex_unlock(&pfmtx); + + DPRINTF(("rump_sp: prefork handler success %p\n", spc)); + + send_prefork_resp(spc, reqno, auth); return; } Index: src/lib/librumpuser/sp_common.c diff -u src/lib/librumpuser/sp_common.c:1.17 src/lib/librumpuser/sp_common.c:1.18 --- src/lib/librumpuser/sp_common.c:1.17 Thu Dec 16 17:05:44 2010 +++ src/lib/librumpuser/sp_common.c Wed Jan 5 17:14:50 2011 @@ -1,7 +1,7 @@ -/* $NetBSD: sp_common.c,v 1.17 2010/12/16 17:05:44 pooka Exp $ */ +/* $NetBSD: sp_common.c,v 1.18 2011/01/05 17:14:50 pooka Exp $ */ /* - * Copyright (c) 2010 Antti Kantee. All Rights Reserved. + * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -80,9 +80,12 @@ RUMPSP_SYSCALL, RUMPSP_COPYIN, RUMPSP_COPYINSTR, RUMPSP_COPYOUT, RUMPSP_COPYOUTSTR, - RUMPSP_ANONMMAP }; + RUMPSP_ANONMMAP, + RUMPSP_PREFORK }; -enum { HANDSHAKE_GUEST, HANDSHAKE_AUTH }; /* more to come */ +enum { HANDSHAKE_GUEST, HANDSHAKE_AUTH, HANDSHAKE_FORK }; + +#define AUTHLEN 4 /* 128bit fork auth */ struct rsp_hdr { uint64_t rsp_len; @@ -123,6 +126,11 @@ register_t rsys_retval[2]; }; +struct handshake_fork { + uint32_t rf_auth[4]; + int rf_cancel; +}; + struct respwait { uint64_t rw_reqno; void *rw_data; @@ -134,6 +142,7 @@ TAILQ_ENTRY(respwait) rw_entries; }; +struct prefork; struct spclient { int spc_fd; int spc_refcnt; @@ -157,6 +166,8 @@ uint64_t spc_nextreq; int spc_ostatus, spc_istatus; + + LIST_HEAD(, prefork) spc_pflist; }; #define SPCSTATUS_FREE 0 #define SPCSTATUS_BUSY 1 @@ -252,11 +263,11 @@ n = send(fd, sdata + sent, dlen - sent, MSG_NOSIGNAL); if (n == 0) { - return EFAULT; + return ENOTCONN; } if (n == -1) { if (errno != EAGAIN) - return EFAULT; + return errno; continue; } sent += n; @@ -305,6 +316,7 @@ if (rw == NULL) { DPRINTF(("no waiter found, invalid reqno %" PRIu64 "?\n", spc->spc_hdr.rsp_reqno)); + spcfreebuf(spc); return; } DPRINTF(("rump_sp: client %p woke up waiter at %p\n", spc, rw));