On Tue, Aug 30, 2022 at 10:32:02AM +0800, Ming Lei wrote:
> Hi Jones,
> 
> On Thu, Aug 25, 2022 at 01:10:55PM +0100, Richard W.M. Jones wrote:
> > This patch adds simple support for a ublk-based NBD client.
> > It is also available here:
> > https://gitlab.com/rwmjones/libnbd/-/tree/nbdublk/ublk
> > 
> > ublk is a way to write Linux block device drivers in userspace:
> 
> Just looked at your nbdublk implementation a bit, basically it is good,
> and one really nice work.
> 
> Also follows two suggestions:
> 
> 1) the io_uring context is multilexed with ublk io command handling, so
> we should avoid to block in both ->handle_io_async() and
> ->handle_event(), otherwise performance may be bad

The nbd_aio_* calls don't block.

However I noticed that I made a mistake with the trim and zero paths
because I am using synchronous (blocking) nbd_flush / nbd_trim /
nbd_zero instead of nbd_aio_flush / nbd_aio_trim / nbd_aio_zero.  I
will fix this soon.

Nothing in handle_event should block except for the call to
pthread_mutex_lock.  This lock is necessary because new commands can
be retired on the nbd_work_thread while handle_event is being called
from the io_uring thread.

> 2) in the implementation of nbd worker thread, there are two sleep
> points(wait for incoming io command, and network FD), I'd suggest to use
> poll to wait on any of them
>
> Recently I are working to add ublksrv io offloading or aio
> interfaces on this sort of case in which io_uring can't be used,
> which may simplified this area, please see the attached patch which
> applies the above two points against your patch. And obvious
> improvement can be observed on my simple fio test( randread, io, 4k
> bs, libaio) against backend of 'nbdkit file'.
>
> But these interfaces aren't merged to ublksrv github tree yet, you can find
> them in the aio branch, and demo_event.c is one example wrt. how to use
> them:
> 
>   https://github.com/ming1/ubdsrv/tree/aio
>
> Actually this interface can be improved further for nbdublk case,
> and the request allocation isn't needed actually for this direct
> offloading. But they are added for covering some IOs not from ublk
> driver, such as meta data, so 'struct ublksrv_aio' is allocated.
> I will try best to finalize them and merge to master branch.

I didn't really understand what these patches to ubdsrv do when I
looked at them before.  Maybe add some diagrams?

> BTW, IOPS on nbdublk(backend: nbdkit file) still has big gap compared
> with ublk-loop, so I guess in future maybe io_uring should be tried and
> see if big improvement can be observed.

It's always going to be a bit slower because we're converting the
requests into a network protocol and passing them to another process.

Rich.

