Hi everyone,

I had a chance to test this patch more and iron out some bugs.
The main changes from the previous version are:

* Uses pool(9) for allocations smaller than MQ_DEF_MSGSIZE.

* Add poll support.

* Fix a memory leak in domqrecv() that would eventually exhaust
  all available kernel memory.

* Return value of mq_receive() was always 0.  Fix it to return
  the length of the message that was dequeued.

* Small stylistic changes.

* Tune the default limits and add MQ_PRIO_MAX and MQ_OPEN_MAX.

I did some very crude benchmarking on my machine and I could push
about 100k messages per second through a single message queue.

At this point, I can start working on manpages + the necessary
userspace glue.

Is this patch going in the right direction?

Thanks,
Dimitris

>From a4a59e77788b6ac9340c5707b04cd924ab23c923 Mon Sep 17 00:00:00 2001
From: Dimitris Papastamos <[email protected]>
Date: Tue, 15 Sep 2015 16:54:34 +0100
Subject: [PATCH] Initial implementation of mq_*() interfaces

---
 conf/files           |   1 +
 kern/init_main.c     |   4 +
 kern/sys_mqueue.c    | 746 +++++++++++++++++++++++++++++++++++++++++++++++++++
 kern/syscalls.master |  14 +
 sys/_types.h         |   1 +
 sys/file.h           |   1 +
 sys/mqueue.h         |  33 +++
 sys/syslimits.h      |   2 +
 sys/types.h          |   1 +
 9 files changed, 803 insertions(+)
 create mode 100644 kern/sys_mqueue.c
 create mode 100644 sys/mqueue.h

diff --git a/conf/files b/conf/files
index 691d7f2..f3d5bfc 100644
--- a/conf/files
+++ b/conf/files
@@ -694,6 +694,7 @@ file kern/subr_prof.c
 file kern/subr_userconf.c              boot_config
 file kern/subr_xxx.c
 file kern/sys_generic.c
+file kern/sys_mqueue.c
 file kern/sys_pipe.c
 file kern/sys_process.c                        ptrace | systrace
 file kern/sys_socket.c
diff --git a/kern/init_main.c b/kern/init_main.c
index e2505ea..cc323d9 100644
--- a/kern/init_main.c
+++ b/kern/init_main.c
@@ -67,6 +67,7 @@
 #ifdef SYSVSEM
 #include <sys/sem.h>
 #endif
+#include <sys/mqueue.h>
 #ifdef SYSVMSG
 #include <sys/msg.h>
 #endif
@@ -376,6 +377,9 @@ main(void *framep)
        seminit();
 #endif
 
+       /* Initialize POSIX message queues. */
+       mqinit();
+
 #ifdef SYSVMSG
        /* Initialize System V style message queues. */
        msginit();
