Hi Neil,

On Sun 28-02-16 16:09:29, NeilBrown wrote:
> The least significant bit of an exception entry is used as a lock flag.
> A caller can:
>  - create a locked entry by simply adding an entry with this flag set
>  - lock an existing entry with radix_tree_lookup_lock().  This may return
>     NULL if the entry doesn't exists, or was deleted while waiting for
>     the lock.  It may return a non-exception entry if that is what is
>     found.  If it returns a locked entry then it has exclusive rights
>     to delete the entry.
>  - unlock an entry that is already locked.  This will wake any waiters.
>  - delete an entry that is locked.  This will wake waiters so that they
>    return NULL without looking at the slot in the radix tree.
> 
> These must all be called with the radix tree locked (i.e. a spinlock held).
> That spinlock is passed to radix_tree_lookup_lock() so that it can drop
> the lock while waiting.
> 
> This is a "demonstration of concept".  I haven't actually tested, only 
> compiled.
> A possible use case is for the exception entries used by DAX.
> 
> It is possible that some of the lookups can be optimised away in some
> cases by storing a slot pointer.  I wanted to keep it reasonable
> simple until it was determined if it might be useful.

Thanks for having a look! So the patch looks like it would do the work but
frankly the amount of hackiness in it has exceeded my personal threshold...
several times ;)

In particular I don't quite understand why have you decided to re-lookup
the exceptional entry in the wake function? That seems to be the source of
a lot of a hackiness? I was hoping for something simpler like what I've
attached (compile tested only). What do you think?

To avoid false wakeups and thundering herd issues which my simple version does
have, we could do something like what I outline in the second patch. Now
that I look at the result that is closer to your patch, just cleaner IMHO :).
But I wanted to have it separated to see how much complexity does this
additional functionality brings...

Now I'm going to have a look how to use this in DAX...

                                                                Honza


