On Wed 12-09-18 15:28:52, Waiman Long wrote:
> From: Davidlohr Bueso <d...@stgolabs.net>
> 
> Instead of the current O(N) implementation, at the cost
> of adding an atomic counter, we can convert the call to
> an atomic_read(). The counter only serves for accounting
> empty to non-empty transitions, and vice versa; therefore
> only modified twice for each of the lists during the
> lifetime of the dlock (while used).
> 
> In addition, to be able to unaccount a list_del(), we
> add a dlist pointer to each head, thus minimizing the
> overall memory footprint.
> 
> Signed-off-by: Davidlohr Bueso <dbu...@suse.de>
> Acked-by: Waiman Long <long...@redhat.com>

So I was wondering: Is this really worth it? AFAICS we have a single call
site for dlock_lists_empty() and that happens during umount where we don't
really care about this optimization. So it seems like unnecessary
complication to me at this point? If someone comes up with a usecase that
needs fast dlock_lists_empty(), then sure, we can do this...

                                                                Honza

> ---
>  include/linux/dlock-list.h |  8 ++++++
>  lib/dlock-list.c           | 67 
> +++++++++++++++++++++++++++++++++++++++-------
>  2 files changed, 65 insertions(+), 10 deletions(-)
> 
> diff --git a/include/linux/dlock-list.h b/include/linux/dlock-list.h
> index 327cb9e..ac1a2e3 100644
> --- a/include/linux/dlock-list.h
> +++ b/include/linux/dlock-list.h
> @@ -32,10 +32,18 @@
>  struct dlock_list_head {
>       struct list_head list;
>       spinlock_t lock;
> +     struct dlock_list_heads *dlist;
>  } ____cacheline_aligned_in_smp;
>  
> +/*
> + * This is the main dlist data structure, with the array of heads
> + * and a counter that atomically tracks if any of the lists are
> + * being used. That is, empty to non-empty (and vice versa)
> + * head->list transitions.
> + */
>  struct dlock_list_heads {
>       struct dlock_list_head *heads;
> +     atomic_t used_lists;
>  };
>  
>  /*
> diff --git a/lib/dlock-list.c b/lib/dlock-list.c
> index e286094..04da20d 100644
> --- a/lib/dlock-list.c
> +++ b/lib/dlock-list.c
> @@ -122,8 +122,11 @@ int __alloc_dlock_list_heads(struct dlock_list_heads 
> *dlist,
>  
>               INIT_LIST_HEAD(&head->list);
>               head->lock = __SPIN_LOCK_UNLOCKED(&head->lock);
> +             head->dlist = dlist;
>               lockdep_set_class(&head->lock, key);
>       }
> +
> +     atomic_set(&dlist->used_lists, 0);
>       return 0;
>  }
>  EXPORT_SYMBOL(__alloc_dlock_list_heads);
> @@ -139,29 +142,36 @@ void free_dlock_list_heads(struct dlock_list_heads 
> *dlist)
>  {
>       kfree(dlist->heads);
>       dlist->heads = NULL;
> +     atomic_set(&dlist->used_lists, 0);
>  }
>  EXPORT_SYMBOL(free_dlock_list_heads);
>  
>  /**
>   * dlock_lists_empty - Check if all the dlock lists are empty
>   * @dlist: Pointer to the dlock_list_heads structure
> - * Return: true if list is empty, false otherwise.
>   *
> - * This can be a pretty expensive function call. If this function is required
> - * in a performance critical path, we may have to maintain a global count
> - * of the list entries in the global dlock_list_heads structure instead.
> + * Return: true if all dlock lists are empty, false otherwise.
>   */
>  bool dlock_lists_empty(struct dlock_list_heads *dlist)
>  {
> -     int idx;
> -
>       /* Shouldn't be called before nr_dlock_lists is initialized */
>       WARN_ON_ONCE(!nr_dlock_lists);
>  
> -     for (idx = 0; idx < nr_dlock_lists; idx++)
> -             if (!list_empty(&dlist->heads[idx].list))
> -                     return false;
> -     return true;
> +     /*
> +      * Serialize dlist->used_lists such that a 0->1 transition is not
> +      * missed by another thread checking if any of the dlock lists are
> +      * used.
> +      *
> +      * CPU0                             CPU1
> +      * dlock_list_add()                 dlock_lists_empty()
> +      *   [S] atomic_inc(used_lists);
> +      *       smp_mb__after_atomic();
> +      *                                        smp_mb__before_atomic();
> +      *                                    [L] atomic_read(used_lists)
> +      *       list_add()
> +      */
> +     smp_mb__before_atomic();
> +     return !atomic_read(&dlist->used_lists);
>  }
>  EXPORT_SYMBOL(dlock_lists_empty);
>  
> @@ -177,11 +187,39 @@ void dlock_lists_add(struct dlock_list_node *node,
>                    struct dlock_list_heads *dlist)
>  {
>       struct dlock_list_head *head = &dlist->heads[this_cpu_read(cpu2idx)];
> +     bool list_empty_before_lock = false;
> +
> +     /*
> +      * Optimistically bump the used_lists counter _before_ taking
> +      * the head->lock such that we don't miss a thread adding itself
> +      * to a list while spinning for the lock.
> +      *
> +      * Then, after taking the lock, recheck if the empty to non-empty
> +      * transition changed and (un)account for ourselves, accordingly.
> +      * Note that all these scenarios are corner cases, and not the
> +      * common scenario, where the lists are actually populated most
> +      * of the time.
> +      */
> +     if (unlikely(list_empty_careful(&head->list))) {
> +             list_empty_before_lock = true;
> +             atomic_inc(&dlist->used_lists);
> +             smp_mb__after_atomic();
> +     }
>  
>       /*
>        * There is no need to disable preemption
>        */
>       spin_lock(&head->lock);
> +
> +     if (unlikely(!list_empty_before_lock && list_empty(&head->list))) {
> +             atomic_inc(&dlist->used_lists);
> +             smp_mb__after_atomic();
> +     }
> +     if (unlikely(list_empty_before_lock && !list_empty(&head->list))) {
> +             atomic_dec(&dlist->used_lists);
> +             smp_mb__after_atomic();
> +     }
> +
>       WRITE_ONCE(node->head, head);
>       list_add(&node->list, &head->list);
>       spin_unlock(&head->lock);
> @@ -212,6 +250,15 @@ void dlock_lists_del(struct dlock_list_node *node)
>               spin_lock(&head->lock);
>               if (likely(head == READ_ONCE(node->head))) {
>                       list_del_init(&node->list);
> +
> +                     if (unlikely(list_empty(&head->list))) {
> +                             struct dlock_list_heads *dlist;
> +                             dlist = node->head->dlist;
> +
> +                             atomic_dec(&dlist->used_lists);
> +                             smp_mb__after_atomic();
> +                     }
> +
>                       WRITE_ONCE(node->head, NULL);
>                       retry = false;
>               } else {
> -- 
> 1.8.3.1
> 
> 
-- 
Jan Kara <j...@suse.com>
SUSE Labs, CR

Reply via email to