diff --git a/kern/sys_mqueue.c b/kern/sys_mqueue.c
new file mode 100644
index 0000000..ddd5d17
--- /dev/null
+++ b/kern/sys_mqueue.c
@@ -0,0 +1,746 @@
+/*
+ * Copyright (c) 2015 Dimitris Papastamos <[email protected]>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/param.h>
+#include <sys/systm.h>
+#include <sys/proc.h>
+#include <sys/file.h>
+#include <sys/filedesc.h>
+#include <sys/poll.h>
+#include <sys/pool.h>
+#include <sys/malloc.h>
+#include <sys/mount.h>
+#include <sys/stat.h>
+#include <sys/syscallargs.h>
+#include <sys/time.h>
+#include <sys/vnode.h>
+#include <sys/mqueue.h>
+
+#define MQ_NAMELEN     (NAME_MAX + 1)
+#define MQ_DEF_MSGSIZE 2048
+#define MQ_MAX_MSGSIZE (8 * MQ_DEF_MSGSIZE)
+#define MQ_DEF_MAXMSG  8
+#define MQ_MAX_MAXMSG  (8 * MQ_DEF_MAXMSG)
+
+/* Stored in mq_flags when message queue is marked for removal. */
+#define MQ_DYING       0x10000000
+
+struct mq {
+       char   name[MQ_NAMELEN];        /* mqueue name */
+       int    oflag;                   /* mqueue send/recv access flag */
+       mode_t mode;                    /* mqueue permissions */
+       struct mq_attr attr;            /* mqueue attributes, see below */
+       int    rchan;                   /* read channel to sleep on */
+       int    wchan;                   /* write channel to sleep on */
+       uid_t  euid;                    /* mqueue euid owner */
+       gid_t  egid;                    /* mqueue egid owner */
+       u_int  refcnt;                  /* mqueue reference count */
+#define MQ_RSEL 0x1
+#define MQ_WSEL 0x2
+       int    pflag;                   /* poll internal flags */
+       struct selinfo rsel;            /* for compat with select */
+       struct selinfo wsel;
+       struct rwlock  lock;            /* mqueue lock */
+       TAILQ_HEAD(, mq_msg) head;      /* mqueue message list */
+       LIST_ENTRY(mq)       entry;     /* link with next mqueue */
+};
+
+struct mq_msg {
+       TAILQ_ENTRY(mq_msg) entry;      /* link with next message */
+       size_t   len;                   /* message length */
+       u_int    prio;                  /* message priority */
+       uint8_t  data[];                /* raw message data */
+};
+
+int mqread(struct file *, off_t *, struct uio *, struct ucred *);
+int mqwrite(struct file *, off_t *, struct uio *, struct ucred *);
+int mqioctl(struct file *, u_long, caddr_t, struct proc *);
+int mqpoll(struct file *, int, struct proc *);
+int mqkqfilter(struct file *, struct knote *);
+int mqstat(struct file *, struct stat *, struct proc *);
+int mqclose(struct file *, struct proc *);
+
+struct fileops mqops = {
+       mqread, mqwrite, mqioctl, mqpoll, mqkqfilter, mqstat, mqclose
+};
+
+LIST_HEAD(, mq) mqlist;
+struct rwlock mqlistlock;
+struct pool mqpool;
+
+void
+mqinit(void)
+{
+       pool_init(&mqpool, MQ_DEF_MSGSIZE, 0, 0, PR_WAITOK, "mqpool", NULL);
+       rw_init(&mqlistlock, "mqlistlock");
+       LIST_INIT(&mqlist);
+}
+
+int
+ts2timo(struct proc *p, struct timespec *ts, int *timo)
+{
+       struct timespec rts;
+       int error;
+
+       error = clock_gettime(p, CLOCK_REALTIME, &rts);
+       if (error != 0)
+               return (error);
+       if (timespeccmp(&rts, ts, >=))
+               return (ETIMEDOUT);
+       timespecsub(ts, &rts, ts);
+       error = timespecfix(ts);
+       if (error != 0)
+               return (error);
+       *timo = tstohz(ts);
+       return (0);
+}
+
+struct mq *
+mqlookup(char *name)
+{
+       struct mq *mq;
+
+       LIST_FOREACH(mq, &mqlist, entry)
+               if (strncmp(mq->name, name, MQ_NAMELEN) == 0)
+                       return (mq);
+       return (NULL);
+}
+
+void
+msgrelease(struct mq_msg *msg)
+{
+       if (sizeof(*msg) + msg->len > MQ_DEF_MSGSIZE)
+               free(msg, M_TEMP, sizeof(*msg) + msg->len);
+       else
+               pool_put(&mqpool, msg);
+}
+
+struct mq_msg *
+msgalloc(const char *udata, size_t len, u_int prio, int *error)
+{
+       struct mq_msg *msg;
+
+       if (sizeof(*msg) + len > MQ_DEF_MSGSIZE)
+               msg = malloc(sizeof(*msg) + len, M_TEMP, M_WAITOK);
+       else
+               msg = pool_get(&mqpool, PR_WAITOK);
+
+       *error = copyin(udata, msg->data, len);
+       if (*error != 0) {
+               msgrelease(msg);
+               return (NULL);
+       }
+       msg->len = len;
+       msg->prio = prio;
+       return (msg);
+}
+
+void
+mqrelease(struct mq *mq)
+{
+       struct mq_msg *msg;
+
+       while ((msg = TAILQ_FIRST(&mq->head))) {
+               TAILQ_REMOVE(&mq->head, msg, entry);
+               msgrelease(msg);
+       }
+       free(mq, M_TEMP, sizeof(*mq));
+}
+
+struct mq *
+mqalloc(char *name, int oflag, mode_t mode, struct mq_attr *attr,
+         struct proc *p)
+{
+       struct mq *mq;
+
+       mq = malloc(sizeof(*mq), M_TEMP, M_WAITOK | M_ZERO);
+       strlcpy(mq->name, name, sizeof(mq->name));
+       mq->oflag = oflag;
+       mq->mode = (mode & ALLPERMS) & ~p->p_fd->fd_cmask;
+       mq->euid = p->p_ucred->cr_uid;
+       mq->egid = p->p_ucred->cr_gid;
+       memcpy(&mq->attr, attr, sizeof(mq->attr));
+       rw_init(&mq->lock, "mqlock");
+       TAILQ_INIT(&mq->head);
+       return (mq);
+}
+
+int
+mqaccess(struct ucred *cred, struct mq *mq, mode_t mode)
+{
+       return vaccess(VNON, mq->mode, mq->euid, mq->egid, mode, cred);
+}
+
+int
+mqread(struct file *fp, off_t *off, struct uio *uio, struct ucred *cred)
+{
+       return (EOPNOTSUPP);
+}
+
+int
+mqwrite(struct file *fp, off_t *off, struct uio *uio, struct ucred *cred)
+{
+       return (EOPNOTSUPP);
+}
+
+int
+mqioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p)
+{
+       return (EOPNOTSUPP);
+}
+
+int
+mqpoll(struct file *fp, int events, struct proc *p)
+{
+       struct mq *mq;
+       int revents = 0;
+
+       mq = fp->f_data;
+       rw_enter_write(&mq->lock);
+       if ((events & (POLLIN | POLLRDNORM)) != 0)
+               if (mq->attr.mq_curmsgs != 0)
+                       revents |= events & (POLLIN | POLLRDNORM);
+       if ((events & (POLLOUT | POLLWRNORM)) != 0)
+               if (mq->attr.mq_curmsgs < mq->attr.mq_maxmsg)
+                       revents |= events & (POLLOUT | POLLWRNORM);
+       if (revents == 0) {
+               if ((events & (POLLIN | POLLRDNORM)) != 0) {
+                       mq->pflag |= MQ_RSEL;
+                       selrecord(p, &mq->rsel);
+               }
+               if ((events & (POLLOUT | POLLWRNORM)) != 0) {
+                       mq->pflag |= MQ_WSEL;
+                       selrecord(p, &mq->wsel);
+               }
+       }
+       rw_exit_write(&mq->lock);
+       return (revents);
+}
+
+void
+mqpollwakeup(struct mq *mq)
+{
+       if ((mq->pflag & MQ_RSEL) != 0) {
+               mq->pflag &= ~MQ_RSEL;
+               selwakeup(&mq->rsel);
+       }
+       if ((mq->pflag & MQ_WSEL) != 0) {
+               mq->pflag &= ~MQ_WSEL;
+               selwakeup(&mq->wsel);
+       }
+}
+
+int
+mqkqfilter(struct file *fp, struct knote *kn)
+{
+       return (EOPNOTSUPP);
+}
+
+int
+mqstat(struct file *fp, struct stat *ub, struct proc *p)
+{
+       return (EOPNOTSUPP);
+}
+
+int
+mqclose(struct file *fp, struct proc *p)
+{
+       struct mq *mq;
+       int release = 0;
+
+       mq = fp->f_data;
+       rw_enter_write(&mq->lock);
+       if (--mq->refcnt == 0 && (mq->attr.mq_flags & MQ_DYING) != 0)
+               release = 1;
+       rw_exit_write(&mq->lock);
+       if (release != 0)
+               mqrelease(mq);
+       return (0);
+}
+
+/*
+ * Require one slash at the start as per POSIX.
+ * Disallow any further slashes.
+ */
+int
+mqnamechk(char *name)
+{
+       if (*name++ != '/')
+               return (EINVAL);
+       while (*name && *name != '/')
+               name++;
+       return *name ? (EINVAL) : (0);
+}
+
+mqd_t
+sys_mq_open(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_open_args /* {
+               syscallarg(const char *) name;
+               syscallarg(int) oflag;
+               syscallarg(mode_t) mode;
+               syscallarg(struct mq_attr *) attr;
+       } */ *uap = v;
+       struct mq *mq, *newmq = NULL;
+       struct mq_attr attr;
+       struct filedesc *fdp = p->p_fd;
+       struct file *fp;
+       mode_t accmode;
+       char *name;
+       int oflag, fd, error;
+
+       oflag = SCARG(uap, oflag);
+       if ((oflag & O_CREAT) != 0) {
+               if (SCARG(uap, attr)) {
+                       error = copyin(SCARG(uap, attr), &attr, sizeof(attr));
+                       if (error != 0)
+                               return (error);
+                       if (attr.mq_maxmsg <= 0 || attr.mq_msgsize <= 0)
+                               return (EINVAL);
+                       if (attr.mq_maxmsg > MQ_MAX_MAXMSG ||
+                           attr.mq_msgsize > MQ_MAX_MSGSIZE)
+                               return (ENOSPC);
+                       attr.mq_curmsgs = 0;
+               } else {
+                       memset(&attr, 0, sizeof(attr));
+                       attr.mq_maxmsg = MQ_DEF_MAXMSG;
+                       attr.mq_msgsize = MQ_DEF_MSGSIZE - sizeof(struct 
mq_msg);
+               }
+       }
+
+       name = malloc(MQ_NAMELEN, M_TEMP, M_WAITOK);
+       error = copyinstr(SCARG(uap, name), name, MQ_NAMELEN, NULL);
+       if (error != 0)
+               goto err0;
+
+       error = mqnamechk(name);
+       if (error != 0)
+               goto err0;
+
+       fdplock(fdp);
+       error = falloc(p, &fp, &fd);
+       if (error != 0)
+               goto err1;
+
+       fp->f_flag = FREAD;
+       if ((oflag & O_WRONLY) != 0 || (oflag & O_RDWR) != 0)
+               fp->f_flag |= FWRITE;
+       if ((oflag & O_NONBLOCK) != 0)
+               fp->f_flag |= FNONBLOCK;
+
+       rw_enter_write(&mqlistlock);
+       mq = mqlookup(name);
+       if (mq) {
+               rw_enter_write(&mq->lock);
+               if ((oflag & O_CREAT) != 0 && (oflag & O_EXCL) != 0) {
+                       error = EEXIST;
+                       goto err3;
+               }
+               accmode = VREAD;
+               if ((fp->f_flag & FWRITE) != 0)
+                       accmode |= VWRITE;
+               if (mqaccess(p->p_ucred, mq, accmode) != 0) {
+                       error = EACCES;
+                       goto err3;
+               }
+       } else {
+               if ((oflag & O_CREAT) == 0) {
+                       error = ENOENT;
+                       goto err2;
+               }
+               mq = mqalloc(name, oflag, SCARG(uap, mode), &attr, p);
+               newmq = mq;
+               rw_enter_write(&mq->lock);
+       }
+
+       fp->f_type = DTYPE_MQUEUE;
+       fp->f_data = mq;
+       fp->f_ops = &mqops;
+
+       fdp->fd_ofileflags[fd] |= UF_EXCLOSE;
+
+       FILE_SET_MATURE(fp, p);
+       fdpunlock(fdp);
+
+       mq->refcnt++;
+       if (newmq)
+               LIST_INSERT_HEAD(&mqlist, newmq, entry);
+       rw_exit_write(&mq->lock);
+       rw_exit_write(&mqlistlock);
+       free(name, M_TEMP, MQ_NAMELEN);
+
+       *retval = fd;
+       return (0);
+
+err3:
+       rw_exit_write(&mq->lock);
+err2:
+       rw_exit_write(&mqlistlock);
+       fdremove(fdp, fd);
+       closef(fp, p);
+err1:
+       fdpunlock(fdp);
+err0:
+       free(name, M_TEMP, MQ_NAMELEN);
+       return (error);
+}
+
+int
+sys_mq_unlink(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_unlink_args /* {
+               syscallarg(const char *) name;
+       } */ *uap = v;
+       struct mq *mq;
+       char *name;
+       int error;
+       int refcnt;
+
+       name = malloc(MQ_NAMELEN, M_TEMP, M_WAITOK);
+       error = copyinstr(SCARG(uap, name), name, MQ_NAMELEN, NULL);
+       if (error != 0)
+               goto err0;
+
+       rw_enter_write(&mqlistlock);
+       mq = mqlookup(name);
+       if (!mq) {
+               error = ENOENT;
+               goto err1;
+       }
+
+       rw_enter_write(&mq->lock);
+       if (mqaccess(p->p_ucred, mq, VWRITE) != 0) {
+               error = EACCES;
+               goto err2;
+       }
+
+       LIST_REMOVE(mq, entry);
+       mq->attr.mq_flags |= MQ_DYING;
+       refcnt = mq->refcnt;
+err2:
+       rw_exit_write(&mq->lock);
+err1:
+       rw_exit_write(&mqlistlock);
+err0:
+       free(name, M_TEMP, MQ_NAMELEN);
+
+       /*
+        * Release the queue if it doesn't have any
+        * active references.  If refcnt is > 0 at this
+        * point, the queue will be released in mqclose().
+        */
+       if (error == 0 && refcnt == 0)
+               mqrelease(mq);
+       return (error);
+}
+
+int
+domqsend(struct proc *p, mqd_t mqdes, const char *umsg_ptr, size_t msg_len,
+         unsigned int msg_prio, struct timespec *ats)
+{
+       struct mq *mq;
+       struct mq_msg *msg, *pos;
+       struct filedesc *fdp = p->p_fd;
+       struct file *fp;
+       int timo;
+       int error;
+
+       if (msg_prio >= MQ_PRIO_MAX)
+               return (EINVAL);
+
+       fp = fd_getfile(fdp, mqdes);
+       if (!fp || fp->f_type != DTYPE_MQUEUE)
+               return (EBADF);
+       FREF(fp);
+       mq = fp->f_data;
+
+       rw_enter_write(&mq->lock);
+       if ((fp->f_flag & FWRITE) == 0 ||
+           mqaccess(p->p_ucred, mq, VWRITE)) {
+               error = EBADF;
+               goto err1;
+       }
+
+       if (msg_len > mq->attr.mq_msgsize) {
+               error = EMSGSIZE;
+               goto err1;
+       }
+
+       /* Block if there is no space in the queue. */
+       while (mq->attr.mq_curmsgs == mq->attr.mq_maxmsg) {
+               rw_exit_write(&mq->lock);
+               if ((fp->f_flag & FNONBLOCK) != 0) {
+                       error = EWOULDBLOCK;
+                       goto err0;
+               }
+               timo = 0;
+               if (ats) {
+                       error = ts2timo(p, ats, &timo);
+                       if (error < 0)
+                               goto err0;
+               }
+               error = tsleep(&mq->wchan, PWAIT | PCATCH, "mqwchan", timo);
+               if (error != 0)
+                       goto err0;
+               /*
+                * `mq' is always valid here.  If there were references
+                * to the queue, then it couldn't have been released.  If
+                * it was released, we would never be able to wake up
+                * from tsleep() without an error.
+                */
+               rw_enter_write(&mq->lock);
+       }
+
+       msg = msgalloc(umsg_ptr, msg_len, msg_prio, &error);
+       if (!msg)
+               goto err1;
+
+       mq->attr.mq_curmsgs++;
+       TAILQ_FOREACH(pos, &mq->head, entry)
+               if (msg->prio > pos->prio)
+                       break;
+       if (!pos)
+               TAILQ_INSERT_TAIL(&mq->head, msg, entry);
+       else
+               TAILQ_INSERT_BEFORE(pos, msg, entry);
+
+       wakeup(&mq->rchan);
+       mqpollwakeup(mq);
+err1:
+       rw_exit_write(&mq->lock);
+err0:
+       FRELE(fp, p);
+       return (error);
+}
+
+int
+sys_mq_send(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_send_args /* {
+               syscallarg(mqd_t) mqdes;
+               syscallarg(const char *) msg_ptr;
+               syscallarg(size_t) msg_len;
+               syscallarg(unsigned int) msg_prio;
+       } */ *uap = v;
+
+       return domqsend(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr),
+                       SCARG(uap, msg_len), SCARG(uap, msg_prio), NULL);
+}
+
+int
+sys_mq_timedsend(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_timedsend_args /* {
+               syscallarg(mqd_t) mqdes;
+               syscallarg(const char *) msg_ptr;
+               syscallarg(size_t) msg_len;
+               syscallarg(unsigned int) msg_prio;
+               syscallarg(const struct timespec *) abstime;
+       } */ *uap = v;
+       struct timespec ts;
+       int error;
+
+       error = copyin(SCARG(uap, abstime), &ts, sizeof(ts));
+       if (error != 0)
+               return (error);
+       return domqsend(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr),
+                       SCARG(uap, msg_len), SCARG(uap, msg_prio), &ts);
+}
+
+int
+domqrecv(struct proc *p, mqd_t mqdes, char *umsg_ptr, size_t msg_len,
+         u_int *umsg_prio, struct timespec *ats, register_t *retval)
+{
+       struct mq *mq;
+       struct mq_msg *msg;
+       struct filedesc *fdp = p->p_fd;
+       struct file *fp;
+       int timo;
+       int error;
+
+       fp = fd_getfile(fdp, mqdes);
+       if (!fp || fp->f_type != DTYPE_MQUEUE)
+               return (EBADF);
+       FREF(fp);
+       mq = fp->f_data;
+
+       rw_enter_write(&mq->lock);
+       if (mqaccess(p->p_ucred, mq, VREAD)) {
+               error = EBADF;
+               goto err1;
+       }
+
+       if (msg_len < mq->attr.mq_msgsize) {
+               error = EMSGSIZE;
+               goto err1;
+       }
+
+       /* Block until there are messages to receive. */
+       while (mq->attr.mq_curmsgs == 0) {
+               rw_exit_write(&mq->lock);
+               if ((fp->f_flag & FNONBLOCK) != 0) {
+                       error = EWOULDBLOCK;
+                       goto err0;
+               }
+               timo = 0;
+               if (ats) {
+                       error = ts2timo(p, ats, &timo);
+                       if (error < 0)
+                               goto err0;
+               }
+               error = tsleep(&mq->rchan, PWAIT | PCATCH, "mqrchan", timo);
+               if (error != 0)
+                       goto err0;
+               /*
+                * `mq' is always valid here.  If there were references
+                * to the queue, then it couldn't have been released.  If
+                * it was released, we would never be able to wake up
+                * from tsleep() without an error.
+                */
+               rw_enter_write(&mq->lock);
+       }
+
+       msg = TAILQ_FIRST(&mq->head);
+       error = copyout(msg->data, umsg_ptr, msg->len);
+       if (error != 0)
+               goto err1;
+
+       if (umsg_prio) {
+               error = copyout(&msg->prio, umsg_prio, sizeof(msg->prio));
+               if (error != 0)
+                       goto err1;
+       }
+
+       *retval = msg->len;
+       mq->attr.mq_curmsgs--;
+       TAILQ_REMOVE(&mq->head, msg, entry);
+       msgrelease(msg);
+
+       wakeup(&mq->wchan);
+       mqpollwakeup(mq);
+err1:
+       rw_exit_write(&mq->lock);
+err0:
+       FRELE(fp, p);
+       return (error);
+}
+
+int
+sys_mq_receive(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_receive_args /* {
+               syscallarg(mqd_t) mqdes;
+               syscallarg(char *) msg_ptr;
+               syscallarg(size_t) msg_len;
+               syscallarg(unsigned int *) msg_prio;
+       } */ *uap = v;
+
+       return domqrecv(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr),
+                       SCARG(uap, msg_len), SCARG(uap, msg_prio),
+                       NULL, retval);
+}
+
+int
+sys_mq_timedreceive(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_timedreceive_args /* {
+               syscallarg(mqd_t) mqdes;
+               syscallarg(char *) msg_ptr;
+               syscallarg(size_t) msg_len;
+               syscallarg(unsigned int *) msg_prio;
+               syscallarg(const struct timespec *) abstime;
+       } */ *uap = v;
+       struct timespec ts;
+       int error;
+
+       error = copyin(SCARG(uap, abstime), &ts, sizeof(ts));
+       if (error != 0)
+               return (error);
+       return domqrecv(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr),
+                       SCARG(uap, msg_len), SCARG(uap, msg_prio),
+                       &ts, retval);
+}
+
+int
+sys_mq_setattr(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_setattr_args /* {
+               syscallarg(mqd_t) mqdes;
+               syscallarg(const struct mq_attr *) mqstat;
+               syscallarg(struct mq_attr *) omqstat;
+       } */ *uap = v;
+       struct mq *mq;
+       struct mq_attr attr, oldattr;
+       struct filedesc *fdp = p->p_fd;
+       struct file *fp;
+       int error;
+
+       error = copyin(SCARG(uap, mqstat), &attr, sizeof(attr));
+       if (error != 0)
+               return (error);
+
+       fp = fd_getfile(fdp, SCARG(uap, mqdes));
+       if (!fp || fp->f_type != DTYPE_MQUEUE)
+               return (EBADF);
+       FREF(fp);
+       mq = fp->f_data;
+
+       rw_enter_write(&mq->lock);
+       /* Copy out old attributes if needed */
+       if (SCARG(uap, omqstat)) {
+               memcpy(&oldattr, &mq->attr, sizeof(oldattr));
+               oldattr.mq_flags &= (fp->f_flag & FNONBLOCK);
+               error = copyout(&oldattr, SCARG(uap, omqstat),
+                               sizeof(oldattr));
+               if (error != 0)
+                       goto err0;
+       }
+       if ((attr.mq_flags & O_NONBLOCK) != 0)
+               fp->f_flag |= FNONBLOCK;
+       else
+               fp->f_flag &= ~FNONBLOCK;
+
+err0:
+       rw_exit_write(&mq->lock);
+       FRELE(fp, p);
+       return (error);
+}
+
+int
+sys_mq_getattr(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_getattr_args /* {
+               syscallarg(mqd_t) mqdes;
+               syscallarg(struct mq_attr *) mqstat;
+       } */ *uap = v;
+       struct mq *mq;
+       struct mq_attr attr;
+       struct filedesc *fdp = p->p_fd;
+       struct file *fp;
+
+       fp = fd_getfile(fdp, SCARG(uap, mqdes));
+       if (!fp || fp->f_type != DTYPE_MQUEUE)
+               return (EBADF);
+       FREF(fp);
+       mq = fp->f_data;
+
+       rw_enter_write(&mq->lock);
+       memcpy(&attr, &mq->attr, sizeof(attr));
+       rw_exit_write(&mq->lock);
+       FRELE(fp, p);
+       attr.mq_flags &= (fp->f_flag & FNONBLOCK);
+       return copyout(&attr, SCARG(uap, mqstat), sizeof(attr));
+}
diff --git a/kern/syscalls.master b/kern/syscalls.master
index 4dca927..0fda6cc 100644
--- a/kern/syscalls.master
+++ b/kern/syscalls.master
@@ -561,3 +561,17 @@
 328    OBSOL           __tfork51
 329    STD NOLOCK      { void sys___set_tcb(void *tcb); }
 330    STD NOLOCK      { void *sys___get_tcb(void); }
