Module Name:    src
Committed By:   rmind
Date:           Mon Jul 13 02:37:13 UTC 2009

Modified Files:
        src/sys/kern: sys_mqueue.c
        src/sys/sys: mqueue.h

Log Message:
- Make insertion to message queue O(1) by using bitmap and array.  However,
  mq_prio_max is dynamic, and sorted list is used for custom setup, when
  user manually sets higher priority range.
- Cache mq->mq_attrib in some places.  Change msg_ptr type to uint8_t.
- Update copyright, misc.


To generate a diff of this commit:
cvs rdiff -u -r1.21 -r1.22 src/sys/kern/sys_mqueue.c
cvs rdiff -u -r1.7 -r1.8 src/sys/sys/mqueue.h

Please note that diffs are not public domain; they are subject to the
copyright notices on the relevant files.

Modified files:

Index: src/sys/kern/sys_mqueue.c
diff -u src/sys/kern/sys_mqueue.c:1.21 src/sys/kern/sys_mqueue.c:1.22
--- src/sys/kern/sys_mqueue.c:1.21	Mon Jul 13 00:41:08 2009
+++ src/sys/kern/sys_mqueue.c	Mon Jul 13 02:37:12 2009
@@ -1,7 +1,7 @@
-/*	$NetBSD: sys_mqueue.c,v 1.21 2009/07/13 00:41:08 rmind Exp $	*/
+/*	$NetBSD: sys_mqueue.c,v 1.22 2009/07/13 02:37:12 rmind Exp $	*/
 
 /*
- * Copyright (c) 2007, 2008 Mindaugas Rasiukevicius <rmind at NetBSD org>
+ * Copyright (c) 2007-2009 Mindaugas Rasiukevicius <rmind at NetBSD org>
  * All rights reserved.
  * 
  * Redistribution and use in source and binary forms, with or without
@@ -37,12 +37,12 @@
  * its members are protected by mqueue::mq_mtx.
  * 
  * Lock order:
- * 	mqlist_mtx
- * 	  -> mqueue::mq_mtx
+ * 	mqlist_mtx ->
+ * 		mqueue::mq_mtx
  */
 
 #include <sys/cdefs.h>
-__KERNEL_RCSID(0, "$NetBSD: sys_mqueue.c,v 1.21 2009/07/13 00:41:08 rmind Exp $");
+__KERNEL_RCSID(0, "$NetBSD: sys_mqueue.c,v 1.22 2009/07/13 02:37:12 rmind Exp $");
 
 #include <sys/param.h>
 #include <sys/types.h>
@@ -69,7 +69,6 @@
 #include <sys/syscallargs.h>
 #include <sys/systm.h>
 #include <sys/unistd.h>
-#include <sys/vnode.h>
 
 #include <miscfs/genfs/genfs.h>
 
@@ -82,8 +81,7 @@
 
 static kmutex_t			mqlist_mtx;
 static pool_cache_t		mqmsg_cache;
-static LIST_HEAD(, mqueue)	mqueue_head =
-	LIST_HEAD_INITIALIZER(mqueue_head);
+static LIST_HEAD(, mqueue)	mqueue_head;
 
 static int	mq_poll_fop(file_t *, int);
 static int	mq_stat_fop(file_t *, struct stat *);
@@ -111,6 +109,7 @@
 	mqmsg_cache = pool_cache_init(MQ_DEF_MSGSIZE, coherency_unit,
 	    0, 0, "mqmsgpl", NULL, IPL_NONE, NULL, NULL, NULL);
 	mutex_init(&mqlist_mtx, MUTEX_DEFAULT, IPL_NONE);
