Maxim Patlasov <mpatla...@virtuozzo.com> writes:

> The ioctl(PLOOP_IOC_PUSH_BACKUP_IO) has two mode of operation:
>
Ack. See minor issue below.
> 1) ctl.direction=PLOOP_READ tells userspace which cluster-blocks to
> push out-of-band; moves processed preq-s from pending_tree to reported_tree
>
> 2) ctl.direction=PLOOP_WRITE tells kernel which cluster-blocks were pushed --
> they are either ordinarily processed preq-s or out-of-band ones; the kernel
> match the blocks to preq-s in reported_tree and re-schedules them.
>
> Signed-off-by: Maxim Patlasov <mpatla...@virtuozzo.com>
> ---
>  drivers/block/ploop/dev.c         |  105 ++++++++++++++++++++
>  drivers/block/ploop/push_backup.c |  197 
> +++++++++++++++++++++++++++++++++++++
>  drivers/block/ploop/push_backup.h |    5 +
>  include/linux/ploop/ploop_if.h    |   23 ++++
>  4 files changed, 330 insertions(+)
>
> diff --git a/drivers/block/ploop/dev.c b/drivers/block/ploop/dev.c
> index 23da9f5..2a77d2e 100644
> --- a/drivers/block/ploop/dev.c
> +++ b/drivers/block/ploop/dev.c
> @@ -4529,6 +4529,108 @@ pb_init_done:
>       return rc;
>  }
>  
> +static int ploop_push_backup_io_read(struct ploop_device *plo, unsigned long 
> arg,
> +                                  struct ploop_push_backup_io_ctl *ctl)
> +{
> +     struct ploop_push_backup_ctl_extent *e;
> +     unsigned n_extents = 0;
> +     int rc = 0;
> +
> +     e = kmalloc(sizeof(*e) * ctl->n_extents, GFP_KERNEL);
> +     if (!e)
> +             return -ENOMEM;
> +
> +     while (n_extents < ctl->n_extents) {
> +             cluster_t clu, len;
> +             rc = ploop_pb_get_pending(plo->pbd, &clu, &len, n_extents);
> +             if (rc)
> +                     goto io_read_done;
> +
> +             e[n_extents].clu = clu;
> +             e[n_extents].len = len;
> +             n_extents++;
> +     }
> +
> +     rc = -EFAULT;
> +     ctl->n_extents = n_extents;
> +     if (copy_to_user((void*)arg, ctl, sizeof(*ctl)))
> +             goto io_read_done;
> +     if (n_extents &&
> +         copy_to_user((void*)(arg + sizeof(*ctl)), e,
> +                      n_extents * sizeof(*e)))
> +                     goto io_read_done;
> +     rc = 0;
> +
> +io_read_done:
> +     kfree(e);
> +     return rc;
> +}
> +
> +static int ploop_push_backup_io_write(struct ploop_device *plo, unsigned 
> long arg,
> +                                   struct ploop_push_backup_io_ctl *ctl)
> +{
> +     struct ploop_push_backup_ctl_extent *e;
> +     unsigned i;
> +     int rc = 0;
> +
> +     e = kmalloc(sizeof(*e) * ctl->n_extents, GFP_KERNEL);
> +     if (!e)
> +             return -ENOMEM;
> +
> +     rc = -EFAULT;
> +     if (copy_from_user(e, (void*)(arg + sizeof(*ctl)),
> +                        ctl->n_extents * sizeof(*e)))
> +             goto io_write_done;
> +
> +     rc = 0;
> +     for (i = 0; i < ctl->n_extents; i++) {
> +             cluster_t j;
> +             for (j = e[i].clu; j < e[i].clu + e[i].len; j++)
> +                     ploop_pb_put_reported(plo->pbd, j, 1);
> +                /* OPTIMIZE ME LATER: like this:
> +              * ploop_pb_put_reported(plo->pbd, e[i].clu, e[i].len); */
> +     }
> +
> +io_write_done:
> +     kfree(e);
> +     return rc;
> +}
> +
> +static int ploop_push_backup_io(struct ploop_device *plo, unsigned long arg)
> +{
> +     struct ploop_push_backup_io_ctl ctl;
> +     struct ploop_pushbackup_desc *pbd = plo->pbd;
> +
> +     if (list_empty(&plo->map.delta_list))
> +             return -ENOENT;
> +
> +     if (plo->maintenance_type != PLOOP_MNTN_PUSH_BACKUP)
> +             return -EINVAL;
> +
> +     BUG_ON (!pbd);
> +
> +     if (copy_from_user(&ctl, (void*)arg, sizeof(ctl)))
> +             return -EFAULT;
> +
> +     if (!ctl.n_extents)
> +             return -EINVAL;
> +
> +     if (ploop_pb_check_uuid(pbd, ctl.cbt_uuid)) {
> +             printk("ploop(%d): PUSH_BACKUP_IO uuid mismatch\n",
> +                    plo->index);
> +             return -EINVAL;
> +     }
> +
> +     switch(ctl.direction) {
> +     case PLOOP_READ:
> +             return ploop_push_backup_io_read(plo, arg, &ctl);
> +     case PLOOP_WRITE:
> +             return ploop_push_backup_io_write(plo, arg, &ctl);
> +     }
> +
> +     return -EINVAL;
> +}
> +
>  static int ploop_push_backup_stop(struct ploop_device *plo, unsigned long 
> arg)
>  {
>       struct ploop_pushbackup_desc *pbd = plo->pbd;
> @@ -4667,6 +4769,9 @@ static int ploop_ioctl(struct block_device *bdev, 
> fmode_t fmode, unsigned int cm
>       case PLOOP_IOC_PUSH_BACKUP_INIT:
>               err = ploop_push_backup_init(plo, arg);
>               break;
> +     case PLOOP_IOC_PUSH_BACKUP_IO:
> +             err = ploop_push_backup_io(plo, arg);
> +             break;
>       case PLOOP_IOC_PUSH_BACKUP_STOP:
>               err = ploop_push_backup_stop(plo, arg);
>               break;
> diff --git a/drivers/block/ploop/push_backup.c 
> b/drivers/block/ploop/push_backup.c
> index ecc9862..477caf7 100644
> --- a/drivers/block/ploop/push_backup.c
> +++ b/drivers/block/ploop/push_backup.c
> @@ -256,6 +256,89 @@ int ploop_pb_copy_cbt_to_user(struct 
> ploop_pushbackup_desc *pbd, char *user_addr
>       return 0;
>  }
>  
> +static void ploop_pb_add_req_to_tree(struct ploop_request *preq,
> +                                  struct rb_root *tree)
> +{
> +     struct rb_node ** p = &tree->rb_node;
> +     struct rb_node *parent = NULL;
> +     struct ploop_request * pr;
> +
> +     while (*p) {
> +             parent = *p;
> +             pr = rb_entry(parent, struct ploop_request, reloc_link);
> +             BUG_ON (preq->req_cluster == pr->req_cluster);
> +
> +             if (preq->req_cluster < pr->req_cluster)
> +                     p = &(*p)->rb_left;
> +             else
> +                     p = &(*p)->rb_right;
> +     }
> +
> +     rb_link_node(&preq->reloc_link, parent, p);
> +     rb_insert_color(&preq->reloc_link, tree);
> +}
> +
> +static void ploop_pb_add_req_to_reported(struct ploop_pushbackup_desc *pbd,
> +                                      struct ploop_request *preq)
> +{
> +     ploop_pb_add_req_to_tree(preq, &pbd->reported_tree);
> +}
> +
> +static struct ploop_request *ploop_pb_get_req_from_tree(struct rb_root *tree,
> +                                                     cluster_t clu)
> +{
> +     struct rb_node *n = tree->rb_node;
> +     struct ploop_request *p;
> +
> +     while (n) {
> +             p = rb_entry(n, struct ploop_request, reloc_link);
> +
> +             if (clu < p->req_cluster)
> +                     n = n->rb_left;
> +             else if (clu > p->req_cluster)
> +                     n = n->rb_right;
> +             else {
> +                     rb_erase(&p->reloc_link, tree);
> +                     return p;
> +             }
> +     }
> +     return NULL;
> +}
> +
> +static struct ploop_request *
> +ploop_pb_get_first_req_from_tree(struct rb_root *tree)
> +{
> +     static struct ploop_request *p;
> +     struct rb_node *n = rb_first(tree);
> +
> +     if (!n)
> +             return NULL;
> +
> +     p = rb_entry(n, struct ploop_request, reloc_link);
> +     rb_erase(&p->reloc_link, tree);
> +     return p;
> +}
> +
> +static struct ploop_request *
> +ploop_pb_get_first_req_from_pending(struct ploop_pushbackup_desc *pbd)
> +{
> +     return ploop_pb_get_first_req_from_tree(&pbd->pending_tree);
> +}
> +
> +static struct ploop_request *
> +ploop_pb_get_req_from_pending(struct ploop_pushbackup_desc *pbd,
> +                           cluster_t clu)
> +{
> +     return ploop_pb_get_req_from_tree(&pbd->pending_tree, clu);
> +}
> +
> +static struct ploop_request *
> +ploop_pb_get_req_from_reported(struct ploop_pushbackup_desc *pbd,
> +                            cluster_t clu)
> +{
> +     return ploop_pb_get_req_from_tree(&pbd->reported_tree, clu);
> +}
> +
>  unsigned long ploop_pb_stop(struct ploop_pushbackup_desc *pbd)
>  {
>       if (pbd == NULL)
> @@ -269,3 +352,117 @@ unsigned long ploop_pb_stop(struct 
> ploop_pushbackup_desc *pbd)
>  
>       return 0;
>  }
> +
> +int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
> +                      cluster_t *clu_p, cluster_t *len_p, unsigned n_done)
> +{
> +     bool blocking  = !n_done;
> +     struct ploop_request *preq;
> +     int err = 0;
> +
> +     spin_lock(&pbd->ppb_lock);
> +
> +     /* OPTIMIZE ME LATER: rb_first() once, then rb_next() */
> +     preq = ploop_pb_get_first_req_from_pending(pbd);
> +     if (!preq) {
> +             struct ploop_device *plo = pbd->plo;
> +
> +             if (!blocking) {
> +                     err = -ENOENT;
> +                     goto get_pending_unlock;
> +             }
> +
> +                /* blocking case */
> +             pbd->ppb_waiting = true;
> +             spin_unlock(&pbd->ppb_lock);
> +
> +             mutex_unlock(&plo->ctl_mutex);
> +             err = wait_for_completion_interruptible(&pbd->ppb_comp);
> +             mutex_lock(&plo->ctl_mutex);
AFAIU you have a re-entrance issue if several tasks want performs ioctls
task1:ioctl->wait 
                  task2:ioctl->wait 

Just change wait sequence like this and you are safe:
                 /* blocking case */
                if (unlikely(pbd->ppb_waiting))
                     /* Other task is already waitng for event */
                     err = -EBUSY;
                     goto get_pending_unlock;
                }
                pbd->ppb_waiting = true;
                spin_unlock(&pbd->ppb_lock);
                mutex_unlock(&plo->ctl_mutex);
> +
> +             if (plo->pbd != pbd)
> +                     return -EINTR;
> +
> +             spin_lock(&pbd->ppb_lock);
> +             pbd->ppb_waiting = false;
> +             init_completion(&pbd->ppb_comp);
> +
> +             preq = ploop_pb_get_first_req_from_pending(pbd);
> +             if (!preq) {
> +                     if (!test_bit(PLOOP_S_PUSH_BACKUP, &plo->state))
> +                             err = -EINTR;
> +                     else if (signal_pending(current))
> +                             err = -ERESTARTSYS;
> +                     else err = -ENOENT;
> +
> +                     goto get_pending_unlock;
> +             }
> +     }
> +
> +     ploop_pb_add_req_to_reported(pbd, preq);
> +
> +     *clu_p = preq->req_cluster;
> +     *len_p = 1;
> +
> +get_pending_unlock:
> +     spin_unlock(&pbd->ppb_lock);
> +     return err;
> +}
> +
> +void ploop_pb_put_reported(struct ploop_pushbackup_desc *pbd,
> +                        cluster_t clu, cluster_t len)
> +{
> +     struct ploop_request *preq;
> +     int n_found = 0;
> +
> +     /* OPTIMIZE ME LATER: find leftmost item for [clu, clu+len),
> +      * then rb_next() while req_cluster < clu+len.
> +      * Do this firstly for reported, then for pending */
> +     BUG_ON(len != 1);
> +
> +     spin_lock(&pbd->ppb_lock);
> +
> +     preq = ploop_pb_get_req_from_reported(pbd, clu);
> +     if (!preq)
> +             preq = ploop_pb_get_req_from_pending(pbd, clu);
> +     else
> +             n_found++;
> +
> +     /*
> +      * If preq not found above, it's unsolicited report. Then it's
> +      * enough to have corresponding bit set in reported_map because if
> +      * any WRITE-request comes afterwards, ploop_pb_preq_add_pending()
> +      * fails and ploop_thread will clear corresponding bit in ppb_map
> +      * -- see "push_backup special processing" in ploop_entry_request()
> +      * for details.
> +      */
> +
> +     /*
> +      * "If .. else if .." below will be fully reworked when switching
> +      * from pbd->ppb_offset to pbd->reported_map. All we need here is
> +      * actaully simply to set bits corresponding to [clu, clu+len) in
> +      * pbd->reported_map.
> +      */
> +     if (pbd->ppb_offset >= clu) { /* lucky strike */
> +             if (clu + len > pbd->ppb_offset) {
> +                     pbd->ppb_offset = clu + len;
> +             }
> +     } else if (n_found != len) { /* a hole, bad luck */
> +             printk("ploop: push_backup ERR: off=%u ext=[%u, %u) found %d\n",
> +                    pbd->ppb_offset, clu, clu + len, n_found);
> +     }
> +
> +     spin_unlock(&pbd->ppb_lock);
> +
> +     if (preq) {
> +             struct ploop_device *plo = preq->plo;
> +             BUG_ON(preq->req_cluster != clu);
> +             BUG_ON(plo != pbd->plo);
> +
> +             spin_lock_irq(&plo->lock);
> +             list_add_tail(&preq->list, &plo->ready_queue);
> +             if (test_bit(PLOOP_S_WAIT_PROCESS, &plo->state))
> +                     wake_up_interruptible(&plo->waitq);
> +             spin_unlock_irq(&plo->lock);
> +     }
> +}
> diff --git a/drivers/block/ploop/push_backup.h 
> b/drivers/block/ploop/push_backup.h
> index 40d23f5..482e070 100644
> --- a/drivers/block/ploop/push_backup.h
> +++ b/drivers/block/ploop/push_backup.h
> @@ -6,3 +6,8 @@ void ploop_pb_fini(struct ploop_pushbackup_desc *pbd);
>  int ploop_pb_copy_cbt_to_user(struct ploop_pushbackup_desc *pbd, char 
> *user_addr);
>  unsigned long ploop_pb_stop(struct ploop_pushbackup_desc *pbd);
>  int ploop_pb_check_uuid(struct ploop_pushbackup_desc *pbd, __u8 *uuid);
> +
> +int ploop_pb_get_pending(struct ploop_pushbackup_desc *pbd,
> +                      cluster_t *clu_p, cluster_t *len_p, unsigned n_done);
> +void ploop_pb_put_reported(struct ploop_pushbackup_desc *pbd,
> +                        cluster_t clu, cluster_t len);
> diff --git a/include/linux/ploop/ploop_if.h b/include/linux/ploop/ploop_if.h
> index 83a68e5..81cc8d1 100644
> --- a/include/linux/ploop/ploop_if.h
> +++ b/include/linux/ploop/ploop_if.h
> @@ -192,6 +192,26 @@ struct ploop_push_backup_init_ctl
>       __u64   cbt_mask_addr; /* page-aligned space for CBT mask */
>  } __attribute__ ((aligned (8)));
>  
> +struct ploop_push_backup_ctl_extent
> +{
> +     __u32 clu;
> +     __u32 len;
> +} __attribute__ ((aligned (8)));
> +
> +/* ploop_push_backup_io_ctl.direction */
> +enum {
> +     PLOOP_READ = 0, /* wait for requests */
> +     PLOOP_WRITE,    /* ACK requests */
> +};
> +
> +struct ploop_push_backup_io_ctl
> +{
> +     __u8    cbt_uuid[16];
> +     __u32   direction;
> +     __u32   n_extents;
> +     struct ploop_push_backup_ctl_extent extents[0];
> +} __attribute__ ((aligned (8)));
> +
>  struct ploop_push_backup_stop_ctl
>  {
>       __u8    cbt_uuid[16];
> @@ -318,6 +338,9 @@ struct ploop_track_extent
>  /* Start push backup */
>  #define PLOOP_IOC_PUSH_BACKUP_INIT _IOR(PLOOPCTLTYPE, 29, struct 
> ploop_push_backup_init_ctl)
>  
> +/* Wait for push backup out-of-order requests; or ACK them */
> +#define PLOOP_IOC_PUSH_BACKUP_IO _IOR(PLOOPCTLTYPE, 30, struct 
> ploop_push_backup_io_ctl)
> +
>  /* Stop push backup */
>  #define PLOOP_IOC_PUSH_BACKUP_STOP _IOR(PLOOPCTLTYPE, 31, struct 
> ploop_push_backup_stop_ctl)
>  

Attachment: signature.asc
Description: PGP signature

_______________________________________________
Devel mailing list
Devel@openvz.org
https://lists.openvz.org/mailman/listinfo/devel

Reply via email to