On 01/27/2016 11:34 AM, Peter Zijlstra wrote:
On Tue, Jan 26, 2016 at 11:03:37AM -0500, Waiman Long wrote:
+static __always_inline void _list_batch_cmd(enum list_batch_cmd cmd,
+ struct list_head *head,
+ struct list_head *entry)
+{
+ if (cmd == lb_cmd_add)
+ list_add(entry, head);
+ else if (cmd == lb_cmd_del)
+ list_del(entry);
+ else /* cmd == lb_cmd_del_init */
+ list_del_init(entry);
Maybe use switch(), GCC has fancy warns with enums and switch().
OK, I will look at the generated code to see if there is any difference.
+}
+static inline void do_list_batch(enum list_batch_cmd cmd, spinlock_t *lock,
+ struct list_batch *batch,
+ struct list_head *entry)
+{
+ /*
+ * Fast path
+ */
+ if (spin_trylock(lock)) {
+ _list_batch_cmd(cmd, batch->list, entry);
+ spin_unlock(lock);
_list_batch_cmd
This is still quite a lot of code for an inline function
I expect the callers will call it with a constant cmd, thus optimizing
out all the if conditional checks in _list_batch_cmd(). Taking the
inline out will probably stop that optimization.
+ return;
+ }
+ do_list_batch_slowpath(cmd, lock, batch, entry);
+}
+void do_list_batch_slowpath(enum list_batch_cmd cmd, spinlock_t *lock,
+ struct list_batch *batch, struct list_head *entry)
+{
+ struct list_batch_qnode node, *prev, *next, *nptr;
+ int loop;
+
+ /*
+ * Put itself into the list_batch queue
+ */
+ node.next = NULL;
+ node.entry = entry;
+ node.cmd = cmd;
+ node.state = lb_state_waiting;
+
Here we rely on the release barrier implied by xchg() to ensure the node
initialization is complete before the xchg() publishes the thing.
But do we also need the acquire part of this barrier? From what I could
tell, the primitive as a whole does not imply any ordering.
I think we probably won't need the acquire part, but I don't have a
non-x86 machine that can really test out the more relaxed versions of
the atomic ops. That is why I use the strict versions. We can always
relax it later on with additional patches.
+ prev = xchg(&batch->tail,&node);
+
+ if (prev) {
+ WRITE_ONCE(prev->next,&node);
+ while (READ_ONCE(node.state) == lb_state_waiting)
+ cpu_relax();
+ if (node.state == lb_state_done)
+ return;
+ WARN_ON(node.state != lb_state_batch);
+ }
+
+ /*
+ * We are now the queue head, we shold now acquire the lock and
+ * process a batch of qnodes.
+ */
+ loop = LB_BATCH_SIZE;
Have you tried different sizes?
I have tried 64 and 128. Using 128 seems to give a bit better
performance number.
+ next =&node;
+ spin_lock(lock);
+
+do_list_again:
+ do {
+ nptr = next;
+ _list_batch_cmd(nptr->cmd, batch->list, nptr->entry);
+ next = READ_ONCE(nptr->next);
+ /*
+ * As soon as the state is marked lb_state_done, we
+ * can no longer assume the content of *nptr as valid.
+ * So we have to hold off marking it done until we no
+ * longer need its content.
+ *
+ * The release barrier here is to make sure that we
+ * won't access its content after marking it done.
+ */
+ if (next)
+ smp_store_release(&nptr->state, lb_state_done);
+ } while (--loop&& next);
+ if (!next) {
+ /*
+ * The queue tail should equal to nptr, so clear it to
+ * mark the queue as empty.
+ */
+ if (cmpxchg(&batch->tail, nptr, NULL) != nptr) {
+ /*
+ * Queue not empty, wait until the next pointer is
+ * initialized.
+ */
+ while (!(next = READ_ONCE(nptr->next)))
+ cpu_relax();
+ }
+ /* The above cmpxchg acts as a memory barrier */
for what? :-)
Also, if that cmpxchg() fails, it very much does _not_ act as one.
I suspect you want smp_store_release() setting the state_done, just as
above, and then use cmpxchg_relaxed().
You are right. I did forgot about there was no memory barrier guarantee
when cmpxchg() fails. However, in that case, the READ_ONCE() and
WRITE_ONCE() macros should still provide the necessary ordering, IMO. I
can certainly change it to use cmpxchg_relaxed() and smp_store_release()
instead.
+ WRITE_ONCE(nptr->state, lb_state_done);
+ }
+ if (next) {
+ if (loop)
+ goto do_list_again; /* More qnodes to process */
+ /*
+ * Mark the next qnode as head to process the next batch
+ * of qnodes. The new queue head cannot proceed until we
+ * release the lock.
+ */
+ WRITE_ONCE(next->state, lb_state_batch);
+ }
+ spin_unlock(lock);
+}
+EXPORT_SYMBOL_GPL(do_list_batch_slowpath);
Cheers,
Longman