From 6d4e5a520888d6385703a4539b33d003793102af Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 26 Jan 2024 11:31:41 +0900
Subject: [PATCH v9 3/3] Improve eviction algorithm in Reorderbuffer using
 max-heap for many subtransactions.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Previously, when selecting the transaction to evict during logical
decoding, we check all transactions to find the largest
transaction. This could lead to a significant replication lag
especially in the case where there are many subtransactions.

This commit improves the eviction algorithm in ReorderBuffer using the
max-heap with transaction size as the key to efficiently find the
largest transaction.

The max-heap starts with empty. While the max-heap is empty, we don't
do anything for the max-heap when updating the memory
counter. Therefore, we get the largest transaction in O(N) time, where
N is the number of transactions including top-level transactions and
subtransactions.

We build the max-heap just before selecting the largest transactions
if the number of transactions being decoded is higher than the
threshold, MAX_HEAP_TXN_COUNT_THRESHOLD. After building the max-heap,
we also update the max-heap when updating the memory counter. The
intention is to efficiently find the largest transaction in O(1) time
instead of incurring the cost of memory counter updates (O(log
N)). Once the number of transactions got lower than the threshold, we
reset the max-heap.

The performance benchmark results showed significant speed up (more
than x30 speed up on my machine) in decoding a transaction with 100k
subtransactions, whereas there is no visible overhead in other cases.

Reviewed-by: Amit Kapila, Hayato Kuroda, Vignesh C, Ajin Cherian,
Tomas Vondra, Shubham Khanna, Álvaro Herrera, Euler Taveira, Peter
Smith
Discussion: https://postgr.es/m/CAD21AoAfKTgrBrLq96GcTv9d6k97zaQcDM-rxfKEt4GSe0qnaQ%40mail.gmail.com
---
 .../replication/logical/reorderbuffer.c       | 226 +++++++++++++++---
 src/include/replication/reorderbuffer.h       |   4 +
 2 files changed, 202 insertions(+), 28 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 393713af91..aa961a924e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -67,6 +67,21 @@
  *	  allocator, evicting the oldest changes would make it more likely the
  *	  memory gets actually freed.
  *
+ *	  We use a max-heap with transaction size as the key to efficiently find
+ *	  the largest transaction. While the max-heap is empty, we don't update
+ *	  the max-heap when updating the memory counter. Therefore, we can get
+ *	  the largest transaction in O(N) time, where N is the number of
+ *	  transactions including top-level transactions and subtransactions.
+ *
+ *	  We build the max-heap just before selecting the largest transactions
+ *	  if the number of transactions being decoded is higher than the threshold,
+ *	  MAX_HEAP_TXN_COUNT_THRESHOLD. After building the max-heap, we also
+ *	  update the max-heap when updating the memory counter. The intention is
+ *	  to efficiently find the largest transaction in O(1) time instead of
+ *	  incurring the cost of memory counter updates (O(log N)). Once the number
+ *	  of transactions got lower than the threshold, we reset the max-heap
+ *	  (refer to ReorderBufferMaybeResetMaxHeap() for details).
+ *
  *	  We still rely on max_changes_in_memory when loading serialized changes
  *	  back into memory. At that point we can't use the memory limit directly
  *	  as we load the subxacts independently. One option to deal with this
@@ -107,6 +122,22 @@
 #include "utils/rel.h"
 #include "utils/relfilenumbermap.h"
 
+/*
+ * Threshold of the total number of top-level and sub transactions that controls
+ * whether we use the max-heap. Although using max-heap to select the largest
+ * transaction is effective when there are many transactions being decoded,
+ * there is generally no need to use it as long as all transactions being
+ * decoded are top-level transactions. Therefore, we use MaxConnections as the
+ * threshold so we can prevent switching to the state unless we use
+ * subtransactions.
+ */
+#define MAX_HEAP_TXN_COUNT_THRESHOLD	MaxConnections
+
+/*
+ * A macro to check if the max-heap is ready to use and needs to be updated
+ * accordingly.
+ */
+#define ReorderBufferMaxHeapIsReady(rb) !binaryheap_empty((rb)->txn_heap)
 
 /* entry for a hash table we use to map from xid to our transaction state */
 typedef struct ReorderBufferTXNByIdEnt
