On 8/21/19 11:52 AM, Vladimir Sementsov-Ogievskiy wrote: > Implement reconnect. To achieve this: > > 1. add new modes: > connecting-wait: means, that reconnecting is in progress, and there > were small number of reconnect attempts, so all requests are > waiting for the connection. > connecting-nowait: reconnecting is in progress, there were a lot of > attempts of reconnect, all requests will return errors. > > two old modes are used too: > connected: normal state > quit: exiting after fatal error or on close > > Possible transitions are: > > * -> quit > connecting-* -> connected > connecting-wait -> connecting-nowait (transition is done after > reconnect-delay seconds in connecting-wait mode) > connected -> connecting-wait > > 2. Implement reconnect in connection_co. So, in connecting-* mode, > connection_co, tries to reconnect unlimited times. > > 3. Retry nbd queries on channel error, if we are in connecting-wait > state. > > Signed-off-by: Vladimir Sementsov-Ogievskiy <vsement...@virtuozzo.com> > ---
> +static bool nbd_client_connecting(BDRVNBDState *s) > +{ > + return s->state == NBD_CLIENT_CONNECTING_WAIT || > + s->state == NBD_CLIENT_CONNECTING_NOWAIT; Indentation looks unusual. I might have done: return (s->state == NBD_CLIENT_CONNECTING_WAIT || s->state == NBD_CLIENT_CONNECTING_NOWAIT); Or even exploit the enum encoding: return s->state <= NBD_CLIENT_CONNECTING_NOWAIT Is s->state updated atomically, or do we risk the case where we might see two different values of s->state across the || sequence point? Does that matter? > +} > + > +static bool nbd_client_connecting_wait(BDRVNBDState *s) > +{ > + return s->state == NBD_CLIENT_CONNECTING_WAIT; > +} > + > +static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s) > +{ > + Error *local_err = NULL; > + > + if (!nbd_client_connecting(s)) { > + return; > + } > + assert(nbd_client_connecting(s)); This assert adds nothing given the condition beforehand. > + > + /* Wait for completion of all in-flight requests */ > + > + qemu_co_mutex_lock(&s->send_mutex); > + > + while (s->in_flight > 0) { > + qemu_co_mutex_unlock(&s->send_mutex); > + nbd_recv_coroutines_wake_all(s); > + s->wait_in_flight = true; > + qemu_coroutine_yield(); > + s->wait_in_flight = false; > + qemu_co_mutex_lock(&s->send_mutex); > + } > + > + qemu_co_mutex_unlock(&s->send_mutex); > + > + if (!nbd_client_connecting(s)) { > + return; > + } > + > + /* > + * Now we are sure that nobody is accessing the channel, and no one will > + * try until we set the state to CONNECTED. > + */ > + > + /* Finalize previous connection if any */ > + if (s->ioc) { > + nbd_client_detach_aio_context(s->bs); > + object_unref(OBJECT(s->sioc)); > + s->sioc = NULL; > + object_unref(OBJECT(s->ioc)); > + s->ioc = NULL; > + } > + > + s->connect_status = nbd_client_connect(s->bs, &local_err); > + error_free(s->connect_err); > + s->connect_err = NULL; > + error_propagate(&s->connect_err, local_err); > + local_err = NULL; > + > + if (s->connect_status < 0) { > + /* failed attempt */ > + return; > + } > + > + /* successfully connected */ > + s->state = NBD_CLIENT_CONNECTED; > + qemu_co_queue_restart_all(&s->free_sema); > +} > + > +static coroutine_fn void nbd_reconnect_loop(BDRVNBDState *s) > +{ > + uint64_t start_time_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); > + uint64_t delay_ns = s->reconnect_delay * NANOSECONDS_PER_SECOND; > + uint64_t timeout = 1 * NANOSECONDS_PER_SECOND; > + uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND; > + > + nbd_reconnect_attempt(s); > + > + while (nbd_client_connecting(s)) { > + if (s->state == NBD_CLIENT_CONNECTING_WAIT && > + qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time_ns > > delay_ns) > + { > + s->state = NBD_CLIENT_CONNECTING_NOWAIT; > + qemu_co_queue_restart_all(&s->free_sema); > + } > + > + qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, timeout, > + &s->connection_co_sleep_ns_state); > + if (s->drained) { > + bdrv_dec_in_flight(s->bs); > + s->wait_drained_end = true; > + while (s->drained) { > + /* > + * We may be entered once from > nbd_client_attach_aio_context_bh > + * and then from nbd_client_co_drain_end. So here is a loop. > + */ > + qemu_coroutine_yield(); > + } > + bdrv_inc_in_flight(s->bs); > + } > + if (timeout < max_timeout) { > + timeout *= 2; > + } > + > + nbd_reconnect_attempt(s); > + } > } > > static coroutine_fn void nbd_connection_entry(void *opaque) > { > - BDRVNBDState *s = opaque; > + BDRVNBDState *s = (BDRVNBDState *)opaque; The cast is not necessary. > uint64_t i; > int ret = 0; > Error *local_err = NULL; > @@ -177,16 +331,26 @@ static coroutine_fn void nbd_connection_entry(void > *opaque) > * Therefore we keep an additional in_flight reference all the time > and > * only drop it temporarily here. > */ > + > + if (nbd_client_connecting(s)) { > + nbd_reconnect_loop(s); > + } > + > + if (s->state != NBD_CLIENT_CONNECTED) { > + continue; > + } > + > assert(s->reply.handle == 0); > ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err); > > if (local_err) { > trace_nbd_read_reply_entry_fail(ret, > error_get_pretty(local_err)); > error_free(local_err); > + local_err = NULL; > } > if (ret <= 0) { > nbd_channel_error(s, ret ? ret : -EIO); > - break; > + continue; > } > > /* > @@ -201,7 +365,7 @@ static coroutine_fn void nbd_connection_entry(void > *opaque) > (nbd_reply_is_structured(&s->reply) && > !s->info.structured_reply)) > { > nbd_channel_error(s, -EINVAL); > - break; > + continue; > } > The commit message says you re-attempt the request after reconnection if you have not yet timed out from the previous connection; but do you also need to clear out any partial reply received to make sure the new request isn't operating on stale assumptions left over if the server died between two structured chunks? > @@ -927,20 +1113,26 @@ static int nbd_co_request(BlockDriverState *bs, > NBDRequest *request, > } else { > assert(request->type != NBD_CMD_WRITE); > } > - ret = nbd_co_send_request(bs, request, write_qiov); > - if (ret < 0) { > - return ret; > - } > > - ret = nbd_co_receive_return_code(s, request->handle, > - &request_ret, &local_err); > - if (local_err) { > - trace_nbd_co_request_fail(request->from, request->len, > request->handle, > - request->flags, request->type, > - nbd_cmd_lookup(request->type), > - ret, error_get_pretty(local_err)); > - error_free(local_err); > - } > + do { > + ret = nbd_co_send_request(bs, request, write_qiov); > + if (ret < 0) { > + continue; > + } > + > + ret = nbd_co_receive_return_code(s, request->handle, > + &request_ret, &local_err); > + if (local_err) { > + trace_nbd_co_request_fail(request->from, request->len, > + request->handle, request->flags, > + request->type, > + nbd_cmd_lookup(request->type), > + ret, error_get_pretty(local_err)); > + error_free(local_err); > + local_err = NULL; > + } > + } while (ret < 0 && nbd_client_connecting_wait(s)); I ask because nothing seems to reset request_ret here in the new loop. -- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3226 Virtualization: qemu.org | libvirt.org
signature.asc
Description: OpenPGP digital signature