21.08.2019 20:35, Eric Blake wrote: > On 8/21/19 11:52 AM, Vladimir Sementsov-Ogievskiy wrote: >> Implement reconnect. To achieve this: >> >> 1. add new modes: >> connecting-wait: means, that reconnecting is in progress, and there >> were small number of reconnect attempts, so all requests are >> waiting for the connection. >> connecting-nowait: reconnecting is in progress, there were a lot of >> attempts of reconnect, all requests will return errors. >> >> two old modes are used too: >> connected: normal state >> quit: exiting after fatal error or on close >> >> Possible transitions are: >> >> * -> quit >> connecting-* -> connected >> connecting-wait -> connecting-nowait (transition is done after >> reconnect-delay seconds in connecting-wait mode) >> connected -> connecting-wait >> >> 2. Implement reconnect in connection_co. So, in connecting-* mode, >> connection_co, tries to reconnect unlimited times. >> >> 3. Retry nbd queries on channel error, if we are in connecting-wait >> state. >> >> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsement...@virtuozzo.com> >> --- > >> +static bool nbd_client_connecting(BDRVNBDState *s) >> +{ >> + return s->state == NBD_CLIENT_CONNECTING_WAIT || >> + s->state == NBD_CLIENT_CONNECTING_NOWAIT; > > > Indentation looks unusual. I might have done: > > return (s->state == NBD_CLIENT_CONNECTING_WAIT || > s->state == NBD_CLIENT_CONNECTING_NOWAIT); > > Or even exploit the enum encoding: > > return s->state <= NBD_CLIENT_CONNECTING_NOWAIT > > Is s->state updated atomically, or do we risk the case where we might > see two different values of s->state across the || sequence point? Does > that matter?
I hope it all happens in one aio context so state change should not intersects with this function as it doesn't yield. > >> +} >> + >> +static bool nbd_client_connecting_wait(BDRVNBDState *s) >> +{ >> + return s->state == NBD_CLIENT_CONNECTING_WAIT; >> +} >> + >> +static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s) >> +{ >> + Error *local_err = NULL; >> + >> + if (!nbd_client_connecting(s)) { >> + return; >> + } >> + assert(nbd_client_connecting(s)); > > This assert adds nothing given the condition beforehand. > >> + >> + /* Wait for completion of all in-flight requests */ >> + >> + qemu_co_mutex_lock(&s->send_mutex); >> + >> + while (s->in_flight > 0) { >> + qemu_co_mutex_unlock(&s->send_mutex); >> + nbd_recv_coroutines_wake_all(s); >> + s->wait_in_flight = true; >> + qemu_coroutine_yield(); >> + s->wait_in_flight = false; >> + qemu_co_mutex_lock(&s->send_mutex); >> + } >> + >> + qemu_co_mutex_unlock(&s->send_mutex); >> + >> + if (!nbd_client_connecting(s)) { >> + return; >> + } >> + >> + /* >> + * Now we are sure that nobody is accessing the channel, and no one will >> + * try until we set the state to CONNECTED. >> + */ >> + >> + /* Finalize previous connection if any */ >> + if (s->ioc) { >> + nbd_client_detach_aio_context(s->bs); >> + object_unref(OBJECT(s->sioc)); >> + s->sioc = NULL; >> + object_unref(OBJECT(s->ioc)); >> + s->ioc = NULL; >> + } >> + >> + s->connect_status = nbd_client_connect(s->bs, &local_err); >> + error_free(s->connect_err); >> + s->connect_err = NULL; >> + error_propagate(&s->connect_err, local_err); >> + local_err = NULL; >> + >> + if (s->connect_status < 0) { >> + /* failed attempt */ >> + return; >> + } >> + >> + /* successfully connected */ >> + s->state = NBD_CLIENT_CONNECTED; >> + qemu_co_queue_restart_all(&s->free_sema); >> +} >> + >> +static coroutine_fn void nbd_reconnect_loop(BDRVNBDState *s) >> +{ >> + uint64_t start_time_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); >> + uint64_t delay_ns = s->reconnect_delay * NANOSECONDS_PER_SECOND; >> + uint64_t timeout = 1 * NANOSECONDS_PER_SECOND; >> + uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND; >> + >> + nbd_reconnect_attempt(s); >> + >> + while (nbd_client_connecting(s)) { >> + if (s->state == NBD_CLIENT_CONNECTING_WAIT && >> + qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time_ns > >> delay_ns) >> + { >> + s->state = NBD_CLIENT_CONNECTING_NOWAIT; >> + qemu_co_queue_restart_all(&s->free_sema); >> + } >> + >> + qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, timeout, >> + &s->connection_co_sleep_ns_state); >> + if (s->drained) { >> + bdrv_dec_in_flight(s->bs); >> + s->wait_drained_end = true; >> + while (s->drained) { >> + /* >> + * We may be entered once from >> nbd_client_attach_aio_context_bh >> + * and then from nbd_client_co_drain_end. So here is a loop. >> + */ >> + qemu_coroutine_yield(); >> + } >> + bdrv_inc_in_flight(s->bs); >> + } >> + if (timeout < max_timeout) { >> + timeout *= 2; >> + } >> + >> + nbd_reconnect_attempt(s); >> + } >> } >> >> static coroutine_fn void nbd_connection_entry(void *opaque) >> { >> - BDRVNBDState *s = opaque; >> + BDRVNBDState *s = (BDRVNBDState *)opaque; > > The cast is not necessary. > >> uint64_t i; >> int ret = 0; >> Error *local_err = NULL; >> @@ -177,16 +331,26 @@ static coroutine_fn void nbd_connection_entry(void >> *opaque) >> * Therefore we keep an additional in_flight reference all the >> time and >> * only drop it temporarily here. >> */ >> + >> + if (nbd_client_connecting(s)) { >> + nbd_reconnect_loop(s); >> + } >> + >> + if (s->state != NBD_CLIENT_CONNECTED) { >> + continue; >> + } >> + >> assert(s->reply.handle == 0); >> ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err); >> >> if (local_err) { >> trace_nbd_read_reply_entry_fail(ret, >> error_get_pretty(local_err)); >> error_free(local_err); >> + local_err = NULL; >> } >> if (ret <= 0) { >> nbd_channel_error(s, ret ? ret : -EIO); >> - break; >> + continue; >> } >> >> /* >> @@ -201,7 +365,7 @@ static coroutine_fn void nbd_connection_entry(void >> *opaque) >> (nbd_reply_is_structured(&s->reply) && >> !s->info.structured_reply)) >> { >> nbd_channel_error(s, -EINVAL); >> - break; >> + continue; >> } >> > > The commit message says you re-attempt the request after reconnection if > you have not yet timed out from the previous connection; but do you also > need to clear out any partial reply received to make sure the new > request isn't operating on stale assumptions left over if the server > died between two structured chunks? In nbd_reconnect_attempt we "Wait for completion of all in-flight requests", so all in-flight requests are failed, and no partial progress appears at reconnect point. > > >> @@ -927,20 +1113,26 @@ static int nbd_co_request(BlockDriverState *bs, >> NBDRequest *request, >> } else { >> assert(request->type != NBD_CMD_WRITE); >> } >> - ret = nbd_co_send_request(bs, request, write_qiov); >> - if (ret < 0) { >> - return ret; >> - } >> >> - ret = nbd_co_receive_return_code(s, request->handle, >> - &request_ret, &local_err); >> - if (local_err) { >> - trace_nbd_co_request_fail(request->from, request->len, >> request->handle, >> - request->flags, request->type, >> - nbd_cmd_lookup(request->type), >> - ret, error_get_pretty(local_err)); >> - error_free(local_err); >> - } >> + do { >> + ret = nbd_co_send_request(bs, request, write_qiov); >> + if (ret < 0) { >> + continue; >> + } >> + >> + ret = nbd_co_receive_return_code(s, request->handle, >> + &request_ret, &local_err); >> + if (local_err) { >> + trace_nbd_co_request_fail(request->from, request->len, >> + request->handle, request->flags, >> + request->type, >> + nbd_cmd_lookup(request->type), >> + ret, error_get_pretty(local_err)); >> + error_free(local_err); >> + local_err = NULL; >> + } >> + } while (ret < 0 && nbd_client_connecting_wait(s)); > > I ask because nothing seems to reset request_ret here in the new loop. > We don't need to reset it. It is set only on the last iterations, as if it is set ret must be 0. -- Best regards, Vladimir