From d43d33f40b0820f22bd5355edbd3f95338a6be0f Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 26 Jan 2024 11:31:41 +0900
Subject: [PATCH v1 3/4] Improve transaction eviction algorithm in
 ReorderBuffer.

Previously, when selecting the largest transaction to evict, we scan
all transactions. Which could be quite slow as it was O(n), where n is
the total number of (top-level and sub) transactions, especially in
cases where there are many subtransactions. It could lead to a huge
replication lag.

This commit changes the eviction algorithm in ReorderBuffer to use
max-heap for transaction sizes. The selecting the largest transaction
is now O(1). The performance test showed a significant improvement.

XXX: updating the transaction's memory counter and the max-heap is now
O(log n), so we need to evaludate it. If there are some regression, we
would need a follow-up patch that batches multiple memory counter
updates where possible..

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch-through:
---
 .../replication/logical/reorderbuffer.c       | 57 ++++++++++++-------
 src/include/replication/reorderbuffer.h       |  4 ++
 2 files changed, 42 insertions(+), 19 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index d1334ffb55..1228e3e0d0 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -295,6 +295,7 @@ static Size ReorderBufferChangeSize(ReorderBufferChange *change);
 static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 											ReorderBufferChange *change,
 											bool addition, Size sz);
+static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg);
 
 /*
  * Allocate a new ReorderBuffer and clean out any old serialized state from
@@ -356,6 +357,13 @@ ReorderBufferAllocate(void)
 	buffer->outbufsize = 0;
 	buffer->size = 0;
 
+	/*
+	 * We start with an arbitrary number. Which should be enough for most of
+	 * cases.
+	 */
+	buffer->txn_heap = binaryheap_allocate(1024, ReorderBufferTXNSizeCompare,
+										   NULL);
+
 	buffer->spillTxns = 0;
 	buffer->spillCount = 0;
 	buffer->spillBytes = 0;
@@ -3202,11 +3210,18 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 	if (addition)
 	{
+		bool init = (txn->size == 0);
+
 		txn->size += sz;
 		rb->size += sz;
 
 		/* Update the total size in the top transaction. */
 		toptxn->total_size += sz;
+
+		if (init)
+			binaryheap_add(rb->txn_heap, PointerGetDatum(txn));
+		else
+			binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn));
 	}
 	else
 	{
@@ -3216,6 +3231,11 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 		/* Update the total size in the top transaction. */
 		toptxn->total_size -= sz;
+
+		if (txn->size == 0)
+			binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn));
+		else
+			binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn));
 	}
 
 	Assert(txn->size <= rb->size);
@@ -3473,31 +3493,13 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
 
 /*
  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
- *
- * XXX With many subtransactions this might be quite slow, because we'll have
- * to walk through all of them. There are some options how we could improve
- * that: (a) maintain some secondary structure with transactions sorted by
- * amount of changes, (b) not looking for the entirely largest transaction,
- * but e.g. for transaction using at least some fraction of the memory limit,
- * and (c) evicting multiple transactions at once, e.g. to free a given portion
- * of the memory limit (e.g. 50%).
  */
 static ReorderBufferTXN *
 ReorderBufferLargestTXN(ReorderBuffer *rb)
 {
-	HASH_SEQ_STATUS hash_seq;
-	ReorderBufferTXNByIdEnt *ent;
 	ReorderBufferTXN *largest = NULL;
 
-	hash_seq_init(&hash_seq, rb->by_txn);
-	while ((ent = hash_seq_search(&hash_seq)) != NULL)
-	{
-		ReorderBufferTXN *txn = ent->txn;
-
-		/* if the current transaction is larger, remember it */
-		if ((!largest) || (txn->size > largest->size))
-			largest = txn;
-	}
+	largest = (ReorderBufferTXN *) DatumGetPointer(binaryheap_first(rb->txn_heap));
 
 	Assert(largest);
 	Assert(largest->size > 0);
@@ -5278,3 +5280,20 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+/*
+ * Compare between sizes of two transactions. This is for a binary heap
+ * comparison function.
+ */
+static int
+ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
+{
+	ReorderBufferTXN	*ta = (ReorderBufferTXN *) DatumGetPointer(a);
+	ReorderBufferTXN	*tb = (ReorderBufferTXN *) DatumGetPointer(b);
+
+	if (ta->size < tb->size)
+		return -1;
+	if (ta->size > tb->size)
+		return 1;
+	return 0;
+}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 3e232c6c27..e9eba186e9 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "lib/binaryheap.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -650,6 +651,9 @@ struct ReorderBuffer
 	/* memory accounting */
 	Size		size;
 
+	/* Max-heap for sizes of all top-level and sub transactions */
+	binaryheap	*txn_heap;
+
 	/*
 	 * Statistics about transactions spilled to disk.
 	 *
-- 
2.39.3