+331    STD             { mqd_t sys_mq_open(const char *name, int oflag, \
+                           mode_t mode, struct mq_attr *attr); }
+332    STD             { int sys_mq_unlink(const char *name); }
+333    STD             { int sys_mq_send(mqd_t mqdes, const char *msg_ptr, 
size_t msg_len, \
+                           unsigned int msg_prio); }
+334    STD             { ssize_t sys_mq_receive(mqd_t mqdes, char *msg_ptr, 
size_t msg_len, \
+                           unsigned int *msg_prio); }
+335    STD             { int sys_mq_timedsend(mqd_t mqdes, const char 
*msg_ptr, size_t msg_len, \
+                           unsigned int msg_prio, const struct timespec 
*abstime); }
+336    STD             { ssize_t sys_mq_timedreceive(mqd_t mqdes, char 
*msg_ptr, size_t msg_len, \
+                           unsigned int *msg_prio, const struct timespec 
*abstime); }
+337    STD             { int sys_mq_setattr(mqd_t mqdes, const struct mq_attr 
*mqstat, \
+                           struct mq_attr *omqstat); }
+338    STD             { int sys_mq_getattr(mqd_t mqdes, struct mq_attr 
*mqstat); }
diff --git a/sys/_types.h b/sys/_types.h
index e058674..a384295 100644
--- a/sys/_types.h
+++ b/sys/_types.h
@@ -51,6 +51,7 @@ typedef       __uint32_t      __in_addr_t;    /* base type 
for internet address */
 typedef        __uint16_t      __in_port_t;    /* IP port type */
 typedef        __uint64_t      __ino_t;        /* inode number */
 typedef        long            __key_t;        /* IPC key (for Sys V IPC) */
