On 11/13/25 23:36, Tomas Vondra wrote: > ... > > What I think we should do is much simpler - make the threshold in shm_mq > dynamic, start with a very low value and gradually ramp up (up to 1/4). > So we'd have > > if (mqh->mqh_consume_pending > threshold) > > We might start with > > threshold = (mq->mq_ring_size / 1024) > > or maybe some fixed value, list > > thredhold = 128 > > And on every signal we'd double it, capping it to 1/4 of mq_ring_size. > > threshold = Min(threshold * 2, mq->mq_ring_size / 1024); > > This is very similar to other places doing this gradual ramp up, like in > the prefetching / read_stream, etc. It allows fast termination for low > LIMIT values, but quickly amortizes the cost for high LIMIT values. >
I gave this a try today, to see if it can actually solve the regression.
Attached is a WIP patch, and a set of benchmarking scripts. On my ryzen
machine I got this (timings of the queries):
fill dataset | 14 15 16 17 18 patched
-----------------------------------------------------------------
10 random | 64.1 319.3 328.7 340.5 344.3 79.5
sequential | 54.6 323.4 347.5 350.5 399.2 78.3
100 random | 11.8 42.9 42.3 42.3 68.5 18.6
sequential | 10.0 44.3 45.0 44.3 60.6 20.0
Clearly 15 is a significant regression, with timings ~4x higher. And the
patch improves that quite a bit. It's not down all the way back to 14,
there's still ~10ms regression, for some reason.
Also, I didn't measure if this patch causes some other regressions for
other queries. I don't think it does, but maybe there's some weird
corner case affected.
regards
--
Tomas Vondra
scripts.tgz
Description: application/compressed-tar
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 2c79a649f46..71733612fb5 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -142,7 +142,9 @@ struct shm_mq_handle
char *mqh_buffer;
Size mqh_buflen;
Size mqh_consume_pending;
+ Size mqh_consume_threshold;
Size mqh_send_pending;
+ Size mqh_send_threshold;
Size mqh_partial_bytes;
Size mqh_expected_bytes;
bool mqh_length_word_complete;
@@ -305,6 +307,10 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
mqh->mqh_counterparty_attached = false;
mqh->mqh_context = CurrentMemoryContext;
+ /* start at 64B, then gradually ramp up */
+ mqh->mqh_consume_threshold = Min(64, mq->mq_ring_size / 4);
+ mqh->mqh_send_threshold = Min(64, mq->mq_ring_size / 4);
+
if (seg != NULL)
on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
@@ -535,12 +541,16 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
* 1/4 of the ring size, mark it as written in shared memory and notify
* the receiver.
*/
- if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
+ if (force_flush || mqh->mqh_send_pending > mqh->mqh_send_threshold)
{
shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
if (receiver != NULL)
SetLatch(&receiver->procLatch);
mqh->mqh_send_pending = 0;
+
+ /* ramp up, up to (mq_ring_size / 4) */
+ mqh->mqh_send_threshold = Min(mqh->mqh_send_threshold * 2,
+ mq->mq_ring_size / 4);
}
return SHM_MQ_SUCCESS;
@@ -622,10 +632,14 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
* because SetLatch() is fairly expensive and we don't want to do it too
* often.
*/
- if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
+ if (mqh->mqh_consume_pending > mqh->mqh_consume_threshold)
{
shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
mqh->mqh_consume_pending = 0;
+
+ /* ramp up, up to (mq_ring_size / 4) */
+ mqh->mqh_consume_threshold = Min(mqh->mqh_consume_threshold * 2,
+ mq->mq_ring_size / 4);
}
/* Try to read, or finish reading, the length word from the buffer. */
