> -----Original Message----- > From: Qemu-devel <qemu-devel- > bounces+thanos.makatos=nutanix....@nongnu.org> On Behalf Of Thanos > Makatos > Sent: 03 February 2022 21:54 > To: John Johnson <john.g.john...@oracle.com>; qemu-devel@nongnu.org > Subject: RE: [RFC v4 08/21] vfio-user: define socket receive functions > > > > > -----Original Message----- > > From: Qemu-devel <qemu-devel- > > bounces+thanos.makatos=nutanix....@nongnu.org> On Behalf Of John > > Johnson > > Sent: 12 January 2022 00:44 > > To: qemu-devel@nongnu.org > > Subject: [RFC v4 08/21] vfio-user: define socket receive functions > > > > Add infrastructure needed to receive incoming messages > > > > Signed-off-by: John G Johnson <john.g.john...@oracle.com> > > Signed-off-by: Elena Ufimtseva <elena.ufimts...@oracle.com> > > Signed-off-by: Jagannathan Raman <jag.ra...@oracle.com> > > --- > > hw/vfio/user-protocol.h | 54 ++++++++ > > hw/vfio/user.h | 6 + > > hw/vfio/pci.c | 6 + > > hw/vfio/user.c | 327 > > ++++++++++++++++++++++++++++++++++++++++++++++++ > > MAINTAINERS | 1 + > > 5 files changed, 394 insertions(+) > > create mode 100644 hw/vfio/user-protocol.h > > > > diff --git a/hw/vfio/user-protocol.h b/hw/vfio/user-protocol.h > > new file mode 100644 > > index 0000000..d23877c > > --- /dev/null > > +++ b/hw/vfio/user-protocol.h > > @@ -0,0 +1,54 @@ > > +#ifndef VFIO_USER_PROTOCOL_H > > +#define VFIO_USER_PROTOCOL_H > > + > > +/* > > + * vfio protocol over a UNIX socket. > > + * > > + * Copyright © 2018, 2021 Oracle and/or its affiliates. > > + * > > + * This work is licensed under the terms of the GNU GPL, version 2. See > > + * the COPYING file in the top-level directory. > > + * > > + * Each message has a standard header that describes the command > > + * being sent, which is almost always a VFIO ioctl(). > > + * > > + * The header may be followed by command-specific data, such as the > > + * region and offset info for read and write commands. > > + */ > > + > > +typedef struct { > > + uint16_t id; > > + uint16_t command; > > + uint32_t size; > > + uint32_t flags; > > + uint32_t error_reply; > > +} VFIOUserHdr; > > + > > +/* VFIOUserHdr commands */ > > +enum vfio_user_command { > > + VFIO_USER_VERSION = 1, > > + VFIO_USER_DMA_MAP = 2, > > + VFIO_USER_DMA_UNMAP = 3, > > + VFIO_USER_DEVICE_GET_INFO = 4, > > + VFIO_USER_DEVICE_GET_REGION_INFO = 5, > > + VFIO_USER_DEVICE_GET_REGION_IO_FDS = 6, > > + VFIO_USER_DEVICE_GET_IRQ_INFO = 7, > > + VFIO_USER_DEVICE_SET_IRQS = 8, > > + VFIO_USER_REGION_READ = 9, > > + VFIO_USER_REGION_WRITE = 10, > > + VFIO_USER_DMA_READ = 11, > > + VFIO_USER_DMA_WRITE = 12, > > + VFIO_USER_DEVICE_RESET = 13, > > + VFIO_USER_DIRTY_PAGES = 14, > > + VFIO_USER_MAX, > > +}; > > + > > +/* VFIOUserHdr flags */ > > +#define VFIO_USER_REQUEST 0x0 > > +#define VFIO_USER_REPLY 0x1 > > +#define VFIO_USER_TYPE 0xF > > + > > +#define VFIO_USER_NO_REPLY 0x10 > > +#define VFIO_USER_ERROR 0x20 > > + > > +#endif /* VFIO_USER_PROTOCOL_H */ > > diff --git a/hw/vfio/user.h b/hw/vfio/user.h > > index da92862..72eefa7 100644 > > --- a/hw/vfio/user.h > > +++ b/hw/vfio/user.h > > @@ -11,6 +11,8 @@ > > * > > */ > > > > +#include "user-protocol.h" > > + > > typedef struct { > > int send_fds; > > int recv_fds; > > @@ -27,6 +29,7 @@ enum msg_type { > > > > typedef struct VFIOUserMsg { > > QTAILQ_ENTRY(VFIOUserMsg) next; > > + VFIOUserHdr *hdr; > > VFIOUserFDs *fds; > > uint32_t rsize; > > uint32_t id; > > @@ -74,5 +77,8 @@ typedef struct VFIOProxy { > > > > VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp); > > void vfio_user_disconnect(VFIOProxy *proxy); > > +void vfio_user_set_handler(VFIODevice *vbasedev, > > + void (*handler)(void *opaque, VFIOUserMsg *msg), > > + void *reqarg); > > > > #endif /* VFIO_USER_H */ > > diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c > > index 9fd7c07..0de915d 100644 > > --- a/hw/vfio/pci.c > > +++ b/hw/vfio/pci.c > > @@ -3386,6 +3386,11 @@ type_init(register_vfio_pci_dev_type) > > * vfio-user routines. > > */ > > > > +static void vfio_user_pci_process_req(void *opaque, VFIOUserMsg *msg) > > +{ > > + > > +} > > + > > /* > > * Emulated devices don't use host hot reset > > */ > > @@ -3432,6 +3437,7 @@ static void vfio_user_pci_realize(PCIDevice *pdev, > > Error **errp) > > return; > > } > > vbasedev->proxy = proxy; > > + vfio_user_set_handler(vbasedev, vfio_user_pci_process_req, vdev); > > > > vbasedev->name = g_strdup_printf("VFIO user <%s>", udev->sock_name); > > vbasedev->dev = DEVICE(vdev); > > diff --git a/hw/vfio/user.c b/hw/vfio/user.c > > index c843f90..e1dfd5d 100644 > > --- a/hw/vfio/user.c > > +++ b/hw/vfio/user.c > > @@ -25,10 +25,26 @@ > > #include "sysemu/iothread.h" > > #include "user.h" > > > > +static uint64_t max_xfer_size; > > static IOThread *vfio_user_iothread; > > > > static void vfio_user_shutdown(VFIOProxy *proxy); > > +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr > *hdr, > > + VFIOUserFDs *fds); > > +static VFIOUserFDs *vfio_user_getfds(int numfds); > > +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg); > > > > +static void vfio_user_recv(void *opaque); > > +static int vfio_user_recv_one(VFIOProxy *proxy); > > +static void vfio_user_cb(void *opaque); > > + > > +static void vfio_user_request(void *opaque); > > + > > +static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err) > > +{ > > + hdr->flags |= VFIO_USER_ERROR; > > + hdr->error_reply = err; > > +} > > > > /* > > * Functions called by main, CPU, or iothread threads > > @@ -40,10 +56,261 @@ static void vfio_user_shutdown(VFIOProxy *proxy) > > qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL, NULL, > NULL); > > } > > > > +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr > *hdr, > > + VFIOUserFDs *fds) > > +{ > > + VFIOUserMsg *msg; > > + > > + msg = QTAILQ_FIRST(&proxy->free); > > + if (msg != NULL) { > > + QTAILQ_REMOVE(&proxy->free, msg, next); > > + } else { > > + msg = g_malloc0(sizeof(*msg)); > > + qemu_cond_init(&msg->cv); > > + } > > + > > + msg->hdr = hdr; > > + msg->fds = fds; > > + return msg; > > +} > > + > > +/* > > + * Recycle a message list entry to the free list. > > + */ > > +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg) > > +{ > > + if (msg->type == VFIO_MSG_NONE) { > > + error_printf("vfio_user_recycle - freeing free msg\n"); > > + return; > > + } > > + > > + /* free msg buffer if no one is waiting to consume the reply */ > > + if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) > { > > + g_free(msg->hdr); > > + if (msg->fds != NULL) { > > + g_free(msg->fds); > > + } > > + } > > + > > + msg->type = VFIO_MSG_NONE; > > + msg->hdr = NULL; > > + msg->fds = NULL; > > + msg->complete = false; > > + QTAILQ_INSERT_HEAD(&proxy->free, msg, next); > > +} > > + > > +static VFIOUserFDs *vfio_user_getfds(int numfds) > > +{ > > + VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int))); > > + > > + fds->fds = (int *)((char *)fds + sizeof(*fds)); > > + > > + return fds; > > +} > > + > > /* > > * Functions only called by iothread > > */ > > > > +static void vfio_user_recv(void *opaque) > > +{ > > + VFIOProxy *proxy = opaque; > > + > > + QEMU_LOCK_GUARD(&proxy->lock); > > + > > + if (proxy->state == VFIO_PROXY_CONNECTED) { > > + while (vfio_user_recv_one(proxy) == 0) { > > + ; > > + } > > + } > > +} > > + > > +/* > > + * Receive and process one incoming message. > > + * > > + * For replies, find matching outgoing request and wake any waiters. > > + * For requests, queue in incoming list and run request BH. > > + */ > > +static int vfio_user_recv_one(VFIOProxy *proxy) > > +{ > > + VFIOUserMsg *msg = NULL; > > + g_autofree int *fdp = NULL; > > + VFIOUserFDs *reqfds; > > + VFIOUserHdr hdr; > > + struct iovec iov = { > > + .iov_base = &hdr, > > + .iov_len = sizeof(hdr), > > + }; > > + bool isreply = false; > > + int i, ret; > > + size_t msgleft, numfds = 0; > > + char *data = NULL; > > + char *buf = NULL; > > + Error *local_err = NULL; > > + > > + /* > > + * Read header > > + */ > > + ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, > > + &local_err); > > + if (ret == QIO_CHANNEL_ERR_BLOCK) { > > + return ret; > > + } > > + if (ret <= 0) { > > + /* read error or other side closed connection */ > > + if (ret == 0) { > > + error_setg(&local_err, "vfio_user_recv server closed socket"); > > + } else { > > + error_prepend(&local_err, "vfio_user_recv"); > > + } > > + goto fatal; > > + } > > + if (ret < sizeof(msg)) { > > + error_setg(&local_err, "vfio_user_recv short read of header"); > > + goto fatal; > > + } > > + > > + /* > > + * Validate header > > + */ > > + if (hdr.size < sizeof(VFIOUserHdr)) { > > + error_setg(&local_err, "vfio_user_recv bad header size"); > > + goto fatal; > > + } > > + switch (hdr.flags & VFIO_USER_TYPE) { > > + case VFIO_USER_REQUEST: > > + isreply = false; > > + break; > > + case VFIO_USER_REPLY: > > + isreply = true; > > + break; > > + default: > > + error_setg(&local_err, "vfio_user_recv unknown message type"); > > + goto fatal; > > + } > > + > > + /* > > + * For replies, find the matching pending request. > > + * For requests, reap incoming FDs. > > + */ > > + if (isreply) { > > + QTAILQ_FOREACH(msg, &proxy->pending, next) { > > + if (hdr.id == msg->id) { > > + break; > > + } > > + } > > + if (msg == NULL) { > > + error_setg(&local_err, "vfio_user_recv unexpected reply"); > > + goto err; > > + } > > + QTAILQ_REMOVE(&proxy->pending, msg, next); > > + > > + /* > > + * Process any received FDs > > + */ > > + if (numfds != 0) { > > + if (msg->fds == NULL || msg->fds->recv_fds < numfds) { > > + error_setg(&local_err, "vfio_user_recv unexpected FDs"); > > + goto err; > > + } > > + msg->fds->recv_fds = numfds; > > + memcpy(msg->fds->fds, fdp, numfds * sizeof(int)); > > + } > > + } else { > > + if (numfds != 0) { > > + reqfds = vfio_user_getfds(numfds); > > + memcpy(reqfds->fds, fdp, numfds * sizeof(int)); > > + } else { > > + reqfds = NULL; > > + } > > + } > > + > > + /* > > + * Put the whole message into a single buffer. > > + */ > > + if (isreply) { > > + if (hdr.size > msg->rsize) { > > + error_setg(&local_err, > > + "vfio_user_recv reply larger than recv buffer"); > > + goto err; > > + } > > + *msg->hdr = hdr; > > + data = (char *)msg->hdr + sizeof(hdr); > > + } else { > > + if (hdr.size > max_xfer_size) { > > + error_setg(&local_err, "vfio_user_recv request larger than > > max"); > > + goto err; > > + } > > + buf = g_malloc0(hdr.size); > > + memcpy(buf, &hdr, sizeof(hdr)); > > + data = buf + sizeof(hdr); > > + msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds); > > + msg->type = VFIO_MSG_REQ; > > + } > > + > > + msgleft = hdr.size - sizeof(hdr); > > + while (msgleft > 0) { > > + ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err); > > + > > + /* error or would block */ > > + if (ret < 0) { > > + goto fatal; > > + } > > IIUC qio_channel_read() ends up calling qio_channel_socket_readv() which can > return QIO_CHANNEL_ERR_BLOCK (-2). The if will be taken so local_err is NULL > and that causes a segfault when error_report_err(local_err) is called before > returning from this function.
In fact, don't we need to continue if qio_channel_read() returns QIO_CHANNEL_ERR_BLOCK and only fail if it returns -1? > > > + > > + msgleft -= ret; > > + data += ret; > > + } > > + > > + /* > > + * Replies signal a waiter, if none just check for errors > > + * and free the message buffer. > > + * > > + * Requests get queued for the BH. > > + */ > > + if (isreply) { > > + msg->complete = true; > > + if (msg->type == VFIO_MSG_WAIT) { > > + qemu_cond_signal(&msg->cv); > > + } else { > > + if (hdr.flags & VFIO_USER_ERROR) { > > + error_printf("vfio_user_rcv error reply on async request > > "); > > + error_printf("command %x error %s\n", hdr.command, > > + strerror(hdr.error_reply)); > > + } > > + /* youngest nowait msg has been ack'd */ > > + if (proxy->last_nowait == msg) { > > + proxy->last_nowait = NULL; > > + } > > + vfio_user_recycle(proxy, msg); > > + } > > + } else { > > + QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next); > > + qemu_bh_schedule(proxy->req_bh); > > + } > > + return 0; > > + > > + /* > > + * fatal means the other side closed or we don't trust the stream > > + * err means this message is corrupt > > + */ > > +fatal: > > + vfio_user_shutdown(proxy); > > + proxy->state = VFIO_PROXY_ERROR; > > + > > +err: > > + for (i = 0; i < numfds; i++) { > > + close(fdp[i]); > > + } > > + if (isreply && msg != NULL) { > > + /* force an error to keep sending thread from hanging */ > > + vfio_user_set_error(msg->hdr, EINVAL); > > + msg->complete = true; > > + qemu_cond_signal(&msg->cv); > > + } > > + error_report_err(local_err); > > + return -1; > > +} > > + > > static void vfio_user_cb(void *opaque) > > { > > VFIOProxy *proxy = opaque; > > @@ -59,6 +326,51 @@ static void vfio_user_cb(void *opaque) > > * Functions called by main or CPU threads > > */ > > > > +/* > > + * Process incoming requests. > > + * > > + * The bus-specific callback has the form: > > + * request(opaque, msg) > > + * where 'opaque' was specified in vfio_user_set_handler > > + * and 'msg' is the inbound message. > > + * > > + * The callback is responsible for disposing of the message buffer, > > + * usually by re-using it when calling vfio_send_reply or vfio_send_error, > > + * both of which free their message buffer when the reply is sent. > > + * > > + * If the callback uses a new buffer, it needs to free the old one. > > + */ > > +static void vfio_user_request(void *opaque) > > +{ > > + VFIOProxy *proxy = opaque; > > + VFIOUserMsgQ new, free; > > + VFIOUserMsg *msg, *m1; > > + > > + /* reap all incoming */ > > + QTAILQ_INIT(&new); > > + WITH_QEMU_LOCK_GUARD(&proxy->lock) { > > + QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) { > > + QTAILQ_REMOVE(&proxy->pending, msg, next); > > + QTAILQ_INSERT_TAIL(&new, msg, next); > > + } > > + } > > + > > + /* process list */ > > + QTAILQ_INIT(&free); > > + QTAILQ_FOREACH_SAFE(msg, &new, next, m1) { > > + QTAILQ_REMOVE(&new, msg, next); > > + proxy->request(proxy->req_arg, msg); > > + QTAILQ_INSERT_HEAD(&free, msg, next); > > + } > > + > > + /* free list */ > > + WITH_QEMU_LOCK_GUARD(&proxy->lock) { > > + QTAILQ_FOREACH_SAFE(msg, &free, next, m1) { > > + vfio_user_recycle(proxy, msg); > > + } > > + } > > +} > > + > > static QLIST_HEAD(, VFIOProxy) vfio_user_sockets = > > QLIST_HEAD_INITIALIZER(vfio_user_sockets); > > > > @@ -97,6 +409,7 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress > > *addr, Error **errp) > > } > > > > proxy->ctx = iothread_get_aio_context(vfio_user_iothread); > > + proxy->req_bh = qemu_bh_new(vfio_user_request, proxy); > > > > QTAILQ_INIT(&proxy->outgoing); > > QTAILQ_INIT(&proxy->incoming); > > @@ -107,6 +420,18 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress > > *addr, Error **errp) > > return proxy; > > } > > > > +void vfio_user_set_handler(VFIODevice *vbasedev, > > + void (*handler)(void *opaque, VFIOUserMsg *msg), > > + void *req_arg) > > +{ > > + VFIOProxy *proxy = vbasedev->proxy; > > + > > + proxy->request = handler; > > + proxy->req_arg = req_arg; > > + qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, > > + vfio_user_recv, NULL, proxy); > > +} > > + > > void vfio_user_disconnect(VFIOProxy *proxy) > > { > > VFIOUserMsg *r1, *r2; > > @@ -122,6 +447,8 @@ void vfio_user_disconnect(VFIOProxy *proxy) > > } > > object_unref(OBJECT(proxy->ioc)); > > proxy->ioc = NULL; > > + qemu_bh_delete(proxy->req_bh); > > + proxy->req_bh = NULL; > > > > proxy->state = VFIO_PROXY_CLOSING; > > QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) { > > diff --git a/MAINTAINERS b/MAINTAINERS > > index cfaccbf..bc0ba88 100644 > > --- a/MAINTAINERS > > +++ b/MAINTAINERS > > @@ -1909,6 +1909,7 @@ S: Supported > > F: docs/devel/vfio-user.rst > > F: hw/vfio/user.c > > F: hw/vfio/user.h > > +F: hw/vfio/user-protocol.h > > > > vhost > > M: Michael S. Tsirkin <m...@redhat.com> > > -- > > 1.8.3.1 > >