Hi everyone, I had a chance to test this patch more and iron out some bugs. The main changes from the previous version are:
* Uses pool(9) for allocations smaller than MQ_DEF_MSGSIZE. * Add poll support. * Fix a memory leak in domqrecv() that would eventually exhaust all available kernel memory. * Return value of mq_receive() was always 0. Fix it to return the length of the message that was dequeued. * Small stylistic changes. * Tune the default limits and add MQ_PRIO_MAX and MQ_OPEN_MAX. I did some very crude benchmarking on my machine and I could push about 100k messages per second through a single message queue. At this point, I can start working on manpages + the necessary userspace glue. Is this patch going in the right direction? Thanks, Dimitris >From a4a59e77788b6ac9340c5707b04cd924ab23c923 Mon Sep 17 00:00:00 2001 From: Dimitris Papastamos <[email protected]> Date: Tue, 15 Sep 2015 16:54:34 +0100 Subject: [PATCH] Initial implementation of mq_*() interfaces --- conf/files | 1 + kern/init_main.c | 4 + kern/sys_mqueue.c | 746 +++++++++++++++++++++++++++++++++++++++++++++++++++ kern/syscalls.master | 14 + sys/_types.h | 1 + sys/file.h | 1 + sys/mqueue.h | 33 +++ sys/syslimits.h | 2 + sys/types.h | 1 + 9 files changed, 803 insertions(+) create mode 100644 kern/sys_mqueue.c create mode 100644 sys/mqueue.h diff --git a/conf/files b/conf/files index 691d7f2..f3d5bfc 100644 --- a/conf/files +++ b/conf/files @@ -694,6 +694,7 @@ file kern/subr_prof.c file kern/subr_userconf.c boot_config file kern/subr_xxx.c file kern/sys_generic.c +file kern/sys_mqueue.c file kern/sys_pipe.c file kern/sys_process.c ptrace | systrace file kern/sys_socket.c diff --git a/kern/init_main.c b/kern/init_main.c index e2505ea..cc323d9 100644 --- a/kern/init_main.c +++ b/kern/init_main.c @@ -67,6 +67,7 @@ #ifdef SYSVSEM #include <sys/sem.h> #endif +#include <sys/mqueue.h> #ifdef SYSVMSG #include <sys/msg.h> #endif @@ -376,6 +377,9 @@ main(void *framep) seminit(); #endif + /* Initialize POSIX message queues. */ + mqinit(); + #ifdef SYSVMSG /* Initialize System V style message queues. */ msginit(); diff --git a/kern/sys_mqueue.c b/kern/sys_mqueue.c new file mode 100644 index 0000000..ddd5d17 --- /dev/null +++ b/kern/sys_mqueue.c @@ -0,0 +1,746 @@ +/* + * Copyright (c) 2015 Dimitris Papastamos <[email protected]> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/param.h> +#include <sys/systm.h> +#include <sys/proc.h> +#include <sys/file.h> +#include <sys/filedesc.h> +#include <sys/poll.h> +#include <sys/pool.h> +#include <sys/malloc.h> +#include <sys/mount.h> +#include <sys/stat.h> +#include <sys/syscallargs.h> +#include <sys/time.h> +#include <sys/vnode.h> +#include <sys/mqueue.h> + +#define MQ_NAMELEN (NAME_MAX + 1) +#define MQ_DEF_MSGSIZE 2048 +#define MQ_MAX_MSGSIZE (8 * MQ_DEF_MSGSIZE) +#define MQ_DEF_MAXMSG 8 +#define MQ_MAX_MAXMSG (8 * MQ_DEF_MAXMSG) + +/* Stored in mq_flags when message queue is marked for removal. */ +#define MQ_DYING 0x10000000 + +struct mq { + char name[MQ_NAMELEN]; /* mqueue name */ + int oflag; /* mqueue send/recv access flag */ + mode_t mode; /* mqueue permissions */ + struct mq_attr attr; /* mqueue attributes, see below */ + int rchan; /* read channel to sleep on */ + int wchan; /* write channel to sleep on */ + uid_t euid; /* mqueue euid owner */ + gid_t egid; /* mqueue egid owner */ + u_int refcnt; /* mqueue reference count */ +#define MQ_RSEL 0x1 +#define MQ_WSEL 0x2 + int pflag; /* poll internal flags */ + struct selinfo rsel; /* for compat with select */ + struct selinfo wsel; + struct rwlock lock; /* mqueue lock */ + TAILQ_HEAD(, mq_msg) head; /* mqueue message list */ + LIST_ENTRY(mq) entry; /* link with next mqueue */ +}; + +struct mq_msg { + TAILQ_ENTRY(mq_msg) entry; /* link with next message */ + size_t len; /* message length */ + u_int prio; /* message priority */ + uint8_t data[]; /* raw message data */ +}; + +int mqread(struct file *, off_t *, struct uio *, struct ucred *); +int mqwrite(struct file *, off_t *, struct uio *, struct ucred *); +int mqioctl(struct file *, u_long, caddr_t, struct proc *); +int mqpoll(struct file *, int, struct proc *); +int mqkqfilter(struct file *, struct knote *); +int mqstat(struct file *, struct stat *, struct proc *); +int mqclose(struct file *, struct proc *); + +struct fileops mqops = { + mqread, mqwrite, mqioctl, mqpoll, mqkqfilter, mqstat, mqclose +}; + +LIST_HEAD(, mq) mqlist; +struct rwlock mqlistlock; +struct pool mqpool; + +void +mqinit(void) +{ + pool_init(&mqpool, MQ_DEF_MSGSIZE, 0, 0, PR_WAITOK, "mqpool", NULL); + rw_init(&mqlistlock, "mqlistlock"); + LIST_INIT(&mqlist); +} + +int +ts2timo(struct proc *p, struct timespec *ts, int *timo) +{ + struct timespec rts; + int error; + + error = clock_gettime(p, CLOCK_REALTIME, &rts); + if (error != 0) + return (error); + if (timespeccmp(&rts, ts, >=)) + return (ETIMEDOUT); + timespecsub(ts, &rts, ts); + error = timespecfix(ts); + if (error != 0) + return (error); + *timo = tstohz(ts); + return (0); +} + +struct mq * +mqlookup(char *name) +{ + struct mq *mq; + + LIST_FOREACH(mq, &mqlist, entry) + if (strncmp(mq->name, name, MQ_NAMELEN) == 0) + return (mq); + return (NULL); +} + +void +msgrelease(struct mq_msg *msg) +{ + if (sizeof(*msg) + msg->len > MQ_DEF_MSGSIZE) + free(msg, M_TEMP, sizeof(*msg) + msg->len); + else + pool_put(&mqpool, msg); +} + +struct mq_msg * +msgalloc(const char *udata, size_t len, u_int prio, int *error) +{ + struct mq_msg *msg; + + if (sizeof(*msg) + len > MQ_DEF_MSGSIZE) + msg = malloc(sizeof(*msg) + len, M_TEMP, M_WAITOK); + else + msg = pool_get(&mqpool, PR_WAITOK); + + *error = copyin(udata, msg->data, len); + if (*error != 0) { + msgrelease(msg); + return (NULL); + } + msg->len = len; + msg->prio = prio; + return (msg); +} + +void +mqrelease(struct mq *mq) +{ + struct mq_msg *msg; + + while ((msg = TAILQ_FIRST(&mq->head))) { + TAILQ_REMOVE(&mq->head, msg, entry); + msgrelease(msg); + } + free(mq, M_TEMP, sizeof(*mq)); +} + +struct mq * +mqalloc(char *name, int oflag, mode_t mode, struct mq_attr *attr, + struct proc *p) +{ + struct mq *mq; + + mq = malloc(sizeof(*mq), M_TEMP, M_WAITOK | M_ZERO); + strlcpy(mq->name, name, sizeof(mq->name)); + mq->oflag = oflag; + mq->mode = (mode & ALLPERMS) & ~p->p_fd->fd_cmask; + mq->euid = p->p_ucred->cr_uid; + mq->egid = p->p_ucred->cr_gid; + memcpy(&mq->attr, attr, sizeof(mq->attr)); + rw_init(&mq->lock, "mqlock"); + TAILQ_INIT(&mq->head); + return (mq); +} + +int +mqaccess(struct ucred *cred, struct mq *mq, mode_t mode) +{ + return vaccess(VNON, mq->mode, mq->euid, mq->egid, mode, cred); +} + +int +mqread(struct file *fp, off_t *off, struct uio *uio, struct ucred *cred) +{ + return (EOPNOTSUPP); +} + +int +mqwrite(struct file *fp, off_t *off, struct uio *uio, struct ucred *cred) +{ + return (EOPNOTSUPP); +} + +int +mqioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p) +{ + return (EOPNOTSUPP); +} + +int +mqpoll(struct file *fp, int events, struct proc *p) +{ + struct mq *mq; + int revents = 0; + + mq = fp->f_data; + rw_enter_write(&mq->lock); + if ((events & (POLLIN | POLLRDNORM)) != 0) + if (mq->attr.mq_curmsgs != 0) + revents |= events & (POLLIN | POLLRDNORM); + if ((events & (POLLOUT | POLLWRNORM)) != 0) + if (mq->attr.mq_curmsgs < mq->attr.mq_maxmsg) + revents |= events & (POLLOUT | POLLWRNORM); + if (revents == 0) { + if ((events & (POLLIN | POLLRDNORM)) != 0) { + mq->pflag |= MQ_RSEL; + selrecord(p, &mq->rsel); + } + if ((events & (POLLOUT | POLLWRNORM)) != 0) { + mq->pflag |= MQ_WSEL; + selrecord(p, &mq->wsel); + } + } + rw_exit_write(&mq->lock); + return (revents); +} + +void +mqpollwakeup(struct mq *mq) +{ + if ((mq->pflag & MQ_RSEL) != 0) { + mq->pflag &= ~MQ_RSEL; + selwakeup(&mq->rsel); + } + if ((mq->pflag & MQ_WSEL) != 0) { + mq->pflag &= ~MQ_WSEL; + selwakeup(&mq->wsel); + } +} + +int +mqkqfilter(struct file *fp, struct knote *kn) +{ + return (EOPNOTSUPP); +} + +int +mqstat(struct file *fp, struct stat *ub, struct proc *p) +{ + return (EOPNOTSUPP); +} + +int +mqclose(struct file *fp, struct proc *p) +{ + struct mq *mq; + int release = 0; + + mq = fp->f_data; + rw_enter_write(&mq->lock); + if (--mq->refcnt == 0 && (mq->attr.mq_flags & MQ_DYING) != 0) + release = 1; + rw_exit_write(&mq->lock); + if (release != 0) + mqrelease(mq); + return (0); +} + +/* + * Require one slash at the start as per POSIX. + * Disallow any further slashes. + */ +int +mqnamechk(char *name) +{ + if (*name++ != '/') + return (EINVAL); + while (*name && *name != '/') + name++; + return *name ? (EINVAL) : (0); +} + +mqd_t +sys_mq_open(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_open_args /* { + syscallarg(const char *) name; + syscallarg(int) oflag; + syscallarg(mode_t) mode; + syscallarg(struct mq_attr *) attr; + } */ *uap = v; + struct mq *mq, *newmq = NULL; + struct mq_attr attr; + struct filedesc *fdp = p->p_fd; + struct file *fp; + mode_t accmode; + char *name; + int oflag, fd, error; + + oflag = SCARG(uap, oflag); + if ((oflag & O_CREAT) != 0) { + if (SCARG(uap, attr)) { + error = copyin(SCARG(uap, attr), &attr, sizeof(attr)); + if (error != 0) + return (error); + if (attr.mq_maxmsg <= 0 || attr.mq_msgsize <= 0) + return (EINVAL); + if (attr.mq_maxmsg > MQ_MAX_MAXMSG || + attr.mq_msgsize > MQ_MAX_MSGSIZE) + return (ENOSPC); + attr.mq_curmsgs = 0; + } else { + memset(&attr, 0, sizeof(attr)); + attr.mq_maxmsg = MQ_DEF_MAXMSG; + attr.mq_msgsize = MQ_DEF_MSGSIZE - sizeof(struct mq_msg); + } + } + + name = malloc(MQ_NAMELEN, M_TEMP, M_WAITOK); + error = copyinstr(SCARG(uap, name), name, MQ_NAMELEN, NULL); + if (error != 0) + goto err0; + + error = mqnamechk(name); + if (error != 0) + goto err0; + + fdplock(fdp); + error = falloc(p, &fp, &fd); + if (error != 0) + goto err1; + + fp->f_flag = FREAD; + if ((oflag & O_WRONLY) != 0 || (oflag & O_RDWR) != 0) + fp->f_flag |= FWRITE; + if ((oflag & O_NONBLOCK) != 0) + fp->f_flag |= FNONBLOCK; + + rw_enter_write(&mqlistlock); + mq = mqlookup(name); + if (mq) { + rw_enter_write(&mq->lock); + if ((oflag & O_CREAT) != 0 && (oflag & O_EXCL) != 0) { + error = EEXIST; + goto err3; + } + accmode = VREAD; + if ((fp->f_flag & FWRITE) != 0) + accmode |= VWRITE; + if (mqaccess(p->p_ucred, mq, accmode) != 0) { + error = EACCES; + goto err3; + } + } else { + if ((oflag & O_CREAT) == 0) { + error = ENOENT; + goto err2; + } + mq = mqalloc(name, oflag, SCARG(uap, mode), &attr, p); + newmq = mq; + rw_enter_write(&mq->lock); + } + + fp->f_type = DTYPE_MQUEUE; + fp->f_data = mq; + fp->f_ops = &mqops; + + fdp->fd_ofileflags[fd] |= UF_EXCLOSE; + + FILE_SET_MATURE(fp, p); + fdpunlock(fdp); + + mq->refcnt++; + if (newmq) + LIST_INSERT_HEAD(&mqlist, newmq, entry); + rw_exit_write(&mq->lock); + rw_exit_write(&mqlistlock); + free(name, M_TEMP, MQ_NAMELEN); + + *retval = fd; + return (0); + +err3: + rw_exit_write(&mq->lock); +err2: + rw_exit_write(&mqlistlock); + fdremove(fdp, fd); + closef(fp, p); +err1: + fdpunlock(fdp); +err0: + free(name, M_TEMP, MQ_NAMELEN); + return (error); +} + +int +sys_mq_unlink(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_unlink_args /* { + syscallarg(const char *) name; + } */ *uap = v; + struct mq *mq; + char *name; + int error; + int refcnt; + + name = malloc(MQ_NAMELEN, M_TEMP, M_WAITOK); + error = copyinstr(SCARG(uap, name), name, MQ_NAMELEN, NULL); + if (error != 0) + goto err0; + + rw_enter_write(&mqlistlock); + mq = mqlookup(name); + if (!mq) { + error = ENOENT; + goto err1; + } + + rw_enter_write(&mq->lock); + if (mqaccess(p->p_ucred, mq, VWRITE) != 0) { + error = EACCES; + goto err2; + } + + LIST_REMOVE(mq, entry); + mq->attr.mq_flags |= MQ_DYING; + refcnt = mq->refcnt; +err2: + rw_exit_write(&mq->lock); +err1: + rw_exit_write(&mqlistlock); +err0: + free(name, M_TEMP, MQ_NAMELEN); + + /* + * Release the queue if it doesn't have any + * active references. If refcnt is > 0 at this + * point, the queue will be released in mqclose(). + */ + if (error == 0 && refcnt == 0) + mqrelease(mq); + return (error); +} + +int +domqsend(struct proc *p, mqd_t mqdes, const char *umsg_ptr, size_t msg_len, + unsigned int msg_prio, struct timespec *ats) +{ + struct mq *mq; + struct mq_msg *msg, *pos; + struct filedesc *fdp = p->p_fd; + struct file *fp; + int timo; + int error; + + if (msg_prio >= MQ_PRIO_MAX) + return (EINVAL); + + fp = fd_getfile(fdp, mqdes); + if (!fp || fp->f_type != DTYPE_MQUEUE) + return (EBADF); + FREF(fp); + mq = fp->f_data; + + rw_enter_write(&mq->lock); + if ((fp->f_flag & FWRITE) == 0 || + mqaccess(p->p_ucred, mq, VWRITE)) { + error = EBADF; + goto err1; + } + + if (msg_len > mq->attr.mq_msgsize) { + error = EMSGSIZE; + goto err1; + } + + /* Block if there is no space in the queue. */ + while (mq->attr.mq_curmsgs == mq->attr.mq_maxmsg) { + rw_exit_write(&mq->lock); + if ((fp->f_flag & FNONBLOCK) != 0) { + error = EWOULDBLOCK; + goto err0; + } + timo = 0; + if (ats) { + error = ts2timo(p, ats, &timo); + if (error < 0) + goto err0; + } + error = tsleep(&mq->wchan, PWAIT | PCATCH, "mqwchan", timo); + if (error != 0) + goto err0; + /* + * `mq' is always valid here. If there were references + * to the queue, then it couldn't have been released. If + * it was released, we would never be able to wake up + * from tsleep() without an error. + */ + rw_enter_write(&mq->lock); + } + + msg = msgalloc(umsg_ptr, msg_len, msg_prio, &error); + if (!msg) + goto err1; + + mq->attr.mq_curmsgs++; + TAILQ_FOREACH(pos, &mq->head, entry) + if (msg->prio > pos->prio) + break; + if (!pos) + TAILQ_INSERT_TAIL(&mq->head, msg, entry); + else + TAILQ_INSERT_BEFORE(pos, msg, entry); + + wakeup(&mq->rchan); + mqpollwakeup(mq); +err1: + rw_exit_write(&mq->lock); +err0: + FRELE(fp, p); + return (error); +} + +int +sys_mq_send(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_send_args /* { + syscallarg(mqd_t) mqdes; + syscallarg(const char *) msg_ptr; + syscallarg(size_t) msg_len; + syscallarg(unsigned int) msg_prio; + } */ *uap = v; + + return domqsend(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr), + SCARG(uap, msg_len), SCARG(uap, msg_prio), NULL); +} + +int +sys_mq_timedsend(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_timedsend_args /* { + syscallarg(mqd_t) mqdes; + syscallarg(const char *) msg_ptr; + syscallarg(size_t) msg_len; + syscallarg(unsigned int) msg_prio; + syscallarg(const struct timespec *) abstime; + } */ *uap = v; + struct timespec ts; + int error; + + error = copyin(SCARG(uap, abstime), &ts, sizeof(ts)); + if (error != 0) + return (error); + return domqsend(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr), + SCARG(uap, msg_len), SCARG(uap, msg_prio), &ts); +} + +int +domqrecv(struct proc *p, mqd_t mqdes, char *umsg_ptr, size_t msg_len, + u_int *umsg_prio, struct timespec *ats, register_t *retval) +{ + struct mq *mq; + struct mq_msg *msg; + struct filedesc *fdp = p->p_fd; + struct file *fp; + int timo; + int error; + + fp = fd_getfile(fdp, mqdes); + if (!fp || fp->f_type != DTYPE_MQUEUE) + return (EBADF); + FREF(fp); + mq = fp->f_data; + + rw_enter_write(&mq->lock); + if (mqaccess(p->p_ucred, mq, VREAD)) { + error = EBADF; + goto err1; + } + + if (msg_len < mq->attr.mq_msgsize) { + error = EMSGSIZE; + goto err1; + } + + /* Block until there are messages to receive. */ + while (mq->attr.mq_curmsgs == 0) { + rw_exit_write(&mq->lock); + if ((fp->f_flag & FNONBLOCK) != 0) { + error = EWOULDBLOCK; + goto err0; + } + timo = 0; + if (ats) { + error = ts2timo(p, ats, &timo); + if (error < 0) + goto err0; + } + error = tsleep(&mq->rchan, PWAIT | PCATCH, "mqrchan", timo); + if (error != 0) + goto err0; + /* + * `mq' is always valid here. If there were references + * to the queue, then it couldn't have been released. If + * it was released, we would never be able to wake up + * from tsleep() without an error. + */ + rw_enter_write(&mq->lock); + } + + msg = TAILQ_FIRST(&mq->head); + error = copyout(msg->data, umsg_ptr, msg->len); + if (error != 0) + goto err1; + + if (umsg_prio) { + error = copyout(&msg->prio, umsg_prio, sizeof(msg->prio)); + if (error != 0) + goto err1; + } + + *retval = msg->len; + mq->attr.mq_curmsgs--; + TAILQ_REMOVE(&mq->head, msg, entry); + msgrelease(msg); + + wakeup(&mq->wchan); + mqpollwakeup(mq); +err1: + rw_exit_write(&mq->lock); +err0: + FRELE(fp, p); + return (error); +} + +int +sys_mq_receive(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_receive_args /* { + syscallarg(mqd_t) mqdes; + syscallarg(char *) msg_ptr; + syscallarg(size_t) msg_len; + syscallarg(unsigned int *) msg_prio; + } */ *uap = v; + + return domqrecv(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr), + SCARG(uap, msg_len), SCARG(uap, msg_prio), + NULL, retval); +} + +int +sys_mq_timedreceive(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_timedreceive_args /* { + syscallarg(mqd_t) mqdes; + syscallarg(char *) msg_ptr; + syscallarg(size_t) msg_len; + syscallarg(unsigned int *) msg_prio; + syscallarg(const struct timespec *) abstime; + } */ *uap = v; + struct timespec ts; + int error; + + error = copyin(SCARG(uap, abstime), &ts, sizeof(ts)); + if (error != 0) + return (error); + return domqrecv(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr), + SCARG(uap, msg_len), SCARG(uap, msg_prio), + &ts, retval); +} + +int +sys_mq_setattr(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_setattr_args /* { + syscallarg(mqd_t) mqdes; + syscallarg(const struct mq_attr *) mqstat; + syscallarg(struct mq_attr *) omqstat; + } */ *uap = v; + struct mq *mq; + struct mq_attr attr, oldattr; + struct filedesc *fdp = p->p_fd; + struct file *fp; + int error; + + error = copyin(SCARG(uap, mqstat), &attr, sizeof(attr)); + if (error != 0) + return (error); + + fp = fd_getfile(fdp, SCARG(uap, mqdes)); + if (!fp || fp->f_type != DTYPE_MQUEUE) + return (EBADF); + FREF(fp); + mq = fp->f_data; + + rw_enter_write(&mq->lock); + /* Copy out old attributes if needed */ + if (SCARG(uap, omqstat)) { + memcpy(&oldattr, &mq->attr, sizeof(oldattr)); + oldattr.mq_flags &= (fp->f_flag & FNONBLOCK); + error = copyout(&oldattr, SCARG(uap, omqstat), + sizeof(oldattr)); + if (error != 0) + goto err0; + } + if ((attr.mq_flags & O_NONBLOCK) != 0) + fp->f_flag |= FNONBLOCK; + else + fp->f_flag &= ~FNONBLOCK; + +err0: + rw_exit_write(&mq->lock); + FRELE(fp, p); + return (error); +} + +int +sys_mq_getattr(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_getattr_args /* { + syscallarg(mqd_t) mqdes; + syscallarg(struct mq_attr *) mqstat; + } */ *uap = v; + struct mq *mq; + struct mq_attr attr; + struct filedesc *fdp = p->p_fd; + struct file *fp; + + fp = fd_getfile(fdp, SCARG(uap, mqdes)); + if (!fp || fp->f_type != DTYPE_MQUEUE) + return (EBADF); + FREF(fp); + mq = fp->f_data; + + rw_enter_write(&mq->lock); + memcpy(&attr, &mq->attr, sizeof(attr)); + rw_exit_write(&mq->lock); + FRELE(fp, p); + attr.mq_flags &= (fp->f_flag & FNONBLOCK); + return copyout(&attr, SCARG(uap, mqstat), sizeof(attr)); +} diff --git a/kern/syscalls.master b/kern/syscalls.master index 4dca927..0fda6cc 100644 --- a/kern/syscalls.master +++ b/kern/syscalls.master @@ -561,3 +561,17 @@ 328 OBSOL __tfork51 329 STD NOLOCK { void sys___set_tcb(void *tcb); } 330 STD NOLOCK { void *sys___get_tcb(void); } +331 STD { mqd_t sys_mq_open(const char *name, int oflag, \ + mode_t mode, struct mq_attr *attr); } +332 STD { int sys_mq_unlink(const char *name); } +333 STD { int sys_mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, \ + unsigned int msg_prio); } +334 STD { ssize_t sys_mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, \ + unsigned int *msg_prio); } +335 STD { int sys_mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, \ + unsigned int msg_prio, const struct timespec *abstime); } +336 STD { ssize_t sys_mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, \ + unsigned int *msg_prio, const struct timespec *abstime); } +337 STD { int sys_mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, \ + struct mq_attr *omqstat); } +338 STD { int sys_mq_getattr(mqd_t mqdes, struct mq_attr *mqstat); } diff --git a/sys/_types.h b/sys/_types.h index e058674..a384295 100644 --- a/sys/_types.h +++ b/sys/_types.h @@ -51,6 +51,7 @@ typedef __uint32_t __in_addr_t; /* base type for internet address */ typedef __uint16_t __in_port_t; /* IP port type */ typedef __uint64_t __ino_t; /* inode number */ typedef long __key_t; /* IPC key (for Sys V IPC) */ +typedef int __mqd_t; /* POSIX realtime message queue descriptor */ typedef __uint32_t __mode_t; /* permissions */ typedef __uint32_t __nlink_t; /* link count */ typedef __int64_t __off_t; /* file offset or size */ diff --git a/sys/file.h b/sys/file.h index 85f900a..ef488a7 100644 --- a/sys/file.h +++ b/sys/file.h @@ -70,6 +70,7 @@ struct file { #define DTYPE_KQUEUE 4 /* event queue */ /* was define DTYPE_CRYPTO 5 */ #define DTYPE_SYSTRACE 6 /* system call tracing */ +#define DTYPE_MQUEUE 7 /* message queue */ short f_type; /* descriptor type */ long f_count; /* reference count */ struct ucred *f_cred; /* credentials associated with descriptor */ diff --git a/sys/mqueue.h b/sys/mqueue.h new file mode 100644 index 0000000..5bdeba4 --- /dev/null +++ b/sys/mqueue.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2015 Dimitris Papastamos <[email protected]> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#ifndef _SYS_MQUEUE_H_ +#define _SYS_MQUEUE_H_ + +#define MQ_PRIO_MAX 32 + +struct mq_attr { + long mq_flags; /* only 0 and O_NONBLOCK are valid */ + long mq_maxmsg; /* maximum number of messages */ + long mq_msgsize; /* maximum size of message */ + long mq_curmsgs; /* current number of messages queued */ +}; + +#ifdef _KERNEL +void mqinit(void); +#endif /* _KERNEL */ + +#endif /* _SYS_MQUEUE_H_ */ diff --git a/sys/syslimits.h b/sys/syslimits.h index fdf5dc9..29a969a 100644 --- a/sys/syslimits.h +++ b/sys/syslimits.h @@ -41,6 +41,8 @@ #define MAX_CANON 255 /* max bytes in term canon input line */ #define MAX_INPUT 255 /* max bytes in terminal input */ #define NAME_MAX 255 /* max bytes in a file name */ +#define MQ_OPEN_MAX 64 /* max open message queues per process */ +#define MQ_PRIO_MAX 32 /* max message priorities */ #define NGROUPS_MAX 16 /* max supplemental group id's */ #define OPEN_MAX 64 /* max open files per process */ #define PATH_MAX 1024 /* max bytes in pathname */ diff --git a/sys/types.h b/sys/types.h index ceb7de5..c618ffb 100644 --- a/sys/types.h +++ b/sys/types.h @@ -141,6 +141,7 @@ typedef __gid_t gid_t; /* group id */ typedef __id_t id_t; /* may contain pid, uid or gid */ typedef __ino_t ino_t; /* inode number */ typedef __key_t key_t; /* IPC key (for Sys V IPC) */ +typedef __mqd_t mqd_t; /* POSIX realtime message queue descriptor */ typedef __mode_t mode_t; /* permissions */ typedef __nlink_t nlink_t; /* link count */ typedef __rlim_t rlim_t; /* resource limit */ -- 2.5.2
