On Tue, Sep 03, 2019 at 09:55:49AM -0400, Michael S. Tsirkin wrote:
[..]
> What's with all of the TODOs? Some of these are really scary,
> looks like they need to be figured out before this is merged.

Hi Michael,

One of the issue I noticed is races w.r.t device removal and super
block initialization. I am about to post a set of patches which
take care of these races and also get rid of some of the scary
TODOs. Other TODOs like suspend/restore, multiqueue support etc
are improvements which we can do over a period of time.

[..]
> > +/* Per-virtqueue state */
> > +struct virtio_fs_vq {
> > +   spinlock_t lock;
> > +   struct virtqueue *vq;     /* protected by ->lock */
> > +   struct work_struct done_work;
> > +   struct list_head queued_reqs;
> > +   struct delayed_work dispatch_work;
> > +   struct fuse_dev *fud;
> > +   bool connected;
> > +   long in_flight;
> > +   char name[24];
> 
> I'd keep names somewhere separate as they are not used on data path.

Ok, this sounds like a nice to have. Will take care of this once base
patch gets merged.

[..]
> > +struct virtio_fs_forget {
> > +   struct fuse_in_header ih;
> > +   struct fuse_forget_in arg;
> 
> These structures are all native endian.
> 
> Passing them to host will make cross-endian setups painful to support,
> and hardware implementations impossible.
> 
> How about converting everything to LE?

So looks like endianness issue is now resolved (going by the other
emails). So I will not worry about it.

[..]
> > +/* Add a new instance to the list or return -EEXIST if tag name exists*/
> > +static int virtio_fs_add_instance(struct virtio_fs *fs)
> > +{
> > +   struct virtio_fs *fs2;
> > +   bool duplicate = false;
> > +
> > +   mutex_lock(&virtio_fs_mutex);
> > +
> > +   list_for_each_entry(fs2, &virtio_fs_instances, list) {
> > +           if (strcmp(fs->tag, fs2->tag) == 0)
> > +                   duplicate = true;
> > +   }
> > +
> > +   if (!duplicate)
> > +           list_add_tail(&fs->list, &virtio_fs_instances);
> 
> 
> This is O(N^2) as it's presumably called for each istance.
> Doesn't scale - please switch to a tree or such.

This is O(N) and not O(N^2) right? Addition of device is O(N), search
during mount is O(N).

This is not a frequent event at all and number of virtiofs instances
per guest are expected to be fairly small (say less than 10). So I 
really don't think there is any value in converting this into a tree
(instead of a list).

[..]
> > +static void virtio_fs_free_devs(struct virtio_fs *fs)
> > +{
> > +   unsigned int i;
> > +
> > +   /* TODO lock */
> 
> Doesn't inspire confidence, does it?

Agreed. Getting rid of this in set of fixes I am about to post.

> 
> > +
> > +   for (i = 0; i < fs->nvqs; i++) {
> > +           struct virtio_fs_vq *fsvq = &fs->vqs[i];
> > +
> > +           if (!fsvq->fud)
> > +                   continue;
> > +
> > +           flush_work(&fsvq->done_work);
> > +           flush_delayed_work(&fsvq->dispatch_work);
> > +
> > +           /* TODO need to quiesce/end_requests/decrement dev_count */
> 
> Indeed. Won't this crash if we don't?

Took care of this as well.

[..]
> > +static void virtio_fs_hiprio_dispatch_work(struct work_struct *work)
> > +{
> > +   struct virtio_fs_forget *forget;
> > +   struct virtio_fs_vq *fsvq = container_of(work, struct virtio_fs_vq,
> > +                                            dispatch_work.work);
> > +   struct virtqueue *vq = fsvq->vq;
> > +   struct scatterlist sg;
> > +   struct scatterlist *sgs[] = {&sg};
> > +   bool notify;
> > +   int ret;
> > +
> > +   pr_debug("virtio-fs: worker %s called.\n", __func__);
> > +   while (1) {
> > +           spin_lock(&fsvq->lock);
> > +           forget = list_first_entry_or_null(&fsvq->queued_reqs,
> > +                                   struct virtio_fs_forget, list);
> > +           if (!forget) {
> > +                   spin_unlock(&fsvq->lock);
> > +                   return;
> > +           }
> > +
> > +           list_del(&forget->list);
> > +           if (!fsvq->connected) {
> > +                   spin_unlock(&fsvq->lock);
> > +                   kfree(forget);
> > +                   continue;
> > +           }
> > +
> > +           sg_init_one(&sg, forget, sizeof(*forget));
> 
> This passes to host a structure including "struct list_head list";
> 
> Not a good idea.

Ok, host does not have to see "struct list_head list". Its needed for
guest. Will look into getting rid of this.

> 
> 
> > +
> > +           /* Enqueue the request */
> > +           dev_dbg(&vq->vdev->dev, "%s\n", __func__);
> > +           ret = virtqueue_add_sgs(vq, sgs, 1, 0, forget, GFP_ATOMIC);
> 
> 
> This is easier as add_outbuf.

Will look into it.

> 
> Also - why GFP_ATOMIC?

Hmm..., may be it can be GFP_KERNEL. I don't see atomic context here. Will
look into it.

> 
> > +           if (ret < 0) {
> > +                   if (ret == -ENOMEM || ret == -ENOSPC) {
> > +                           pr_debug("virtio-fs: Could not queue FORGET: 
> > err=%d. Will try later\n",
> > +                                    ret);
> > +                           list_add_tail(&forget->list,
> > +                                           &fsvq->queued_reqs);
> > +                           schedule_delayed_work(&fsvq->dispatch_work,
> > +                                           msecs_to_jiffies(1));
> 
> Can't we we requeue after some buffers get consumed?

That's what dispatch work is doing. It tries to requeue the request after
a while.

[..]
> > +static int virtio_fs_probe(struct virtio_device *vdev)
> > +{
> > +   struct virtio_fs *fs;
> > +   int ret;
> > +
> > +   fs = devm_kzalloc(&vdev->dev, sizeof(*fs), GFP_KERNEL);
> > +   if (!fs)
> > +           return -ENOMEM;
> > +   vdev->priv = fs;
> > +
> > +   ret = virtio_fs_read_tag(vdev, fs);
> > +   if (ret < 0)
> > +           goto out;
> > +
> > +   ret = virtio_fs_setup_vqs(vdev, fs);
> > +   if (ret < 0)
> > +           goto out;
> > +
> > +   /* TODO vq affinity */
> > +   /* TODO populate notifications vq */
> 
> what's notifications vq?

It has not been implemented yet. At some point of time we want to have
a notion of notification queue so that host can send notifications to
guest. Will get rid of this comment for now.

[..]
> > +#ifdef CONFIG_PM_SLEEP
> > +static int virtio_fs_freeze(struct virtio_device *vdev)
> > +{
> > +   return 0; /* TODO */
> > +}
> > +
> > +static int virtio_fs_restore(struct virtio_device *vdev)
> > +{
> > +   return 0; /* TODO */
> > +}
> 
> Is this really a good idea? I'd rather it was implemented,
> but if not possible at all disabling PM seems better than just
> keep going.

I agree. Will look into disabling it.

> 
> > +#endif /* CONFIG_PM_SLEEP */
> > +
> > +const static struct virtio_device_id id_table[] = {
> > +   { VIRTIO_ID_FS, VIRTIO_DEV_ANY_ID },
> > +   {},
> > +};
> > +
> > +const static unsigned int feature_table[] = {};
> > +
> > +static struct virtio_driver virtio_fs_driver = {
> > +   .driver.name            = KBUILD_MODNAME,
> > +   .driver.owner           = THIS_MODULE,
> > +   .id_table               = id_table,
> > +   .feature_table          = feature_table,
> > +   .feature_table_size     = ARRAY_SIZE(feature_table),
> > +   /* TODO validate config_get != NULL */
> 
> Why?

Don't know. Stefan, do you remember why did you put this comment? If not,
I will get rid of it.

> 
> > +   .probe                  = virtio_fs_probe,
> > +   .remove                 = virtio_fs_remove,
> > +#ifdef CONFIG_PM_SLEEP
> > +   .freeze                 = virtio_fs_freeze,
> > +   .restore                = virtio_fs_restore,
> > +#endif
> > +};
> > +
> > +static void virtio_fs_wake_forget_and_unlock(struct fuse_iqueue *fiq)
> > +__releases(fiq->waitq.lock)
> > +{
> > +   struct fuse_forget_link *link;
> > +   struct virtio_fs_forget *forget;
> > +   struct scatterlist sg;
> > +   struct scatterlist *sgs[] = {&sg};
> > +   struct virtio_fs *fs;
> > +   struct virtqueue *vq;
> > +   struct virtio_fs_vq *fsvq;
> > +   bool notify;
> > +   u64 unique;
> > +   int ret;
> > +
> > +   link = fuse_dequeue_forget(fiq, 1, NULL);
> > +   unique = fuse_get_unique(fiq);
> > +
> > +   fs = fiq->priv;
> > +   fsvq = &fs->vqs[VQ_HIPRIO];
> > +   spin_unlock(&fiq->waitq.lock);
> > +
> > +   /* Allocate a buffer for the request */
> > +   forget = kmalloc(sizeof(*forget), GFP_NOFS | __GFP_NOFAIL);
> > +
> > +   forget->ih = (struct fuse_in_header){
> > +           .opcode = FUSE_FORGET,
> > +           .nodeid = link->forget_one.nodeid,
> > +           .unique = unique,
> > +           .len = sizeof(*forget),
> > +   };
> > +   forget->arg = (struct fuse_forget_in){
> > +           .nlookup = link->forget_one.nlookup,
> > +   };
> > +
> > +   sg_init_one(&sg, forget, sizeof(*forget));
> > +
> > +   /* Enqueue the request */
> > +   vq = fsvq->vq;
> > +   dev_dbg(&vq->vdev->dev, "%s\n", __func__);
> > +   spin_lock(&fsvq->lock);
> > +
> > +   ret = virtqueue_add_sgs(vq, sgs, 1, 0, forget, GFP_ATOMIC);
> > +   if (ret < 0) {
> > +           if (ret == -ENOMEM || ret == -ENOSPC) {
> > +                   pr_debug("virtio-fs: Could not queue FORGET: err=%d. 
> > Will try later.\n",
> > +                            ret);
> > +                   list_add_tail(&forget->list, &fsvq->queued_reqs);
> > +                   schedule_delayed_work(&fsvq->dispatch_work,
> > +                                   msecs_to_jiffies(1));
> > +           } else {
> > +                   pr_debug("virtio-fs: Could not queue FORGET: err=%d. 
> > Dropping it.\n",
> > +                            ret);
> > +                   kfree(forget);
> > +           }
> > +           spin_unlock(&fsvq->lock);
> > +           goto out;
> > +   }
> 
> 
> code duplicated from virtio_fs_hiprio_dispatch_work

Will look into reusing the code.

> 
> > +
> > +   fsvq->in_flight++;
> > +   notify = virtqueue_kick_prepare(vq);
> > +
> > +   spin_unlock(&fsvq->lock);
> > +
> > +   if (notify)
> > +           virtqueue_notify(vq);
> > +out:
> > +   kfree(link);
> > +}
> > +
> > +static void virtio_fs_wake_interrupt_and_unlock(struct fuse_iqueue *fiq)
> > +__releases(fiq->waitq.lock)
> > +{
> > +   /* TODO */
> 
> what's needed?

We have not implemented this interrupt thing. It interrupts the existing
pending requests. Its not a must to have.

[..]
> > +static int virtio_fs_enqueue_req(struct virtqueue *vq, struct fuse_req 
> > *req)
> > +{
> > +   /* requests need at least 4 elements */
> > +   struct scatterlist *stack_sgs[6];
> > +   struct scatterlist stack_sg[ARRAY_SIZE(stack_sgs)];
> > +   struct scatterlist **sgs = stack_sgs;
> > +   struct scatterlist *sg = stack_sg;
> > +   struct virtio_fs_vq *fsvq;
> > +   unsigned int argbuf_used = 0;
> > +   unsigned int out_sgs = 0;
> > +   unsigned int in_sgs = 0;
> > +   unsigned int total_sgs;
> > +   unsigned int i;
> > +   int ret;
> > +   bool notify;
> > +
> > +   /* Does the sglist fit on the stack? */
> > +   total_sgs = sg_count_fuse_req(req);
> > +   if (total_sgs > ARRAY_SIZE(stack_sgs)) {
> > +           sgs = kmalloc_array(total_sgs, sizeof(sgs[0]), GFP_ATOMIC);
> > +           sg = kmalloc_array(total_sgs, sizeof(sg[0]), GFP_ATOMIC);
> > +           if (!sgs || !sg) {
> > +                   ret = -ENOMEM;
> > +                   goto out;
> > +           }
> > +   }
> > +
> > +   /* Use a bounce buffer since stack args cannot be mapped */
> > +   ret = copy_args_to_argbuf(req);
> > +   if (ret < 0)
> > +           goto out;
> > +
> > +   /* Request elements */
> > +   sg_init_one(&sg[out_sgs++], &req->in.h, sizeof(req->in.h));
> > +   out_sgs += sg_init_fuse_args(&sg[out_sgs], req,
> > +                                (struct fuse_arg *)req->in.args,
> > +                                req->in.numargs, req->in.argpages,
> > +                                req->argbuf, &argbuf_used);
> > +
> > +   /* Reply elements */
> > +   if (test_bit(FR_ISREPLY, &req->flags)) {
> > +           sg_init_one(&sg[out_sgs + in_sgs++],
> > +                       &req->out.h, sizeof(req->out.h));
> > +           in_sgs += sg_init_fuse_args(&sg[out_sgs + in_sgs], req,
> > +                                       req->out.args, req->out.numargs,
> > +                                       req->out.argpages,
> > +                                       req->argbuf + argbuf_used, NULL);
> > +   }
> > +
> > +   WARN_ON(out_sgs + in_sgs != total_sgs);
> > +
> > +   for (i = 0; i < total_sgs; i++)
> > +           sgs[i] = &sg[i];
> > +
> > +   fsvq = vq_to_fsvq(vq);
> > +   spin_lock(&fsvq->lock);
> > +
> > +   ret = virtqueue_add_sgs(vq, sgs, out_sgs, in_sgs, req, GFP_ATOMIC);
> > +   if (ret < 0) {
> > +           /* TODO handle full virtqueue */
> 
> Indeed. What prevents this?

So for forget requests (VQ_HIPRIO), I already handled this by retrying 
submitting the request with the help of a worker.

For regular requests (VQ_REQUESTS), I have never run into virt queue
being full so far. That's why never worried about it.

So nothing prevents this. But we have not noticed it yet. So its a TODO
item. It will be nice to retry if virtuqueue gets full (instead of
returning error to caller).

[..]
> > +static void virtio_fs_wake_pending_and_unlock(struct fuse_iqueue *fiq)
> > +__releases(fiq->waitq.lock)
> > +{
> > +   unsigned int queue_id = VQ_REQUEST; /* TODO multiqueue */
> > +   struct virtio_fs *fs;
> > +   struct fuse_conn *fc;
> > +   struct fuse_req *req;
> > +   struct fuse_pqueue *fpq;
> > +   int ret;
> > +
> > +   WARN_ON(list_empty(&fiq->pending));
> > +   req = list_last_entry(&fiq->pending, struct fuse_req, list);
> > +   clear_bit(FR_PENDING, &req->flags);
> > +   list_del_init(&req->list);
> > +   WARN_ON(!list_empty(&fiq->pending));
> > +   spin_unlock(&fiq->waitq.lock);
> > +
> > +   fs = fiq->priv;
> > +   fc = fs->vqs[queue_id].fud->fc;
> > +
> > +   dev_dbg(&fs->vqs[queue_id].vq->vdev->dev,
> > +           "%s: opcode %u unique %#llx nodeid %#llx in.len %u out.len 
> > %u\n",
> > +           __func__, req->in.h.opcode, req->in.h.unique, req->in.h.nodeid,
> > +           req->in.h.len, fuse_len_args(req->out.numargs, req->out.args));
> > +
> > +   fpq = &fs->vqs[queue_id].fud->pq;
> > +   spin_lock(&fpq->lock);
> > +   if (!fpq->connected) {
> > +           spin_unlock(&fpq->lock);
> > +           req->out.h.error = -ENODEV;
> > +           pr_err("virtio-fs: %s disconnected\n", __func__);
> > +           fuse_request_end(fc, req);
> > +           return;
> > +   }
> > +   list_add_tail(&req->list, fpq->processing);
> > +   spin_unlock(&fpq->lock);
> > +   set_bit(FR_SENT, &req->flags);
> > +   /* matches barrier in request_wait_answer() */
> > +   smp_mb__after_atomic();
> > +   /* TODO check for FR_INTERRUPTED? */
> 
> 
> ?

hmm... we don't support FR_INTERRUPTED. Stefan, do you remember why
this TODO is here. If not, I will get rid of it.

> 
> > +
> > +retry:
> > +   ret = virtio_fs_enqueue_req(fs->vqs[queue_id].vq, req);
> > +   if (ret < 0) {
> > +           if (ret == -ENOMEM || ret == -ENOSPC) {
> > +                   /* Virtqueue full. Retry submission */
> > +                   usleep_range(20, 30);
> 
> again, why not respond to completion?

Using usleep() was a quick fix. Will look into using completion in second
round of cleanup. In first round I am taking care of more scary TODOs
which deal with races w.r.t device removal.

> 
> > +                   goto retry;
> > +           }
> > +           req->out.h.error = ret;
> > +           pr_err("virtio-fs: virtio_fs_enqueue_req() failed %d\n", ret);
> > +           fuse_request_end(fc, req);
> > +           return;
> > +   }
> > +}
> > +
> > +static void virtio_fs_flush_hiprio_queue(struct virtio_fs_vq *fsvq)
> > +{
> > +   struct virtio_fs_forget *forget;
> > +
> > +   WARN_ON(fsvq->in_flight < 0);
> > +
> > +   /* Go through pending forget requests and free them */
> > +   spin_lock(&fsvq->lock);
> > +   while (1) {
> > +           forget = list_first_entry_or_null(&fsvq->queued_reqs,
> > +                                   struct virtio_fs_forget, list);
> > +           if (!forget)
> > +                   break;
> > +           list_del(&forget->list);
> > +           kfree(forget);
> > +   }
> > +
> > +   spin_unlock(&fsvq->lock);
> > +
> > +   /* Wait for in flight requests to finish.*/
> > +   while (1) {
> > +           spin_lock(&fsvq->lock);
> > +           if (!fsvq->in_flight) {
> > +                   spin_unlock(&fsvq->lock);
> > +                   break;
> > +           }
> > +           spin_unlock(&fsvq->lock);
> > +           usleep_range(1000, 2000);
> 
> Same thing here. Can we use e.g. a completion and not usleep?

Second round cleanup.

> 
> > +   }
> > +}
> > +
> > +const static struct fuse_iqueue_ops virtio_fs_fiq_ops = {
> > +   .wake_forget_and_unlock         = virtio_fs_wake_forget_and_unlock,
> > +   .wake_interrupt_and_unlock      = virtio_fs_wake_interrupt_and_unlock,
> > +   .wake_pending_and_unlock        = virtio_fs_wake_pending_and_unlock,
> > +};
> > +
> > +static int virtio_fs_fill_super(struct super_block *sb)
> > +{
> > +   struct fuse_conn *fc = get_fuse_conn_super(sb);
> > +   struct virtio_fs *fs = fc->iq.priv;
> > +   unsigned int i;
> > +   int err;
> > +   struct fuse_req *init_req;
> > +   struct fuse_fs_context ctx = {
> > +           .rootmode = S_IFDIR,
> > +           .default_permissions = 1,
> > +           .allow_other = 1,
> > +           .max_read = UINT_MAX,
> > +           .blksize = 512,
> > +           .destroy = true,
> > +           .no_control = true,
> > +           .no_force_umount = true,
> > +   };
> > +
> > +   /* TODO lock */
> 
> what does this refer to?

Took care of in first round of cleanup.

> 
> > +   if (fs->vqs[VQ_REQUEST].fud) {
> > +           pr_err("virtio-fs: device already in use\n");
> > +           err = -EBUSY;
> > +           goto err;
> > +   }
> > +
> > +   err = -ENOMEM;
> > +   /* Allocate fuse_dev for hiprio and notification queues */
> > +   for (i = 0; i < VQ_REQUEST; i++) {
> > +           struct virtio_fs_vq *fsvq = &fs->vqs[i];
> > +
> > +           fsvq->fud = fuse_dev_alloc();
> > +           if (!fsvq->fud)
> > +                   goto err_free_fuse_devs;
> > +   }
> > +
> > +   init_req = fuse_request_alloc(0);
> > +   if (!init_req)
> > +           goto err_free_fuse_devs;
> > +   __set_bit(FR_BACKGROUND, &init_req->flags);
> > +
> > +   ctx.fudptr = (void **)&fs->vqs[VQ_REQUEST].fud;
> > +   err = fuse_fill_super_common(sb, &ctx);
> > +   if (err < 0)
> > +           goto err_free_init_req;
> > +
> > +   fc = fs->vqs[VQ_REQUEST].fud->fc;
> > +
> > +   /* TODO take fuse_mutex around this loop? */
> 
> Don't we need to figure this kind of thing out before this
> code lands upstream?

I think we don't need this TODO. fuse_connection object and associated
super block are still being formed. And my cleanup has taken care of
the additional locking.

Thanks
Vivek
_______________________________________________
Virtualization mailing list
Virtualization@lists.linux-foundation.org
https://lists.linuxfoundation.org/mailman/listinfo/virtualization

Reply via email to