+	LIST_INIT(&mqueue_head);
 }
 
 /*
@@ -120,10 +119,11 @@
 mqueue_freemsg(struct mq_msg *msg, const size_t size)
 {
 
-	if (size > MQ_DEF_MSGSIZE)
+	if (size > MQ_DEF_MSGSIZE) {
 		kmem_free(msg, size);
-	else
+	} else {
 		pool_cache_put(mqmsg_cache, msg);
+	}
 }
 
 /*
@@ -133,10 +133,15 @@
 mqueue_destroy(struct mqueue *mq)
 {
 	struct mq_msg *msg;
+	size_t msz;
+	u_int i;
 
-	while ((msg = TAILQ_FIRST(&mq->mq_head)) != NULL) {
-		TAILQ_REMOVE(&mq->mq_head, msg, msg_queue);
-		mqueue_freemsg(msg, sizeof(struct mq_msg) + msg->msg_len);
+	for (i = 0; i < (MQ_PQSIZE + 1); i++) {
+		while ((msg = TAILQ_FIRST(&mq->mq_head[i])) != NULL) {
+			TAILQ_REMOVE(&mq->mq_head[i], msg, msg_queue);
+			msz = sizeof(struct mq_msg) + msg->msg_len;
+			mqueue_freemsg(msg, msz);
+		}
 	}
 	seldestroy(&mq->mq_rsel);
 	seldestroy(&mq->mq_wsel);
@@ -193,6 +198,27 @@
 }
 
 /*
+ * mqueue_linear_insert: perform linear insert according to the message
+ * priority into the reserved queue (note MQ_PQSIZE + 1).  Reserved queue
+ * is a sorted list used only when mq_prio_max is increased via sysctl.
+ */
+static inline void
+mqueue_linear_insert(struct mqueue *mq, struct mq_msg *msg)
+{
+	struct mq_msg *mit;
+
+	TAILQ_FOREACH(mit, &mq->mq_head[MQ_PQSIZE], msg_queue) {
+		if (msg->msg_prio > mit->msg_prio)
+			break;
+	}
+	if (mit == NULL) {
+		TAILQ_INSERT_TAIL(&mq->mq_head[MQ_PQSIZE], msg, msg_queue);
+	} else {
+		TAILQ_INSERT_BEFORE(mit, msg, msg_queue);
+	}
+}
+
+/*
  * Converter from struct timespec to the ticks.
  * Used by mq_timedreceive(), mq_timedsend().
  */
