From 158a0037b897cdb0a6b267a3a393bb5b7a72bef0 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 26 Jan 2024 11:31:41 +0900
Subject: [PATCH v2 3/4] Improve transaction eviction algorithm in
 ReorderBuffer.

Previously, when selecting the largest transaction to evict, we scan
all transactions. Which could be quite slow as it was O(n), where n is
the total number of (top-level and sub) transactions, especially in
cases where there are many subtransactions. It could lead to a huge
replication lag.

This commit changes the eviction algorithm in ReorderBuffer to use
max-heap with transaction size,a nd use two strategies depending on
the number of transactions being decoded.

It could be too expensive to pudate max-heap while preserving the heap
property each time the transaction's memory counter is updated, as it
could happen very frquently. So when the number of transactions being
decoded is small, we add the transactions to max-heap but don't
preserve the heap property, which is O(1). We heapify the max-heap
just before picking the largest transaction, which is O(n). This
strategy minimizes the overheads of updating the transaction's memory
counter.

On the other hand, when the number of transactions being decoded is
fairly large, such as when a transaction has many subtranasctions,
selecting the largest transaction is O(n) is too expensive. Therefore,
once the number of transactions being decoded exceeds the
threshold (1024), each time updating the transaction's memory counter
we update max-heap while preserving the heap property, which is O(log
n). Picking the largest transaction can be done in O(1). This strategy
minimizes the cost of picking the largest transaction.

XXX: updating the transaction's memory counter and the max-heap is now
O(log n), so we need to evaludate it. If there are some regression, we
would need a follow-up patch that batches multiple memory counter
updates where possible..

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch-through:
---
 .../replication/logical/reorderbuffer.c       | 130 +++++++++++++++---
 src/include/replication/reorderbuffer.h       |  11 ++
 2 files changed, 125 insertions(+), 16 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c390d96ac3..a114f57d3b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -67,6 +67,27 @@
  *	  allocator, evicting the oldest changes would make it more likely the
  *	  memory gets actually freed.
  *
+ *	  We use max-heap with transaction size as the key to find the largest
+ *	  transaction, and use two strategies depending on the number of transactions
+ *	  being decoded:
+ *
+ *	  Since the transaction memory counter is updated frequently, it's expensive
+ *	  to update max-heap while preserving the heap property each time the memory
+ *	  counter is updated. So when the number of transactions is small, transactions
+ *	  are added to the max-heap while not preserving the heap property. We heapify
+ *	  it just before picking the largest transaction. In this case, updating the
+ *	  memory counter is done in O(1) whereas picking the largest transaction is
+ *	  done in O(n), where n is the total number of transactions being decoded.
+ *
+ *	  On the other hand, when the number of transactions being decoded is large,
+ *	  such as when a transaction has many subtransactions, selecting the largest
+ *	  transaction in O(1) is too costly. Therefore, each time the memory counter
+ *	  of a transaction is updated, the max-heap is updated while preserving the
+ *	  heap property, and the largest transaction is picked at a low cost. In
+ *	  this case, updating the memory counter is done in O(log n) whereas picking
+ *	  the largest transaction is done in O(1). This minimizes the cost of choosing
+ *	  the largest transaction.
+ *
  *	  We still rely on max_changes_in_memory when loading serialized changes
  *	  back into memory. At that point we can't use the memory limit directly
  *	  as we load the subxacts independently. One option to deal with this
@@ -295,6 +316,7 @@ static Size ReorderBufferChangeSize(ReorderBufferChange *change);
 static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 											ReorderBufferChange *change,
 											bool addition, Size sz);
+static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg);
 
 /*
  * Allocate a new ReorderBuffer and clean out any old serialized state from
@@ -356,6 +378,14 @@ ReorderBufferAllocate(void)
 	buffer->outbufsize = 0;
 	buffer->size = 0;
 
+	/*
+	 * We start with an arbitrary number. Which should be enough for most of
+	 * cases.
+	 */
+	buffer->memtrack_state = REORDER_BUFFER_MEM_TRACK_NORMAL;
+	buffer->txn_heap = binaryheap_allocate(1024, ReorderBufferTXNSizeCompare,
+										   true, NULL);
+
 	buffer->spillTxns = 0;
 	buffer->spillCount = 0;
 	buffer->spillBytes = 0;
