From 3d89473738752d991810b86701b2790a27e82734 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 26 Jan 2024 11:31:41 +0900
Subject: [PATCH v3 3/3] Improve transaction eviction algorithm in
 ReorderBuffer.

Previously, when selecting the largest transaction to evict, we scan
all transactions. Which could be quite slow as it was O(n), where n is
the total number of (top-level and sub) transactions, especially in
cases where there are many subtransactions. It could lead to a huge
replication lag.

This commit changes the eviction algorithm in ReorderBuffer to use
max-heap with transaction size, and use two strategies depending on
the number of transactions being decoded.

It could be too expensive to update max-heap while preserving the heap
property each time the transaction's memory counter is updated, as it
could happen very frequently. So when the number of transactions being
decoded is small, we add the transactions to max-heap but don't
preserve the heap property, which is O(1). We heapify the max-heap
just before picking the largest transaction, which is O(n). This
strategy minimizes the overheads of updating the transaction's memory
counter.

On the other hand, when the number of transactions being decoded is
fairly large, such as when a transaction has many subtransactions,
selecting the largest transaction is O(n) is too expensive. Therefore,
once the number of transactions being decoded exceeds the
threshold (1024), each time updating the transaction's memory counter
we update max-heap while preserving the heap property, which is O(log
n). Picking the largest transaction can be done in O(1). This strategy
minimizes the cost of picking the largest transaction.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch-through:
---
 .../replication/logical/reorderbuffer.c       | 136 +++++++++++++++---
 src/include/replication/reorderbuffer.h       |  12 ++
 2 files changed, 132 insertions(+), 16 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c390d96ac3..bc6a8c0810 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -67,6 +67,29 @@
  *	  allocator, evicting the oldest changes would make it more likely the
  *	  memory gets actually freed.
  *
+ *	  We use max-heap with transaction size as the key to find the largest
+ *	  transaction, and use two strategies depending on the number of transactions
+ *	  being decoded:
+ *
+ *	  Since the transaction memory counter is updated frequently, it's expensive
+ *	  to update max-heap while preserving the heap property each time the memory
+ *	  counter is updated. So when the number of transactions is small (i.e.
+ *	  in REORDER_BUFFER_MEM_TRACK_NORMAL state), transactions are added to the
+ *	  max-heap while not preserving the heap property. We heapify it just before
+ *	  picking the largest transaction. In this case, updating the memory counter
+ *	  is done in O(1) whereas picking the largest transaction is done in O(n),
+ *	  where n is the total number of transactions being decoded.
+ *
+ *	  On the other hand, when the number of transactions being decoded is large
+ *	  (i.e. in REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP), such as when a
+ *	  transaction has many subtransactions, selecting the largest transaction in
+ *	  O(1) is too costly. Therefore, each time the memory counter of a transaction
+ *	  is updated, the max-heap is updated while preserving the heap property,
+ *	  and the largest transaction is picked at a low cost. In this case,
+ *	  updating the memory counter is done in O(log n) whereas picking the
+ *	  largest transaction is done in O(1). This minimizes the cost of choosing
+ *	  the largest transaction.
+ *
  *	  We still rely on max_changes_in_memory when loading serialized changes
  *	  back into memory. At that point we can't use the memory limit directly
  *	  as we load the subxacts independently. One option to deal with this
@@ -108,6 +131,11 @@
 #include "utils/rel.h"
 #include "utils/relfilenumbermap.h"
 
+/*
+ * The threshold of the number of transactions in the max-heap (rb->txn_heap)
+ * to switch the state.
+ */
+#define REORDE_BUFFER_MEM_TRACK_THRESHOLD 1024
 
 /* entry for a hash table we use to map from xid to our transaction state */
 typedef struct ReorderBufferTXNByIdEnt
@@ -295,6 +323,7 @@ static Size ReorderBufferChangeSize(ReorderBufferChange *change);
 static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 											ReorderBufferChange *change,
 											bool addition, Size sz);
+static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg);
 
 /*
  * Allocate a new ReorderBuffer and clean out any old serialized state from
@@ -356,6 +385,14 @@ ReorderBufferAllocate(void)
 	buffer->outbufsize = 0;
 	buffer->size = 0;
 
+	/*
+	 * We start with an arbitrary number. Which should be enough for most of
+	 * cases.
+	 */
+	buffer->memtrack_state = REORDER_BUFFER_MEM_TRACK_NORMAL;
+	buffer->txn_heap = binaryheap_allocate(1024, ReorderBufferTXNSizeCompare,
+										   true, NULL);
+
 	buffer->spillTxns = 0;
 	buffer->spillCount = 0;
 	buffer->spillBytes = 0;
@@ -3205,6 +3242,32 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 		/* Update the total size in the top transaction. */
 		toptxn->total_size += sz;
