From 3cfce047ff0bbcdfddc7122a4b637f9d61f334a4 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 26 Jan 2024 11:40:03 +0900
Subject: [PATCH v1 4/4] Batch memory counter updates in ReorderBuffer.

Commit XXX improved the algorith of selecting the largest transaction
among top-level and sub transactions by using a max-heap. It in terns
required for memory counter updates to also update the max-heap, which
is O(log n), where n is the number of transactions.

In order to reduce the number of times for updating the binaryheap,
this commits batches memory counter updates where available. For
instance, when cleaning up a transaction, we sum up the total amount
of changes we freed, and then update the memory counter once.

XXX: if the performance test on cases where memory counter updates
happen quite often showed regressions, we need this patch.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch-through:
---
 .../replication/logical/reorderbuffer.c       | 116 ++++++++++++------
 1 file changed, 80 insertions(+), 36 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 1228e3e0d0..ca60e7b984 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -252,7 +252,7 @@ static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *tx
 										 int fd, ReorderBufferChange *change);
 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 										TXNEntryFile *file, XLogSegNo *segno);
-static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
+static Size ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 									   char *data);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
@@ -295,6 +295,9 @@ static Size ReorderBufferChangeSize(ReorderBufferChange *change);
 static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 											ReorderBufferChange *change,
 											bool addition, Size sz);
+static void ReorderBufferTXNMemoryUpdate(ReorderBuffer *rb,
+										 ReorderBufferTXN *txn,
+										 bool addition, Size sz);
 static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg);
 
 /*
@@ -1509,6 +1512,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
 	bool		found;
 	dlist_mutable_iter iter;
+	Size		freed_bytes = 0;
 
 	/* cleanup subtransactions & their changes */
 	dlist_foreach_modify(iter, &txn->subtxns)
@@ -1538,9 +1542,15 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		/* Check we're not mixing changes from different transactions. */
 		Assert(change->txn == txn);
 
-		ReorderBufferReturnChange(rb, change, true);
+		freed_bytes += ReorderBufferChangeSize(change);
+		ReorderBufferReturnChange(rb, change, false);
 	}
 
+	/* Update memory statistics for this txn entry */
+	if (freed_bytes > 0)
+		ReorderBufferTXNMemoryUpdate(rb, txn, false, freed_bytes);
+
+
 	/*
 	 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
 	 * They are always stored in the toplevel transaction.
@@ -1555,7 +1565,8 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		Assert(change->txn == txn);
 		Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
 
-		ReorderBufferReturnChange(rb, change, true);
+		/* Tuple CID changes are ignored for updating memory counter. */
+		ReorderBufferReturnChange(rb, change, false);
 	}
 
 	/*
@@ -1616,6 +1627,7 @@ static void
 ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 {
 	dlist_mutable_iter iter;
+	Size		freed_bytes = 0;
 
 	/* cleanup subtransactions & their changes */
 	dlist_foreach_modify(iter, &txn->subtxns)
@@ -1648,9 +1660,14 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 		/* remove the change from it's containing list */
 		dlist_delete(&change->node);
 
-		ReorderBufferReturnChange(rb, change, true);
+		freed_bytes += ReorderBufferChangeSize(change);
+		ReorderBufferReturnChange(rb, change, false);
 	}
 
+	/* Update memory statistics for this txn entry */
+	if (freed_bytes > 0)
+		ReorderBufferTXNMemoryUpdate(rb, txn, false, freed_bytes);
+
 	/*
 	 * Mark the transaction as streamed.
 	 *
@@ -1689,7 +1706,8 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 			/* Remove the change from its containing list. */
 			dlist_delete(&change->node);
 
-			ReorderBufferReturnChange(rb, change, true);
+			/* Tuple CID changes are ignored for updating memory counter. */
+			ReorderBufferReturnChange(rb, change, false);
 		}
 	}
 