@@ -259,6 +290,9 @@ static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
 										TransactionId xid, XLogSegNo segno);
+static void ReorderBufferBuildMaxHeap(ReorderBuffer *rb);
+static void ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb);
+static int	ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg);
 
 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
@@ -293,6 +327,7 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
 static Size ReorderBufferChangeSize(ReorderBufferChange *change);
 static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 											ReorderBufferChange *change,
+											ReorderBufferTXN *txn,
 											bool addition, Size sz);
 
 /*
@@ -355,6 +390,17 @@ ReorderBufferAllocate(void)
 	buffer->outbufsize = 0;
 	buffer->size = 0;
 
+	/*
+	 * The binaryheap is indexed for faster manipulations.
+	 *
+	 * We allocate the initial heap size greater than
+	 * MAX_HEAP_TXN_COUNT_THRESHOLD because the txn_heap will not be used
+	 * until the threshold is exceeded.
+	 */
+	buffer->txn_heap = binaryheap_allocate(MAX_HEAP_TXN_COUNT_THRESHOLD * 2,
+										   ReorderBufferTXNSizeCompare,
+										   true, NULL);
+
 	buffer->spillTxns = 0;
 	buffer->spillCount = 0;
 	buffer->spillBytes = 0;
@@ -485,7 +531,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 {
 	/* update memory accounting info */
 	if (upd_mem)
-		ReorderBufferChangeMemoryUpdate(rb, change, false,
+		ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
 										ReorderBufferChangeSize(change));
 
 	/* free contained data */
@@ -816,7 +862,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 	txn->nentries_mem++;
 
 	/* update memory accounting information */
-	ReorderBufferChangeMemoryUpdate(rb, change, true,
+	ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
 									ReorderBufferChangeSize(change));
 
 	/* process partial change */
@@ -1527,7 +1573,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		/* Check we're not mixing changes from different transactions. */
 		Assert(change->txn == txn);
 
-		ReorderBufferReturnChange(rb, change, true);
+		ReorderBufferReturnChange(rb, change, false);
 	}
 
 	/*
@@ -1586,8 +1632,13 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	if (rbtxn_is_serialized(txn))
 		ReorderBufferRestoreCleanup(rb, txn);
 
+	/* Update the memory counter */
+	ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+
 	/* deallocate */
 	ReorderBufferReturnTXN(rb, txn);
+
+	ReorderBufferMaybeResetMaxHeap(rb);
 }
 
 /*
@@ -1637,9 +1688,12 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 		/* remove the change from it's containing list */
 		dlist_delete(&change->node);
 
-		ReorderBufferReturnChange(rb, change, true);
+		ReorderBufferReturnChange(rb, change, false);
 	}
 
+	/* Update the memory counter */
+	ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+
 	/*
 	 * Mark the transaction as streamed.
 	 *
@@ -3166,6 +3220,9 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
  * decide if we reached the memory limit, the transaction counter allows
  * us to quickly pick the largest transaction for eviction.
  *
+ * Either txn or change must be non-NULL at least. We update the memory
+ * counter of txn if it's non-NULL, otherwise change->txn.
+ *
  * When streaming is enabled, we need to update the toplevel transaction
  * counters instead - we don't really care about subtransactions as we
  * can't stream them individually anyway, and we only pick toplevel
@@ -3174,22 +3231,25 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
 static void
 ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 								ReorderBufferChange *change,
+								ReorderBufferTXN *txn,
 								bool addition, Size sz)
 {
-	ReorderBufferTXN *txn;
 	ReorderBufferTXN *toptxn;
 
-	Assert(change->txn);
-
 	/*
 	 * Ignore tuple CID changes, because those are not evicted when reaching
 	 * memory limit. So we just don't count them, because it might easily
 	 * trigger a pointless attempt to spill.
 	 */
