On Fri, Apr 16, 2021 at 11:09:11AM +0300, Vladimir Sementsov-Ogievskiy wrote: > OK, that's a big rewrite of the logic. > > Pre-patch we have an always running coroutine - connection_co. It does > reply receiving and reconnecting. And it leads to a lot of difficult > and unobvious code around drained sections and context switch. We also > abuse bs->in_flight counter which is increased for connection_co and > temporary decreased in points where we want to allow drained section to > begin. One of these place is in another file: in nbd_read_eof() in > nbd/client.c. > > We also cancel reconnect and requests waiting for reconnect on drained > begin which is not correct. > > Let's finally drop this always running coroutine and go another way: > > 1. reconnect_attempt() goes to nbd_co_send_request and called under > send_mutex > > 2. We do receive headers in request coroutine. But we also should > dispatch replies for another pending requests. So, > nbd_connection_entry() is turned into nbd_receive_replies(), which > does reply dispatching until it receive another request headers, and > returns when it receive the requested header. > > 3. All old staff around drained sections and context switch is dropped. > > Signed-off-by: Vladimir Sementsov-Ogievskiy <vsement...@virtuozzo.com> > --- > block/nbd.c | 376 ++++++++++++++++----------------------------------- > nbd/client.c | 2 - > 2 files changed, 119 insertions(+), 259 deletions(-) >
> -static coroutine_fn void nbd_connection_entry(void *opaque) > +static coroutine_fn void nbd_receive_replies(BDRVNBDState *s, uint64_t > handle) > { > - BDRVNBDState *s = opaque; > uint64_t i; > int ret = 0; > Error *local_err = NULL; > > - while (qatomic_load_acquire(&s->state) != NBD_CLIENT_QUIT) { > - /* > - * The NBD client can only really be considered idle when it has > - * yielded from qio_channel_readv_all_eof(), waiting for data. This > is > - * the point where the additional scheduled coroutine entry happens > - * after nbd_client_attach_aio_context(). > - * > - * Therefore we keep an additional in_flight reference all the time > and > - * only drop it temporarily here. > - */ > + i = HANDLE_TO_INDEX(s, handle); > + if (s->receive_co) { > + assert(s->receive_co != qemu_coroutine_self()); > > - if (nbd_client_connecting(s)) { > - nbd_co_reconnect_loop(s); > - } > + /* Another request coroutine is receiving now */ > + s->requests[i].receiving = true; > + qemu_coroutine_yield(); > + assert(!s->requests[i].receiving); > > - if (!nbd_client_connected(s)) { > - continue; > + if (s->receive_co != qemu_coroutine_self()) { > + /* > + * We are either failed or done, caller uses > nbd_client_connected() > + * to distinguish. > + */ > + return; > } > + } > + > + assert(s->receive_co == 0 || s->receive_co == qemu_coroutine_self()); s/0/NULL/ here > + s->receive_co = qemu_coroutine_self(); > > + while (nbd_client_connected(s)) { > assert(s->reply.handle == 0); > ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err); > > @@ -522,8 +380,21 @@ static coroutine_fn void nbd_connection_entry(void > *opaque) > local_err = NULL; > } > if (ret <= 0) { > - nbd_channel_error(s, ret ? ret : -EIO); > - continue; > + ret = ret ? ret : -EIO; > + nbd_channel_error(s, ret); > + goto out; > + } > + > + if (!nbd_client_connected(s)) { > + ret = -EIO; > + goto out; > + } > + > + i = HANDLE_TO_INDEX(s, s->reply.handle); > + > + if (s->reply.handle == handle) { > + ret = 0; > + goto out; > } > > /* I know your followup said there is more work to do before v4, but I look forward to seeing it. -- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3266 Virtualization: qemu.org | libvirt.org