From 6b8bcb44bb73811522416cc222a2e6d5eb8c6938 Mon Sep 17 00:00:00 2001
From: alterego665 <824662526@qq.com>
Date: Thu, 26 Jun 2025 21:23:08 +0800
Subject: [PATCH v7] Process sync requests incrementally in AbsorbSyncRequests
 and CompactCheckpointerRequestQueue

If the number of sync requests is big enough, the palloc() call in
AbsorbSyncRequests() will attempt to allocate more than 1 GB of memory,
resulting in failure. This can lead to an infinite loop in the checkpointer
process, as it repeatedly fails to absorb the pending requests.

Similarly, CompactCheckpointerRequestQueue() can face the same memory
allocation issues when the request queue contains millions of entries,
requiring large hash tables that can exceed available memory.

To avoid this, we process requests incrementally in batches in both
functions. This patch introduces bounded memory usage by limiting
allocations to CKPT_REQ_BATCH_SIZE regardless of the total number of
requests. For CompactCheckpointerRequestQueue(), this changes memory
usage from O(num_requests) to O(batch_size) for both hash tables and
skip arrays, while accepting the trade-off that duplicates spanning
batch boundaries won't be detected.

This ensures that memory usage stays within a safe range, avoids
excessive allocations.
---
 src/backend/postmaster/checkpointer.c | 259 +++++++++++++++++---------
 1 file changed, 171 insertions(+), 88 deletions(-)

diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index fda91ffd1ce..4c0a7204c23 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -127,6 +127,11 @@ typedef struct
 
 	int			num_requests;	/* current # of requests */
 	int			max_requests;	/* allocated array size */
+
+	int			head;			/* Index of the first request in the ring
+								 * buffer */
+	int			tail;			/* Index of the last request in the ring
+								 * buffer */
 	CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER];
 } CheckpointerShmemStruct;
 
@@ -135,6 +140,12 @@ static CheckpointerShmemStruct *CheckpointerShmem;
 /* interval for calling AbsorbSyncRequests in CheckpointWriteDelay */
 #define WRITES_PER_ABSORB		1000
 
+/* Maximum number of checkpointer requests to process in one batch */
+#define CKPT_REQ_BATCH_SIZE 10000
+
+/* Max number of requests the checkpointer request queue can hold */
+#define MAX_CHECKPOINT_REQUESTS 10000000
+
 /*
  * GUC parameters
  */
@@ -970,7 +981,8 @@ CheckpointerShmemInit(void)
 		 */
 		MemSet(CheckpointerShmem, 0, size);
 		SpinLockInit(&CheckpointerShmem->ckpt_lck);
-		CheckpointerShmem->max_requests = NBuffers;
+		CheckpointerShmem->max_requests = Min(NBuffers, MAX_CHECKPOINT_REQUESTS);
+		CheckpointerShmem->head = CheckpointerShmem->tail = 0;
 		ConditionVariableInit(&CheckpointerShmem->start_cv);
 		ConditionVariableInit(&CheckpointerShmem->done_cv);
 	}
@@ -1148,6 +1160,7 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
 {
 	CheckpointerRequest *request;
 	bool		too_full;
+	int			insert_pos;
 
 	if (!IsUnderPostmaster)
 		return false;			/* probably shouldn't even get here */
@@ -1171,10 +1184,14 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
 	}
 
 	/* OK, insert request */
-	request = &CheckpointerShmem->requests[CheckpointerShmem->num_requests++];
+	insert_pos = CheckpointerShmem->tail;
+	request = &CheckpointerShmem->requests[insert_pos];
 	request->ftag = *ftag;
 	request->type = type;
 
+	CheckpointerShmem->tail = (CheckpointerShmem->tail + 1) % CheckpointerShmem->max_requests;
+	CheckpointerShmem->num_requests++;
+
 	/* If queue is more than half full, nudge the checkpointer to empty it */
 	too_full = (CheckpointerShmem->num_requests >=
 				CheckpointerShmem->max_requests / 2);
@@ -1209,6 +1226,12 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
  * Trying to do this every time the queue is full could lose if there
  * aren't any removable entries.  But that should be vanishingly rare in
  * practice: there's one queue entry per shared buffer.
+ *
+ * To avoid large memory allocations when the queue contains many entries,
+ * we process requests incrementally in batches of CKPT_REQ_BATCH_SIZE.
+ * This limits memory usage to O(batch_size) instead of O(num_requests).
+ * Note that duplicates spanning batch boundaries won't be detected, but
+ * this trade-off is acceptable for memory scalability.
  */
 static bool
 CompactCheckpointerRequestQueue(void)
@@ -1216,15 +1239,17 @@ CompactCheckpointerRequestQueue(void)
 	struct CheckpointerSlotMapping
 	{
 		CheckpointerRequest request;
-		int			slot;
+		int			ring_idx;
 	};
-
-	int			n,
-				preserve_count;
-	int			num_skipped = 0;
+	int			n;
+	int			total_num_skipped = 0;
+	int			head;
+	int			max_requests;
+	int			num_requests;
+	int			read_idx,
+				write_idx;
+	int			batch_start;
 	HASHCTL		ctl;
