Will Deacon <w...@kernel.org> wrote: > If I'm understanding your code correctly (big 'if'), then you have things > like this in pipe_read(): > > > unsigned int head = READ_ONCE(pipe->head); > unsigned int tail = pipe->tail; > unsigned int mask = pipe->buffers - 1; > > if (tail != head) { > struct pipe_buffer *buf = &pipe->bufs[tail & mask]; > > [...] > > written = copy_page_to_iter(buf->page, buf->offset, chars, to); > > > where you want to make sure you don't read from 'buf->page' until after > you've read the updated head index. Is that right? If so, then READ_ONCE() > will not give you that guarantee on architectures such as Power and Arm, > because the 'if (tail != head)' branch can be speculated and the buffer > can be read before we've got around to looking at the head index. > > So I reckon you need smp_load_acquire() in this case. pipe_write() might be > ok with the control dependency because CPUs don't tend to make speculative > writes visible, but I didn't check it carefully and the compiler can do > crazy stuff in this area, so I'd be inclined to use smp_load_acquire() here > too unless you really need the last ounce of performance.
Yeah, I probably do. Documentation/core-api/circular-buffers.rst might be wrong, then, I think. It mandates using smp_store_release() to update buffer->head in the producer and buffer->tail in the consumer - but these need pairing with memory barriers used when reading buffer->head and buffer->tail on the other side. Currently, for the producer we have: spin_lock(&producer_lock); unsigned long head = buffer->head; /* The spin_unlock() and next spin_lock() provide needed ordering. */ unsigned long tail = READ_ONCE(buffer->tail); if (CIRC_SPACE(head, tail, buffer->size) >= 1) { /* insert one item into the buffer */ struct item *item = buffer[head]; produce_item(item); smp_store_release(buffer->head, (head + 1) & (buffer->size - 1)); /* wake_up() will make sure that the head is committed before * waking anyone up */ wake_up(consumer); } spin_unlock(&producer_lock); I think the ordering comment about spin_unlock and spin_lock is wrong. There's no requirement to have a spinlock on either side - and in any case, both sides could be inside their respective locked sections when accessing the buffer. The READ_ONCE() would theoretically provide the smp_read_barrier_depends() to pair with the smp_store_release() in the consumer. Maybe I should change this to: spin_lock(&producer_lock); /* Barrier paired with consumer-side store-release on tail */ unsigned long tail = smp_load_acquire(buffer->tail); unsigned long head = buffer->head; if (CIRC_SPACE(head, tail, buffer->size) >= 1) { /* insert one item into the buffer */ struct item *item = buffer[head]; produce_item(item); smp_store_release(buffer->head, (head + 1) & (buffer->size - 1)); /* wake_up() will make sure that the head is committed before * waking anyone up */ wake_up(consumer); } spin_unlock(&producer_lock); The consumer is currently: spin_lock(&consumer_lock); /* Read index before reading contents at that index. */ unsigned long head = smp_load_acquire(buffer->head); unsigned long tail = buffer->tail; if (CIRC_CNT(head, tail, buffer->size) >= 1) { /* extract one item from the buffer */ struct item *item = buffer[tail]; consume_item(item); /* Finish reading descriptor before incrementing tail. */ smp_store_release(buffer->tail, (tail + 1) & (buffer->size - 1)); } spin_unlock(&consumer_lock); which I think is okay. And yes, I note that this does use smp_load_acquire(buffer->head) in the consumer - which I should also be doing. David