> 
> diff --git a/generator/API.ml b/generator/API.ml
> index 3e948aa..bdd0fb8 100644
> --- a/generator/API.ml
> +++ b/generator/API.ml
> @@ -2289,6 +2289,26 @@ that eventual action is actually expected - for 
> example, if
>  the connection is established but there are no commands in
>  flight, using an infinite timeout will permanently block).
>  
> +This function is mainly useful as an example of how you might
> +integrate libnbd with your own main loop, rather than being
> +intended as something you would use.";
> +    example = Some "examples/aio-connect-read.c";
> +  };
> +
> +  "poll2", {
> +    default_call with
> +    args = [Int "evt"; Int "timeout" ]; ret = RInt;
> +    shortdesc = "poll the handle once with eventfd";
> +    longdesc = "\
> +This is a simple implementation of L<poll(2)> which is used
> +internally by synchronous API calls.  On success, it returns
> +C<0> if the C<timeout> (in milliseconds) occurs, or C<1> if
> +the poll completed and the state machine progressed. Set
> +C<timeout> to C<-1> to block indefinitely (but be careful
> +that eventual action is actually expected - for example, if
> +the connection is established but there are no commands in
> +flight, using an infinite timeout will permanently block).
> +
>  This function is mainly useful as an example of how you might
>  integrate libnbd with your own main loop, rather than being
>  intended as something you would use.";
> @@ -3153,6 +3173,7 @@ let first_version = [
>    "zero", (1, 0);
>    "block_status", (1, 0);
>    "poll", (1, 0);
> +  "poll2", (1, 0);
>    "aio_connect", (1, 0);
>    "aio_connect_uri", (1, 0);
>    "aio_connect_unix", (1, 0);
> diff --git a/lib/poll.c b/lib/poll.c
> index df01d94..e9d7924 100644
> --- a/lib/poll.c
> +++ b/lib/poll.c
> @@ -27,14 +27,21 @@
>  #include "internal.h"
>  
>  /* A simple main loop implementation using poll(2). */
> -int
> -nbd_unlocked_poll (struct nbd_handle *h, int timeout)
> +static int
> +__nbd_unlocked_poll (struct nbd_handle *h, int evt, int timeout)
>  {
> -  struct pollfd fds[1];
> -  int r;
> +  struct pollfd fds[2];
> +  int r, nr_fds = 1;
>  
>    /* fd might be negative, and poll will ignore it. */
>    fds[0].fd = nbd_unlocked_aio_get_fd (h);
> +  if (evt > 0) {
> +      fds[1].fd = evt;
> +      fds[1].events = POLLIN;
> +      fds[1].revents = 0;
> +      nr_fds = 2;
> +  }
> +
>    switch (nbd_internal_aio_get_direction (get_next_state (h))) {
>    case LIBNBD_AIO_DIRECTION_READ:
>      fds[0].events = POLLIN;
> @@ -58,7 +65,7 @@ nbd_unlocked_poll (struct nbd_handle *h, int timeout)
>     * passed to poll.
>     */
>    do {
> -    r = poll (fds, 1, timeout);
> +    r = poll (fds, nr_fds, timeout);
>      debug (h, "poll end: r=%d revents=%x", r, fds[0].revents);
>    } while (r == -1 && errno == EINTR);
>  
> @@ -91,3 +98,15 @@ nbd_unlocked_poll (struct nbd_handle *h, int timeout)
>  
>    return 1;
>  }
> +
> +int
> +nbd_unlocked_poll (struct nbd_handle *h, int timeout)
> +{
> +     return __nbd_unlocked_poll (h, -1, timeout);
> +}
> +
> +int
> +nbd_unlocked_poll2 (struct nbd_handle *h, int evt, int timeout)
> +{
> +     return __nbd_unlocked_poll (h, evt, timeout);
> +}
> diff --git a/ublk/tgt.c b/ublk/tgt.c
> index 4cdd42a..2ab995a 100644
> --- a/ublk/tgt.c
> +++ b/ublk/tgt.c
> @@ -35,6 +35,7 @@
>  #endif
>  
>  #include <ublksrv.h>
> +#include <ublksrv_aio.h>
>  
>  #include <libnbd.h>
>  
> @@ -46,14 +47,6 @@
>  /* Number of seconds to wait for commands to complete when closing the dev. 
> */
>  #define RELEASE_TIMEOUT 5
>  
> -/* List of completed commands. */
> -struct completion {
> -  struct ublksrv_queue *q;
> -  int tag;
> -  int res;      /* The normal return value, if the command completes OK. */
> -};
> -DEFINE_VECTOR_TYPE(completions, struct completion)
> -
>  /* Thread model:
>   *
>   * There are two threads per NBD connection.  One thread
> @@ -69,32 +62,170 @@ struct thread_info {
>    pthread_t io_uring_thread;
>    pthread_t nbd_work_thread;
>  
> -  /* This counts the number of commands in flight.  The condition is
> -   * used to allow the operations thread to process commands when
> -   * in_flight goes from 0 -> 1.  This is roughly equivalent to
> -   * nbd_aio_in_flight, but we need to count it ourselves in order to
> -   * use the condition.
> -   */
> -  _Atomic size_t in_flight;
> -  pthread_mutex_t in_flight_mutex;
> -  pthread_cond_t in_flight_cond;
> -
> -  /* Commands have to be completed on the io_uring thread, but they
> -   * run on the NBD thread.  So when the NBD command completes we put
> -   * the command on this queue and they are passed to the io_uring
> -   * thread to call ublksrv_complete_io.
> -   */
> -  pthread_mutex_t completed_commands_lock;
> -  completions completed_commands;
> +  struct ublksrv_aio_list compl;
>  };
>  DEFINE_VECTOR_TYPE(thread_infos, struct thread_info)
>  static thread_infos thread_info;
>  
>  static pthread_barrier_t barrier;
> +static struct ublksrv_aio_ctx *aio_ctx = NULL;
>  
>  static char jbuf[4096];
>  static pthread_mutex_t jbuf_lock = PTHREAD_MUTEX_INITIALIZER;
>  
> +/* Command completion callback (called on the NBD thread). */
> +static int
> +command_completed (void *vpdata, int *error)
> +{
> +  struct ublksrv_aio *req = vpdata;
> +  int q_id = ublksrv_aio_qid(req->id);
> +  struct ublksrv_queue *q = ublksrv_get_queue(aio_ctx->dev, q_id);
> +  struct ublksrv_aio_list *compl = &thread_info.ptr[q_id].compl;
> +
> +  if (verbose)
> +    fprintf (stderr,
> +             "%s: command_completed: tag=%d q_id=%zu error=%d\n",
> +             "nbdublk", ublksrv_aio_tag(req->id),
> +          ublksrv_aio_qid(req->id), *error);
> +
> +  /* If the command failed, override the normal result. */
> +  if (*error != 0)
> +    req->res = *error;
> +
> +  pthread_spin_lock(&compl->lock);
> +  aio_list_add(&compl->list, req);
> +  pthread_spin_unlock(&compl->lock);
> +
> +  return 1;
> +}
> +
> +
> +int aio_submitter(struct ublksrv_aio_ctx *ctx,
> +             struct ublksrv_aio *req)
> +{
> +  const struct ublksrv_io_desc *iod = &req->io;
> +  const unsigned op = ublksrv_get_op (iod);
> +  const unsigned flags = ublksrv_get_flags (iod);
> +  const bool fua = flags & UBLK_IO_F_FUA;
> +  const bool alloc_zero = flags & UBLK_IO_F_NOUNMAP; /* else punch hole */
> +  const size_t q_id = ublksrv_aio_qid(req->id); /* also the NBD handle 
> number */
> +  struct nbd_handle *h = nbd.ptr[q_id];
> +  uint32_t nbd_flags = 0;
> +  int64_t r;
> +  nbd_completion_callback cb;
> +  bool sync = false;
> +
> +  if (verbose)
> +    fprintf (stderr, "%s: handle_io_async: tag = %d q_id = %zu\n",
> +             "nbdublk", ublksrv_aio_tag(req->id), q_id);
> +
> +  req->res = iod->nr_sectors << 9;
> +  cb.callback = command_completed;
> +  cb.user_data = req;
> +  cb.free = NULL;
> +
> +  switch (op) {
> +  case UBLK_IO_OP_READ:
> +    r = nbd_aio_pread (h, (void *) iod->addr, iod->nr_sectors << 9,
> +                       iod->start_sector << 9, cb, 0);
> +    if (r == -1) {
> +      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
> +      return -EINVAL;
> +    }
> +    break;
> +
> +  case UBLK_IO_OP_WRITE:
> +    if (fua && can_fua)
> +      nbd_flags |= LIBNBD_CMD_FLAG_FUA;
> +
> +    r = nbd_aio_pwrite (h, (const void *) iod->addr, iod->nr_sectors << 9,
> +                        iod->start_sector << 9, cb, nbd_flags);
> +    if (r == -1) {
> +      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
> +      return -EINVAL;
> +    }
> +    break;
> +
> +  case UBLK_IO_OP_FLUSH:
> +    r = nbd_flush (h, 0);
> +    if (r == -1) {
> +      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
> +      return -EINVAL;
> +    }
> +    sync = true;
> +    break;
> +
> +  case UBLK_IO_OP_DISCARD:
> +    if (fua && can_fua)
> +      nbd_flags |= LIBNBD_CMD_FLAG_FUA;
> +
> +    r = nbd_trim (h, iod->nr_sectors << 9, iod->start_sector << 9, 
> nbd_flags);
> +    if (r == -1) {
> +      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
> +      return -EINVAL;
> +    }
> +    sync = true;
> +    break;
> +
> +  case UBLK_IO_OP_WRITE_ZEROES:
> +    if (fua && can_fua)
> +      nbd_flags |= LIBNBD_CMD_FLAG_FUA;
> +
> +    if (alloc_zero)
> +      nbd_flags |= LIBNBD_CMD_FLAG_NO_HOLE;
> +
> +    r = nbd_zero (h, iod->nr_sectors << 9, iod->start_sector << 9, 
> nbd_flags);
> +    if (r == -1) {
> +      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
> +      return -EINVAL;
> +    }
> +    sync = true;
> +    break;
> +
> +  default:
> +    fprintf (stderr, "%s: unknown operation %u\n", "nbdublk", op);
> +    return -ENOTSUP;
> +  }
> +
> +  /* return if this request is completed */
> +  if (sync)
> +       return 1;
> +  return 0;
> +}
> +
> +static void *
> +nbd_work_thread (void *vpinfo)
> +{
> +  struct thread_info *ti = vpinfo;
> +  struct nbd_handle *h = nbd.ptr[ti->i];
> +  struct ublksrv_queue *q = ublksrv_get_queue(aio_ctx->dev, ti->i);
> +  struct ublksrv_aio_list *c = &thread_info.ptr[ti->i].compl;
> +
> +  /* Signal to the main thread that we have initialized. */
> +  pthread_barrier_wait (&barrier);
> +
> +  while (!ublksrv_aio_ctx_dead(aio_ctx)) {
> +      struct aio_list compl;
> +
> +      aio_list_init(&compl);
> +      ublksrv_aio_submit_worker(aio_ctx, aio_submitter, &compl);
> +
> +      pthread_spin_lock(&c->lock);
> +      aio_list_splice(&c->list, &compl);
> +      pthread_spin_unlock(&c->lock);
> +
> +      ublksrv_aio_complete_worker(aio_ctx, &compl);
> +
> +      if (nbd_poll2 (h, aio_ctx->efd, -1) == -1) {
> +        fprintf (stderr, "%s\n", nbd_get_error ());
> +        exit (EXIT_FAILURE);
> +      }
> +  }
> +
> +  /*NOTREACHED*/
> +  return NULL;
> +}
> +
>  static void *
>  io_uring_thread (void *vpinfo)
>  {
> @@ -139,37 +270,6 @@ io_uring_thread (void *vpinfo)
>    return NULL;
>  }
>  
> -static void *
> -nbd_work_thread (void *vpinfo)
> -{
> -  struct thread_info *thread_info = vpinfo;
> -  const size_t i = thread_info->i;
> -  struct nbd_handle *h = nbd.ptr[i];
> -
> -  /* Signal to the main thread that we have initialized. */
> -  pthread_barrier_wait (&barrier);
> -
> -  while (1) {
> -    /* Sleep until at least one command is in flight. */
> -    pthread_mutex_lock (&thread_info->in_flight_mutex);
> -    while (thread_info->in_flight == 0)
> -      pthread_cond_wait (&thread_info->in_flight_cond,
> -                         &thread_info->in_flight_mutex);
> -    pthread_mutex_unlock (&thread_info->in_flight_mutex);
> -
> -    /* Dispatch work while there are commands in flight. */
> -    while (thread_info->in_flight > 0) {
> -      if (nbd_poll (h, -1) == -1) {
> -        fprintf (stderr, "%s\n", nbd_get_error ());
> -        exit (EXIT_FAILURE);
> -      }
> -    }
> -  }
> -
> -  /*NOTREACHED*/
> -  return NULL;
> -}
> -
>  static int
>  set_parameters (struct ublksrv_ctrl_dev *ctrl_dev,
>                  const struct ublksrv_dev *dev)
> @@ -215,6 +315,7 @@ int
>  start_daemon (struct ublksrv_ctrl_dev *ctrl_dev)
>  {
>    const struct ublksrv_ctrl_dev_info *dinfo = &ctrl_dev->dev_info;
> +  int dev_id = ctrl_dev->dev_info.dev_id;
>    struct ublksrv_dev *dev;
>    size_t i;
>    int r;
> @@ -260,22 +361,21 @@ start_daemon (struct ublksrv_ctrl_dev *ctrl_dev)
>      return -1;
>    }
>  
> +   aio_ctx = ublksrv_aio_ctx_init(dev, 0);
> +   if (!aio_ctx) {
> +       fprintf(stderr, "dev %d call ublk_aio_ctx_init failed\n", dev_id);
> +       return -ENOMEM;
> +   }
> +
>    /* Create the threads. */
>    for (i = 0; i < nbd.len; ++i) {
>      /* Note this cannot fail because of previous reserve. */
>      thread_infos_append (&thread_info,
>                           (struct thread_info)
> -                         { .dev = dev, .i = i, .in_flight = 0 });
> +                         { .dev = dev, .i = i,});
> +
> +    ublksrv_aio_init_list(&thread_info.ptr[i].compl);
>  
> -    r = pthread_mutex_init (&thread_info.ptr[i].in_flight_mutex, NULL);
> -    if (r != 0)
> -      goto bad_pthread;
> -    r = pthread_cond_init (&thread_info.ptr[i].in_flight_cond, NULL);
> -    if (r != 0)
> -      goto bad_pthread;
> -    r = pthread_mutex_init (&thread_info.ptr[i].completed_commands_lock, 
> NULL);
> -    if (r != 0)
> -      goto bad_pthread;
>      r = pthread_create (&thread_info.ptr[i].io_uring_thread, NULL,
>                          io_uring_thread, &thread_info.ptr[i]);
>      if (r != 0)
> @@ -316,25 +416,11 @@ start_daemon (struct ublksrv_ctrl_dev *ctrl_dev)
>    for (i = 0; i < nbd.len; ++i)
>      pthread_join (thread_info.ptr[i].io_uring_thread, NULL);
>  
> -  /* Wait until a timeout while there are NBD commands in flight. */
> -  time (&st);
> -  while (time (NULL) - st <= RELEASE_TIMEOUT) {
> -    for (i = 0; i < nbd.len; ++i) {
> -      if (thread_info.ptr[i].in_flight > 0)
> -        break;
> -    }
> -    if (i == nbd.len) /* no commands in flight */
> -      break;
> -
> -    /* Signal to the operations threads to work. */
> -    for (i = 0; i < nbd.len; ++i) {
> -      pthread_mutex_lock (&thread_info.ptr[i].in_flight_mutex);
> -      pthread_cond_signal (&thread_info.ptr[i].in_flight_cond);
> -      pthread_mutex_unlock (&thread_info.ptr[i].in_flight_mutex);
> -    }
> -
> -    sleep (1);
> +  for (i = 0; i < nbd.len; ++i) {
> +    ublksrv_aio_ctx_shutdown(aio_ctx);
> +    pthread_join (thread_info.ptr[i].nbd_work_thread, NULL);
>    }
> +  ublksrv_aio_ctx_deinit(aio_ctx);
>  
>    ublksrv_dev_deinit (dev);
>    //thread_infos_reset (&thread_info);
> @@ -367,176 +453,31 @@ init_tgt (struct ublksrv_dev *dev, int type, int argc, 
> char *argv[])
>    return 0;
>  }
>  
> -/* Command completion callback (called on the NBD thread). */
> -static int
> -command_completed (void *vpdata, int *error)
> -{
> -  struct completion *completion = vpdata;
> -  struct ublksrv_queue *q = completion->q;
> -  const size_t i = q->q_id;
> -
> -  if (verbose)
> -    fprintf (stderr,
> -             "%s: command_completed: tag=%d q_id=%zu res=%d error=%d\n",
> -             "nbdublk", completion->tag, i, completion->res, *error);
> -
> -  /* If the command failed, override the normal result. */
> -  if (*error != 0)
> -    completion->res = *error;
> -
> -  assert (thread_info.ptr[i].in_flight >= 1);
> -  thread_info.ptr[i].in_flight--;
> -
> -  /* Copy the command to the list of completed commands.
> -   *
> -   * Note *completion is freed by the .free handler that we added to
> -   * this completion callback.
> -   */
> -  pthread_mutex_lock (&thread_info.ptr[i].completed_commands_lock);
> -  completions_append (&thread_info.ptr[i].completed_commands, *completion);
> -
> -  /* Signal io_uring thread that the command has been completed.
> -   * It will call us back in a different thread on ->handle_event
> -   * and we can finally complete the command(s) there.
> -   */
> -  ublksrv_queue_send_event (q);
> -  pthread_mutex_unlock (&thread_info.ptr[i].completed_commands_lock);
> -
> -  /* Retire the NBD command. */
> -  return 1;
> -}
> -
>  static void
> -handle_event (struct ublksrv_queue *q)
> +nbd_handle_event (struct ublksrv_queue *q)
>  {
> -  const size_t i = q->q_id;
> -  size_t j;
> -
>    if (verbose)
> -    fprintf (stderr, "%s: handle_event: q_id = %d\n", "nbdublk", q->q_id);
> +     fprintf (stderr, "%s: handle_event: q_id = %d\n", "nbdublk", q->q_id);
>  
> -  pthread_mutex_lock (&thread_info.ptr[i].completed_commands_lock);
> -
> -  for (j = 0; j < thread_info.ptr[i].completed_commands.len; ++j) {
> -    struct completion *completion =
> -      &thread_info.ptr[i].completed_commands.ptr[j];
> -    ublksrv_complete_io (completion->q, completion->tag, completion->res);
> -  }
> -  completions_reset (&thread_info.ptr[i].completed_commands);
> -  ublksrv_queue_handled_event (q);
> -
> -  pthread_mutex_unlock (&thread_info.ptr[i].completed_commands_lock);
> +  ublksrv_aio_handle_event(aio_ctx, q);
>  }
>  
> -/* Start a single command. */
> -static int
> -handle_io_async (struct ublksrv_queue *q, int tag)
> +static int nbd_handle_io_async(struct ublksrv_queue *q, int tag)
>  {
> -  const struct ublksrv_io_desc *iod = ublksrv_get_iod (q, tag);
> -  const unsigned op = ublksrv_get_op (iod);
> -  const unsigned flags = ublksrv_get_flags (iod);
> -  const bool fua = flags & UBLK_IO_F_FUA;
> -  const bool alloc_zero = flags & UBLK_IO_F_NOUNMAP; /* else punch hole */
> -  const size_t q_id = q->q_id; /* also the NBD handle number */
> -  struct nbd_handle *h = nbd.ptr[q_id];
> -  uint32_t nbd_flags = 0;
> -  int64_t r;
> -  nbd_completion_callback cb;
> -  struct completion *completion;
> +     const struct ublksrv_io_desc *iod = ublksrv_get_iod(q, tag);
> +     struct ublksrv_aio *req = ublksrv_aio_alloc_req(aio_ctx, 0);
>  
> -  if (verbose)
> -    fprintf (stderr, "%s: handle_io_async: tag = %d q_id = %zu\n",
> -             "nbdublk", tag, q_id);
> -
> -  /* Set up a completion callback and its user data. */
> -  completion = malloc (sizeof *completion);
> -  if (completion == NULL) abort ();
> -  completion->q = q;
> -  completion->tag = tag;
> -  completion->res = iod->nr_sectors << 9;
> -  cb.callback = command_completed;
> -  cb.user_data = completion;
> -  cb.free = free;
> -
> -  switch (op) {
> -  case UBLK_IO_OP_READ:
> -    r = nbd_aio_pread (h, (void *) iod->addr, iod->nr_sectors << 9,
> -                       iod->start_sector << 9, cb, 0);
> -    if (r == -1) {
> -      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
> -      ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL));
> -      return 0;
> -    }
> -    break;
> -
> -  case UBLK_IO_OP_WRITE:
> -    if (fua && can_fua)
> -      nbd_flags |= LIBNBD_CMD_FLAG_FUA;
> -
> -    r = nbd_aio_pwrite (h, (const void *) iod->addr, iod->nr_sectors << 9,
> -                        iod->start_sector << 9, cb, nbd_flags);
> -    if (r == -1) {
> -      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
> -      ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL));
> -      return 0;
> -    }
> -    break;
> -
> -  case UBLK_IO_OP_FLUSH:
> -    r = nbd_flush (h, 0);
> -    if (r == -1) {
> -      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
> -      ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL));
> -      return 0;
> -    }
> -    break;
> +     req->io = *iod;
> +     req->id = ublksrv_aio_pid_tag(q->q_id, tag);
> +     ublksrv_aio_submit_req(aio_ctx, req);
>  
> -  case UBLK_IO_OP_DISCARD:
> -    if (fua && can_fua)
> -      nbd_flags |= LIBNBD_CMD_FLAG_FUA;
> -
> -    r = nbd_trim (h, iod->nr_sectors << 9, iod->start_sector << 9, 
> nbd_flags);
> -    if (r == -1) {
> -      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
> -      ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL));
> -      return 0;
> -    }
> -    break;
> -
> -  case UBLK_IO_OP_WRITE_ZEROES:
> -    if (fua && can_fua)
> -      nbd_flags |= LIBNBD_CMD_FLAG_FUA;
> -
> -    if (alloc_zero)
> -      nbd_flags |= LIBNBD_CMD_FLAG_NO_HOLE;
> -
> -    r = nbd_zero (h, iod->nr_sectors << 9, iod->start_sector << 9, 
> nbd_flags);
> -    if (r == -1) {
> -      fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ());
> -      ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL));
> -      return 0;
> -    }
> -    break;
> -
> -  default:
> -    fprintf (stderr, "%s: unknown operation %u\n", "nbdublk", op);
> -    ublksrv_complete_io (q, tag, -ENOTSUP);
> -    return 0;
> -  }
> -
> -  /* Make sure the corresponding NBD worker sees the command. */
> -  pthread_mutex_lock (&thread_info.ptr[q_id].in_flight_mutex);
> -  thread_info.ptr[q_id].in_flight++;
> -  pthread_cond_signal (&thread_info.ptr[q_id].in_flight_cond);
> -  pthread_mutex_unlock (&thread_info.ptr[q_id].in_flight_mutex);
> -
> -  return 0;
> +     return 0;
>  }
>  
>  struct ublksrv_tgt_type tgt_type = {
>    .type = UBLKSRV_TGT_TYPE_NBD,
>    .name = "nbd",
>    .init_tgt = init_tgt,
> -  .handle_io_async = handle_io_async,
> -  .handle_event = handle_event,
> +  .handle_io_async = nbd_handle_io_async,
> +  .handle_event = nbd_handle_event,
>  };
> 
> Thanks,
> Ming

-- 
Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones
Read my programming and virtualization blog: http://rwmj.wordpress.com
nbdkit - Flexible, fast NBD server with plugins
https://gitlab.com/nbdkit/nbdkit
_______________________________________________
Libguestfs mailing list
Libguestfs@redhat.com
https://listman.redhat.com/mailman/listinfo/libguestfs

Reply via email to