Module Name: src Committed By: rmind Date: Mon Jul 13 02:37:13 UTC 2009
Modified Files: src/sys/kern: sys_mqueue.c src/sys/sys: mqueue.h Log Message: - Make insertion to message queue O(1) by using bitmap and array. However, mq_prio_max is dynamic, and sorted list is used for custom setup, when user manually sets higher priority range. - Cache mq->mq_attrib in some places. Change msg_ptr type to uint8_t. - Update copyright, misc. To generate a diff of this commit: cvs rdiff -u -r1.21 -r1.22 src/sys/kern/sys_mqueue.c cvs rdiff -u -r1.7 -r1.8 src/sys/sys/mqueue.h Please note that diffs are not public domain; they are subject to the copyright notices on the relevant files.
Modified files: Index: src/sys/kern/sys_mqueue.c diff -u src/sys/kern/sys_mqueue.c:1.21 src/sys/kern/sys_mqueue.c:1.22 --- src/sys/kern/sys_mqueue.c:1.21 Mon Jul 13 00:41:08 2009 +++ src/sys/kern/sys_mqueue.c Mon Jul 13 02:37:12 2009 @@ -1,7 +1,7 @@ -/* $NetBSD: sys_mqueue.c,v 1.21 2009/07/13 00:41:08 rmind Exp $ */ +/* $NetBSD: sys_mqueue.c,v 1.22 2009/07/13 02:37:12 rmind Exp $ */ /* - * Copyright (c) 2007, 2008 Mindaugas Rasiukevicius <rmind at NetBSD org> + * Copyright (c) 2007-2009 Mindaugas Rasiukevicius <rmind at NetBSD org> * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,12 +37,12 @@ * its members are protected by mqueue::mq_mtx. * * Lock order: - * mqlist_mtx - * -> mqueue::mq_mtx + * mqlist_mtx -> + * mqueue::mq_mtx */ #include <sys/cdefs.h> -__KERNEL_RCSID(0, "$NetBSD: sys_mqueue.c,v 1.21 2009/07/13 00:41:08 rmind Exp $"); +__KERNEL_RCSID(0, "$NetBSD: sys_mqueue.c,v 1.22 2009/07/13 02:37:12 rmind Exp $"); #include <sys/param.h> #include <sys/types.h> @@ -69,7 +69,6 @@ #include <sys/syscallargs.h> #include <sys/systm.h> #include <sys/unistd.h> -#include <sys/vnode.h> #include <miscfs/genfs/genfs.h> @@ -82,8 +81,7 @@ static kmutex_t mqlist_mtx; static pool_cache_t mqmsg_cache; -static LIST_HEAD(, mqueue) mqueue_head = - LIST_HEAD_INITIALIZER(mqueue_head); +static LIST_HEAD(, mqueue) mqueue_head; static int mq_poll_fop(file_t *, int); static int mq_stat_fop(file_t *, struct stat *); @@ -111,6 +109,7 @@ mqmsg_cache = pool_cache_init(MQ_DEF_MSGSIZE, coherency_unit, 0, 0, "mqmsgpl", NULL, IPL_NONE, NULL, NULL, NULL); mutex_init(&mqlist_mtx, MUTEX_DEFAULT, IPL_NONE); + LIST_INIT(&mqueue_head); } /* @@ -120,10 +119,11 @@ mqueue_freemsg(struct mq_msg *msg, const size_t size) { - if (size > MQ_DEF_MSGSIZE) + if (size > MQ_DEF_MSGSIZE) { kmem_free(msg, size); - else + } else { pool_cache_put(mqmsg_cache, msg); + } } /* @@ -133,10 +133,15 @@ mqueue_destroy(struct mqueue *mq) { struct mq_msg *msg; + size_t msz; + u_int i; - while ((msg = TAILQ_FIRST(&mq->mq_head)) != NULL) { - TAILQ_REMOVE(&mq->mq_head, msg, msg_queue); - mqueue_freemsg(msg, sizeof(struct mq_msg) + msg->msg_len); + for (i = 0; i < (MQ_PQSIZE + 1); i++) { + while ((msg = TAILQ_FIRST(&mq->mq_head[i])) != NULL) { + TAILQ_REMOVE(&mq->mq_head[i], msg, msg_queue); + msz = sizeof(struct mq_msg) + msg->msg_len; + mqueue_freemsg(msg, msz); + } } seldestroy(&mq->mq_rsel); seldestroy(&mq->mq_wsel); @@ -193,6 +198,27 @@ } /* + * mqueue_linear_insert: perform linear insert according to the message + * priority into the reserved queue (note MQ_PQSIZE + 1). Reserved queue + * is a sorted list used only when mq_prio_max is increased via sysctl. + */ +static inline void +mqueue_linear_insert(struct mqueue *mq, struct mq_msg *msg) +{ + struct mq_msg *mit; + + TAILQ_FOREACH(mit, &mq->mq_head[MQ_PQSIZE], msg_queue) { + if (msg->msg_prio > mit->msg_prio) + break; + } + if (mit == NULL) { + TAILQ_INSERT_TAIL(&mq->mq_head[MQ_PQSIZE], msg, msg_queue); + } else { + TAILQ_INSERT_BEFORE(mit, msg, msg_queue); + } +} + +/* * Converter from struct timespec to the ticks. * Used by mq_timedreceive(), mq_timedsend(). */ @@ -216,7 +242,7 @@ { struct mqueue *mq = fp->f_data; - (void)memset(st, 0, sizeof(*st)); + memset(st, 0, sizeof(*st)); mutex_enter(&mq->mq_mtx); st->st_mode = mq->mq_mode; @@ -236,19 +262,21 @@ mq_poll_fop(file_t *fp, int events) { struct mqueue *mq = fp->f_data; + struct mq_attr *mqattr; int revents = 0; mutex_enter(&mq->mq_mtx); + mqattr = &mq->mq_attrib; if (events & (POLLIN | POLLRDNORM)) { /* Ready for receiving, if there are messages in the queue */ - if (mq->mq_attrib.mq_curmsgs) + if (mqattr->mq_curmsgs) revents |= (POLLIN | POLLRDNORM); else selrecord(curlwp, &mq->mq_rsel); } if (events & (POLLOUT | POLLWRNORM)) { /* Ready for sending, if the message queue is not full */ - if (mq->mq_attrib.mq_curmsgs < mq->mq_attrib.mq_maxmsg) + if (mqattr->mq_curmsgs < mqattr->mq_maxmsg) revents |= (POLLOUT | POLLWRNORM); else selrecord(curlwp, &mq->mq_wsel); @@ -298,6 +326,7 @@ static int mqueue_access(struct mqueue *mq, mode_t mode, kauth_cred_t cred) { + if (genfs_can_access(VNON, mq->mq_mode, mq->mq_euid, mq->mq_egid, mode, cred)) { return EACCES; @@ -339,6 +368,7 @@ if (oflag & O_CREAT) { struct cwdinfo *cwdi = p->p_cwdi; struct mq_attr attr; + u_int i; /* Check the limit */ if (p->p_mqueue_cnt == mq_open_max) { @@ -355,7 +385,7 @@ /* Check for mqueue attributes */ if (SCARG(uap, attr)) { error = copyin(SCARG(uap, attr), &attr, - sizeof(struct mq_attr)); + sizeof(struct mq_attr)); if (error) { kmem_free(name, MQ_NAMELEN); return error; @@ -382,7 +412,9 @@ mutex_init(&mq_new->mq_mtx, MUTEX_DEFAULT, IPL_NONE); cv_init(&mq_new->mq_send_cv, "mqsendcv"); cv_init(&mq_new->mq_recv_cv, "mqrecvcv"); - TAILQ_INIT(&mq_new->mq_head); + for (i = 0; i < (MQ_PQSIZE + 1); i++) { + TAILQ_INIT(&mq_new->mq_head[i]); + } selinit(&mq_new->mq_rsel); selinit(&mq_new->mq_wsel); @@ -424,6 +456,7 @@ error = EACCES; goto exit; } + /* Fail if O_EXCL is set, and mqueue already exists */ if ((oflag & O_CREAT) && (oflag & O_EXCL)) { error = EEXIST; @@ -503,12 +536,14 @@ * Primary mq_receive1() function. */ int -mq_receive1(struct lwp *l, mqd_t mqdes, void *msg_ptr, size_t msg_len, +mq_receive1(lwp_t *l, mqd_t mqdes, void *msg_ptr, size_t msg_len, unsigned *msg_prio, int t, ssize_t *mlen) { file_t *fp = NULL; struct mqueue *mq; struct mq_msg *msg = NULL; + struct mq_attr *mqattr; + u_int prio; int error; /* Get the message queue */ @@ -522,16 +557,17 @@ goto error; } getnanotime(&mq->mq_atime); + mqattr = &mq->mq_attrib; /* Check the message size limits */ - if (msg_len < mq->mq_attrib.mq_msgsize) { + if (msg_len < mqattr->mq_msgsize) { error = EMSGSIZE; goto error; } /* Check if queue is empty */ - while (TAILQ_EMPTY(&mq->mq_head)) { - if (mq->mq_attrib.mq_flags & O_NONBLOCK) { + while (mqattr->mq_curmsgs == 0) { + if (mqattr->mq_flags & O_NONBLOCK) { error = EAGAIN; goto error; } @@ -543,22 +579,35 @@ * Block until someone sends the message. * While doing this, notification should not be sent. */ - mq->mq_attrib.mq_flags |= MQ_RECEIVE; + mqattr->mq_flags |= MQ_RECEIVE; error = cv_timedwait_sig(&mq->mq_send_cv, &mq->mq_mtx, t); - mq->mq_attrib.mq_flags &= ~MQ_RECEIVE; - if (error || (mq->mq_attrib.mq_flags & MQ_UNLINK)) { + mqattr->mq_flags &= ~MQ_RECEIVE; + if (error || (mqattr->mq_flags & MQ_UNLINK)) { error = (error == EWOULDBLOCK) ? ETIMEDOUT : EINTR; goto error; } } - /* Remove the message from the queue */ - msg = TAILQ_FIRST(&mq->mq_head); + /* Find the highest priority message */ + prio = ffs(mq->mq_bitmap); + if (__predict_false(prio == 0)) { + /* Must be in reserved queue then */ + prio = MQ_PQSIZE; + } + + /* Remove it from the queue */ + msg = TAILQ_FIRST(&mq->mq_head[prio]); KASSERT(msg != NULL); - TAILQ_REMOVE(&mq->mq_head, msg, msg_queue); + TAILQ_REMOVE(&mq->mq_head[prio], msg, msg_queue); + + /* Unmark the bit, if last message */ + if (__predict_true(prio != MQ_PQSIZE) && + TAILQ_EMPTY(&mq->mq_head[prio])) { + mq->mq_bitmap &= ~(MQ_PQMSB >> prio); + } /* Decrement the counter and signal waiter, if any */ - mq->mq_attrib.mq_curmsgs--; + mqattr->mq_curmsgs--; cv_signal(&mq->mq_recv_cv); /* Ready for sending now */ @@ -643,12 +692,13 @@ * Primary mq_send1() function. */ int -mq_send1(struct lwp *l, mqd_t mqdes, const char *msg_ptr, size_t msg_len, +mq_send1(lwp_t *l, mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, int t) { file_t *fp = NULL; struct mqueue *mq; - struct mq_msg *msg, *pos_msg; + struct mq_msg *msg; + struct mq_attr *mqattr; struct proc *notify = NULL; ksiginfo_t ksi; size_t size; @@ -663,10 +713,11 @@ if (size > mq_max_msgsize) return EMSGSIZE; - if (size > MQ_DEF_MSGSIZE) + if (size > MQ_DEF_MSGSIZE) { msg = kmem_alloc(size, KM_SLEEP); - else + } else { msg = pool_cache_get(mqmsg_cache, PR_WAITOK); + } /* Get the data from user-space */ error = copyin(msg_ptr, msg->msg_ptr, msg_len); @@ -689,16 +740,17 @@ goto error; } getnanotime(&mq->mq_mtime); + mqattr = &mq->mq_attrib; /* Check the message size limit */ - if (msg_len <= 0 || msg_len > mq->mq_attrib.mq_msgsize) { + if (msg_len <= 0 || msg_len > mqattr->mq_msgsize) { error = EMSGSIZE; goto error; } /* Check if queue is full */ - while (mq->mq_attrib.mq_curmsgs >= mq->mq_attrib.mq_maxmsg) { - if (mq->mq_attrib.mq_flags & O_NONBLOCK) { + while (mqattr->mq_curmsgs >= mqattr->mq_maxmsg) { + if (mqattr->mq_flags & O_NONBLOCK) { error = EAGAIN; goto error; } @@ -708,25 +760,24 @@ } /* Block until queue becomes available */ error = cv_timedwait_sig(&mq->mq_recv_cv, &mq->mq_mtx, t); - if (error || (mq->mq_attrib.mq_flags & MQ_UNLINK)) { + if (error || (mqattr->mq_flags & MQ_UNLINK)) { error = (error == EWOULDBLOCK) ? ETIMEDOUT : error; goto error; } } - KASSERT(mq->mq_attrib.mq_curmsgs < mq->mq_attrib.mq_maxmsg); + KASSERT(mqattr->mq_curmsgs < mqattr->mq_maxmsg); /* Insert message into the queue, according to the priority */ - TAILQ_FOREACH(pos_msg, &mq->mq_head, msg_queue) - if (msg->msg_prio > pos_msg->msg_prio) - break; - if (pos_msg == NULL) - TAILQ_INSERT_TAIL(&mq->mq_head, msg, msg_queue); - else - TAILQ_INSERT_BEFORE(pos_msg, msg, msg_queue); + if (__predict_true(msg_prio < MQ_PQSIZE)) { + TAILQ_INSERT_TAIL(&mq->mq_head[msg_prio], msg, msg_queue); + mq->mq_bitmap |= (MQ_PQMSB >> msg_prio); + } else { + mqueue_linear_insert(mq, msg); + } /* Check for the notify */ - if (mq->mq_attrib.mq_curmsgs == 0 && mq->mq_notify_proc && - (mq->mq_attrib.mq_flags & MQ_RECEIVE) == 0) { + if (mqattr->mq_curmsgs == 0 && mq->mq_notify_proc && + (mqattr->mq_flags & MQ_RECEIVE) == 0) { /* Initialize the signal */ KSI_INIT(&ksi); ksi.ksi_signo = mq->mq_sig_notify.sigev_signo; @@ -738,7 +789,7 @@ } /* Increment the counter and signal waiter, if any */ - mq->mq_attrib.mq_curmsgs++; + mqattr->mq_curmsgs++; cv_signal(&mq->mq_send_cv); /* Ready for receiving now */ @@ -755,7 +806,6 @@ kpsignal(notify, &ksi, NULL); mutex_exit(proc_lock); } - return error; } @@ -901,8 +951,9 @@ mq = fp->f_data; /* Copy the old attributes, if needed */ - if (SCARG(uap, omqstat)) + if (SCARG(uap, omqstat)) { memcpy(&attr, &mq->mq_attrib, sizeof(struct mq_attr)); + } /* Ignore everything, except O_NONBLOCK */ if (nonblock) @@ -990,7 +1041,7 @@ } /* - * SysCtl. + * System control nodes. */ SYSCTL_SETUP(sysctl_mqueue_setup, "sysctl mqueue setup") Index: src/sys/sys/mqueue.h diff -u src/sys/sys/mqueue.h:1.7 src/sys/sys/mqueue.h:1.8 --- src/sys/sys/mqueue.h:1.7 Sat Apr 11 15:47:34 2009 +++ src/sys/sys/mqueue.h Mon Jul 13 02:37:13 2009 @@ -1,7 +1,7 @@ -/* $NetBSD: mqueue.h,v 1.7 2009/04/11 15:47:34 christos Exp $ */ +/* $NetBSD: mqueue.h,v 1.8 2009/07/13 02:37:13 rmind Exp $ */ /* - * Copyright (c) 2007, Mindaugas Rasiukevicius <rmind at NetBSD org> + * Copyright (c) 2007-2009 Mindaugas Rasiukevicius <rmind at NetBSD org> * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -67,6 +67,10 @@ /* Default size of the message */ #define MQ_DEF_MSGSIZE 1024 +/* Size/bits and MSB for the queue array */ +#define MQ_PQSIZE 32 +#define MQ_PQMSB 0x80000000U + /* Structure of the message queue */ struct mqueue { char mq_name[MQ_NAMELEN]; @@ -83,11 +87,13 @@ mode_t mq_mode; uid_t mq_euid; gid_t mq_egid; - /* Reference counter, head of the message queue */ + /* Reference counter, queue array and bitmap */ u_int mq_refcnt; - TAILQ_HEAD(, mq_msg) mq_head; + TAILQ_HEAD(, mq_msg) mq_head[MQ_PQSIZE + 1]; + uint32_t mq_bitmap; /* Entry of the global list */ LIST_ENTRY(mqueue) mq_list; + /* Time stamps */ struct timespec mq_atime; struct timespec mq_mtime; struct timespec mq_btime; @@ -98,16 +104,15 @@ TAILQ_ENTRY(mq_msg) msg_queue; size_t msg_len; u_int msg_prio; - int8_t msg_ptr[1]; + uint8_t msg_ptr[1]; }; /* Prototypes */ void mqueue_sysinit(void); void mqueue_print_list(void (*pr)(const char *, ...)); int abstimeout2timo(struct timespec *, int *); -int mq_send1(struct lwp *, mqd_t, const char *, size_t, unsigned, int); -int mq_receive1(struct lwp *, mqd_t, void *, size_t, unsigned *, int, - ssize_t *); +int mq_send1(lwp_t *, mqd_t, const char *, size_t, unsigned, int); +int mq_receive1(lwp_t *, mqd_t, void *, size_t, unsigned *, int, ssize_t *); #endif /* _KERNEL */