-	if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
+	if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
+		return;
+
+	if (sz == 0)
 		return;
 
-	txn = change->txn;
+	if (txn == NULL)
+		txn = change->txn;
+	Assert(txn != NULL);
 
 	/*
 	 * Update the total size in top level as well. This is later used to
@@ -3204,6 +3264,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 		/* Update the total size in the top transaction. */
 		toptxn->total_size += sz;
+
+		/* Update the max-heap as well if necessary */
+		if (ReorderBufferMaxHeapIsReady(rb))
+		{
+			if ((txn->size - sz) == 0)
+				binaryheap_add(rb->txn_heap, PointerGetDatum(txn));
+			else
+				binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn));
+		}
 	}
 	else
 	{
@@ -3213,6 +3282,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 		/* Update the total size in the top transaction. */
 		toptxn->total_size -= sz;
+
+		/* Update the max-heap as well if necessary */
+		if (ReorderBufferMaxHeapIsReady(rb))
+		{
+			if (txn->size == 0)
+				binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn));
+			else
+				binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn));
+		}
 	}
 
 	Assert(txn->size <= rb->size);
@@ -3468,34 +3546,121 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
 	}
 }
 
+
+/* Compare two transactions by size */
+static int
+ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
+{
+	ReorderBufferTXN *ta = (ReorderBufferTXN *) DatumGetPointer(a);
+	ReorderBufferTXN *tb = (ReorderBufferTXN *) DatumGetPointer(b);
+
+	if (ta->size < tb->size)
+		return -1;
+	if (ta->size > tb->size)
+		return 1;
+	return 0;
+}
+
 /*
- * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
- *
- * XXX With many subtransactions this might be quite slow, because we'll have
- * to walk through all of them. There are some options how we could improve
- * that: (a) maintain some secondary structure with transactions sorted by
- * amount of changes, (b) not looking for the entirely largest transaction,
- * but e.g. for transaction using at least some fraction of the memory limit,
- * and (c) evicting multiple transactions at once, e.g. to free a given portion
- * of the memory limit (e.g. 50%).
+ * Build the max-heap. The heap assembly step is deferred  until the end, for
+ * efficiency.
  */
