Create a coroutine in nbd_client_new, so that nbd_send_negotiate doesn't need qemu_set_block().
A handler is needed for csock fd in case the coroutine yields during I/O. With this, if the other end disappears in the middle of the negotiation, we don't block the whole event loop. Signed-off-by: Fam Zheng <f...@redhat.com> --- nbd.c | 85 +++++++++++++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 60 insertions(+), 25 deletions(-) diff --git a/nbd.c b/nbd.c index bcb79d4..4a75d2d 100644 --- a/nbd.c +++ b/nbd.c @@ -238,10 +238,9 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) static ssize_t read_sync(int fd, void *buffer, size_t size) { - /* Sockets are kept in blocking mode in the negotiation phase. After - * that, a non-readable socket simply means that another thread stole - * our request/reply. Synchronization is done with recv_coroutine, so - * that this is coroutine-safe. + /* A non-readable socket simply means that another thread stole our + * request/reply. Synchronization is done with recv_coroutine, so that + * this is coroutine-safe. */ return nbd_wr_sync(fd, buffer, size, true); } @@ -504,13 +503,26 @@ static int nbd_receive_options(NBDClient *client) } } -static int nbd_send_negotiate(NBDClient *client) +typedef struct { + NBDClient *client; + NBDClientNewCB cb; + Coroutine *co; +} NBDClientNewData; + +static void nbd_negotiate_continue(void *opaque) { + qemu_coroutine_enter(opaque, NULL); +} + +static coroutine_fn int nbd_send_negotiate(NBDClientNewData *data) +{ + NBDClient *client = data->client; int csock = client->sock; char buf[8 + 8 + 8 + 128]; int rc; const int myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM | NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA); + AioContext *ctx = client->exp ? client->exp->ctx : qemu_get_aio_context(); /* Negotiation header without options: [ 0 .. 7] passwd ("NBDMAGIC") @@ -531,9 +543,11 @@ static int nbd_send_negotiate(NBDClient *client) [28 .. 151] reserved (0) */ - qemu_set_block(csock); rc = -EINVAL; + aio_set_fd_handler(ctx, client->sock, true, + nbd_negotiate_continue, + nbd_negotiate_continue, data->co); TRACE("Beginning negotiation."); memset(buf, 0, sizeof(buf)); memcpy(buf, "NBDMAGIC", 8); @@ -575,7 +589,8 @@ static int nbd_send_negotiate(NBDClient *client) TRACE("Negotiation succeeded."); rc = 0; fail: - qemu_set_nonblock(csock); + aio_set_fd_handler(ctx, client->sock, true, + NULL, NULL, NULL); return rc; } @@ -1475,36 +1490,56 @@ static void nbd_update_can_read(NBDClient *client) } } +static coroutine_fn void nbd_co_client_start(void *opaque) +{ + int ret = 0; + NBDClientNewData *data = opaque; + NBDClient *client = data->client; + NBDExport *exp = client->exp; + + if (exp) { + nbd_export_get(exp); + } + if (nbd_send_negotiate(data)) { + shutdown(client->sock, 2); + close(client->sock); + g_free(client); + ret = -1; + nbd_export_put(exp); + goto out; + } + qemu_co_mutex_init(&client->send_lock); + nbd_set_handlers(client); + + if (exp) { + QTAILQ_INSERT_TAIL(&exp->clients, client, next); + } +out: + if (data->cb) { + data->cb(exp, client->sock, ret); + } + g_free(data); +} + /* Create and initialize a new client. If it fails, @csock is closed. * cb will be called with the status code when done. */ void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *), NBDClientNewCB cb) { - int ret = 0; NBDClient *client; + Coroutine *co; + NBDClientNewData *data = g_new(NBDClientNewData, 1); + client = g_malloc0(sizeof(NBDClient)); client->refcount = 1; client->exp = exp; client->sock = csock; client->can_read = true; - if (nbd_send_negotiate(client)) { - shutdown(csock, 2); - close(csock); - g_free(client); - ret = -1; - goto out; - } client->close = close_fn; - qemu_co_mutex_init(&client->send_lock); - nbd_set_handlers(client); - if (exp) { - QTAILQ_INSERT_TAIL(&exp->clients, client, next); - nbd_export_get(exp); - } -out: - if (cb) { - cb(exp, csock, ret); - } + data->client = client; + data->cb = cb; + co = qemu_coroutine_create(nbd_co_client_start); + qemu_coroutine_enter(co, data); } -- 2.4.3