Module Name:    src
Committed By:   pooka
Date:           Wed Jan  5 17:14:50 UTC 2011

Modified Files:
        src/lib/librumpclient: rumpclient.c rumpclient.h
        src/lib/librumpuser: rumpuser_sp.c sp_common.c

Log Message:
Support fork() for rumpclient users.


To generate a diff of this commit:
cvs rdiff -u -r1.10 -r1.11 src/lib/librumpclient/rumpclient.c
cvs rdiff -u -r1.1 -r1.2 src/lib/librumpclient/rumpclient.h
cvs rdiff -u -r1.28 -r1.29 src/lib/librumpuser/rumpuser_sp.c
cvs rdiff -u -r1.17 -r1.18 src/lib/librumpuser/sp_common.c

Please note that diffs are not public domain; they are subject to the
copyright notices on the relevant files.

Modified files:

Index: src/lib/librumpclient/rumpclient.c
diff -u src/lib/librumpclient/rumpclient.c:1.10 src/lib/librumpclient/rumpclient.c:1.11
--- src/lib/librumpclient/rumpclient.c:1.10	Thu Dec 16 17:05:44 2010
+++ src/lib/librumpclient/rumpclient.c	Wed Jan  5 17:14:50 2011
@@ -1,7 +1,7 @@
-/*      $NetBSD: rumpclient.c,v 1.10 2010/12/16 17:05:44 pooka Exp $	*/
+/*      $NetBSD: rumpclient.c,v 1.11 2011/01/05 17:14:50 pooka Exp $	*/
 
 /*
- * Copyright (c) 2010 Antti Kantee.  All Rights Reserved.
+ * Copyright (c) 2010, 2011 Antti Kantee.  All Rights Reserved.
  *
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions
@@ -45,6 +45,7 @@
 #include <fcntl.h>
 #include <poll.h>
 #include <pthread.h>
+#include <signal.h>
 #include <stdarg.h>
 #include <stdio.h>
 #include <stdlib.h>
@@ -55,7 +56,9 @@
 
 #include "sp_common.c"
 
-static struct spclient clispc;
+static struct spclient clispc = {
+	.spc_fd = -1,
+};
 
 static int
 syscall_req(struct spclient *spc, int sysnum,
@@ -87,21 +90,30 @@
 }
 
 static int
-handshake_req(struct spclient *spc)
+handshake_req(struct spclient *spc, uint32_t *auth, int cancel)
 {
+	struct handshake_fork rf;
 	struct rsp_hdr rhdr;
 	struct respwait rw;
 	int rv;
 
 	/* performs server handshake */
-	rhdr.rsp_len = sizeof(rhdr);
+	rhdr.rsp_len = sizeof(rhdr) + (auth ? sizeof(rf) : 0);
 	rhdr.rsp_class = RUMPSP_REQ;
 	rhdr.rsp_type = RUMPSP_HANDSHAKE;
-	rhdr.rsp_handshake = HANDSHAKE_GUEST;
+	if (auth)
+		rhdr.rsp_handshake = HANDSHAKE_FORK;
+	else
+		rhdr.rsp_handshake = HANDSHAKE_GUEST;
 
 	putwait(spc, &rw, &rhdr);
 	rv = dosend(spc, &rhdr, sizeof(rhdr));
-	if (rv != 0) {
+	if (auth) {
+		memcpy(rf.rf_auth, auth, AUTHLEN*sizeof(*auth));
+		rf.rf_cancel = cancel;
+		rv = dosend(spc, &rf, sizeof(rf));
+	}
+	if (rv != 0 || cancel) {
 		unputwait(spc, &rw);
 		return rv;
 	}
@@ -117,6 +129,31 @@
 }
 
 static int