-static ReorderBufferTXN *
-ReorderBufferLargestTXN(ReorderBuffer *rb)
+static void
+ReorderBufferBuildMaxHeap(ReorderBuffer *rb)
 {
 	HASH_SEQ_STATUS hash_seq;
 	ReorderBufferTXNByIdEnt *ent;
-	ReorderBufferTXN *largest = NULL;
+
+	Assert(binaryheap_empty(rb->txn_heap));
 
 	hash_seq_init(&hash_seq, rb->by_txn);
 	while ((ent = hash_seq_search(&hash_seq)) != NULL)
 	{
 		ReorderBufferTXN *txn = ent->txn;
 
-		/* if the current transaction is larger, remember it */
-		if ((!largest) || (txn->size > largest->size))
-			largest = txn;
+		if (txn->size == 0)
+			continue;
+
+		binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn));
+	}
+
+	binaryheap_build(rb->txn_heap);
+}
+
+/*
+ * Reset the max-heap if the number of transactions got lower than the
+ * threshold.
+ */
+static void
+ReorderBufferMaybeResetMaxHeap(ReorderBuffer *rb)
+{
+	/*
+	 * If we add and remove transactions right around the threshold, we could
+	 * easily end up "thrashing". To avoid it, we adapt 10% of transactions to
+	 * reset the max-heap.
+	 */
+	if (ReorderBufferMaxHeapIsReady(rb) &&
+		binaryheap_size(rb->txn_heap) < MAX_HEAP_TXN_COUNT_THRESHOLD * 0.9)
+		binaryheap_reset(rb->txn_heap);
+}
+
+/*
+ * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
+ * We use a different way to find the largest transaction depending on the
+ * memory tracking state and the number of transactions being decoded. Refer
+ * to the comments atop this file for the algorithm details.
+ */
+static ReorderBufferTXN *
+ReorderBufferLargestTXN(ReorderBuffer *rb)
+{
+	ReorderBufferTXN *largest = NULL;
+
+	if (!ReorderBufferMaxHeapIsReady(rb))
+	{
+		/*
+		 * If the number of transactions are small, we scan all transactions
+		 * being decoded to get the largest transaction. This saves the cost
+		 * of building a max-heap with a small number of transactions.
+		 */
+		if (hash_get_num_entries(rb->by_txn) < MAX_HEAP_TXN_COUNT_THRESHOLD)
+		{
+			HASH_SEQ_STATUS hash_seq;
+			ReorderBufferTXNByIdEnt *ent;
+
+			hash_seq_init(&hash_seq, rb->by_txn);
+			while ((ent = hash_seq_search(&hash_seq)) != NULL)
+			{
+				ReorderBufferTXN *txn = ent->txn;
+
+				/* if the current transaction is larger, remember it */
+				if ((!largest) || (txn->size > largest->size))
+					largest = txn;
+			}
+		}
+		else
+		{
+			/*
+			 * There are a large number of transactions in ReorderBuffer. We
+			 * build the max-heap for efficiently selecting the largest
+			 * transactions.
+			 */
+			ReorderBufferBuildMaxHeap(rb);
+
+			/*
+			 * The max-heap is ready now. We remain in this state at least
+			 * until we free up enough transactions to bring the total memory
+			 * usage below the limit. The largest transaction is selected
+			 * below.
+			 */
+			Assert(ReorderBufferMaxHeapIsReady(rb));
+		}
 	}
 
+	/* Get the largest transaction from the max-heap */
+	if (ReorderBufferMaxHeapIsReady(rb))
+		largest = (ReorderBufferTXN *)
+			DatumGetPointer(binaryheap_first(rb->txn_heap));
+
 	Assert(largest);
 	Assert(largest->size > 0);
 	Assert(largest->size <= rb->size);
@@ -3636,6 +3801,8 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 		Assert(txn->nentries_mem == 0);
 	}
 
+	ReorderBufferMaybeResetMaxHeap(rb);
+
 	/* We must be under the memory limit now. */
 	Assert(rb->size < logical_decoding_work_mem * 1024L);
 }
@@ -3705,11 +3872,14 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 		ReorderBufferSerializeChange(rb, txn, fd, change);
 		dlist_delete(&change->node);
-		ReorderBufferReturnChange(rb, change, true);
+		ReorderBufferReturnChange(rb, change, false);
 
 		spilled++;
 	}
 
+	/* Update the memory counter */
+	ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
+
 	/* update the statistics iff we have spilled anything */
 	if (spilled)
 	{
@@ -4491,7 +4661,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	 * update the accounting too (subtracting the size from the counters). And
 	 * we don't want to underflow there.
 	 */
-	ReorderBufferChangeMemoryUpdate(rb, change, true,
+	ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
 									ReorderBufferChangeSize(change));
 }
 
@@ -4903,9 +5073,9 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	MemoryContextSwitchTo(oldcontext);
 
 	/* subtract the old change size */
-	ReorderBufferChangeMemoryUpdate(rb, change, false, old_size);
+	ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
 	/* now add the change back, with the correct size */
-	ReorderBufferChangeMemoryUpdate(rb, change, true,
+	ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
 									ReorderBufferChangeSize(change));
 }
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0b2c95f7aa..a5aec01c2f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "lib/binaryheap.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -631,6 +632,9 @@ struct ReorderBuffer
 	/* memory accounting */
 	Size		size;
 
+	/* Max-heap for sizes of all top-level and sub transactions */
+	binaryheap *txn_heap;
+
 	/*
 	 * Statistics about transactions spilled to disk.
 	 *
-- 
2.39.3

