Hi Daniel,

> -----Original Message-----
> From: Daniel P. Berrangé [mailto:berra...@redhat.com]
> Sent: Friday, June 7, 2024 5:04 PM
> To: Gonglei (Arei) <arei.gong...@huawei.com>
> Cc: qemu-devel@nongnu.org; pet...@redhat.com; yu.zh...@ionos.com;
> mgal...@akamai.com; elmar.ger...@ionos.com; zhengchuan
> <zhengch...@huawei.com>; arm...@redhat.com; lizhij...@fujitsu.com;
> pbonz...@redhat.com; m...@redhat.com; Xiexiangyou
> <xiexiang...@huawei.com>; linux-r...@vger.kernel.org; lixiao (H)
> <lixia...@huawei.com>; jinpu.w...@ionos.com; Wangjialin
> <wangjiali...@huawei.com>
> Subject: Re: [PATCH 3/6] io/channel-rdma: support working in coroutine
> 
> On Tue, Jun 04, 2024 at 08:14:09PM +0800, Gonglei wrote:
> > From: Jialin Wang <wangjiali...@huawei.com>
> >
> > It is not feasible to obtain RDMA completion queue notifications
> > through poll/ppoll on the rsocket fd. Therefore, we create a thread
> > named rpoller for each rsocket fd and two eventfds: pollin_eventfd and
> > pollout_eventfd.
> >
> > When using io_create_watch or io_set_aio_fd_handler waits for POLLIN
> > or POLLOUT events, it will actually poll/ppoll on the pollin_eventfd
> > and pollout_eventfd instead of the rsocket fd.
> >
> > The rpoller rpoll() on the rsocket fd to receive POLLIN and POLLOUT
> > events.
> > When a POLLIN event occurs, the rpoller write the pollin_eventfd, and
> > then poll/ppoll will return the POLLIN event.
> > When a POLLOUT event occurs, the rpoller read the pollout_eventfd, and
> > then poll/ppoll will return the POLLOUT event.
> >
> > For a non-blocking rsocket fd, if rread/rwrite returns EAGAIN, it will
> > read/write the pollin/pollout_eventfd, preventing poll/ppoll from
> > returning POLLIN/POLLOUT events.
> >
> > Known limitations:
> >
> >   For a blocking rsocket fd, if we use io_create_watch to wait for
> >   POLLIN or POLLOUT events, since the rsocket fd is blocking, we
> >   cannot determine when it is not ready to read/write as we can with
> >   non-blocking fds. Therefore, when an event occurs, it will occurs
> >   always, potentially leave the qemu hanging. So we need be cautious
> >   to avoid hanging when using io_create_watch .
> >
> > Luckily, channel-rdma works well in coroutines :)
> >
> > Signed-off-by: Jialin Wang <wangjiali...@huawei.com>
> > Signed-off-by: Gonglei <arei.gong...@huawei.com>
> > ---
> >  include/io/channel-rdma.h |  15 +-
> >  io/channel-rdma.c         | 363
> +++++++++++++++++++++++++++++++++++++-
> >  2 files changed, 376 insertions(+), 2 deletions(-)
> >
> > diff --git a/include/io/channel-rdma.h b/include/io/channel-rdma.h
> > index 8cab2459e5..cb56127d76 100644
> > --- a/include/io/channel-rdma.h
> > +++ b/include/io/channel-rdma.h
> > @@ -47,6 +47,18 @@ struct QIOChannelRDMA {
> >      socklen_t localAddrLen;
> >      struct sockaddr_storage remoteAddr;
> >      socklen_t remoteAddrLen;
> > +
> > +    /* private */
> > +
> > +    /* qemu g_poll/ppoll() POLLIN event on it */
> > +    int pollin_eventfd;
> > +    /* qemu g_poll/ppoll() POLLOUT event on it */
> > +    int pollout_eventfd;
> > +
> > +    /* the index in the rpoller's fds array */
> > +    int index;
> > +    /* rpoller will rpoll() rpoll_events on the rsocket fd */
> > +    short int rpoll_events;
> >  };
> >
> >  /**
> > @@ -147,6 +159,7 @@ void
> qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress
> *addr,
> >   *
> >   * Returns: the new client channel, or NULL on error
> >   */
> > -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *ioc,
> Error
> > **errp);
> > +QIOChannelRDMA *coroutine_mixed_fn
> qio_channel_rdma_accept(QIOChannelRDMA *ioc,
> > +
> Error
> > +**errp);
> >
> >  #endif /* QIO_CHANNEL_RDMA_H */
> > diff --git a/io/channel-rdma.c b/io/channel-rdma.c index
> > 92c362df52..9792add5cf 100644
> > --- a/io/channel-rdma.c
> > +++ b/io/channel-rdma.c
> > @@ -23,10 +23,15 @@
> >
> >  #include "qemu/osdep.h"
> >  #include "io/channel-rdma.h"
> > +#include "io/channel-util.h"
> > +#include "io/channel-watch.h"
> >  #include "io/channel.h"
> >  #include "qapi/clone-visitor.h"
> >  #include "qapi/error.h"
> >  #include "qapi/qapi-visit-sockets.h"
> > +#include "qemu/atomic.h"
> > +#include "qemu/error-report.h"
> > +#include "qemu/thread.h"
> >  #include "trace.h"
> >  #include <errno.h>
> >  #include <netdb.h>
> > @@ -39,11 +44,274 @@
> >  #include <sys/poll.h>
> >  #include <unistd.h>
> >
> > +typedef enum {
> > +    CLEAR_POLLIN,
> > +    CLEAR_POLLOUT,
> > +    SET_POLLIN,
> > +    SET_POLLOUT,
> > +} UpdateEvent;
> > +
> > +typedef enum {
> > +    RP_CMD_ADD_IOC,
> > +    RP_CMD_DEL_IOC,
> > +    RP_CMD_UPDATE,
> > +} RpollerCMD;
> > +
> > +typedef struct {
> > +    RpollerCMD cmd;
> > +    QIOChannelRDMA *rioc;
> > +} RpollerMsg;
> > +
> > +/*
> > + * rpoll() on the rsocket fd with rpoll_events, when POLLIN/POLLOUT
> > +event
> > + * occurs, it will write/read the pollin_eventfd/pollout_eventfd to
> > +allow
> > + * qemu g_poll/ppoll() get the POLLIN/POLLOUT event  */ static struct
> > +Rpoller {
> > +    QemuThread thread;
> > +    bool is_running;
> > +    int sock[2];
> > +    int count; /* the number of rsocket fds being rpoll() */
> > +    int size; /* the size of fds/riocs */
> > +    struct pollfd *fds;
> > +    QIOChannelRDMA **riocs;
> > +} rpoller;
> > +
> > +static void qio_channel_rdma_notify_rpoller(QIOChannelRDMA *rioc,
> > +                                            RpollerCMD cmd) {
> > +    RpollerMsg msg;
> > +    int ret;
> > +
> > +    msg.cmd = cmd;
> > +    msg.rioc = rioc;
> > +
> > +    ret = RETRY_ON_EINTR(write(rpoller.sock[0], &msg, sizeof msg));
> 
> So this message is handled asynchronously by the poll thread, but you're not
> acquiring any reference on teh 'rioc' object. So there's the possibility that 
> the
> owner of the rioc calls 'unref' free'ing the last reference, before the poll 
> thread
> has finished processing the message.  IMHO the poll thread must hold a
> reference on the rioc for as long as it needs the object.
> 
Yes. You're right.


> > +    if (ret != sizeof msg) {
> > +        error_report("%s: failed to send msg, errno: %d", __func__,
> errno);
> > +    }
> 
> I feel like this should be propagated to the caller via an Error **errp 
> parameter.
> 

OK. 


> > +}
> > +
> > +static void qio_channel_rdma_update_poll_event(QIOChannelRDMA *rioc,
> > +                                               UpdateEvent
> action,
> > +                                               bool notify_rpoller)
> {
> > +    /* An eventfd with the value of ULLONG_MAX - 1 is readable but
> unwritable */
> > +    unsigned long long buf = ULLONG_MAX - 1;
> > +
> > +    switch (action) {
> > +    /* only rpoller do SET_* action, to allow qemu ppoll() get the event */
> > +    case SET_POLLIN:
> > +        RETRY_ON_EINTR(write(rioc->pollin_eventfd, &buf, sizeof buf));
> > +        rioc->rpoll_events &= ~POLLIN;
> > +        break;
> > +    case SET_POLLOUT:
> > +        RETRY_ON_EINTR(read(rioc->pollout_eventfd, &buf, sizeof buf));
> > +        rioc->rpoll_events &= ~POLLOUT;
> > +        break;
> > +
> > +    /* the rsocket fd is not ready to rread/rwrite */
> > +    case CLEAR_POLLIN:
> > +        RETRY_ON_EINTR(read(rioc->pollin_eventfd, &buf, sizeof buf));
> > +        rioc->rpoll_events |= POLLIN;
> > +        break;
> > +    case CLEAR_POLLOUT:
> > +        RETRY_ON_EINTR(write(rioc->pollout_eventfd, &buf, sizeof buf));
> > +        rioc->rpoll_events |= POLLOUT;
> > +        break;
> > +    default:
> > +        break;
> > +    }
> > +
> > +    /* notify rpoller to rpoll() POLLIN/POLLOUT events */
> > +    if (notify_rpoller) {
> > +        qio_channel_rdma_notify_rpoller(rioc, RP_CMD_UPDATE);
> > +    }
> > +}
> > +
> > +static void qio_channel_rdma_rpoller_add_rioc(QIOChannelRDMA *rioc) {
> > +    if (rioc->index != -1) {
> > +        error_report("%s: rioc already exsits", __func__);
> > +        return;
> > +    }
> > +
> > +    rioc->index = ++rpoller.count;
> > +
> > +    if (rpoller.count + 1 > rpoller.size) {
> > +        rpoller.size *= 2;
> > +        rpoller.fds = g_renew(struct pollfd, rpoller.fds, rpoller.size);
> > +        rpoller.riocs = g_renew(QIOChannelRDMA *, rpoller.riocs,
> rpoller.size);
> > +    }
> > +
> > +    rpoller.fds[rioc->index].fd = rioc->fd;
> > +    rpoller.fds[rioc->index].events = rioc->rpoll_events;
> > +    rpoller.riocs[rioc->index] = rioc; }
> > +
> > +static void qio_channel_rdma_rpoller_del_rioc(QIOChannelRDMA *rioc) {
> > +    if (rioc->index == -1) {
> > +        error_report("%s: rioc not exsits", __func__);
> > +        return;
> > +    }
> > +
> > +    rpoller.fds[rioc->index] = rpoller.fds[rpoller.count];
> > +    rpoller.riocs[rioc->index] = rpoller.riocs[rpoller.count];
> > +    rpoller.riocs[rioc->index]->index = rioc->index;
> > +    rpoller.count--;
> > +
> > +    close(rioc->pollin_eventfd);
> > +    close(rioc->pollout_eventfd);
> > +    rioc->index = -1;
> > +    rioc->rpoll_events = 0;
> > +}
> > +
> > +static void qio_channel_rdma_rpoller_update_ioc(QIOChannelRDMA *rioc)
> > +{
> > +    if (rioc->index == -1) {
> > +        error_report("%s: rioc not exsits", __func__);
> > +        return;
> > +    }
> > +
> > +    rpoller.fds[rioc->index].fd = rioc->fd;
> > +    rpoller.fds[rioc->index].events = rioc->rpoll_events; }
> > +
> > +static void qio_channel_rdma_rpoller_process_msg(void)
> > +{
> > +    RpollerMsg msg;
> > +    int ret;
> > +
> > +    ret = RETRY_ON_EINTR(read(rpoller.sock[1], &msg, sizeof msg));
> > +    if (ret != sizeof msg) {
> > +        error_report("%s: rpoller failed to recv msg: %s", __func__,
> > +                     strerror(errno));
> > +        return;
> > +    }
> > +
> > +    switch (msg.cmd) {
> > +    case RP_CMD_ADD_IOC:
> > +        qio_channel_rdma_rpoller_add_rioc(msg.rioc);
> > +        break;
> > +    case RP_CMD_DEL_IOC:
> > +        qio_channel_rdma_rpoller_del_rioc(msg.rioc);
> > +        break;
> > +    case RP_CMD_UPDATE:
> > +        qio_channel_rdma_rpoller_update_ioc(msg.rioc);
> > +        break;
> > +    default:
> > +        break;
> > +    }
> > +}
> > +
> > +static void qio_channel_rdma_rpoller_cleanup(void)
> > +{
> > +    close(rpoller.sock[0]);
> > +    close(rpoller.sock[1]);
> > +    rpoller.sock[0] = -1;
> > +    rpoller.sock[1] = -1;
> > +    g_free(rpoller.fds);
> > +    g_free(rpoller.riocs);
> > +    rpoller.fds = NULL;
> > +    rpoller.riocs = NULL;
> > +    rpoller.count = 0;
> > +    rpoller.size = 0;
> > +    rpoller.is_running = false;
> > +}
> > +
> > +static void *qio_channel_rdma_rpoller_thread(void *opaque) {
> > +    int i, ret, error_events = POLLERR | POLLHUP | POLLNVAL;
> > +
> > +    do {
> > +        ret = rpoll(rpoller.fds, rpoller.count + 1, -1);
> > +        if (ret < 0 && errno != -EINTR) {
> > +            error_report("%s: rpoll() error: %s", __func__,
> strerror(errno));
> > +            break;
> > +        }
> > +
> > +        for (i = 1; i <= rpoller.count; i++) {
> > +            if (rpoller.fds[i].revents & (POLLIN | error_events)) {
> > +                qio_channel_rdma_update_poll_event(rpoller.riocs[i],
> SET_POLLIN,
> > +                                                   false);
> > +                rpoller.fds[i].events &= ~POLLIN;
> > +            }
> > +            if (rpoller.fds[i].revents & (POLLOUT | error_events)) {
> > +                qio_channel_rdma_update_poll_event(rpoller.riocs[i],
> > +
> SET_POLLOUT, false);
> > +                rpoller.fds[i].events &= ~POLLOUT;
> > +            }
> > +            /* ignore this fd */
> > +            if (rpoller.fds[i].revents & (error_events)) {
> > +                rpoller.fds[i].fd = -1;
> > +            }
> > +        }
> > +
> > +        if (rpoller.fds[0].revents) {
> > +            qio_channel_rdma_rpoller_process_msg();
> > +        }
> > +    } while (rpoller.count >= 1);
> > +
> > +    qio_channel_rdma_rpoller_cleanup();
> > +
> > +    return NULL;
> > +}
> > +
> > +static void qio_channel_rdma_rpoller_start(void)
> > +{
> > +    if (qatomic_xchg(&rpoller.is_running, true)) {
> > +        return;
> > +    }
> > +
> > +    if (qemu_socketpair(AF_UNIX, SOCK_STREAM, 0, rpoller.sock)) {
> > +        rpoller.is_running = false;
> > +        error_report("%s: failed to create socketpair %s", __func__,
> > +                     strerror(errno));
> > +        return;
> > +    }
> > +
> > +    rpoller.count = 0;
> > +    rpoller.size = 4;
> > +    rpoller.fds = g_malloc0_n(rpoller.size, sizeof(struct pollfd));
> > +    rpoller.riocs = g_malloc0_n(rpoller.size, sizeof(QIOChannelRDMA *));
> > +    rpoller.fds[0].fd = rpoller.sock[1];
> > +    rpoller.fds[0].events = POLLIN;
> > +
> > +    qemu_thread_create(&rpoller.thread, "qio-channel-rdma-rpoller",
> > +                       qio_channel_rdma_rpoller_thread, NULL,
> > +                       QEMU_THREAD_JOINABLE); }
> > +
> > +static void qio_channel_rdma_add_rioc_to_rpoller(QIOChannelRDMA
> > +*rioc) {
> > +    int flags = EFD_CLOEXEC | EFD_NONBLOCK;
> > +
> > +    /*
> > +     * A single eventfd is either readable or writable. A single eventfd
> cannot
> > +     * represent a state where it is neither readable nor writable. so use
> two
> > +     * eventfds here.
> > +     */
> > +    rioc->pollin_eventfd = eventfd(0, flags);
> > +    rioc->pollout_eventfd = eventfd(0, flags);
> > +    /* pollout_eventfd with the value 0, means writable, make it
> unwritable */
> > +    qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLOUT, false);
> > +
> > +    /* tell the rpoller to rpoll() events on rioc->socketfd */
> > +    rioc->rpoll_events = POLLIN | POLLOUT;
> > +    qio_channel_rdma_notify_rpoller(rioc, RP_CMD_ADD_IOC); }
> > +
> >  QIOChannelRDMA *qio_channel_rdma_new(void)  {
> >      QIOChannelRDMA *rioc;
> >      QIOChannel *ioc;
> >
> > +    qio_channel_rdma_rpoller_start();
> > +    if (!rpoller.is_running) {
> > +        return NULL;
> > +    }
> > +
> >      rioc =
> QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
> >      ioc = QIO_CHANNEL(rioc);
> >      qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
> @@
> > -125,6 +393,8 @@ retry:
> >          goto out;
> >      }
> >
> > +    qio_channel_rdma_add_rioc_to_rpoller(rioc);
> > +
> >  out:
> >      if (ret) {
> >          trace_qio_channel_rdma_connect_fail(rioc);
> > @@ -211,6 +481,8 @@ int
> qio_channel_rdma_listen_sync(QIOChannelRDMA *rioc, InetSocketAddress
> *addr,
> >      qio_channel_set_feature(QIO_CHANNEL(rioc),
> QIO_CHANNEL_FEATURE_LISTEN);
> >      trace_qio_channel_rdma_listen_complete(rioc, fd);
> >
> > +    qio_channel_rdma_add_rioc_to_rpoller(rioc);
> > +
> >  out:
> >      if (ret) {
> >          trace_qio_channel_rdma_listen_fail(rioc);
> > @@ -267,8 +539,10 @@ void
> qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress
> *addr,
> >                             qio_channel_listen_worker_free,
> context);
> > }
> >
> > -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *rioc,
> Error
> > **errp)
> > +QIOChannelRDMA *coroutine_mixed_fn
> qio_channel_rdma_accept(QIOChannelRDMA *rioc,
> > +
> Error
> > +**errp)
> >  {
> > +    QIOChannel *ioc = QIO_CHANNEL(rioc);
> >      QIOChannelRDMA *cioc;
> >
> >      cioc = qio_channel_rdma_new();
> > @@ -283,6 +557,17 @@ retry:
> >          if (errno == EINTR) {
> >              goto retry;
> >          }
> > +        if (errno == EAGAIN) {
> > +            if (!(rioc->rpoll_events & POLLIN)) {
> > +                qio_channel_rdma_update_poll_event(rioc,
> CLEAR_POLLIN, true);
> > +            }
> > +            if (qemu_in_coroutine()) {
> > +                qio_channel_yield(ioc, G_IO_IN);
> > +            } else {
> > +                qio_channel_wait(ioc, G_IO_IN);
> > +            }
> > +            goto retry;
> > +        }
> >          error_setg_errno(errp, errno, "Unable to accept connection");
> >          goto error;
> >      }
> > @@ -294,6 +579,8 @@ retry:
> >          goto error;
> >      }
> >
> > +    qio_channel_rdma_add_rioc_to_rpoller(cioc);
> > +
> >      trace_qio_channel_rdma_accept_complete(rioc, cioc, cioc->fd);
> >      return cioc;
> >
> > @@ -307,6 +594,10 @@ static void qio_channel_rdma_init(Object *obj)  {
> >      QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj);
> >      ioc->fd = -1;
> > +    ioc->pollin_eventfd = -1;
> > +    ioc->pollout_eventfd = -1;
> > +    ioc->index = -1;
> > +    ioc->rpoll_events = 0;
> >  }
> >
> >  static void qio_channel_rdma_finalize(Object *obj) @@ -314,6 +605,7
> > @@ static void qio_channel_rdma_finalize(Object *obj)
> >      QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj);
> >
> >      if (ioc->fd != -1) {
> > +        qio_channel_rdma_notify_rpoller(ioc, RP_CMD_DEL_IOC);
> 
> This is unsafe.
> 
> When finalize runs, the object has dropped its last reference and is about to 
> be
> free()d.  The notify_rpoller() method, however, sends an async message to the
> poll thread, which the poll thread will end up processing after the rioc is 
> free()d.
> ie a use-after-free.
> 
> If you take my earlier suggestion that the poll thread should hold its own
> reference on the ioc, then it becomes impossible for the rioc to be freed 
> while
> there is still an active I/O watch, and thus this call can go away, and so 
> will the
> use after free.
> 

Yes, will fixed in the next version.

Regards,
-Gonglei

> >          rclose(ioc->fd);
> >          ioc->fd = -1;
> >      }
> 
> With regards,
> Daniel
> --
> |: https://berrange.com      -o-
> https://www.flickr.com/photos/dberrange :|
> |: https://libvirt.org         -o-
> https://fstop138.berrange.com :|
> |: https://entangle-photo.org    -o-
> https://www.instagram.com/dberrange :|
> 

Reply via email to