@@ -3170,26 +3188,14 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
 }
 
 /*
- * Update memory counters to account for the new or removed change.
- *
- * We update two counters - in the reorder buffer, and in the transaction
- * containing the change. The reorder buffer counter allows us to quickly
- * decide if we reached the memory limit, the transaction counter allows
- * us to quickly pick the largest transaction for eviction.
- *
- * When streaming is enabled, we need to update the toplevel transaction
- * counters instead - we don't really care about subtransactions as we
- * can't stream them individually anyway, and we only pick toplevel
- * transactions for eviction. So only toplevel transactions matter.
+ * A wrapper function for ReorderBufferTXNMemoryUpdate() to update memory
+ * counters to account for the new or removed change.
  */
 static void
 ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 								ReorderBufferChange *change,
 								bool addition, Size sz)
 {
-	ReorderBufferTXN *txn;
-	ReorderBufferTXN *toptxn;
-
 	Assert(change->txn);
 
 	/*
@@ -3200,7 +3206,27 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 	if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
 		return;
 
-	txn = change->txn;
+	ReorderBufferTXNMemoryUpdate(rb, change->txn, addition, sz);
+}
+
+/*
+ * Update memory counters of the given transaction.
+ *
+ * We update two counters - in the reorder buffer, and in the transaction
+ * containing the change. The reorder buffer counter allows us to quickly
+ * decide if we reached the memory limit, the transaction counter allows
+ * us to quickly pick the largest transaction for eviction.
+ *
+ * When streaming is enabled, we need to update the toplevel transaction
+ * counters instead - we don't really care about subtransactions as we
+ * can't stream them individually anyway, and we only pick toplevel
+ * transactions for eviction. So only toplevel transactions matter.
+ */
+static void
+ReorderBufferTXNMemoryUpdate(ReorderBuffer *rb, ReorderBufferTXN *txn,
+							 bool addition, Size sz)
+{
+	ReorderBufferTXN *toptxn;
 
 	/*
 	 * Update the total size in top level as well. This is later used to
@@ -3656,6 +3682,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	int			fd = -1;
 	XLogSegNo	curOpenSegNo = 0;
 	Size		spilled = 0;
+	Size		freed_bytes = 0;
 	Size		size = txn->size;
 
 	elog(DEBUG2, "spill %u changes in XID %u to disk",
@@ -3710,11 +3737,15 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 		ReorderBufferSerializeChange(rb, txn, fd, change);
 		dlist_delete(&change->node);
-		ReorderBufferReturnChange(rb, change, true);
+		freed_bytes += ReorderBufferChangeSize(change);
+		ReorderBufferReturnChange(rb, change, false);
 
 		spilled++;
 	}
 
+	if (freed_bytes > 0)
+		ReorderBufferTXNMemoryUpdate(rb, txn, false, freed_bytes);
+
 	/* update the statistics iff we have spilled anything */
 	if (spilled)
 	{
@@ -4197,6 +4228,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 							TXNEntryFile *file, XLogSegNo *segno)
 {
 	Size		restored = 0;
+	Size		restored_bytes = 0;
+	Size		freed_bytes = 0;
 	XLogSegNo	last_segno;
 	dlist_mutable_iter cleanup_iter;
 	File	   *fd = &file->vfd;
@@ -4210,12 +4243,16 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		ReorderBufferChange *cleanup =
 			dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
 
+		freed_bytes += ReorderBufferChangeSize(cleanup);
 		dlist_delete(&cleanup->node);
-		ReorderBufferReturnChange(rb, cleanup, true);
+		ReorderBufferReturnChange(rb, cleanup, false);
 	}
 	txn->nentries_mem = 0;
 	Assert(dlist_is_empty(&txn->changes));
 
+	if (freed_bytes > 0)
+		ReorderBufferTXNMemoryUpdate(rb, txn, false, freed_bytes);
+
 	XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
 
 	while (restored < max_changes_in_memory && *segno <= last_segno)
@@ -4320,22 +4357,32 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		 * ok, read a full change from disk, now restore it into proper
 		 * in-memory format
 		 */
-		ReorderBufferRestoreChange(rb, txn, rb->outbuf);
+		restored_bytes += ReorderBufferRestoreChange(rb, txn, rb->outbuf);
 		restored++;
 	}
 
+	/*
+	 * Update memory accounting for the restored change.  We need to do this
+	 * although we don't check the memory limit when restoring the changes in
+	 * this branch (we only do that when initially queueing the changes after
+	 * decoding), because we will release the changes later, and that will
+	 * update the accounting too (subtracting the size from the counters). And
+	 * we don't want to underflow there.
+	 */
+	ReorderBufferTXNMemoryUpdate(rb, txn, true, restored_bytes);
+
 	return restored;
 }
 
 /*
  * Convert change from its on-disk format to in-memory format and queue it onto
- * the TXN's ->changes list.
+ * the TXN's ->changes list. Return the size of the restored change.
  *
  * Note: although "data" is declared char*, at entry it points to a
  * maxalign'd buffer, making it safe in most of this function to assume
  * that the pointed-to data is suitably aligned for direct access.
  */
-static void
+static Size
 ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 						   char *data)
 {
@@ -4488,16 +4535,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	dlist_push_tail(&txn->changes, &change->node);
 	txn->nentries_mem++;
 
-	/*
-	 * Update memory accounting for the restored change.  We need to do this
-	 * although we don't check the memory limit when restoring the changes in
-	 * this branch (we only do that when initially queueing the changes after
-	 * decoding), because we will release the changes later, and that will
-	 * update the accounting too (subtracting the size from the counters). And
-	 * we don't want to underflow there.
-	 */
-	ReorderBufferChangeMemoryUpdate(rb, change, true,
-									ReorderBufferChangeSize(change));
+	return ReorderBufferChangeSize(change);
 }
 
 /*
@@ -4931,6 +4969,7 @@ ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
 	{
 		dlist_mutable_iter it;
+		Size	freed_bytes = 0;
 
 		if (ent->reconstructed != NULL)
 			pfree(ent->reconstructed);
@@ -4940,9 +4979,14 @@ ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
 			ReorderBufferChange *change =
 				dlist_container(ReorderBufferChange, node, it.cur);
 
+			freed_bytes += ReorderBufferChangeSize(change);
 			dlist_delete(&change->node);
-			ReorderBufferReturnChange(rb, change, true);
+			ReorderBufferReturnChange(rb, change, false);
 		}
+
+		/* Update memory statistics for this txn entry */
+		if (freed_bytes > 0)
+			ReorderBufferTXNMemoryUpdate(rb, txn, false, freed_bytes);
 	}
 
 	hash_destroy(txn->toast_hash);
-- 
2.39.3

