On Mon, Jul 15, 2019 at 03:25:34PM -0400, Alex Kogan wrote:
> +static struct cna_node *find_successor(struct mcs_spinlock *me)
> +{
> +     struct cna_node *me_cna = CNA_NODE(me);
> +     struct cna_node *head_other, *tail_other, *cur;
> +     struct cna_node *next = CNA_NODE(READ_ONCE(me->next));
> +     int my_node;
> +
> +     /* @next should be set, else we would not be calling this function. */
> +     WARN_ON_ONCE(next == NULL);
> +
> +     my_node = me_cna->numa_node;
> +
> +     /*
> +      * Fast path - check whether the immediate successor runs on
> +      * the same node.
> +      */
> +     if (next->numa_node == my_node)
> +             return next;
> +
> +     head_other = next;
> +     tail_other = next;
> +
> +     /*
> +      * Traverse the main waiting queue starting from the successor of my
> +      * successor, and look for a thread running on the same node.
> +      */
> +     cur = CNA_NODE(READ_ONCE(next->mcs.next));
> +     while (cur) {
> +             if (cur->numa_node == my_node) {
> +                     /*
> +                      * Found a thread on the same node. Move threads
> +                      * between me and that node into the secondary queue.
> +                      */
> +                     if (me->locked > 1)
> +                             CNA_NODE(me->locked)->tail->mcs.next =
> +                                     (struct mcs_spinlock *)head_other;
> +                     else
> +                             me->locked = (uintptr_t)head_other;
> +                     tail_other->mcs.next = NULL;
> +                     CNA_NODE(me->locked)->tail = tail_other;
> +                     return cur;
> +             }
> +             tail_other = cur;
> +             cur = CNA_NODE(READ_ONCE(cur->mcs.next));
> +     }
> +     return NULL;
> +}

static void cna_move(struct cna_node *cn, struct cna_node *cni)
{
        struct cna_node *head, *tail;

        /* remove @cni */
        WRITE_ONCE(cn->mcs.next, cni->mcs.next);

        /* stick @cni on the 'other' list tail */
        cni->mcs.next = NULL;

        if (cn->mcs.locked <= 1) {
                /* head = tail = cni */
                head = cni;
                head->tail = cni;
                cn->mcs.locked = head->encoded_tail;
        } else {
                /* add to tail */
                head = (struct cna_node *)decode_tail(cn->mcs.locked);
                tail = tail->tail;
                tail->next = cni;
        }
}

static struct cna_node *cna_find_next(struct mcs_spinlock *node)
{
        struct cna_node *cni, *cn = (struct cna_node *)node;

        while ((cni = (struct cna_node *)READ_ONCE(cn->mcs.next))) {
                if (likely(cni->node == cn->node))
                        break;

                cna_move(cn, cni);
        }

        return cni;
}

> +static inline bool cna_set_locked_empty_mcs(struct qspinlock *lock, u32 val,
> +                                     struct mcs_spinlock *node)
> +{
> +     /* Check whether the secondary queue is empty. */
> +     if (node->locked <= 1) {
> +             if (atomic_try_cmpxchg_relaxed(&lock->val, &val,
> +                             _Q_LOCKED_VAL))
> +                     return true; /* No contention */
> +     } else {
> +             /*
> +              * Pass the lock to the first thread in the secondary
> +              * queue, but first try to update the queue's tail to
> +              * point to the last node in the secondary queue.


That comment doesn't make sense; there's at least one conditional
missing.

> +              */
> +             struct cna_node *succ = CNA_NODE(node->locked);
> +             u32 new = succ->tail->encoded_tail + _Q_LOCKED_VAL;
> +
> +             if (atomic_try_cmpxchg_relaxed(&lock->val, &val, new)) {
> +                     arch_mcs_spin_unlock_contended(&succ->mcs.locked, 1);
> +                     return true;
> +             }
> +     }
> +
> +     return false;
> +}

static cna_try_clear_tail(struct qspinlock *lock, u32 val, struct mcs_spinlock 
*node)
{
        if (node->locked <= 1)
                return __try_clear_tail(lock, val, node);

        /* the other case */
}

> +static inline void cna_pass_mcs_lock(struct mcs_spinlock *node,
> +                                  struct mcs_spinlock *next)
> +{
> +     struct cna_node *succ = NULL;
> +     u64 *var = &next->locked;
> +     u64 val = 1;
> +
> +     succ = find_successor(node);

This makes unlock O(n), which is 'funneh' and undocumented.

> +
> +     if (succ) {
> +             var = &succ->mcs.locked;
> +             /*
> +              * We unlock a successor by passing a non-zero value,
> +              * so set @val to 1 iff @locked is 0, which will happen
> +              * if we acquired the MCS lock when its queue was empty
> +              */
> +             val = node->locked + (node->locked == 0);
> +     } else if (node->locked > 1) { /* if the secondary queue is not empty */
> +             /* pass the lock to the first node in that queue */
> +             succ = CNA_NODE(node->locked);
> +             succ->tail->mcs.next = next;
> +             var = &succ->mcs.locked;

> +     }       /*
> +              * Otherwise, pass the lock to the immediate successor
> +              * in the main queue.
> +              */

I don't think this mis-indented comment can happen. The call-site
guarantees @next is non-null.

Therefore, cna_find_next() will either return it, or place it on the
secondary list. If it (cna_find_next) returns NULL, we must have a
non-empty secondary list.

In no case do I see this tertiary condition being possible.

> +
> +     arch_mcs_spin_unlock_contended(var, val);
> +}

This also renders this @next argument superfluous.

static cna_mcs_pass_lock(struct mcs_spinlock *node, struct mcs_spinlock *next)
{
        next = cna_find_next(node);
        if (!next) {
                BUG_ON(node->locked <= 1);
                next = (struct cna_node *)decode_tail(node->locked);
                node->locked = 1;
        }

        arch_mcs_pass_lock(&next->mcs.locked, node->locked);
}

Reply via email to