+prefork_req(struct spclient *spc, void **resp)
+{
+	struct rsp_hdr rhdr;
+	struct respwait rw;
+	int rv;
+
+	rhdr.rsp_len = sizeof(rhdr);
+	rhdr.rsp_class = RUMPSP_REQ;
+	rhdr.rsp_type = RUMPSP_PREFORK;
+	rhdr.rsp_error = 0;
+
+
+	putwait(spc, &rw, &rhdr);
+	rv = dosend(spc, &rhdr, sizeof(rhdr));
+	if (rv != 0) {
+		unputwait(spc, &rw);
+		return rv;
+	}
+
+	rv = waitresp(spc, &rw);
+	*resp = rw.rw_data;
+	return rv;
+}
+
+static int
 send_copyin_resp(struct spclient *spc, uint64_t reqno, void *data, size_t dlen,
 	int wantstr)
 {
@@ -234,33 +271,30 @@
 	spcfreebuf(spc);
 }
 
-int
-rumpclient_init()
+static unsigned ptab_idx;
+static struct sockaddr *serv_sa;
+
+static int
+doconnect(void)
 {
 	char banner[MAXBANNER];
-	struct sockaddr *sap;
-	char *p;
-	unsigned idx;
+	int s, error;
 	ssize_t n;
-	int error, s;
 
-	if ((p = getenv("RUMP_SERVER")) == NULL) {
-		errno = ENOENT;
+	s = socket(parsetab[ptab_idx].domain, SOCK_STREAM, 0);
+	if (s == -1)
 		return -1;
-	}
 
-	if ((error = parseurl(p, &sap, &idx, 0)) != 0) {
+	if (connect(s, serv_sa, (socklen_t)serv_sa->sa_len) == -1) {
+		error = errno;
+		fprintf(stderr, "rump_sp: client connect failed\n");
 		errno = error;
 		return -1;
 	}
 
-	s = socket(parsetab[idx].domain, SOCK_STREAM, 0);
-	if (s == -1)
-		return -1;
-
-	if (connect(s, sap, (socklen_t)sap->sa_len) == -1) {
+	if ((error = parsetab[ptab_idx].connhook(s)) != 0) {
 		error = errno;
-		fprintf(stderr, "rump_sp: client connect failed\n");
+		fprintf(stderr, "rump_sp: connect hook failed\n");
 		errno = error;
 		return -1;
 	}
@@ -281,23 +315,91 @@
 
 	/* parse the banner some day */
 
-	if ((error = parsetab[idx].connhook(s)) != 0) {
-		error = errno;
-		fprintf(stderr, "rump_sp: connect hook failed\n");
+	clispc.spc_fd = s;
+	TAILQ_INIT(&clispc.spc_respwait);
+	pthread_mutex_init(&clispc.spc_mtx, NULL);
+	pthread_cond_init(&clispc.spc_cv, NULL);
+
+	return 0;
+}
+
+int
+rumpclient_init()
+{
+	char *p;
+	int error;
+
+	if ((p = getenv("RUMP_SERVER")) == NULL) {
+		errno = ENOENT;
+		return -1;
+	}
+
+	if ((error = parseurl(p, &serv_sa, &ptab_idx, 0)) != 0) {
 		errno = error;
 		return -1;
 	}
 
-	pthread_mutex_init(&clispc.spc_mtx, NULL);
-	pthread_cond_init(&clispc.spc_cv, NULL);
-	clispc.spc_fd = s;
-	TAILQ_INIT(&clispc.spc_respwait);
+	if (doconnect() == -1)
+		return -1;
+
+	error = handshake_req(&clispc, NULL, 0);
+	if (error) {
+		pthread_mutex_destroy(&clispc.spc_mtx);
+		pthread_cond_destroy(&clispc.spc_cv);
+		close(clispc.spc_fd);
+		errno = error;
+		return -1;
+	}
+
+	return 0;
+}
+
+struct rumpclient_fork {
+	uint32_t fork_auth[AUTHLEN];
+};
+
+struct rumpclient_fork *
+rumpclient_prefork(void)
+{
+	struct rumpclient_fork *rpf;
+	void *resp;
+	int rv;
+
+	rpf = malloc(sizeof(*rpf));
+	if (rpf == NULL)
+		return NULL;
+
+	if ((rv = prefork_req(&clispc, &resp)) != 0) {
+		free(rpf);
+		errno = rv;
+		return NULL;
+	}
+
+	memcpy(rpf->fork_auth, resp, sizeof(rpf->fork_auth));
+	free(resp);
 
-	error = handshake_req(&clispc);
+	return rpf;
+}
+
+int
+rumpclient_fork_init(struct rumpclient_fork *rpf)
+{
+	int error;
+
+	close(clispc.spc_fd);
+	memset(&clispc, 0, sizeof(clispc));
+	clispc.spc_fd = -1;
+
+	if (doconnect() == -1)
+		return -1;
+
+	error = handshake_req(&clispc, rpf->fork_auth, 0);
 	if (error) {
 		pthread_mutex_destroy(&clispc.spc_mtx);
 		pthread_cond_destroy(&clispc.spc_cv);
-		close(s);
+		errno = error;
+		return -1;
 	}
-	return error;
+
+	return 0;
 }

Index: src/lib/librumpclient/rumpclient.h
diff -u src/lib/librumpclient/rumpclient.h:1.1 src/lib/librumpclient/rumpclient.h:1.2
--- src/lib/librumpclient/rumpclient.h:1.1	Thu Nov  4 21:01:29 2010
+++ src/lib/librumpclient/rumpclient.h	Wed Jan  5 17:14:50 2011
@@ -1,4 +1,4 @@
-/*	$NetBSD: rumpclient.h,v 1.1 2010/11/04 21:01:29 pooka Exp $	*/
+/*	$NetBSD: rumpclient.h,v 1.2 2011/01/05 17:14:50 pooka Exp $	*/
 
 /*-
  * Copyright (c) 2010 Antti Kantee.  All Rights Reserved.
@@ -33,6 +33,10 @@
 int rumpclient_syscall(int, const void *, size_t, register_t *);
 int rumpclient_init(void);
 
+struct rumpclient_fork;
+struct rumpclient_fork *rumpclient_prefork(void);
+int			rumpclient_fork_init(struct rumpclient_fork *);
+
 __END_DECLS
 
 #endif /* _RUMP_RUMPCLIENT_H_ */

Index: src/lib/librumpuser/rumpuser_sp.c
diff -u src/lib/librumpuser/rumpuser_sp.c:1.28 src/lib/librumpuser/rumpuser_sp.c:1.29
--- src/lib/librumpuser/rumpuser_sp.c:1.28	Sun Jan  2 13:01:45 2011
+++ src/lib/librumpuser/rumpuser_sp.c	Wed Jan  5 17:14:50 2011
@@ -1,7 +1,7 @@
-/*      $NetBSD: rumpuser_sp.c,v 1.28 2011/01/02 13:01:45 pooka Exp $	*/
+/*      $NetBSD: rumpuser_sp.c,v 1.29 2011/01/05 17:14:50 pooka Exp $	*/
 
 /*
- * Copyright (c) 2010 Antti Kantee.  All Rights Reserved.
+ * Copyright (c) 2010, 2011 Antti Kantee.  All Rights Reserved.
  *
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions
@@ -35,7 +35,7 @@
  */
 
 #include <sys/cdefs.h>
-__RCSID("$NetBSD: rumpuser_sp.c,v 1.28 2011/01/02 13:01:45 pooka Exp $");
+__RCSID("$NetBSD: rumpuser_sp.c,v 1.29 2011/01/05 17:14:50 pooka Exp $");
 
 #include <sys/types.h>
 #include <sys/atomic.h>
@@ -85,7 +85,17 @@
 static char banner[MAXBANNER];
 
 #define PROTOMAJOR 0
-#define PROTOMINOR 0
+#define PROTOMINOR 1
+
+struct prefork {
+	uint32_t pf_auth[AUTHLEN];
+	struct lwp *pf_lwp;
+
+	LIST_ENTRY(prefork) pf_entries;		/* global list */
+	LIST_ENTRY(prefork) pf_spcentries;	/* linked from forking spc */
+};
+static LIST_HEAD(, prefork) preforks = LIST_HEAD_INITIALIZER(preforks);
+static pthread_mutex_t pfmtx;
 
 /*
  * Manual wrappers, since librump does not have access to the
@@ -244,6 +254,26 @@
 }
 
 static int
+send_prefork_resp(struct spclient *spc, uint64_t reqno, uint32_t *auth)
+{
+	struct rsp_hdr rhdr;
+	int rv;
+
+	rhdr.rsp_len = sizeof(rhdr) + AUTHLEN*sizeof(*auth);
+	rhdr.rsp_reqno = reqno;
+	rhdr.rsp_class = RUMPSP_RESP;
+	rhdr.rsp_type = RUMPSP_PREFORK;
+	rhdr.rsp_sysnum = 0;
+
+	sendlock(spc);
+	rv = dosend(spc, &rhdr, sizeof(rhdr));
+	rv = dosend(spc, auth, AUTHLEN*sizeof(*auth));
+	sendunlock(spc);
+
+	return rv;
+}
+
+static int
 copyin_req(struct spclient *spc, const void *remaddr, size_t *dlen,
 	int wantstr, void **resp)
 {
@@ -365,13 +395,15 @@
 	if (ref > 0)
 		return;
 
-	DPRINTF(("spcrelease: spc %p fd %d\n", spc, spc->spc_fd));
+	DPRINTF(("rump_sp: spcrelease: spc %p fd %d\n", spc, spc->spc_fd));
 
 	_DIAGASSERT(TAILQ_EMPTY(&spc->spc_respwait));
 	_DIAGASSERT(spc->spc_buf == NULL);
 
-	lwproc_switch(spc->spc_mainlwp);
-	lwproc_release();
+	if (spc->spc_mainlwp) {
+		lwproc_switch(spc->spc_mainlwp);
+		lwproc_release();
+	}
 	spc->spc_mainlwp = NULL;
 
 	close(spc->spc_fd);
@@ -464,26 +496,16 @@
 			break;
 	}
 
-	if (lwproc_rfork(&spclist[i], RUMP_RFCFDG) != 0) {
-		close(newfd);
-		return 0;
-	}
-
 	assert(i < MAXCLI);
 
 	pfdlist[i].fd = newfd;
 	spclist[i].spc_fd = newfd;
-	spclist[i].spc_mainlwp = lwproc_curlwp();
 	spclist[i].spc_istatus = SPCSTATUS_BUSY; /* dedicated receiver */
-	spclist[i].spc_pid = lwproc_getpid();
 	spclist[i].spc_refcnt = 1;
 
 	TAILQ_INIT(&spclist[i].spc_respwait);
 
-	DPRINTF(("rump_sp: added new connection fd %d at idx %u, pid %d\n",
-	    newfd, i, lwproc_getpid()));
-
-	lwproc_switch(NULL);
+	DPRINTF(("rump_sp: added new connection fd %d at idx %u\n", newfd, i));
 
 	return i;
 }
@@ -496,7 +518,7 @@
 
 	sysnum = (int)rhdr->rsp_sysnum;
 	DPRINTF(("rump_sp: handling syscall %d from client %d\n",
-	    sysnum, 0));
+	    sysnum, spc->spc_pid));
 
 	lwproc_newlwp(spc->spc_pid);
 	rv = rumpsyscall(sysnum, data, retval);
