On 11/13/25 23:36, Tomas Vondra wrote:
> ...
> 
> What I think we should do is much simpler - make the threshold in shm_mq
> dynamic, start with a very low value and gradually ramp up (up to 1/4).
> So we'd have
> 
>    if (mqh->mqh_consume_pending > threshold)
> 
> We might start with
> 
>   threshold = (mq->mq_ring_size / 1024)
> 
> or maybe some fixed value, list
> 
>   thredhold = 128
> 
> And on every signal we'd double it, capping it to 1/4 of mq_ring_size.
> 
>   threshold = Min(threshold * 2, mq->mq_ring_size / 1024);
> 
> This is very similar to other places doing this gradual ramp up, like in
> the prefetching / read_stream, etc. It allows fast termination for low
> LIMIT values, but quickly amortizes the cost for high LIMIT values.
> 

I gave this a try today, to see if it can actually solve the regression.
Attached is a WIP patch, and a set of benchmarking scripts. On my ryzen
machine I got this (timings of the queries):

    fill    dataset  |    14      15      16      17      18  patched
    -----------------------------------------------------------------
    10       random  |  64.1   319.3   328.7   340.5   344.3     79.5
         sequential  |  54.6   323.4   347.5   350.5   399.2     78.3
    100      random  |  11.8    42.9    42.3    42.3    68.5     18.6
         sequential  |  10.0    44.3    45.0    44.3    60.6     20.0

Clearly 15 is a significant regression, with timings ~4x higher. And the
patch improves that quite a bit. It's not down all the way back to 14,
there's still ~10ms regression, for some reason.

Also, I didn't measure if this patch causes some other regressions for
other queries. I don't think it does, but maybe there's some weird
corner case affected.


regards

-- 
Tomas Vondra

Attachment: scripts.tgz
Description: application/compressed-tar

diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 2c79a649f46..71733612fb5 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -142,7 +142,9 @@ struct shm_mq_handle
 	char	   *mqh_buffer;
 	Size		mqh_buflen;
 	Size		mqh_consume_pending;
+	Size		mqh_consume_threshold;
 	Size		mqh_send_pending;
+	Size		mqh_send_threshold;
 	Size		mqh_partial_bytes;
 	Size		mqh_expected_bytes;
 	bool		mqh_length_word_complete;
@@ -305,6 +307,10 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 	mqh->mqh_counterparty_attached = false;
 	mqh->mqh_context = CurrentMemoryContext;
 
+	/* start at 64B, then gradually ramp up */
+	mqh->mqh_consume_threshold = Min(64, mq->mq_ring_size / 4);
+	mqh->mqh_send_threshold = Min(64, mq->mq_ring_size / 4);
+
 	if (seg != NULL)
 		on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
 
@@ -535,12 +541,16 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
 	 * 1/4 of the ring size, mark it as written in shared memory and notify
 	 * the receiver.
 	 */
-	if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
+	if (force_flush || mqh->mqh_send_pending > mqh->mqh_send_threshold)
 	{
 		shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
 		if (receiver != NULL)
 			SetLatch(&receiver->procLatch);
 		mqh->mqh_send_pending = 0;
+
+		/* ramp up, up to (mq_ring_size / 4) */
+		mqh->mqh_send_threshold = Min(mqh->mqh_send_threshold * 2,
+									  mq->mq_ring_size / 4);
 	}
 
 	return SHM_MQ_SUCCESS;
@@ -622,10 +632,14 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 	 * because SetLatch() is fairly expensive and we don't want to do it too
 	 * often.
 	 */
-	if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
+	if (mqh->mqh_consume_pending > mqh->mqh_consume_threshold)
 	{
 		shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
 		mqh->mqh_consume_pending = 0;
+
+		/* ramp up, up to (mq_ring_size / 4) */
+		mqh->mqh_consume_threshold = Min(mqh->mqh_consume_threshold * 2,
+										 mq->mq_ring_size / 4);
 	}
 
 	/* Try to read, or finish reading, the length word from the buffer. */

Reply via email to