+
+		if ((txn->size - sz) == 0)
+		{
+			/* Add the transaction to the max-heap */
+			if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_NORMAL)
+				binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn));
+			else
+				binaryheap_add(rb->txn_heap, PointerGetDatum(txn));
+
+			/*
+			 * Even if the number of transactions reached
+			 * REORDE_BUFFER_MEM_TRACK_THRESHOLD, we don't switch the state
+			 * immediately since it requires to heapify the max-heap and
+			 * some transactions could finish before reaching the memory
+			 * limit. We could switch the state when the total memory usage
+			 * exceeds the memory limit, in ReorderBufferLargestTXN().
+			 */
+		}
+		else if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP)
+		{
+			/*
+			 * If we're maintaining max-heap even while updating the memory counter,
+			 * we reflect the updates to the max-heap.
+			 */
+			binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn));
+		}
 	}
 	else
 	{
@@ -3214,6 +3277,35 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 		/* Update the total size in the top transaction. */
 		toptxn->total_size -= sz;
+
+		if (txn->size == 0)
+		{
+			/* Remove the transaction from the max-heap */
+			if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_NORMAL)
+				binaryheap_remove_node_ptr_unordered(rb->txn_heap, PointerGetDatum(txn));
+			else
+				binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn));
+
+			/*
+			 * Even if the number of transactions falls below
+			 * REORDER_BUFFER_MEM_TRACK_THRESHOLD, it may exceed it and require
+			 * to heapify the max-heap again. In this case, maintaining max-heap
+			 * would be cheaper overall. Therefore in order to switch to the normal
+			 * state, we have a small buffer; when the number of transactions falls
+			 * below 95% of REORDER_BUFFER_MEM_TRACK_THRESHOLD, we switch to the
+			 * normal state.
+			 */
+			if (binaryheap_size(rb->txn_heap) < (REORDE_BUFFER_MEM_TRACK_THRESHOLD * 0.95))
+				rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_NORMAL;
+		}
+		else if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP)
+		{
+			/*
+			 * If we're maintaining max-heap even while updating the memory counter,
+			 * we reflect the updates to the max-heap.
+			 */
+			binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn));
+		}
 	}
 
 	Assert(txn->size <= rb->size);
@@ -3471,32 +3563,27 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
 
 /*
  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
- *
- * XXX With many subtransactions this might be quite slow, because we'll have
- * to walk through all of them. There are some options how we could improve
- * that: (a) maintain some secondary structure with transactions sorted by
- * amount of changes, (b) not looking for the entirely largest transaction,
- * but e.g. for transaction using at least some fraction of the memory limit,
- * and (c) evicting multiple transactions at once, e.g. to free a given portion
- * of the memory limit (e.g. 50%).
  */
 static ReorderBufferTXN *
 ReorderBufferLargestTXN(ReorderBuffer *rb)
 {
-	HASH_SEQ_STATUS hash_seq;
-	ReorderBufferTXNByIdEnt *ent;
 	ReorderBufferTXN *largest = NULL;
 
-	hash_seq_init(&hash_seq, rb->by_txn);
-	while ((ent = hash_seq_search(&hash_seq)) != NULL)
+	if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_NORMAL)
 	{
-		ReorderBufferTXN *txn = ent->txn;
+		binaryheap_build(rb->txn_heap);
 
-		/* if the current transaction is larger, remember it */
-		if ((!largest) || (txn->size > largest->size))
-			largest = txn;
+		/*
+		 * If the number of transactions exceeds the threshold, switch to the
+		 * state where we maintain the max-heap even while updating the memory
+		 * counter.
+		 */
+		if (binaryheap_size(rb->txn_heap) >= REORDE_BUFFER_MEM_TRACK_THRESHOLD)
+			rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP;
 	}
 
+	largest = (ReorderBufferTXN *) DatumGetPointer(binaryheap_first(rb->txn_heap));
+
 	Assert(largest);
 	Assert(largest->size > 0);
 	Assert(largest->size <= rb->size);
@@ -5276,3 +5363,20 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+/*
+ * Compare between sizes of two transactions. This is for a binary heap
+ * comparison function.
+ */
+static int
+ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
+{
+	ReorderBufferTXN	*ta = (ReorderBufferTXN *) DatumGetPointer(a);
+	ReorderBufferTXN	*tb = (ReorderBufferTXN *) DatumGetPointer(b);
+
+	if (ta->size < tb->size)
+		return -1;
+	if (ta->size > tb->size)
+		return 1;
+	return 0;
+}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0b2c95f7aa..967eb65cb3 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "lib/binaryheap.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -531,6 +532,13 @@ typedef void (*ReorderBufferUpdateProgressTxnCB) (
 												  ReorderBufferTXN *txn,
 												  XLogRecPtr lsn);
 
+/* How to track the memory usage of each transaction being decoded */
+typedef enum ReorderBufferMemTrackState
+{
+	REORDER_BUFFER_MEM_TRACK_NORMAL,
+	REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP,
+} ReorderBufferMemTrackState;
+
 struct ReorderBuffer
 {
 	/*
@@ -631,6 +639,10 @@ struct ReorderBuffer
 	/* memory accounting */
 	Size		size;
 
+	/* Max-heap for sizes of all top-level and sub transactions */
+	ReorderBufferMemTrackState memtrack_state;
+	binaryheap	*txn_heap;
+
 	/*
 	 * Statistics about transactions spilled to disk.
 	 *
-- 
2.39.3

