Re: [RFC v2 1/2] virtio-pmem: Async virtio-pmem flush

2021-08-27 Thread Pankaj Gupta
> > > > > > Implement asynchronous flush for virtio pmem using work queue
> > > > > > to solve the preflush ordering issue. Also, coalesce the flush
> > > > > > requests when a flush is already in process.
> > > > > >
> > > > > > Signed-off-by: Pankaj Gupta 
> > > > > > ---
> > > > > >  drivers/nvdimm/nd_virtio.c   | 72 
> > > > > > 
> > > > > >  drivers/nvdimm/virtio_pmem.c | 10 -
> > > > > >  drivers/nvdimm/virtio_pmem.h | 14 +++
> > > > > >  3 files changed, 79 insertions(+), 17 deletions(-)
> > > > > >
> > > > > > diff --git a/drivers/nvdimm/nd_virtio.c b/drivers/nvdimm/nd_virtio.c
> > > > > > index 10351d5b49fa..61b655b583be 100644
> > > > > > --- a/drivers/nvdimm/nd_virtio.c
> > > > > > +++ b/drivers/nvdimm/nd_virtio.c
> > > > > > @@ -97,29 +97,69 @@ static int virtio_pmem_flush(struct nd_region 
> > > > > > *nd_region)
> > > > > > return err;
> > > > > >  };
> > > > > >
> > > > > > +static void submit_async_flush(struct work_struct *ws);
> > > > > > +
> > > > > >  /* The asynchronous flush callback function */
> > > > > >  int async_pmem_flush(struct nd_region *nd_region, struct bio *bio)
> > > > > >  {
> > > > > > -   /*
> > > > > > -* Create child bio for asynchronous flush and chain with
> > > > > > -* parent bio. Otherwise directly call nd_region flush.
> > > > > > +   /* queue asynchronous flush and coalesce the flush requests 
> > > > > > */
> > > > > > +   struct virtio_device *vdev = nd_region->provider_data;
> > > > > > +   struct virtio_pmem *vpmem  = vdev->priv;
> > > > > > +   ktime_t req_start = ktime_get_boottime();
> > > > > > +
> > > > > > +   spin_lock_irq(&vpmem->lock);
> > > > > > +   /* flush requests wait until ongoing flush completes,
> > > > > > +* hence coalescing all the pending requests.
> > > > > >  */
> > > > > > -   if (bio && bio->bi_iter.bi_sector != -1) {
> > > > > > -   struct bio *child = bio_alloc(GFP_ATOMIC, 0);
> > > > > > -
> > > > > > -   if (!child)
> > > > > > -   return -ENOMEM;
> > > > > > -   bio_copy_dev(child, bio);
> > > > > > -   child->bi_opf = REQ_PREFLUSH;
> > > > > > -   child->bi_iter.bi_sector = -1;
> > > > > > -   bio_chain(child, bio);
> > > > > > -   submit_bio(child);
> > > > > > -   return 0;
> > > > > > +   wait_event_lock_irq(vpmem->sb_wait,
> > > > > > +   !vpmem->flush_bio ||
> > > > > > +   ktime_before(req_start, 
> > > > > > vpmem->prev_flush_start),
> > > > > > +   vpmem->lock);
> > > > > > +   /* new request after previous flush is completed */
> > > > > > +   if (ktime_after(req_start, vpmem->prev_flush_start)) {
> > > > > > +   WARN_ON(vpmem->flush_bio);
> > > > > > +   vpmem->flush_bio = bio;
> > > > > > +   bio = NULL;
> > > > > > +   }
> > > > >
> > > > > Why the dance with ->prev_flush_start vs just calling queue_work()
> > > > > again. queue_work() is naturally coalescing in that if the last work
> > > > > request has not started execution another queue attempt will be
> > > > > dropped.
> > > >
> > > > How parent flush request will know when corresponding flush is 
> > > > completed?
> > >
> > > The eventual bio_endio() is what signals upper layers that the flush
> > > completed...
> > >
> > >
> > > Hold on... it's been so long that I forgot that you are copying
> > > md_flush_request() here. It would help immensely if that was mentioned
> > > in the changelog and at a minimum have a comment in the code that this
> > > was copied from md. In fact it would be extra helpful if you
> >
> > My bad. I only mentioned this in the cover letter.
>
> Yeah, sorry about that. Having come back to this after so long I just
> decided to jump straight into the patches, but even if I had read that
> cover I still would have given the feedback that md_flush_request()
> heritage should also be noted with a comment in the code.

Sure.

Thanks,
Pankaj



Re: [RFC v2 1/2] virtio-pmem: Async virtio-pmem flush

2021-08-25 Thread Dan Williams
On Wed, Aug 25, 2021 at 3:01 PM Pankaj Gupta  wrote:
>
> > > Hi Dan,
> > >
> > > Thank you for the review. Please see my reply inline.
> > >
> > > > > Implement asynchronous flush for virtio pmem using work queue
> > > > > to solve the preflush ordering issue. Also, coalesce the flush
> > > > > requests when a flush is already in process.
> > > > >
> > > > > Signed-off-by: Pankaj Gupta 
> > > > > ---
> > > > >  drivers/nvdimm/nd_virtio.c   | 72 
> > > > > 
> > > > >  drivers/nvdimm/virtio_pmem.c | 10 -
> > > > >  drivers/nvdimm/virtio_pmem.h | 14 +++
> > > > >  3 files changed, 79 insertions(+), 17 deletions(-)
> > > > >
> > > > > diff --git a/drivers/nvdimm/nd_virtio.c b/drivers/nvdimm/nd_virtio.c
> > > > > index 10351d5b49fa..61b655b583be 100644
> > > > > --- a/drivers/nvdimm/nd_virtio.c
> > > > > +++ b/drivers/nvdimm/nd_virtio.c
> > > > > @@ -97,29 +97,69 @@ static int virtio_pmem_flush(struct nd_region 
> > > > > *nd_region)
> > > > > return err;
> > > > >  };
> > > > >
> > > > > +static void submit_async_flush(struct work_struct *ws);
> > > > > +
> > > > >  /* The asynchronous flush callback function */
> > > > >  int async_pmem_flush(struct nd_region *nd_region, struct bio *bio)
> > > > >  {
> > > > > -   /*
> > > > > -* Create child bio for asynchronous flush and chain with
> > > > > -* parent bio. Otherwise directly call nd_region flush.
> > > > > +   /* queue asynchronous flush and coalesce the flush requests */
> > > > > +   struct virtio_device *vdev = nd_region->provider_data;
> > > > > +   struct virtio_pmem *vpmem  = vdev->priv;
> > > > > +   ktime_t req_start = ktime_get_boottime();
> > > > > +
> > > > > +   spin_lock_irq(&vpmem->lock);
> > > > > +   /* flush requests wait until ongoing flush completes,
> > > > > +* hence coalescing all the pending requests.
> > > > >  */
> > > > > -   if (bio && bio->bi_iter.bi_sector != -1) {
> > > > > -   struct bio *child = bio_alloc(GFP_ATOMIC, 0);
> > > > > -
> > > > > -   if (!child)
> > > > > -   return -ENOMEM;
> > > > > -   bio_copy_dev(child, bio);
> > > > > -   child->bi_opf = REQ_PREFLUSH;
> > > > > -   child->bi_iter.bi_sector = -1;
> > > > > -   bio_chain(child, bio);
> > > > > -   submit_bio(child);
> > > > > -   return 0;
> > > > > +   wait_event_lock_irq(vpmem->sb_wait,
> > > > > +   !vpmem->flush_bio ||
> > > > > +   ktime_before(req_start, 
> > > > > vpmem->prev_flush_start),
> > > > > +   vpmem->lock);
> > > > > +   /* new request after previous flush is completed */
> > > > > +   if (ktime_after(req_start, vpmem->prev_flush_start)) {
> > > > > +   WARN_ON(vpmem->flush_bio);
> > > > > +   vpmem->flush_bio = bio;
> > > > > +   bio = NULL;
> > > > > +   }
> > > >
> > > > Why the dance with ->prev_flush_start vs just calling queue_work()
> > > > again. queue_work() is naturally coalescing in that if the last work
> > > > request has not started execution another queue attempt will be
> > > > dropped.
> > >
> > > How parent flush request will know when corresponding flush is completed?
> >
> > The eventual bio_endio() is what signals upper layers that the flush
> > completed...
> >
> >
> > Hold on... it's been so long that I forgot that you are copying
> > md_flush_request() here. It would help immensely if that was mentioned
> > in the changelog and at a minimum have a comment in the code that this
> > was copied from md. In fact it would be extra helpful if you
>
> My bad. I only mentioned this in the cover letter.

Yeah, sorry about that. Having come back to this after so long I just
decided to jump straight into the patches, but even if I had read that
cover I still would have given the feedback that md_flush_request()
heritage should also be noted with a comment in the code.



Re: [RFC v2 1/2] virtio-pmem: Async virtio-pmem flush

2021-08-25 Thread Pankaj Gupta
> > Hi Dan,
> >
> > Thank you for the review. Please see my reply inline.
> >
> > > > Implement asynchronous flush for virtio pmem using work queue
> > > > to solve the preflush ordering issue. Also, coalesce the flush
> > > > requests when a flush is already in process.
> > > >
> > > > Signed-off-by: Pankaj Gupta 
> > > > ---
> > > >  drivers/nvdimm/nd_virtio.c   | 72 
> > > >  drivers/nvdimm/virtio_pmem.c | 10 -
> > > >  drivers/nvdimm/virtio_pmem.h | 14 +++
> > > >  3 files changed, 79 insertions(+), 17 deletions(-)
> > > >
> > > > diff --git a/drivers/nvdimm/nd_virtio.c b/drivers/nvdimm/nd_virtio.c
> > > > index 10351d5b49fa..61b655b583be 100644
> > > > --- a/drivers/nvdimm/nd_virtio.c
> > > > +++ b/drivers/nvdimm/nd_virtio.c
> > > > @@ -97,29 +97,69 @@ static int virtio_pmem_flush(struct nd_region 
> > > > *nd_region)
> > > > return err;
> > > >  };
> > > >
> > > > +static void submit_async_flush(struct work_struct *ws);
> > > > +
> > > >  /* The asynchronous flush callback function */
> > > >  int async_pmem_flush(struct nd_region *nd_region, struct bio *bio)
> > > >  {
> > > > -   /*
> > > > -* Create child bio for asynchronous flush and chain with
> > > > -* parent bio. Otherwise directly call nd_region flush.
> > > > +   /* queue asynchronous flush and coalesce the flush requests */
> > > > +   struct virtio_device *vdev = nd_region->provider_data;
> > > > +   struct virtio_pmem *vpmem  = vdev->priv;
> > > > +   ktime_t req_start = ktime_get_boottime();
> > > > +
> > > > +   spin_lock_irq(&vpmem->lock);
> > > > +   /* flush requests wait until ongoing flush completes,
> > > > +* hence coalescing all the pending requests.
> > > >  */
> > > > -   if (bio && bio->bi_iter.bi_sector != -1) {
> > > > -   struct bio *child = bio_alloc(GFP_ATOMIC, 0);
> > > > -
> > > > -   if (!child)
> > > > -   return -ENOMEM;
> > > > -   bio_copy_dev(child, bio);
> > > > -   child->bi_opf = REQ_PREFLUSH;
> > > > -   child->bi_iter.bi_sector = -1;
> > > > -   bio_chain(child, bio);
> > > > -   submit_bio(child);
> > > > -   return 0;
> > > > +   wait_event_lock_irq(vpmem->sb_wait,
> > > > +   !vpmem->flush_bio ||
> > > > +   ktime_before(req_start, 
> > > > vpmem->prev_flush_start),
> > > > +   vpmem->lock);
> > > > +   /* new request after previous flush is completed */
> > > > +   if (ktime_after(req_start, vpmem->prev_flush_start)) {
> > > > +   WARN_ON(vpmem->flush_bio);
> > > > +   vpmem->flush_bio = bio;
> > > > +   bio = NULL;
> > > > +   }
> > >
> > > Why the dance with ->prev_flush_start vs just calling queue_work()
> > > again. queue_work() is naturally coalescing in that if the last work
> > > request has not started execution another queue attempt will be
> > > dropped.
> >
> > How parent flush request will know when corresponding flush is completed?
>
> The eventual bio_endio() is what signals upper layers that the flush
> completed...
>
>
> Hold on... it's been so long that I forgot that you are copying
> md_flush_request() here. It would help immensely if that was mentioned
> in the changelog and at a minimum have a comment in the code that this
> was copied from md. In fact it would be extra helpful if you

My bad. I only mentioned this in the cover letter.

> refactored a common helper that bio based block drivers could share
> for implementing flush handling, but that can come later.

Sure.
>
> Let me go re-review this with respect to whether the md case is fully
> applicable here.

o.k.

Best regards,
Pankaj



Re: [RFC v2 1/2] virtio-pmem: Async virtio-pmem flush

2021-08-25 Thread Dan Williams
On Wed, Aug 25, 2021 at 1:02 PM Pankaj Gupta
 wrote:
>
> Hi Dan,
>
> Thank you for the review. Please see my reply inline.
>
> > > Implement asynchronous flush for virtio pmem using work queue
> > > to solve the preflush ordering issue. Also, coalesce the flush
> > > requests when a flush is already in process.
> > >
> > > Signed-off-by: Pankaj Gupta 
> > > ---
> > >  drivers/nvdimm/nd_virtio.c   | 72 
> > >  drivers/nvdimm/virtio_pmem.c | 10 -
> > >  drivers/nvdimm/virtio_pmem.h | 14 +++
> > >  3 files changed, 79 insertions(+), 17 deletions(-)
> > >
> > > diff --git a/drivers/nvdimm/nd_virtio.c b/drivers/nvdimm/nd_virtio.c
> > > index 10351d5b49fa..61b655b583be 100644
> > > --- a/drivers/nvdimm/nd_virtio.c
> > > +++ b/drivers/nvdimm/nd_virtio.c
> > > @@ -97,29 +97,69 @@ static int virtio_pmem_flush(struct nd_region 
> > > *nd_region)
> > > return err;
> > >  };
> > >
> > > +static void submit_async_flush(struct work_struct *ws);
> > > +
> > >  /* The asynchronous flush callback function */
> > >  int async_pmem_flush(struct nd_region *nd_region, struct bio *bio)
> > >  {
> > > -   /*
> > > -* Create child bio for asynchronous flush and chain with
> > > -* parent bio. Otherwise directly call nd_region flush.
> > > +   /* queue asynchronous flush and coalesce the flush requests */
> > > +   struct virtio_device *vdev = nd_region->provider_data;
> > > +   struct virtio_pmem *vpmem  = vdev->priv;
> > > +   ktime_t req_start = ktime_get_boottime();
> > > +
> > > +   spin_lock_irq(&vpmem->lock);
> > > +   /* flush requests wait until ongoing flush completes,
> > > +* hence coalescing all the pending requests.
> > >  */
> > > -   if (bio && bio->bi_iter.bi_sector != -1) {
> > > -   struct bio *child = bio_alloc(GFP_ATOMIC, 0);
> > > -
> > > -   if (!child)
> > > -   return -ENOMEM;
> > > -   bio_copy_dev(child, bio);
> > > -   child->bi_opf = REQ_PREFLUSH;
> > > -   child->bi_iter.bi_sector = -1;
> > > -   bio_chain(child, bio);
> > > -   submit_bio(child);
> > > -   return 0;
> > > +   wait_event_lock_irq(vpmem->sb_wait,
> > > +   !vpmem->flush_bio ||
> > > +   ktime_before(req_start, 
> > > vpmem->prev_flush_start),
> > > +   vpmem->lock);
> > > +   /* new request after previous flush is completed */
> > > +   if (ktime_after(req_start, vpmem->prev_flush_start)) {
> > > +   WARN_ON(vpmem->flush_bio);
> > > +   vpmem->flush_bio = bio;
> > > +   bio = NULL;
> > > +   }
> >
> > Why the dance with ->prev_flush_start vs just calling queue_work()
> > again. queue_work() is naturally coalescing in that if the last work
> > request has not started execution another queue attempt will be
> > dropped.
>
> How parent flush request will know when corresponding flush is completed?

The eventual bio_endio() is what signals upper layers that the flush
completed...


Hold on... it's been so long that I forgot that you are copying
md_flush_request() here. It would help immensely if that was mentioned
in the changelog and at a minimum have a comment in the code that this
was copied from md. In fact it would be extra helpful if you
refactored a common helper that bio based block drivers could share
for implementing flush handling, but that can come later.

Let me go re-review this with respect to whether the md case is fully
applicable here.



Re: [RFC v2 1/2] virtio-pmem: Async virtio-pmem flush

2021-08-25 Thread Pankaj Gupta
Hi Dan,

Thank you for the review. Please see my reply inline.

> > Implement asynchronous flush for virtio pmem using work queue
> > to solve the preflush ordering issue. Also, coalesce the flush
> > requests when a flush is already in process.
> >
> > Signed-off-by: Pankaj Gupta 
> > ---
> >  drivers/nvdimm/nd_virtio.c   | 72 
> >  drivers/nvdimm/virtio_pmem.c | 10 -
> >  drivers/nvdimm/virtio_pmem.h | 14 +++
> >  3 files changed, 79 insertions(+), 17 deletions(-)
> >
> > diff --git a/drivers/nvdimm/nd_virtio.c b/drivers/nvdimm/nd_virtio.c
> > index 10351d5b49fa..61b655b583be 100644
> > --- a/drivers/nvdimm/nd_virtio.c
> > +++ b/drivers/nvdimm/nd_virtio.c
> > @@ -97,29 +97,69 @@ static int virtio_pmem_flush(struct nd_region 
> > *nd_region)
> > return err;
> >  };
> >
> > +static void submit_async_flush(struct work_struct *ws);
> > +
> >  /* The asynchronous flush callback function */
> >  int async_pmem_flush(struct nd_region *nd_region, struct bio *bio)
> >  {
> > -   /*
> > -* Create child bio for asynchronous flush and chain with
> > -* parent bio. Otherwise directly call nd_region flush.
> > +   /* queue asynchronous flush and coalesce the flush requests */
> > +   struct virtio_device *vdev = nd_region->provider_data;
> > +   struct virtio_pmem *vpmem  = vdev->priv;
> > +   ktime_t req_start = ktime_get_boottime();
> > +
> > +   spin_lock_irq(&vpmem->lock);
> > +   /* flush requests wait until ongoing flush completes,
> > +* hence coalescing all the pending requests.
> >  */
> > -   if (bio && bio->bi_iter.bi_sector != -1) {
> > -   struct bio *child = bio_alloc(GFP_ATOMIC, 0);
> > -
> > -   if (!child)
> > -   return -ENOMEM;
> > -   bio_copy_dev(child, bio);
> > -   child->bi_opf = REQ_PREFLUSH;
> > -   child->bi_iter.bi_sector = -1;
> > -   bio_chain(child, bio);
> > -   submit_bio(child);
> > -   return 0;
> > +   wait_event_lock_irq(vpmem->sb_wait,
> > +   !vpmem->flush_bio ||
> > +   ktime_before(req_start, 
> > vpmem->prev_flush_start),
> > +   vpmem->lock);
> > +   /* new request after previous flush is completed */
> > +   if (ktime_after(req_start, vpmem->prev_flush_start)) {
> > +   WARN_ON(vpmem->flush_bio);
> > +   vpmem->flush_bio = bio;
> > +   bio = NULL;
> > +   }
>
> Why the dance with ->prev_flush_start vs just calling queue_work()
> again. queue_work() is naturally coalescing in that if the last work
> request has not started execution another queue attempt will be
> dropped.

How parent flush request will know when corresponding flush is completed?

>
> > +   spin_unlock_irq(&vpmem->lock);
> > +
> > +   if (!bio) {
> > +   INIT_WORK(&vpmem->flush_work, submit_async_flush);
>
> I expect this only needs to be initialized once at driver init time.

yes, will fix this.
>
> > +   queue_work(vpmem->pmem_wq, &vpmem->flush_work);
> > +   return 1;
> > +   }
> > +
> > +   /* flush completed in other context while we waited */
> > +   if (bio && (bio->bi_opf & REQ_PREFLUSH)) {
> > +   bio->bi_opf &= ~REQ_PREFLUSH;
> > +   submit_bio(bio);
> > +   } else if (bio && (bio->bi_opf & REQ_FUA)) {
> > +   bio->bi_opf &= ~REQ_FUA;
> > +   bio_endio(bio);
>
> It's not clear to me how this happens, shouldn't all flush completions
> be driven from the work completion?

Requests should progress after notified by ongoing flush completion
event.

>
> > }
> > -   if (virtio_pmem_flush(nd_region))
> > -   return -EIO;
> >
> > return 0;
> >  };
> >  EXPORT_SYMBOL_GPL(async_pmem_flush);
> > +
> > +static void submit_async_flush(struct work_struct *ws)
> > +{
> > +   struct virtio_pmem *vpmem = container_of(ws, struct virtio_pmem, 
> > flush_work);
> > +   struct bio *bio = vpmem->flush_bio;
> > +
> > +   vpmem->start_flush = ktime_get_boottime();
> > +   bio->bi_status = 
> > errno_to_blk_status(virtio_pmem_flush(vpmem->nd_region));
> > +   vpmem->prev_flush_start = vpmem->start_flush;
> > +   vpmem->flush_bio = NULL;
> > +   wake_up(&vpmem->sb_wait);
> > +
> > +   /* Submit parent bio only for PREFLUSH */
> > +   if (bio && (bio->bi_opf & REQ_PREFLUSH)) {
> > +   bio->bi_opf &= ~REQ_PREFLUSH;
> > +   submit_bio(bio);
> > +   } else if (bio && (bio->bi_opf & REQ_FUA)) {
> > +   bio->bi_opf &= ~REQ_FUA;
> > +   bio_endio(bio);
> > +   }
>
> Shouldn't the wait_event_lock_irq() be here rather than in
> async_pmem_flush()? That will cause the workqueue to back up and flush
> requests to coalesce.

but this is coalesced flush request

Re: [RFC v2 1/2] virtio-pmem: Async virtio-pmem flush

2021-08-25 Thread Dan Williams
On Sun, Jul 25, 2021 at 11:09 PM Pankaj Gupta
 wrote:
>
> From: Pankaj Gupta 
>
> Implement asynchronous flush for virtio pmem using work queue
> to solve the preflush ordering issue. Also, coalesce the flush
> requests when a flush is already in process.
>
> Signed-off-by: Pankaj Gupta 
> ---
>  drivers/nvdimm/nd_virtio.c   | 72 
>  drivers/nvdimm/virtio_pmem.c | 10 -
>  drivers/nvdimm/virtio_pmem.h | 14 +++
>  3 files changed, 79 insertions(+), 17 deletions(-)
>
> diff --git a/drivers/nvdimm/nd_virtio.c b/drivers/nvdimm/nd_virtio.c
> index 10351d5b49fa..61b655b583be 100644
> --- a/drivers/nvdimm/nd_virtio.c
> +++ b/drivers/nvdimm/nd_virtio.c
> @@ -97,29 +97,69 @@ static int virtio_pmem_flush(struct nd_region *nd_region)
> return err;
>  };
>
> +static void submit_async_flush(struct work_struct *ws);
> +
>  /* The asynchronous flush callback function */
>  int async_pmem_flush(struct nd_region *nd_region, struct bio *bio)
>  {
> -   /*
> -* Create child bio for asynchronous flush and chain with
> -* parent bio. Otherwise directly call nd_region flush.
> +   /* queue asynchronous flush and coalesce the flush requests */
> +   struct virtio_device *vdev = nd_region->provider_data;
> +   struct virtio_pmem *vpmem  = vdev->priv;
> +   ktime_t req_start = ktime_get_boottime();
> +
> +   spin_lock_irq(&vpmem->lock);
> +   /* flush requests wait until ongoing flush completes,
> +* hence coalescing all the pending requests.
>  */
> -   if (bio && bio->bi_iter.bi_sector != -1) {
> -   struct bio *child = bio_alloc(GFP_ATOMIC, 0);
> -
> -   if (!child)
> -   return -ENOMEM;
> -   bio_copy_dev(child, bio);
> -   child->bi_opf = REQ_PREFLUSH;
> -   child->bi_iter.bi_sector = -1;
> -   bio_chain(child, bio);
> -   submit_bio(child);
> -   return 0;
> +   wait_event_lock_irq(vpmem->sb_wait,
> +   !vpmem->flush_bio ||
> +   ktime_before(req_start, vpmem->prev_flush_start),
> +   vpmem->lock);
> +   /* new request after previous flush is completed */
> +   if (ktime_after(req_start, vpmem->prev_flush_start)) {
> +   WARN_ON(vpmem->flush_bio);
> +   vpmem->flush_bio = bio;
> +   bio = NULL;
> +   }

Why the dance with ->prev_flush_start vs just calling queue_work()
again. queue_work() is naturally coalescing in that if the last work
request has not started execution another queue attempt will be
dropped.

> +   spin_unlock_irq(&vpmem->lock);
> +
> +   if (!bio) {
> +   INIT_WORK(&vpmem->flush_work, submit_async_flush);

I expect this only needs to be initialized once at driver init time.

> +   queue_work(vpmem->pmem_wq, &vpmem->flush_work);
> +   return 1;
> +   }
> +
> +   /* flush completed in other context while we waited */
> +   if (bio && (bio->bi_opf & REQ_PREFLUSH)) {
> +   bio->bi_opf &= ~REQ_PREFLUSH;
> +   submit_bio(bio);
> +   } else if (bio && (bio->bi_opf & REQ_FUA)) {
> +   bio->bi_opf &= ~REQ_FUA;
> +   bio_endio(bio);

It's not clear to me how this happens, shouldn't all flush completions
be driven from the work completion?

> }
> -   if (virtio_pmem_flush(nd_region))
> -   return -EIO;
>
> return 0;
>  };
>  EXPORT_SYMBOL_GPL(async_pmem_flush);
> +
> +static void submit_async_flush(struct work_struct *ws)
> +{
> +   struct virtio_pmem *vpmem = container_of(ws, struct virtio_pmem, 
> flush_work);
> +   struct bio *bio = vpmem->flush_bio;
> +
> +   vpmem->start_flush = ktime_get_boottime();
> +   bio->bi_status = 
> errno_to_blk_status(virtio_pmem_flush(vpmem->nd_region));
> +   vpmem->prev_flush_start = vpmem->start_flush;
> +   vpmem->flush_bio = NULL;
> +   wake_up(&vpmem->sb_wait);
> +
> +   /* Submit parent bio only for PREFLUSH */
> +   if (bio && (bio->bi_opf & REQ_PREFLUSH)) {
> +   bio->bi_opf &= ~REQ_PREFLUSH;
> +   submit_bio(bio);
> +   } else if (bio && (bio->bi_opf & REQ_FUA)) {
> +   bio->bi_opf &= ~REQ_FUA;
> +   bio_endio(bio);
> +   }

Shouldn't the wait_event_lock_irq() be here rather than in
async_pmem_flush()? That will cause the workqueue to back up and flush
requests to coalesce.

> +}
>  MODULE_LICENSE("GPL");
> diff --git a/drivers/nvdimm/virtio_pmem.c b/drivers/nvdimm/virtio_pmem.c
> index 726c7354d465..56780a6140c7 100644
> --- a/drivers/nvdimm/virtio_pmem.c
> +++ b/drivers/nvdimm/virtio_pmem.c
> @@ -24,6 +24,7 @@ static int init_vq(struct virtio_pmem *vpmem)
> return PTR_ERR(vpmem->req_vq);
>
> spin_lock_init(&vpmem->pmem_lock);
> +   spin_lock_init(&vpmem->lock);

Why 2 l