@@ -3200,11 +3230,31 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 	if (addition)
 	{
+		bool init = (txn->size == 0);
+
 		txn->size += sz;
 		rb->size += sz;
 
 		/* Update the total size in the top transaction. */
 		toptxn->total_size += sz;
+
+		/* Update the transaction in the max-heap */
+		if (init)
+		{
+			/* Add the transaction to the max-heap */
+			if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_NORMAL)
+				binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn));
+			else
+				binaryheap_add(rb->txn_heap, PointerGetDatum(txn));
+		}
+		else if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP)
+		{
+			/*
+			 * If we're maintaining max-heap even while updating the memory counter,
+			 * we reflect the updates to the max-heap.
+			 */
+			binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn));
+		}
 	}
 	else
 	{
@@ -3214,6 +3264,24 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 		/* Update the total size in the top transaction. */
 		toptxn->total_size -= sz;
+
+		/* Remove the transaction from the max-heap */
+		if (txn->size == 0)
+		{
+			/* Remove the transaction */
+			if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_NORMAL)
+				binaryheap_remove_node_ptr_unordered(rb->txn_heap, PointerGetDatum(txn));
+			else
+				binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn));
+		}
+		else if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP)
+		{
+			/*
+			 * If we're maintaining max-heap even while updating the memory counter,
+			 * we reflect the updates to the max-heap.
+			 */
+			binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn));
+		}
 	}
 
 	Assert(txn->size <= rb->size);
@@ -3471,32 +3539,45 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
 
 /*
  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
- *
- * XXX With many subtransactions this might be quite slow, because we'll have
- * to walk through all of them. There are some options how we could improve
- * that: (a) maintain some secondary structure with transactions sorted by
- * amount of changes, (b) not looking for the entirely largest transaction,
- * but e.g. for transaction using at least some fraction of the memory limit,
- * and (c) evicting multiple transactions at once, e.g. to free a given portion
- * of the memory limit (e.g. 50%).
  */
 static ReorderBufferTXN *
 ReorderBufferLargestTXN(ReorderBuffer *rb)
 {
-	HASH_SEQ_STATUS hash_seq;
-	ReorderBufferTXNByIdEnt *ent;
+	/*
+	 * The threshold of the number of transactions in the max-heap (rb->txn_heap)
+	 * to switch the state.
+	 */
+#define REORDE_BUFFER_MEM_TRACK_THRESHOLD 1024
+
 	ReorderBufferTXN *largest = NULL;
 
-	hash_seq_init(&hash_seq, rb->by_txn);
-	while ((ent = hash_seq_search(&hash_seq)) != NULL)
+	if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_NORMAL)
 	{
-		ReorderBufferTXN *txn = ent->txn;
+		binaryheap_build(rb->txn_heap);
 
-		/* if the current transaction is larger, remember it */
-		if ((!largest) || (txn->size > largest->size))
-			largest = txn;
+		/*
+		 * If the number of transactions exceeds the threshold, switch to the
+		 * state where we maintain the max-heap even while updating the memory
+		 * counter.
+		 */
+		if (binaryheap_size(rb->txn_heap) >= REORDE_BUFFER_MEM_TRACK_THRESHOLD)
+			rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP;
+	}
+	else
+	{
+		Assert(rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP);
+
+		/*
+		 * If the number of transactions gets lowered than the threshold, switch
+		 * to the state where we heapify the max-heap right before picking the
+		 * largest transaction while doing nothing for memory counter update.
+		 */
+		if (binaryheap_size(rb->txn_heap) < REORDE_BUFFER_MEM_TRACK_THRESHOLD)
+			rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_NORMAL;
 	}
 
+	largest = (ReorderBufferTXN *) DatumGetPointer(binaryheap_first(rb->txn_heap));
+
 	Assert(largest);
 	Assert(largest->size > 0);
 	Assert(largest->size <= rb->size);
@@ -5276,3 +5357,20 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+/*
+ * Compare between sizes of two transactions. This is for a binary heap
+ * comparison function.
+ */
+static int
+ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
+{
+	ReorderBufferTXN	*ta = (ReorderBufferTXN *) DatumGetPointer(a);
+	ReorderBufferTXN	*tb = (ReorderBufferTXN *) DatumGetPointer(b);
+
+	if (ta->size < tb->size)
+		return -1;
+	if (ta->size > tb->size)
+		return 1;
+	return 0;
+}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0b2c95f7aa..c9815d03f7 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "lib/binaryheap.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -531,6 +532,12 @@ typedef void (*ReorderBufferUpdateProgressTxnCB) (
 												  ReorderBufferTXN *txn,
 												  XLogRecPtr lsn);
 
+typedef enum ReorderBufferMemTrackState
+{
+	REORDER_BUFFER_MEM_TRACK_NORMAL,
+	REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP,
+} ReorderBufferMemTrackState;
+
 struct ReorderBuffer
 {
 	/*
@@ -631,6 +638,10 @@ struct ReorderBuffer
 	/* memory accounting */
 	Size		size;
 
+	/* Max-heap for sizes of all top-level and sub transactions */
+	ReorderBufferMemTrackState memtrack_state;
+	binaryheap	*txn_heap;
+
 	/*
 	 * Statistics about transactions spilled to disk.
 	 *
-- 
2.39.3