@@ -668,22 +690,150 @@
 {
 	struct sysbouncearg *sba;
 	pthread_t pt;
-	int retries, rv;
+	int retries, error, i;
 
 	if (__predict_false(spc->spc_state == SPCSTATE_NEW)) {
 		if (spc->spc_hdr.rsp_type != RUMPSP_HANDSHAKE) {
 			send_error_resp(spc, spc->spc_hdr.rsp_reqno, EAUTH);
+			shutdown(spc->spc_fd, SHUT_RDWR);
 			spcfreebuf(spc);
 			return;
 		}
 
-		rv = send_handshake_resp(spc, spc->spc_hdr.rsp_reqno, 0);
+		if (spc->spc_hdr.rsp_handshake == HANDSHAKE_GUEST) {
+			if ((error = lwproc_rfork(spc, RUMP_RFCFDG)) != 0) {
+				shutdown(spc->spc_fd, SHUT_RDWR);
+			}
+
+			spcfreebuf(spc);
+			if (error)
+				return;
+
+			spc->spc_mainlwp = lwproc_curlwp();
+
+			send_handshake_resp(spc, spc->spc_hdr.rsp_reqno, 0);
+		} else if (spc->spc_hdr.rsp_handshake == HANDSHAKE_FORK) {
+			struct lwp *tmpmain;
+			struct prefork *pf;
+			struct handshake_fork *rfp;
+			uint64_t reqno;
+			int cancel;
+
+			reqno = spc->spc_hdr.rsp_reqno;
+			if (spc->spc_off-HDRSZ != sizeof(*rfp)) {
+				send_error_resp(spc, reqno, EINVAL);
+				shutdown(spc->spc_fd, SHUT_RDWR);
+				spcfreebuf(spc);
+				return;
+			}
+
+			/*LINTED*/
+			rfp = (void *)spc->spc_buf;
+			cancel = rfp->rf_cancel;
+
+			pthread_mutex_lock(&pfmtx);
+			LIST_FOREACH(pf, &preforks, pf_entries) {
+				if (memcmp(rfp->rf_auth, pf->pf_auth,
+				    sizeof(rfp->rf_auth)) == 0) {
+					LIST_REMOVE(pf, pf_entries);
+					LIST_REMOVE(pf, pf_spcentries);
+					break;
+				}
+			}
+			pthread_mutex_lock(&pfmtx);
+			spcfreebuf(spc);
+
+			if (!pf) {
+				send_error_resp(spc, reqno, ESRCH);
+				shutdown(spc->spc_fd, SHUT_RDWR);
+				return;
+			}
+
+			tmpmain = pf->pf_lwp;
+			free(pf);
+			lwproc_switch(tmpmain);
+			if (cancel) {
+				lwproc_release();
+				shutdown(spc->spc_fd, SHUT_RDWR);
+				return;
+			}
+
+			/*
+			 * So, we forked already during "prefork" to save
+			 * the file descriptors from a parent exit
+			 * race condition.  But now we need to fork
+			 * a second time since the initial fork has
+			 * the wrong spc pointer.  (yea, optimize
+			 * interfaces some day if anyone cares)
+			 */
+			if ((error = lwproc_rfork(spc, 0)) != 0) {
+				send_error_resp(spc, reqno, error);
+				shutdown(spc->spc_fd, SHUT_RDWR);
+				lwproc_release();
+				return;
+			}
+			spc->spc_mainlwp = lwproc_curlwp();
+			lwproc_switch(tmpmain);
+			lwproc_release();
+			lwproc_switch(spc->spc_mainlwp);
+
+			send_handshake_resp(spc, reqno, 0);
+		}
+
+		spc->spc_pid = lwproc_getpid();
+
+		DPRINTF(("rump_sp: handshake for client %p complete, pid %d\n",
+		    spc, spc->spc_pid));
+		    
+		lwproc_switch(NULL);
+		spc->spc_state = SPCSTATE_RUNNING;
+		return;
+	}
+
+	if (__predict_false(spc->spc_hdr.rsp_type == RUMPSP_PREFORK)) {
+		struct prefork *pf;
+		uint64_t reqno;
+		uint32_t auth[AUTHLEN];
+
+		DPRINTF(("rump_sp: prefork handler executing for %p\n", spc));
+		reqno = spc->spc_hdr.rsp_reqno;
 		spcfreebuf(spc);
-		if (rv) {
-			shutdown(spc->spc_fd, SHUT_RDWR);
+
+		pf = malloc(sizeof(*pf));
+		if (pf == NULL) {
+			send_error_resp(spc, reqno, ENOMEM);
 			return;
 		}
-		spc->spc_state = SPCSTATE_RUNNING;
+
+		/*
+		 * Use client main lwp to fork.  this is never used by
+		 * worker threads (except if spc refcount goes to 0),
+		 * so we can safely use it here.
+		 */
+		lwproc_switch(spc->spc_mainlwp);
+		if ((error = lwproc_rfork(spc, RUMP_RFFDG)) != 0) {
+			DPRINTF(("rump_sp: fork failed: %d (%p)\n",error, spc));
+			send_error_resp(spc, reqno, error);
+			lwproc_switch(NULL);
+			free(pf);
+			return;
+		}
+
+		/* Ok, we have a new process context and a new curlwp */
+		for (i = 0; i < AUTHLEN; i++) {
+			pf->pf_auth[i] = auth[i] = arc4random();
+		}
+		pf->pf_lwp = lwproc_curlwp();
+		lwproc_switch(NULL);
+
+		pthread_mutex_lock(&pfmtx);
+		LIST_INSERT_HEAD(&preforks, pf, pf_entries);
+		LIST_INSERT_HEAD(&spc->spc_pflist, pf, pf_spcentries);
+		pthread_mutex_unlock(&pfmtx);
+
+		DPRINTF(("rump_sp: prefork handler success %p\n", spc));
+
+		send_prefork_resp(spc, reqno, auth);
 		return;
 	}
 

Index: src/lib/librumpuser/sp_common.c
diff -u src/lib/librumpuser/sp_common.c:1.17 src/lib/librumpuser/sp_common.c:1.18
--- src/lib/librumpuser/sp_common.c:1.17	Thu Dec 16 17:05:44 2010
+++ src/lib/librumpuser/sp_common.c	Wed Jan  5 17:14:50 2011
@@ -1,7 +1,7 @@
-/*      $NetBSD: sp_common.c,v 1.17 2010/12/16 17:05:44 pooka Exp $	*/
+/*      $NetBSD: sp_common.c,v 1.18 2011/01/05 17:14:50 pooka Exp $	*/
 
 /*
- * Copyright (c) 2010 Antti Kantee.  All Rights Reserved.
+ * Copyright (c) 2010, 2011 Antti Kantee.  All Rights Reserved.
  *
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions
@@ -80,9 +80,12 @@
 	RUMPSP_SYSCALL,
 	RUMPSP_COPYIN, RUMPSP_COPYINSTR,
 	RUMPSP_COPYOUT, RUMPSP_COPYOUTSTR,
-	RUMPSP_ANONMMAP };
+	RUMPSP_ANONMMAP,
+	RUMPSP_PREFORK };
 
-enum { HANDSHAKE_GUEST, HANDSHAKE_AUTH }; /* more to come */
+enum { HANDSHAKE_GUEST, HANDSHAKE_AUTH, HANDSHAKE_FORK };
+
+#define AUTHLEN 4 /* 128bit fork auth */
 
 struct rsp_hdr {
 	uint64_t rsp_len;
@@ -123,6 +126,11 @@
 	register_t rsys_retval[2];
 };
 
+struct handshake_fork {
+	uint32_t rf_auth[4];
+	int rf_cancel;
+};
+
 struct respwait {
 	uint64_t rw_reqno;
 	void *rw_data;
@@ -134,6 +142,7 @@
 	TAILQ_ENTRY(respwait) rw_entries;
 };
 
+struct prefork;
 struct spclient {
 	int spc_fd;
 	int spc_refcnt;
@@ -157,6 +166,8 @@
 
 	uint64_t spc_nextreq;
 	int spc_ostatus, spc_istatus;
+
+	LIST_HEAD(, prefork) spc_pflist;
 };
 #define SPCSTATUS_FREE 0
 #define SPCSTATUS_BUSY 1
@@ -252,11 +263,11 @@
 
 		n = send(fd, sdata + sent, dlen - sent, MSG_NOSIGNAL);
 		if (n == 0) {
-			return EFAULT;
+			return ENOTCONN;
 		}
 		if (n == -1)  {
 			if (errno != EAGAIN)
-				return EFAULT;
+				return errno;
 			continue;
 		}
 		sent += n;
@@ -305,6 +316,7 @@
 	if (rw == NULL) {
 		DPRINTF(("no waiter found, invalid reqno %" PRIu64 "?\n",
 		    spc->spc_hdr.rsp_reqno));
+		spcfreebuf(spc);
 		return;
 	}
 	DPRINTF(("rump_sp: client %p woke up waiter at %p\n", spc, rw));

Reply via email to