@@ -216,7 +242,7 @@
 {
 	struct mqueue *mq = fp->f_data;
 
-	(void)memset(st, 0, sizeof(*st));
+	memset(st, 0, sizeof(*st));
 
 	mutex_enter(&mq->mq_mtx);
 	st->st_mode = mq->mq_mode;
@@ -236,19 +262,21 @@
 mq_poll_fop(file_t *fp, int events)
 {
 	struct mqueue *mq = fp->f_data;
+	struct mq_attr *mqattr;
 	int revents = 0;
 
 	mutex_enter(&mq->mq_mtx);
+	mqattr = &mq->mq_attrib;
 	if (events & (POLLIN | POLLRDNORM)) {
 		/* Ready for receiving, if there are messages in the queue */
-		if (mq->mq_attrib.mq_curmsgs)
+		if (mqattr->mq_curmsgs)
 			revents |= (POLLIN | POLLRDNORM);
 		else
 			selrecord(curlwp, &mq->mq_rsel);
 	}
 	if (events & (POLLOUT | POLLWRNORM)) {
 		/* Ready for sending, if the message queue is not full */
-		if (mq->mq_attrib.mq_curmsgs < mq->mq_attrib.mq_maxmsg)
+		if (mqattr->mq_curmsgs < mqattr->mq_maxmsg)
 			revents |= (POLLOUT | POLLWRNORM);
 		else
 			selrecord(curlwp, &mq->mq_wsel);
@@ -298,6 +326,7 @@
 static int
 mqueue_access(struct mqueue *mq, mode_t mode, kauth_cred_t cred)
 {
+
 	if (genfs_can_access(VNON, mq->mq_mode, mq->mq_euid,
 	    mq->mq_egid, mode, cred)) {
 		return EACCES;
@@ -339,6 +368,7 @@
 	if (oflag & O_CREAT) {
 		struct cwdinfo *cwdi = p->p_cwdi;
 		struct mq_attr attr;
+		u_int i;
 
 		/* Check the limit */
 		if (p->p_mqueue_cnt == mq_open_max) {
@@ -355,7 +385,7 @@
 		/* Check for mqueue attributes */
 		if (SCARG(uap, attr)) {
 			error = copyin(SCARG(uap, attr), &attr,
-				sizeof(struct mq_attr));
+			    sizeof(struct mq_attr));
 			if (error) {
 				kmem_free(name, MQ_NAMELEN);
 				return error;
@@ -382,7 +412,9 @@
 		mutex_init(&mq_new->mq_mtx, MUTEX_DEFAULT, IPL_NONE);
 		cv_init(&mq_new->mq_send_cv, "mqsendcv");
 		cv_init(&mq_new->mq_recv_cv, "mqrecvcv");
-		TAILQ_INIT(&mq_new->mq_head);
+		for (i = 0; i < (MQ_PQSIZE + 1); i++) {
+			TAILQ_INIT(&mq_new->mq_head[i]);
+		}
 		selinit(&mq_new->mq_rsel);
 		selinit(&mq_new->mq_wsel);
 
@@ -424,6 +456,7 @@
 			error = EACCES;
 			goto exit;
 		}
+
 		/* Fail if O_EXCL is set, and mqueue already exists */
 		if ((oflag & O_CREAT) && (oflag & O_EXCL)) {
 			error = EEXIST;
@@ -503,12 +536,14 @@
  * Primary mq_receive1() function.
  */
 int
-mq_receive1(struct lwp *l, mqd_t mqdes, void *msg_ptr, size_t msg_len,
+mq_receive1(lwp_t *l, mqd_t mqdes, void *msg_ptr, size_t msg_len,
     unsigned *msg_prio, int t, ssize_t *mlen)
 {
 	file_t *fp = NULL;
 	struct mqueue *mq;
 	struct mq_msg *msg = NULL;
+	struct mq_attr *mqattr;
+	u_int prio;
 	int error;
 
 	/* Get the message queue */
@@ -522,16 +557,17 @@
 		goto error;
 	}
 	getnanotime(&mq->mq_atime);
+	mqattr = &mq->mq_attrib;
 
 	/* Check the message size limits */
-	if (msg_len < mq->mq_attrib.mq_msgsize) {
+	if (msg_len < mqattr->mq_msgsize) {
 		error = EMSGSIZE;
 		goto error;
 	}
 
 	/* Check if queue is empty */
-	while (TAILQ_EMPTY(&mq->mq_head)) {
-		if (mq->mq_attrib.mq_flags & O_NONBLOCK) {
+	while (mqattr->mq_curmsgs == 0) {
+		if (mqattr->mq_flags & O_NONBLOCK) {
 			error = EAGAIN;
 			goto error;
 		}
@@ -543,22 +579,35 @@
 		 * Block until someone sends the message.
 		 * While doing this, notification should not be sent.
 		 */
-		mq->mq_attrib.mq_flags |= MQ_RECEIVE;
+		mqattr->mq_flags |= MQ_RECEIVE;
 		error = cv_timedwait_sig(&mq->mq_send_cv, &mq->mq_mtx, t);
-		mq->mq_attrib.mq_flags &= ~MQ_RECEIVE;
-		if (error || (mq->mq_attrib.mq_flags & MQ_UNLINK)) {
+		mqattr->mq_flags &= ~MQ_RECEIVE;
+		if (error || (mqattr->mq_flags & MQ_UNLINK)) {
 			error = (error == EWOULDBLOCK) ? ETIMEDOUT : EINTR;
 			goto error;
 		}
 	}
 
-	/* Remove the message from the queue */
-	msg = TAILQ_FIRST(&mq->mq_head);
+	/* Find the highest priority message */
+	prio = ffs(mq->mq_bitmap);
+	if (__predict_false(prio == 0)) {
+		/* Must be in reserved queue then */
+		prio = MQ_PQSIZE;
+	}
+
+	/* Remove it from the queue */
+	msg = TAILQ_FIRST(&mq->mq_head[prio]);
 	KASSERT(msg != NULL);
-	TAILQ_REMOVE(&mq->mq_head, msg, msg_queue);
+	TAILQ_REMOVE(&mq->mq_head[prio], msg, msg_queue);
+
+	/* Unmark the bit, if last message */
+	if (__predict_true(prio != MQ_PQSIZE) &&
+	    TAILQ_EMPTY(&mq->mq_head[prio])) {
+		mq->mq_bitmap &= ~(MQ_PQMSB >> prio);
+	}
 
 	/* Decrement the counter and signal waiter, if any */
-	mq->mq_attrib.mq_curmsgs--;
+	mqattr->mq_curmsgs--;
 	cv_signal(&mq->mq_recv_cv);
 
 	/* Ready for sending now */
@@ -643,12 +692,13 @@
  * Primary mq_send1() function.
  */
 int
-mq_send1(struct lwp *l, mqd_t mqdes, const char *msg_ptr, size_t msg_len,
+mq_send1(lwp_t *l, mqd_t mqdes, const char *msg_ptr, size_t msg_len,
     unsigned msg_prio, int t)
 {
 	file_t *fp = NULL;
 	struct mqueue *mq;
-	struct mq_msg *msg, *pos_msg;
+	struct mq_msg *msg;
+	struct mq_attr *mqattr;
 	struct proc *notify = NULL;
 	ksiginfo_t ksi;
 	size_t size;
@@ -663,10 +713,11 @@
 	if (size > mq_max_msgsize)
 		return EMSGSIZE;
 
-	if (size > MQ_DEF_MSGSIZE)
+	if (size > MQ_DEF_MSGSIZE) {
 		msg = kmem_alloc(size, KM_SLEEP);
-	else
+	} else {
 		msg = pool_cache_get(mqmsg_cache, PR_WAITOK);
+	}
 
 	/* Get the data from user-space */
 	error = copyin(msg_ptr, msg->msg_ptr, msg_len);
@@ -689,16 +740,17 @@
 		goto error;
 	}
 	getnanotime(&mq->mq_mtime);
+	mqattr = &mq->mq_attrib;
 
 	/* Check the message size limit */
-	if (msg_len <= 0 || msg_len > mq->mq_attrib.mq_msgsize) {
+	if (msg_len <= 0 || msg_len > mqattr->mq_msgsize) {
 		error = EMSGSIZE;
 		goto error;
 	}
 
 	/* Check if queue is full */
-	while (mq->mq_attrib.mq_curmsgs >= mq->mq_attrib.mq_maxmsg) {
-		if (mq->mq_attrib.mq_flags & O_NONBLOCK) {
+	while (mqattr->mq_curmsgs >= mqattr->mq_maxmsg) {
+		if (mqattr->mq_flags & O_NONBLOCK) {
 			error = EAGAIN;
 			goto error;
 		}
@@ -708,25 +760,24 @@
 		}
 		/* Block until queue becomes available */
 		error = cv_timedwait_sig(&mq->mq_recv_cv, &mq->mq_mtx, t);
-		if (error || (mq->mq_attrib.mq_flags & MQ_UNLINK)) {
+		if (error || (mqattr->mq_flags & MQ_UNLINK)) {
 			error = (error == EWOULDBLOCK) ? ETIMEDOUT : error;
 			goto error;
 		}
 	}
-	KASSERT(mq->mq_attrib.mq_curmsgs < mq->mq_attrib.mq_maxmsg);
+	KASSERT(mqattr->mq_curmsgs < mqattr->mq_maxmsg);
 
 	/* Insert message into the queue, according to the priority */
-	TAILQ_FOREACH(pos_msg, &mq->mq_head, msg_queue)
-		if (msg->msg_prio > pos_msg->msg_prio)
-			break;
-	if (pos_msg == NULL)
-		TAILQ_INSERT_TAIL(&mq->mq_head, msg, msg_queue);
-	else
-		TAILQ_INSERT_BEFORE(pos_msg, msg, msg_queue);
+	if (__predict_true(msg_prio < MQ_PQSIZE)) {
+		TAILQ_INSERT_TAIL(&mq->mq_head[msg_prio], msg, msg_queue);
+		mq->mq_bitmap |= (MQ_PQMSB >> msg_prio);
+	} else {
+		mqueue_linear_insert(mq, msg);
+	}
 
 	/* Check for the notify */
-	if (mq->mq_attrib.mq_curmsgs == 0 && mq->mq_notify_proc &&
-	    (mq->mq_attrib.mq_flags & MQ_RECEIVE) == 0) {
+	if (mqattr->mq_curmsgs == 0 && mq->mq_notify_proc &&
+	    (mqattr->mq_flags & MQ_RECEIVE) == 0) {
 		/* Initialize the signal */
 		KSI_INIT(&ksi);
 		ksi.ksi_signo = mq->mq_sig_notify.sigev_signo;
@@ -738,7 +789,7 @@
 	}
 
 	/* Increment the counter and signal waiter, if any */
-	mq->mq_attrib.mq_curmsgs++;
+	mqattr->mq_curmsgs++;
 	cv_signal(&mq->mq_send_cv);
 
 	/* Ready for receiving now */
@@ -755,7 +806,6 @@
 		kpsignal(notify, &ksi, NULL);
 		mutex_exit(proc_lock);
 	}
-
 	return error;
 }
 
@@ -901,8 +951,9 @@
 	mq = fp->f_data;
 
 	/* Copy the old attributes, if needed */
-	if (SCARG(uap, omqstat))
+	if (SCARG(uap, omqstat)) {
 		memcpy(&attr, &mq->mq_attrib, sizeof(struct mq_attr));
+	}
 
 	/* Ignore everything, except O_NONBLOCK */
 	if (nonblock)
@@ -990,7 +1041,7 @@
 }
 
 /*
- * SysCtl.
+ * System control nodes.
  */
 
 SYSCTL_SETUP(sysctl_mqueue_setup, "sysctl mqueue setup")

Index: src/sys/sys/mqueue.h
diff -u src/sys/sys/mqueue.h:1.7 src/sys/sys/mqueue.h:1.8
--- src/sys/sys/mqueue.h:1.7	Sat Apr 11 15:47:34 2009
+++ src/sys/sys/mqueue.h	Mon Jul 13 02:37:13 2009
@@ -1,7 +1,7 @@
-/*	$NetBSD: mqueue.h,v 1.7 2009/04/11 15:47:34 christos Exp $	*/
+/*	$NetBSD: mqueue.h,v 1.8 2009/07/13 02:37:13 rmind Exp $	*/
 
 /*
- * Copyright (c) 2007, Mindaugas Rasiukevicius <rmind at NetBSD org>
+ * Copyright (c) 2007-2009 Mindaugas Rasiukevicius <rmind at NetBSD org>
  * All rights reserved.
  * 
  * Redistribution and use in source and binary forms, with or without
@@ -67,6 +67,10 @@
 /* Default size of the message */
 #define	MQ_DEF_MSGSIZE		1024
 
+/* Size/bits and MSB for the queue array */
+#define	MQ_PQSIZE		32
+#define	MQ_PQMSB		0x80000000U
+
 /* Structure of the message queue */
 struct mqueue {
 	char			mq_name[MQ_NAMELEN];
@@ -83,11 +87,13 @@
 	mode_t			mq_mode;
 	uid_t			mq_euid;
 	gid_t			mq_egid;
-	/* Reference counter, head of the message queue */
+	/* Reference counter, queue array and bitmap */
 	u_int			mq_refcnt;
-	TAILQ_HEAD(, mq_msg)	mq_head;
+	TAILQ_HEAD(, mq_msg)	mq_head[MQ_PQSIZE + 1];
+	uint32_t		mq_bitmap;
 	/* Entry of the global list */
 	LIST_ENTRY(mqueue)	mq_list;
+	/* Time stamps */
 	struct timespec		mq_atime;
 	struct timespec		mq_mtime;
 	struct timespec		mq_btime;
@@ -98,16 +104,15 @@
 	TAILQ_ENTRY(mq_msg)	msg_queue;
 	size_t			msg_len;
 	u_int			msg_prio;
-	int8_t			msg_ptr[1];
+	uint8_t			msg_ptr[1];
 };
 
 /* Prototypes */
 void	mqueue_sysinit(void);
 void	mqueue_print_list(void (*pr)(const char *, ...));
 int	abstimeout2timo(struct timespec *, int *);
-int	mq_send1(struct lwp *, mqd_t, const char *, size_t, unsigned, int);
-int	mq_receive1(struct lwp *, mqd_t, void *, size_t, unsigned *, int,
-    ssize_t *);
+int	mq_send1(lwp_t *, mqd_t, const char *, size_t, unsigned, int);
+int	mq_receive1(lwp_t *, mqd_t, void *, size_t, unsigned *, int, ssize_t *);
 
 #endif	/* _KERNEL */
 

Reply via email to