Will Deacon <w...@kernel.org> wrote:

> If I'm understanding your code correctly (big 'if'), then you have things
> like this in pipe_read():
> 
> 
>       unsigned int head = READ_ONCE(pipe->head);
>       unsigned int tail = pipe->tail;
>       unsigned int mask = pipe->buffers - 1;
> 
>       if (tail != head) {
>               struct pipe_buffer *buf = &pipe->bufs[tail & mask];
> 
>               [...]
> 
>               written = copy_page_to_iter(buf->page, buf->offset, chars, to);
> 
> 
> where you want to make sure you don't read from 'buf->page' until after
> you've read the updated head index. Is that right? If so, then READ_ONCE()
> will not give you that guarantee on architectures such as Power and Arm,
> because the 'if (tail != head)' branch can be speculated and the buffer
> can be read before we've got around to looking at the head index.
> 
> So I reckon you need smp_load_acquire() in this case. pipe_write() might be
> ok with the control dependency because CPUs don't tend to make speculative
> writes visible, but I didn't check it carefully and the compiler can do
> crazy stuff in this area, so I'd be inclined to use smp_load_acquire() here
> too unless you really need the last ounce of performance.

Yeah, I probably do.

Documentation/core-api/circular-buffers.rst might be wrong, then, I think.

It mandates using smp_store_release() to update buffer->head in the producer
and buffer->tail in the consumer - but these need pairing with memory barriers
used when reading buffer->head and buffer->tail on the other side.  Currently,
for the producer we have:

        spin_lock(&producer_lock);

        unsigned long head = buffer->head;
        /* The spin_unlock() and next spin_lock() provide needed ordering. */
        unsigned long tail = READ_ONCE(buffer->tail);

        if (CIRC_SPACE(head, tail, buffer->size) >= 1) {
                /* insert one item into the buffer */
                struct item *item = buffer[head];

                produce_item(item);

                smp_store_release(buffer->head,
                                  (head + 1) & (buffer->size - 1));

                /* wake_up() will make sure that the head is committed before
                 * waking anyone up */
                wake_up(consumer);
        }

        spin_unlock(&producer_lock);

I think the ordering comment about spin_unlock and spin_lock is wrong.  There's
no requirement to have a spinlock on either side - and in any case, both sides
could be inside their respective locked sections when accessing the buffer.
The READ_ONCE() would theoretically provide the smp_read_barrier_depends() to
pair with the smp_store_release() in the consumer.  Maybe I should change this
to:

        spin_lock(&producer_lock);

        /* Barrier paired with consumer-side store-release on tail */
        unsigned long tail = smp_load_acquire(buffer->tail);
        unsigned long head = buffer->head;

        if (CIRC_SPACE(head, tail, buffer->size) >= 1) {
                /* insert one item into the buffer */
                struct item *item = buffer[head];

                produce_item(item);

                smp_store_release(buffer->head,
                                  (head + 1) & (buffer->size - 1));

                /* wake_up() will make sure that the head is committed before
                 * waking anyone up */
                wake_up(consumer);
        }

        spin_unlock(&producer_lock);

The consumer is currently:

        spin_lock(&consumer_lock);

        /* Read index before reading contents at that index. */
        unsigned long head = smp_load_acquire(buffer->head);
        unsigned long tail = buffer->tail;

        if (CIRC_CNT(head, tail, buffer->size) >= 1) {

                /* extract one item from the buffer */
                struct item *item = buffer[tail];

                consume_item(item);

                /* Finish reading descriptor before incrementing tail. */
                smp_store_release(buffer->tail,
                                  (tail + 1) & (buffer->size - 1));
        }

        spin_unlock(&consumer_lock);

which I think is okay.

And yes, I note that this does use smp_load_acquire(buffer->head) in the
consumer - which I should also be doing.

David

Reply via email to