> On Aug 24, 2021, at 8:14 AM, Stefan Hajnoczi <stefa...@redhat.com> wrote: > > On Mon, Aug 16, 2021 at 09:42:38AM -0700, Elena Ufimtseva wrote: >> @@ -62,5 +65,10 @@ typedef struct VFIOProxy { >> >> VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp); >> void vfio_user_disconnect(VFIOProxy *proxy); >> +void vfio_user_set_reqhandler(VFIODevice *vbasdev, > > "vbasedev" for consistency? >
OK >> + int (*handler)(void *opaque, char *buf, >> + VFIOUserFDs *fds), >> + void *reqarg); > > The handler callback is undocumented. What context does it run in, what > do the arguments mean, and what should the function return? Please > document it so it's easy for others to modify this code in the future > without reverse-engineering the assumptions behind it. > OK >> +void vfio_user_recv(void *opaque) >> +{ >> + VFIODevice *vbasedev = opaque; >> + VFIOProxy *proxy = vbasedev->proxy; >> + VFIOUserReply *reply = NULL; >> + g_autofree int *fdp = NULL; >> + VFIOUserFDs reqfds = { 0, 0, fdp }; >> + VFIOUserHdr msg; >> + struct iovec iov = { >> + .iov_base = &msg, >> + .iov_len = sizeof(msg), >> + }; >> + bool isreply; >> + int i, ret; >> + size_t msgleft, numfds = 0; >> + char *data = NULL; >> + g_autofree char *buf = NULL; >> + Error *local_err = NULL; >> + >> + qemu_mutex_lock(&proxy->lock); >> + if (proxy->state == VFIO_PROXY_CLOSING) { >> + qemu_mutex_unlock(&proxy->lock); >> + return; >> + } >> + >> + ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, >> + &local_err); > > This is a blocking call. My understanding is that the IOThread is shared > by all vfio-user devices, so other devices will have to wait if one of > them is acting up (e.g. the device emulation process sent less than > sizeof(msg) bytes). > > While we're blocked in this function the proxy device cannot be > hot-removed since proxy->lock is held. > > It would more robust to use of the event loop to avoid blocking. There > could be a per-connection receiver coroutine that calls > qio_channel_readv_full_all_eof() (it yields the coroutine if reading > would block). > I thought the main loop uses BQL, which I don’t need for most message processing. The blocking behavior can be fixed with FIONREAD beforehand to detect a message with fewer bytes than in a header. >> + /* >> + * Replies signal a waiter, requests get processed by vfio code >> + * that may assume the iothread lock is held. >> + */ >> + if (isreply) { >> + reply->complete = 1; >> + if (!reply->nowait) { >> + qemu_cond_signal(&reply->cv); >> + } else { >> + if (msg.flags & VFIO_USER_ERROR) { >> + error_printf("vfio_user_rcv error reply on async request "); >> + error_printf("command %x error %s\n", msg.command, >> + strerror(msg.error_reply)); >> + } >> + /* just free it if no one is waiting */ >> + reply->nowait = 0; >> + if (proxy->last_nowait == reply) { >> + proxy->last_nowait = NULL; >> + } >> + g_free(reply->msg); >> + QTAILQ_INSERT_HEAD(&proxy->free, reply, next); >> + } >> + qemu_mutex_unlock(&proxy->lock); >> + } else { >> + qemu_mutex_unlock(&proxy->lock); >> + qemu_mutex_lock_iothread(); > > The fact that proxy->request() runs with the BQL suggests that VFIO > communication should take place in the main event loop thread instead of > a separate IOThread. > See the last reply. Using the main event loop optimizes the least common case. >> + /* >> + * make sure proxy wasn't closed while we waited >> + * checking state without holding the proxy lock is safe >> + * since it's only set to CLOSING when BQL is held >> + */ >> + if (proxy->state != VFIO_PROXY_CLOSING) { >> + ret = proxy->request(proxy->reqarg, buf, &reqfds); > > The request() callback in an earlier patch is a noop for the client > implementation. Who frees passed fds? > Right now no server->client requests send fd’s, but I do need a single point where they are consumed if an error is returned. >> + if (ret < 0 && !(msg.flags & VFIO_USER_NO_REPLY)) { >> + vfio_user_send_reply(proxy, buf, ret); >> + } >> + } >> + qemu_mutex_unlock_iothread(); >> + } >> + return; >> + >> +fatal: >> + vfio_user_shutdown(proxy); >> + proxy->state = VFIO_PROXY_RECV_ERROR; >> + >> +err: >> + for (i = 0; i < numfds; i++) { >> + close(fdp[i]); >> + } >> + if (reply != NULL) { >> + /* force an error to keep sending thread from hanging */ >> + reply->msg->flags |= VFIO_USER_ERROR; >> + reply->msg->error_reply = EINVAL; >> + reply->complete = 1; >> + qemu_cond_signal(&reply->cv); > > What about fd passing? The actual fds have been closed already in fdp[] > but reply has a copy too. > If the sender gets an error, it won’t be using the fd’s. I can zero reply->fds to make this clearer. > What about the nowait case? If no one is waiting on reply->cv so this > reply will be leaked? > This looks like a leak. JJ