-	HTAB	   *htab;
-	bool	   *skip_slot;
 
 	/* must hold CheckpointerCommLock in exclusive mode */
 	Assert(LWLockHeldByMe(CheckpointerCommLock));
@@ -1233,81 +1258,118 @@ CompactCheckpointerRequestQueue(void)
 	if (CritSectionCount > 0)
 		return false;
 
-	/* Initialize skip_slot array */
-	skip_slot = palloc0(sizeof(bool) * CheckpointerShmem->num_requests);
+	max_requests = CheckpointerShmem->max_requests;
+	num_requests = CheckpointerShmem->num_requests;
+	head = CheckpointerShmem->head;
 
-	/* Initialize temporary hash table */
+	/* Setup hash table control structure once */
 	ctl.keysize = sizeof(CheckpointerRequest);
 	ctl.entrysize = sizeof(struct CheckpointerSlotMapping);
 	ctl.hcxt = CurrentMemoryContext;
 
-	htab = hash_create("CompactCheckpointerRequestQueue",
-					   CheckpointerShmem->num_requests,
-					   &ctl,
-					   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	/* Process and compact in batches */
+	read_idx = head;
+	write_idx = head;
+	batch_start = 0;
 
-	/*
-	 * The basic idea here is that a request can be skipped if it's followed
-	 * by a later, identical request.  It might seem more sensible to work
-	 * backwards from the end of the queue and check whether a request is
-	 * *preceded* by an earlier, identical request, in the hopes of doing less
-	 * copying.  But that might change the semantics, if there's an
-	 * intervening SYNC_FORGET_REQUEST or SYNC_FILTER_REQUEST, so we do it
-	 * this way.  It would be possible to be even smarter if we made the code
-	 * below understand the specific semantics of such requests (it could blow
-	 * away preceding entries that would end up being canceled anyhow), but
-	 * it's not clear that the extra complexity would buy us anything.
-	 */
-	for (n = 0; n < CheckpointerShmem->num_requests; n++)
+	while (batch_start < num_requests)
 	{
-		CheckpointerRequest *request;
-		struct CheckpointerSlotMapping *slotmap;
-		bool		found;
+		int			batch_size = Min(num_requests - batch_start, CKPT_REQ_BATCH_SIZE);
+		HTAB	   *htab;
+		bool	   *skip_slot;
+		int			batch_num_skipped = 0;
+		int			batch_read_idx;
+
+		/* Allocate skip array for this batch only */
+		skip_slot = palloc0(sizeof(bool) * batch_size);
+
+		htab = hash_create("CompactCheckpointerRequestQueue_Batch",
+						   batch_size,
+						   &ctl,
+						   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
 		/*
-		 * We use the request struct directly as a hashtable key.  This
-		 * assumes that any padding bytes in the structs are consistently the
-		 * same, which should be okay because we zeroed them in
-		 * CheckpointerShmemInit.  Note also that RelFileLocator had better
-		 * contain no pad bytes.
+		 * The basic idea here is that a request can be skipped if it's followed
+		 * by a later, identical request within the same batch.  It might seem more
+		 * sensible to work backwards from the end of the queue and check whether a
+		 * request is *preceded* by an earlier, identical request, in the hopes of
+		 * doing less copying.  But that might change the semantics, if there's an
+		 * intervening SYNC_FORGET_REQUEST or SYNC_FILTER_REQUEST, so we do it
+		 * this way.  It would be possible to be even smarter if we made the code
+		 * below understand the specific semantics of such requests (it could blow
+		 * away preceding entries that would end up being canceled anyhow), but
+		 * it's not clear that the extra complexity would buy us anything.
 		 */
-		request = &CheckpointerShmem->requests[n];
-		slotmap = hash_search(htab, request, HASH_ENTER, &found);
-		if (found)
+		batch_read_idx = read_idx;
+		for (n = 0; n < batch_size; n++)
 		{
-			/* Duplicate, so mark the previous occurrence as skippable */
-			skip_slot[slotmap->slot] = true;
-			num_skipped++;
+			CheckpointerRequest *request;
+			struct CheckpointerSlotMapping *slotmap;
+			bool		found;
+
+			/*
+			 * We use the request struct directly as a hashtable key.  This
+			 * assumes that any padding bytes in the structs are consistently the
+			 * same, which should be okay because we zeroed them in
+			 * CheckpointerShmemInit.  Note also that RelFileLocator had better
+			 * contain no pad bytes.
+			 */
+			request = &CheckpointerShmem->requests[batch_read_idx];
+			slotmap = hash_search(htab, request, HASH_ENTER, &found);
+			if (found)
+			{
+				/* Duplicate, so mark the previous occurrence as skippable */
+				skip_slot[slotmap->ring_idx] = true;
+				batch_num_skipped++;
+			}
+			/* Remember slot containing latest occurrence of this request value */
+			slotmap->ring_idx = n;  /* Index within this batch */
+			batch_read_idx = (batch_read_idx + 1) % max_requests;
 		}
-		/* Remember slot containing latest occurrence of this request value */
-		slotmap->slot = n;
-	}
 
-	/* Done with the hash table. */
-	hash_destroy(htab);
+		/* Done with the hash table. */
+		hash_destroy(htab);
 
-	/* If no duplicates, we're out of luck. */
-	if (!num_skipped)
-	{
+		/* Compact this batch: copy non-skipped entries */
+		for (n = 0; n < batch_size; n++)
+		{
+			/* If this slot is NOT skipped, keep it */
+			if (!skip_slot[n])
+			{
+				/* If the read and write positions are different, copy the request */
+				if (write_idx != read_idx)
+					CheckpointerShmem->requests[write_idx] = CheckpointerShmem->requests[read_idx];
+
+				/* Advance the write position */
+				write_idx = (write_idx + 1) % max_requests;
+			}
+
+			read_idx = (read_idx + 1) % max_requests;
+		}
+
+		total_num_skipped += batch_num_skipped;
+
+		/* Cleanup batch resources */
 		pfree(skip_slot);
-		return false;
-	}
 
-	/* We found some duplicates; remove them. */
-	preserve_count = 0;
-	for (n = 0; n < CheckpointerShmem->num_requests; n++)
-	{
-		if (skip_slot[n])
-			continue;
-		CheckpointerShmem->requests[preserve_count++] = CheckpointerShmem->requests[n];
+		batch_start += batch_size;
 	}
+
+	/* If no duplicates, we're out of luck. */
+	if (total_num_skipped == 0)
+		return false;
+
+	/*
+	 * Update ring buffer state: head remains the same, tail moves, count
+	 * decreases
+	 */
+	CheckpointerShmem->tail = write_idx;
+	CheckpointerShmem->num_requests -= total_num_skipped;
+
 	ereport(DEBUG1,
 			(errmsg_internal("compacted fsync request queue from %d entries to %d entries",
-							 CheckpointerShmem->num_requests, preserve_count)));
-	CheckpointerShmem->num_requests = preserve_count;
+							 num_requests, CheckpointerShmem->num_requests)));
 
