Instead of the current O(N) implementation, at the cost
of adding an atomic counter, we can convert the call to
an atomic_read(). The counter only serves for accounting
empty to non-empty transitions, and vice versa; therefore
only modified twice for each of the lists during the
lifetime of the dlock (while used).

In addition, to be able to unaccount a list_del(), we
add a dlist pointer to each head, thus minimizing the
overall memory footprint.

Signed-off-by: Davidlohr Bueso <dbu...@suse.de>
---
Changes from v3:
- s/waiters/used_lists, more doc around the counter.
- fixed racy scenario when the list empty/non-empty
  condition changes after taking the lock.
- sprinkled unlikely() around all checks, these are
  only corner cases in the lifetime of the lock.

include/linux/dlock-list.h |  8 ++++++
lib/dlock-list.c           | 67 +++++++++++++++++++++++++++++++++++++++-------
2 files changed, 65 insertions(+), 10 deletions(-)

diff --git a/include/linux/dlock-list.h b/include/linux/dlock-list.h
index c00c7f92ada4..e18690a9bba6 100644
--- a/include/linux/dlock-list.h
+++ b/include/linux/dlock-list.h
@@ -32,10 +32,18 @@
struct dlock_list_head {
        struct list_head list;
        spinlock_t lock;
+       struct dlock_list_heads *dlist;
} ____cacheline_aligned_in_smp;

+/*
+ * This is the main dlist data structure, with the array of heads
+ * and a counter that atomically tracks if any of the lists are
+ * being used. That is, empty to non-empty (and vice versa)
+ * head->list transitions.
+ */
struct dlock_list_heads {
        struct dlock_list_head *heads;
+       atomic_t used_lists;
};

/*
diff --git a/lib/dlock-list.c b/lib/dlock-list.c
index a4ddecc01b12..a9c855d492b8 100644
--- a/lib/dlock-list.c
+++ b/lib/dlock-list.c
@@ -122,8 +122,11 @@ int __alloc_dlock_list_heads(struct dlock_list_heads 
*dlist,

                INIT_LIST_HEAD(&head->list);
                head->lock = __SPIN_LOCK_UNLOCKED(&head->lock);
+               head->dlist = dlist;
                lockdep_set_class(&head->lock, key);
        }
+
+       atomic_set(&dlist->used_lists, 0);
        return 0;
}
EXPORT_SYMBOL(__alloc_dlock_list_heads);
@@ -139,29 +142,36 @@ void free_dlock_list_heads(struct dlock_list_heads *dlist)
{
        kfree(dlist->heads);
        dlist->heads = NULL;
+       atomic_set(&dlist->used_lists, 0);
}
EXPORT_SYMBOL(free_dlock_list_heads);

/**
 * dlock_lists_empty - Check if all the dlock lists are empty
 * @dlist: Pointer to the dlock_list_heads structure
- * Return: true if list is empty, false otherwise.
 *
- * This can be a pretty expensive function call. If this function is required
- * in a performance critical path, we may have to maintain a global count
- * of the list entries in the global dlock_list_heads structure instead.
+ * Return: true if all dlock lists are empty, false otherwise.
 */
bool dlock_lists_empty(struct dlock_list_heads *dlist)
{
-       int idx;
-
        /* Shouldn't be called before nr_dlock_lists is initialized */
        WARN_ON_ONCE(!nr_dlock_lists);

-       for (idx = 0; idx < nr_dlock_lists; idx++)
-               if (!list_empty(&dlist->heads[idx].list))
-                       return false;
-       return true;
+       /*
+        * Serialize dlist->used_lists such that a 0->1 transition is not
+        * missed by another thread checking if any of the dlock lists are
+        * used.
+        *
+        * CPU0                             CPU1
+        * dlock_list_add()                 dlock_lists_empty()
+        *   [S] atomic_inc(used_lists);
+        *       smp_mb__after_atomic();
+        *                                        smp_mb__before_atomic();
+        *                                    [L] atomic_read(used_lists)
+        *       list_add()
+        */
+       smp_mb__before_atomic();
+       return !atomic_read(&dlist->used_lists);
}
EXPORT_SYMBOL(dlock_lists_empty);

@@ -177,11 +187,39 @@ void dlock_lists_add(struct dlock_list_node *node,
                     struct dlock_list_heads *dlist)
{
        struct dlock_list_head *head = &dlist->heads[this_cpu_read(cpu2idx)];
+       bool list_empty_before_lock = false;
+
+       /*
+        * Optimistically bump the used_lists counter _before_ taking
+        * the head->lock such that we don't miss a thread adding itself
+        * to a list while spinning for the lock.
+        *
+        * Then, after taking the lock, recheck if the empty to non-empty
+        * transition changed and (un)account for ourselves, accordingly.
+        * Note that all these scenarios are corner cases, and not the
+        * common scenario, where the lists are actually populated most
+        * of the time.
+        */
+       if (unlikely(list_empty_careful(&head->list))) {
+               list_empty_before_lock = true;
+               atomic_inc(&dlist->used_lists);
+               smp_mb__after_atomic();
+       }

        /*
         * There is no need to disable preemption
         */
        spin_lock(&head->lock);
+
+       if (unlikely(!list_empty_before_lock && list_empty(&head->list))) {
+               atomic_inc(&dlist->used_lists);
+               smp_mb__after_atomic();
+       }
+       if (unlikely(list_empty_before_lock && !list_empty(&head->list))) {
+               atomic_dec(&dlist->used_lists);
+               smp_mb__after_atomic();
+       }
+
        node->head = head;
        list_add(&node->list, &head->list);
        spin_unlock(&head->lock);
@@ -212,6 +250,15 @@ void dlock_lists_del(struct dlock_list_node *node)
                spin_lock(&head->lock);
                if (likely(head == node->head)) {
                        list_del_init(&node->list);
+
+                       if (unlikely(list_empty(&head->list))) {
+                               struct dlock_list_heads *dlist;
+                               dlist = node->head->dlist;
+
+                               atomic_dec(&dlist->used_lists);
+                               smp_mb__after_atomic();
+                       }
+
                        node->head = NULL;
                        retry = false;
                } else {
--
2.13.6

Reply via email to