From bec9ffcaf60593986f88c2a717f61fe6dd8cdc30 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Tue, 20 Aug 2024 04:57:41 -0700
Subject: [PATCH v3] Fix memory counter update in ReorderBuffer.

Commit 5bec1d6bc5e changed the memory usage updates of the
ReorderBuffer to zero all at once by subtracting txn->size, rather
than updating it for each change. However, if TOAST reconstruction
data remained in the transaction when freeing it, there were cases
where it further subtracted the memory counter from zero, resulting in
an assertion failure.

This change calculates the memory size for each change and updates the
memory usage to precisely the amount that has been freed.

Backpatch to v17, where this was introducd.

Reviewed-by: Amit Kapila, Shlok Kyal
Discussion: https://postgr.es/m/CAD21AoAqkNUvicgKPT_dXzNoOwpPkVTg0QPPxEcWmzT0moCJ1g%40mail.gmail.com
Backpatch-through: 17
---
 .../replication/logical/reorderbuffer.c       | 32 ++++++++++++++++---
 1 file changed, 28 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 00a8327e77..bf5cf416bf 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -467,6 +467,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	/* Reset the toast hash */
 	ReorderBufferToastReset(rb, txn);
 
+	/* All changes must be returned */
+	Assert(txn->size == 0);
+
 	pfree(txn);
 }
 
@@ -1506,6 +1509,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
 	bool		found;
 	dlist_mutable_iter iter;
+	Size		mem_freed = 0;
 
 	/* cleanup subtransactions & their changes */
 	dlist_foreach_modify(iter, &txn->subtxns)
@@ -1535,9 +1539,20 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		/* Check we're not mixing changes from different transactions. */
 		Assert(change->txn == txn);
 
+		/*
+		 * Instead of updating the memory counter for individual changes,
+		 * we sum up the size of memory to free so we can update the memory
+		 * counter all together below. This saves costs of maintaining
+		 * the max-heap.
+		 */
+		mem_freed += ReorderBufferChangeSize(change);
+
 		ReorderBufferReturnChange(rb, change, false);
 	}
 
+	/* Update the memory counter */
+	ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
+
 	/*
 	 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
 	 * They are always stored in the toplevel transaction.
@@ -1594,9 +1609,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	if (rbtxn_is_serialized(txn))
 		ReorderBufferRestoreCleanup(rb, txn);
 
-	/* Update the memory counter */
-	ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
-
 	/* deallocate */
 	ReorderBufferReturnTXN(rb, txn);
 }
@@ -1616,6 +1628,7 @@ static void
 ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 {
 	dlist_mutable_iter iter;
+	Size	mem_freed = 0;
 
 	/* cleanup subtransactions & their changes */
 	dlist_foreach_modify(iter, &txn->subtxns)
@@ -1648,11 +1661,19 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 		/* remove the change from it's containing list */
 		dlist_delete(&change->node);
 
+		/*
+		 * Instead of updating the memory counter for individual changes,
+		 * we sum up the size of memory to free so we can update the memory
+		 * counter all together below. This saves costs of maintaining
+		 * the max-heap.
+		 */
+		mem_freed += ReorderBufferChangeSize(change);
+
 		ReorderBufferReturnChange(rb, change, false);
 	}
 
 	/* Update the memory counter */
-	ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+	ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
 
 	/*
 	 * Mark the transaction as streamed.
@@ -2062,6 +2083,9 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		rb->stream_stop(rb, txn, last_lsn);
 		ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
 	}
+
+	/* All changes must be returned */
+	Assert(txn->size == 0);
 }
 
 /*
-- 
2.39.3