> Signed-off-by: NeilBrown <ne...@suse.com>
> ---
>  include/linux/radix-tree.h |    8 ++
>  lib/radix-tree.c           |  158 
> ++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 166 insertions(+)
> 
> diff --git a/include/linux/radix-tree.h b/include/linux/radix-tree.h
> index 450c12b546b7..8f579f66574b 100644
> --- a/include/linux/radix-tree.h
> +++ b/include/linux/radix-tree.h
> @@ -308,6 +308,14 @@ unsigned long radix_tree_range_tag_if_tagged(struct 
> radix_tree_root *root,
>  int radix_tree_tagged(struct radix_tree_root *root, unsigned int tag);
>  unsigned long radix_tree_locate_item(struct radix_tree_root *root, void 
> *item);
>  
> +void *radix_tree_lookup_lock(struct radix_tree_root *root, wait_queue_head_t 
> *wq,
> +                          unsigned long index, spinlock_t *lock);
> +void radix_tree_unlock(struct radix_tree_root *root, wait_queue_head_t *wq,
> +                    unsigned long index);
> +void radix_tree_delete_unlock(struct radix_tree_root *root, 
> wait_queue_head_t *wq,
> +                           unsigned long index);
> +
> +
>  static inline void radix_tree_preload_end(void)
>  {
>       preempt_enable();
> diff --git a/lib/radix-tree.c b/lib/radix-tree.c
> index 37d4643ab5c0..a24ea002f3eb 100644
> --- a/lib/radix-tree.c
> +++ b/lib/radix-tree.c
> @@ -1500,3 +1500,161 @@ void __init radix_tree_init(void)
>       radix_tree_init_maxindex();
>       hotcpu_notifier(radix_tree_callback, 0);
>  }
> +
> +/* Exception entry locking.
> + * The least significant bit of an exception entry can be used as a
> + * "locked" flag.  Supported locking operations are:
> + * radix_tree_lookup_lock() - if the indexed entry exists, lock it and
> + *         return the value, else return NULL.  If the indexed entry is not
> + *         exceptional it is returned without locking.
> + * radix_tree_unlock() - release the lock on the indexed entry
> + * radix_tree_delete_unlock() - the entry must be locked.  It will be 
> atomically
> + *     unlocked and removed.  Any threads sleeping in lookup_lock() will 
> return.
> + * Each of these take a radix_tree_root, a wait_queue_head_t, and an index.
> + * The '*lock' function also takes a spinlock_t which must be held when any
> + * of the functions is called.  *lock will drop the spinlock while waiting 
> for
> + * the entry lock.
> + *
> + * As delete_unlock could free the radix_tree_node, waiters much not touch it
> + * when woken.  We provide a wake function for the waitq which records when 
> the
> + * item has been deleted.
> + *
> + * The wait_queue_head passed should be one that is used for bit_wait, such
> + * as zone->wait_table.  We re-use the 'flags' and 'timeout' fields of the
> + * wait_bit_key to store the root and index that we are waiting for.
> + * __wake_up may only be called on one of these keys while the radix tree
> + * is locked.  The wakeup function will take the lock itself if appropriate, 
> or
> + * may record that the radix tree entry has been deleted.  In either case
> + * the waiting function just looks at the status reported by the wakeup 
> function
> + * and doesn't look at the radix tree itself.
> + *
> + * There is no function for locking an entry while inserting it.  Simply
> + * insert an entry that is already marked as 'locked' - lsb set.
> + *
> + */
> +
> +struct wait_slot_queue {
> +     struct radix_tree_root  *root;
> +     unsigned long           index;
> +     wait_queue_t            wait;
> +     enum {SLOT_WAITING, SLOT_LOCKED, SLOT_GONE} state;
> +     void                    *ret;
> +};
> +
> +static inline int slot_locked(void *v)
> +{
> +     unsigned long l = (unsigned long)v;
> +     return l & 1;
> +}
> +
> +static inline void *lock_slot(void **v)
> +{
> +     unsigned long *l = (unsigned long *)v;
> +     return (void*)(*l |= 1);
> +}
> +
> +static inline void * unlock_slot(void **v)
> +{
> +     unsigned long *l = (unsigned long *)v;
> +     return (void*)(*l &= ~1UL);
> +}
> +
> +static int wake_slot_function(wait_queue_t *wait, unsigned mode, int sync,
> +                           void *arg)
> +{
> +     struct wait_bit_key *key = arg;
> +     struct wait_slot_queue *wait_slot =
> +             container_of(wait, struct wait_slot_queue, wait);
> +     void **slot;
> +
> +     if (wait_slot->root != key->flags ||
> +         wait_slot->index != key->timeout)
> +             /* Not waking this waiter */
> +             return 0;
> +     if (wait_slot->state != SLOT_WAITING)
> +             /* Should be impossible.... */
> +             return 1;
> +     if (key->bit_nr == -3)
> +             /* Was just deleted, no point in doing a lookup */
> +             wait_slot = NULL;
> +     else
> +             wait_slot->ret = __radix_tree_lookup(
> +                     wait_slot->root, wait_slot->index, NULL, &slot);
> +     if (!wait_slot->ret || !radix_tree_exceptional_entry(wait_slot->ret)) {
> +             wait_slot->state = SLOT_GONE;
> +             return 1;
> +     }
> +     if (slot_locked(slot))
> +             /* still locked */
> +             return 0;
> +     wait_slot->ret = lock_slot(slot);
> +     wait_slot->state = SLOT_LOCKED;
> +     return 1;
> +}
> +
> +void *radix_tree_lookup_lock(struct radix_tree_root *root, wait_queue_head_t 
> *wq,
> +                          unsigned long index, spinlock_t *lock)
> +{
> +     void *ret, **slot;
> +     struct wait_slot_queue wait;
> +
> +     ret = __radix_tree_lookup(root, index, NULL, &slot);
> +     if (!ret || !radix_tree_exceptional_entry(ret))
> +             return ret;
> +     if (!slot_locked(slot))
> +             return lock_slot(slot);
> +
> +     wait.wait.private = current;
> +     wait.wait.func = wake_slot_function;
> +     INIT_LIST_HEAD(&wait.wait.task_list);
> +     wait.state = SLOT_WAITING;
> +     wait.root = root;
> +     wait.index = index;
> +     wait.ret = NULL;
> +     for (;;) {
> +             prepare_to_wait(wq, &wait.wait,
> +                             TASK_UNINTERRUPTIBLE);
> +             if (wait.state != SLOT_WAITING)
> +                     break;
> +
> +             spin_unlock(lock);
> +             schedule();
> +             spin_lock(lock);
> +     }
> +     finish_wait(wq, &wait.wait);
> +     return wait.ret;
> +}
> +EXPORT_SYMBOL(radix_tree_lookup_lock);
> +
> +void radix_tree_unlock(struct radix_tree_root *root, wait_queue_head_t *wq,
> +                     unsigned long index)
> +{
> +     void *ret, **slot;
> +
> +     ret = __radix_tree_lookup(root, index, NULL, &slot);
> +     if (WARN_ON_ONCE(!ret || !radix_tree_exceptional_entry(ret)))
> +             return;
> +     if (WARN_ON_ONCE(!slot_locked(slot)))
> +             return;
> +     unlock_slot(slot);
> +
> +     if (waitqueue_active(wq)) {
> +             struct wait_bit_key key = {.flags = root, .bit_nr = -2,
> +                                        .timeout = index};
> +             __wake_up(wq, TASK_NORMAL, 1, &key);
> +     }
> +}
> +EXPORT_SYMBOL(radix_tree_unlock);
> +
> +void radix_tree_delete_unlock(struct radix_tree_root *root, 
> wait_queue_head_t *wq,
> +                           unsigned long index)
> +{
> +     radix_tree_delete(root, index);
> +     if (waitqueue_active(wq)) {
> +             /* -3 here indicates deletion */
> +             struct wait_bit_key key = {.flags = root, .bit_nr = -3,
> +                                        .timeout = index};
> +             __wake_up(wq, TASK_NORMAL, 1, &key);
> +     }
> +}
> +EXPORT_SYMBOL(radix_tree_delete_unlock);
> 
> 
-- 
Jan Kara <j...@suse.com>
SUSE Labs, CR
>From c30e48c69b3390250f60b780c64ebeb2da2fcf75 Mon Sep 17 00:00:00 2001
From: NeilBrown <ne...@suse.com>
Date: Sun, 28 Feb 2016 16:09:29 +1100
Subject: [PATCH] radix-tree: support locking of individual exception entries.

The least significant bit of an exception entry is used as a lock flag.
A caller can:
 - create a locked entry by simply adding an entry with this flag set
 - lock an existing entry with radix_tree_lookup_lock().  This may return
    NULL if the entry doesn't exists, or was deleted while waiting for
    the lock.  It may return a non-exception entry if that is what is
    found.  If it returns a locked entry then it has exclusive rights
    to delete the entry.
 - unlock an entry that is already locked.  This will wake any waiters.

These must all be called with the radix tree locked (i.e. a spinlock held).
That spinlock is passed to radix_tree_lookup_lock() so that it can drop
the lock while waiting.

This is a "demonstration of concept".  I haven't actually tested, only compiled.
A possible use case is for the exception entries used by DAX.

It is possible that some of the lookups can be optimised away in some
cases by storing a slot pointer.  I wanted to keep it reasonable
simple until it was determined if it might be useful.

Signed-off-by: NeilBrown <ne...@suse.com>
Signed-off-by: Jan Kara <j...@suse.cz>
---
 include/linux/radix-tree.h |  5 +++
 lib/radix-tree.c           | 83 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 88 insertions(+)

diff --git a/include/linux/radix-tree.h b/include/linux/radix-tree.h
index 450c12b546b7..92138d30b95d 100644
--- a/include/linux/radix-tree.h
+++ b/include/linux/radix-tree.h
@@ -308,6 +308,11 @@ unsigned long radix_tree_range_tag_if_tagged(struct radix_tree_root *root,
 int radix_tree_tagged(struct radix_tree_root *root, unsigned int tag);
 unsigned long radix_tree_locate_item(struct radix_tree_root *root, void *item);
 
+void *radix_tree_lookup_lock(struct radix_tree_root *root, unsigned long index,
+			     wait_queue_head_t *wq, spinlock_t *lock);
+void radix_tree_unlock(struct radix_tree_root *root, unsigned long index,
+		       wait_queue_head_t *wq);
+
 static inline void radix_tree_preload_end(void)
 {
 	preempt_enable();
diff --git a/lib/radix-tree.c b/lib/radix-tree.c
index 37d4643ab5c0..a97231ab66f0 100644
--- a/lib/radix-tree.c
+++ b/lib/radix-tree.c
@@ -1500,3 +1500,86 @@ void __init radix_tree_init(void)
 	radix_tree_init_maxindex();
 	hotcpu_notifier(radix_tree_callback, 0);
 }
+
+/*
+ * Exception entry locking
+ */
+static inline int slot_locked(void **v)
+{
+	unsigned long l = *(unsigned long *)v;
+	return l & 1;
+}
+
+static inline void *lock_slot(void **v)
+{
+	unsigned long *l = (unsigned long *)v;
+	return (void*)(*l |= 1);
+}
+
+static inline void * unlock_slot(void **v)
+{
+	unsigned long *l = (unsigned long *)v;
+	return (void*)(*l &= ~1UL);
+}
+
+/**
+ *	radix_tree_lookup_lock - lookup and lock exceptional entry if found
+ *	@root:		radix tree root
+ *	@index:		index key
+ *	@wq:		waitqueue to wait for exceptional entry lock
+ *	@lock:		spinlock protecting the radix tree
+ *
+ *	Lookup @index in the radix tree @root and if there is an exceptional
+ *	entry at that location, lock it. If entry at @index is not exceptional,
+ *	we just return it. We use @wq as a wait queue to wait for exceptional
+ *	entry lock to be released. @lock is a spinlock protecting the radix
+ *	tree which we assume is locked when entering this function and released
+ *	while waiting for the exceptional entry lock.
+ */
+void *radix_tree_lookup_lock(struct radix_tree_root *root, unsigned long index,
+			     wait_queue_head_t *wq, spinlock_t *lock)
+{
+	void *ret, **slot;
+	DEFINE_WAIT(wait);
+
+	for (;;) {
+		ret = __radix_tree_lookup(root, index, NULL, &slot);
+		if (!ret || !radix_tree_exceptional_entry(ret))
+			return ret;
+		if (!slot_locked(slot))
+			return lock_slot(slot);
+		prepare_to_wait(wq, &wait, TASK_UNINTERRUPTIBLE);
+		spin_unlock(lock);
+		schedule();
+		finish_wait(wq, &wait);
+		spin_lock(lock);
+	}
+}
+EXPORT_SYMBOL(radix_tree_lookup_lock);
+
+/**
+ *	radix_tree_unlock - unlock exceptional radix tree entry
+ *	@root:		radix tree root
+ *	@index:		index key
+ *	@wq:		waitqueue to wake waiters for exceptional entry lock
+ *
+ *	Unlock exceptional entry at @index in a radix tree @root and wake up
+ *	waiters for it waiting in wait queue @wq. We expect the radix tree is
+ *	locked against concurrent modifications.
+ */
+void radix_tree_unlock(struct radix_tree_root *root, unsigned long index,
+		       wait_queue_head_t *wq)
+{
+	void *ret, **slot;
+
+	ret = __radix_tree_lookup(root, index, NULL, &slot);
+	if (WARN_ON_ONCE(!ret || !radix_tree_exceptional_entry(ret)))
+		return;
+	if (WARN_ON_ONCE(!slot_locked(slot)))
+		return;
+	unlock_slot(slot);
+
+	if (waitqueue_active(wq))
+		wake_up(wq);
+}
+EXPORT_SYMBOL(radix_tree_unlock);
-- 
2.6.2

>From 982c02870b262da742f593e695b4050aa99fabd3 Mon Sep 17 00:00:00 2001
From: Jan Kara <j...@suse.cz>
Date: Thu, 3 Mar 2016 14:06:42 +0100
Subject: [PATCH] radix-tree: Avoid false wakeups when waiting for exceptional
 entry lock

Signed-off-by: Jan Kara <j...@suse.cz>
---
 lib/radix-tree.c | 42 +++++++++++++++++++++++++++++++++++++-----
 1 file changed, 37 insertions(+), 5 deletions(-)

diff --git a/lib/radix-tree.c b/lib/radix-tree.c
index a97231ab66f0..be9763dd9de5 100644
--- a/lib/radix-tree.c
+++ b/lib/radix-tree.c
@@ -1522,6 +1522,28 @@ static inline void * unlock_slot(void **v)
 	return (void*)(*l &= ~1UL);
 }
 
+struct exceptional_entry_key {
+	struct radix_tree_root *root;
+	unsigned long index;
+};
+
+struct wait_exceptional_entry_queue {
+	wait_queue_t wait;
+	struct exceptional_entry_key key;
+};
+
+static int wake_exceptional_entry_func(wait_queue_t *wait, unsigned mode,
+				       int sync, void *keyp)
+{
+	struct exceptional_entry_key *key = keyp;
+	struct wait_exceptional_entry_queue *ewait =
+		container_of(wait, struct wait_exceptional_entry_queue, wait);
+
+	if (key->root != ewait->key.root || key->index != ewait->key.index)
+		return 0;
+	return autoremove_wake_function(wait, mode, sync, NULL);
+}
+
 /**
  *	radix_tree_lookup_lock - lookup and lock exceptional entry if found
  *	@root:		radix tree root
@@ -1540,7 +1562,12 @@ void *radix_tree_lookup_lock(struct radix_tree_root *root, unsigned long index,
 			     wait_queue_head_t *wq, spinlock_t *lock)
 {
 	void *ret, **slot;
-	DEFINE_WAIT(wait);
+	struct wait_exceptional_entry_queue wait;
+
+	init_wait(&wait.wait);
+	wait.wait.func = wake_exceptional_entry_func;
+	wait.key.root = root;
+	wait.key.index = index;
 
 	for (;;) {
 		ret = __radix_tree_lookup(root, index, NULL, &slot);
@@ -1548,10 +1575,10 @@ void *radix_tree_lookup_lock(struct radix_tree_root *root, unsigned long index,
 			return ret;
 		if (!slot_locked(slot))
 			return lock_slot(slot);
-		prepare_to_wait(wq, &wait, TASK_UNINTERRUPTIBLE);
+		prepare_to_wait_exclusive(wq, &wait.wait, TASK_UNINTERRUPTIBLE);
 		spin_unlock(lock);
 		schedule();
-		finish_wait(wq, &wait);
+		finish_wait(wq, &wait.wait);
 		spin_lock(lock);
 	}
 }
@@ -1579,7 +1606,12 @@ void radix_tree_unlock(struct radix_tree_root *root, unsigned long index,
 		return;
 	unlock_slot(slot);
 
-	if (waitqueue_active(wq))
-		wake_up(wq);
+	if (waitqueue_active(wq)) {
+		struct exceptional_entry_key key;
+
+		key.root = root;
+		key.index = index;
+		__wake_up(wq, TASK_NORMAL, 1, &key);
+	}
 }
 EXPORT_SYMBOL(radix_tree_unlock);
-- 
2.6.2

Reply via email to