+typedef        int             __mqd_t;        /* POSIX realtime message queue 
descriptor */
 typedef        __uint32_t      __mode_t;       /* permissions */
 typedef        __uint32_t      __nlink_t;      /* link count */
 typedef        __int64_t       __off_t;        /* file offset or size */
diff --git a/sys/file.h b/sys/file.h
index 85f900a..ef488a7 100644
--- a/sys/file.h
+++ b/sys/file.h
@@ -70,6 +70,7 @@ struct file {
 #define        DTYPE_KQUEUE    4       /* event queue */
 /* was define  DTYPE_CRYPTO    5 */
 #define        DTYPE_SYSTRACE  6       /* system call tracing */
+#define        DTYPE_MQUEUE    7       /* message queue */
        short   f_type;         /* descriptor type */
        long    f_count;        /* reference count */
        struct  ucred *f_cred;  /* credentials associated with descriptor */
diff --git a/sys/mqueue.h b/sys/mqueue.h
new file mode 100644
index 0000000..5bdeba4
--- /dev/null
+++ b/sys/mqueue.h
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2015 Dimitris Papastamos <[email protected]>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#ifndef _SYS_MQUEUE_H_
+#define _SYS_MQUEUE_H_
+
+#define MQ_PRIO_MAX 32
+
+struct mq_attr {
+       long mq_flags;          /* only 0 and O_NONBLOCK are valid */
+       long mq_maxmsg;         /* maximum number of messages */
+       long mq_msgsize;        /* maximum size of message */
+       long mq_curmsgs;        /* current number of messages queued */
+};
+
+#ifdef _KERNEL
+void mqinit(void);
+#endif /* _KERNEL */
+
+#endif /* _SYS_MQUEUE_H_ */
diff --git a/sys/syslimits.h b/sys/syslimits.h
index fdf5dc9..29a969a 100644
--- a/sys/syslimits.h
+++ b/sys/syslimits.h
@@ -41,6 +41,8 @@
 #define        MAX_CANON                 255   /* max bytes in term canon 
input line */
 #define        MAX_INPUT                 255   /* max bytes in terminal input 
*/
 #define        NAME_MAX                  255   /* max bytes in a file name */
+#define        MQ_OPEN_MAX                64   /* max open message queues per 
process */
+#define        MQ_PRIO_MAX                32   /* max message priorities */
 #define        NGROUPS_MAX                16   /* max supplemental group id's 
*/
 #define        OPEN_MAX                   64   /* max open files per process */
 #define        PATH_MAX                 1024   /* max bytes in pathname */
diff --git a/sys/types.h b/sys/types.h
index ceb7de5..c618ffb 100644
--- a/sys/types.h
+++ b/sys/types.h
@@ -141,6 +141,7 @@ typedef     __gid_t         gid_t;          /* group id */
 typedef        __id_t          id_t;           /* may contain pid, uid or gid 
*/
 typedef        __ino_t         ino_t;          /* inode number */
 typedef        __key_t         key_t;          /* IPC key (for Sys V IPC) */
+typedef        __mqd_t         mqd_t;          /* POSIX realtime message queue 
descriptor */
 typedef        __mode_t        mode_t;         /* permissions */
 typedef        __nlink_t       nlink_t;        /* link count */
 typedef        __rlim_t        rlim_t;         /* resource limit */
-- 
2.5.2

Reply via email to