-	/* Cleanup. */
-	pfree(skip_slot);
 	return true;
 }
 
@@ -1325,40 +1387,61 @@ AbsorbSyncRequests(void)
 {
 	CheckpointerRequest *requests = NULL;
 	CheckpointerRequest *request;
-	int			n;
+	int			n,
+				i;
+	bool		loop;
 
 	if (!AmCheckpointerProcess())
 		return;
 
-	LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
-
-	/*
-	 * We try to avoid holding the lock for a long time by copying the request
-	 * array, and processing the requests after releasing the lock.
-	 *
-	 * Once we have cleared the requests from shared memory, we have to PANIC
-	 * if we then fail to absorb them (eg, because our hashtable runs out of
-	 * memory).  This is because the system cannot run safely if we are unable
-	 * to fsync what we have been told to fsync.  Fortunately, the hashtable
-	 * is so small that the problem is quite unlikely to arise in practice.
-	 */
-	n = CheckpointerShmem->num_requests;
-	if (n > 0)
+	do
 	{
-		requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest));
-		memcpy(requests, CheckpointerShmem->requests, n * sizeof(CheckpointerRequest));
-	}
+		LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
 
-	START_CRIT_SECTION();
+		/*
+		 * We try to avoid holding the lock for a long time by copying the
+		 * request array, and processing the requests after releasing the
+		 * lock.
+		 *
+		 * Once we have cleared the requests from shared memory, we have to
+		 * PANIC if we then fail to absorb them (eg, because our hashtable
+		 * runs out of memory).  This is because the system cannot run safely
+		 * if we are unable to fsync what we have been told to fsync.
+		 * Fortunately, the hashtable is so small that the problem is quite
+		 * unlikely to arise in practice.
+		 *
+		 * Note: we could not palloc more than 1Gb of memory, thus make sure
+		 * that the maximum number of elements will fit in the requests
+		 * buffer.
+		 */
+		n = Min(CheckpointerShmem->num_requests, CKPT_REQ_BATCH_SIZE);
+		if (n > 0)
+		{
+			if (!requests)
+				requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest));
 
-	CheckpointerShmem->num_requests = 0;
+			for (i = 0; i < n; i++)
+			{
+				requests[i] = CheckpointerShmem->requests[CheckpointerShmem->head];
+				CheckpointerShmem->head = (CheckpointerShmem->head + 1) % CheckpointerShmem->max_requests;
+			}
 
-	LWLockRelease(CheckpointerCommLock);
+			CheckpointerShmem->num_requests -= n;
+
+		}
+
+		START_CRIT_SECTION();
+
+		/* Are there any requests in the queue? If so, keep going. */
+		loop = CheckpointerShmem->num_requests != 0;
+
+		LWLockRelease(CheckpointerCommLock);
 
-	for (request = requests; n > 0; request++, n--)
-		RememberSyncRequest(&request->ftag, request->type);
+		for (request = requests; n > 0; request++, n--)
+			RememberSyncRequest(&request->ftag, request->type);
 
-	END_CRIT_SECTION();
+		END_CRIT_SECTION();
+	} while (loop);
 
 	if (requests)
 		pfree(requests);
-- 
2.49.0

