Hi,

I did a basic review and testing of this patch today. Overall I think
the patch is in very good shape - I agree with the tradeoffs it makes,
and I like the approach in general. I do have a couple minor comments
about the code, and then maybe a couple thoughts about the approach.


First, some comments - I'll put them here, but I also kept them in
"review" commits, because that makes it easier to show the exact place
in the code the comment is about.

1) binaryheap_allocate got a new "indexed" argument, but the comment is
not updated to document it

2) I think it's preferable to use descriptive argument names for
bh_set_node. I don't think there's a good reason to keep it short.

3) In a couple places we have code like this:

    if (heap->bh_indexed)
        bh_nodeidx_delete(heap->bh_nodeidx, result);

Maybe it'd be better to have the if condition in bh_nodeidx_delete, so
that it can be called without it.

4) Could we check the "found" flag in bh_set_node, somehow? I mean, we
either expect to find the node (update of already tracked transaction)
or not (when inserting it). The life cycle may be non-trivial (node
added, updated and removed, ...), would be useful assert I think.

5) Do we actually need the various mem_freed local variables in a couple
places, when we expect the value to be equal to txn->size (there's even
assert enforcing that)?

6) ReorderBufferCleanupTXN has a comment about maybe not using the same
threshold both to enable & disable usage of the binaryheap. I agree with
that, otherwise we could easily end up "trashing" if we add/remove
transactions right around the threshold. I think 90-95% for disabling
the heap would work fine.

7) The code disabling binaryheap (based on the threshold) is copied in a
couple places, perhaps it should be a separate function called from
those places.

8) Similarly to (3), maybe ReorderBufferTXNMemoryUpdate should do the
memory size check internally, to make the calls simpler.

9) The ReorderBufferChangeMemoryUpdate / ReorderBufferTXNMemoryUpdate
split maybe not very clear. It's not clear to me why it's divided like
this, or why we can't simply call ReorderBufferTXNMemoryUpdate directly.


performance
-----------

I did some benchmarks, to see the behavior in simple good/bad cases (see
the attached scripts.tgz). "large" is one large transaction inserting 1M
rows, small is 64k single-row inserts, and subxacts is the original case
with ~100k subxacts. Finally, subxacts-small is many transactions with
128 subxacts each (the main transactions are concurrent).

The results are pretty good, I think:

             test        master     patched
  -----------------------------------------------------
            large          2587        2459       95%
            small           956         856       89%
         subxacts        138915        2911        2%
   subxacts-small         13632       13187       97%

This is timing (ms) with logical_work_mem=4MB. I also tried with 64MB,
where the subxact timing goes way down, but the overall conclusions do
not change.

I was a bit surprised I haven't seen any clear regression, but in the
end that's a good thing, right? There's a couple results in this thread
showing ~10% regression, but I've been unable to reproduce those.
Perhaps the newer patch versions fix that, I guess.

Anyway, I think that at some point we'd have to accept that some cases
may have slight regression. I think that's inherent for almost any
heuristics - there's always going to be some rare case that defeats it.
What's important is that the case needs to be rare and/or the impact
very limited. And I think that's true here.


overall design
--------------

As for the design, I agree with the approach of using a binaryheap to
track transactions by size. When going over the thread history,
describing the initial approach with only keeping "large" transactions
above some threshold (e.g. 10%), I was really concerned that'll either
lead to abrupt changes in behavior (when transactions move just around
the 10%), or won't help with many common cases (with most transactions
being below the limit).

I was going to suggest some sort of "binning" - keeping lists for
transactions of similar size (e.g. <1kB, 1-2kB, 2-4kB, 4-8kB, ...) and
evicting transactions from a list, i.e. based on approximate size. But
if the indexed binary heap seems to be cheap enough, I think it's a
better solution.

The one thing I'm a bit concerned about is the threshold used to start
using binary heap - these thresholds with binary decisions may easily
lead to a "cliff" and robustness issues, i.e. abrupt change in behavior
with significant runtime change (e.g. you add/remove one transaction and
the code takes a much more expensive path). The value (1024) seems
rather arbitrary, I wonder if there's something to justify that choice.

In any case, I agree it'd be good to have some dampening factor, to
reduce the risk of trashing because of adding/removing a single
transaction to the decoding.


related stuff / GenerationContext
---------------------------------

It's not the fault of this patch, but this reminds me I have some doubts
about how the eviction interferes with using the GenerationContext for
some of the data. I suspect we can easily get into a situation where we
evict the largest transaction, but that doesn't actually reduce the
memory usage at all, because the memory context blocks are shared with
some other transactions and don't get 100% empty (so we can't release
them). But it's actually worse, because GenerationContext does not even
reuse this memory. So do we even gain anything by the eviction?

When the earlier patch versions also considered age of the transaction,
to try evicting the older ones first, I think that was interesting. I
think we may want to do something like this even with the binary heap.


related stuff / increase of logical_decoding_work_mem
-----------------------------------------------------

In the thread, one of the "alternatives to spilling" suggested in the
thread was to enable streaming, but I think there's often a much more
efficient alternative - increase the amount of memory, so that we don't
actually need to spill.

For example, a system may be doing a lot of eviction / spilling with
logical_decoding_work_mem=64MB, but setting 128MB may completely
eliminate that. Of course, if there are large transactions, this may not
be possible (the GUC would have to exceed RAM). But I don't think that's
very common, the incidents that I've observed were often resolved by
bumping the logical_decoding_work_mem by a little bit.

I wonder if there's something we might do to help users to tune this. We
should be able to measure the "peak" memory usage (how much memory we'd
need to not spill), so maybe we could log that as a WARNING, similarly
to checkpoints - there we only log "checkpoints too frequent, tune WAL
limits", but perhaps we might do more here?  Or maybe we could add the
watermark to the system catalog?


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From 6dfeb61ffddeedc8e00f8de5eb6b644b28ae1f62 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Fri, 23 Feb 2024 13:15:44 +0100
Subject: [PATCH v5 5/5] review

---
 .../replication/logical/reorderbuffer.c       | 32 ++++++++++++-------
 1 file changed, 21 insertions(+), 11 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index f22cf2fb9b8..40fa2ba9843 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1537,7 +1537,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
 	bool		found;
 	dlist_mutable_iter iter;
-	Size		mem_freed = 0;
+	Size		mem_freed = 0;	/* XXX why don't we use txn->size directly? */
 
 	/* cleanup subtransactions & their changes */
 	dlist_foreach_modify(iter, &txn->subtxns)
@@ -1571,11 +1571,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		ReorderBufferReturnChange(rb, change, false);
 	}
 
-	/* Update the memory counter */
-	Assert(mem_freed == txn->size);
-	if (mem_freed > 0)
-		ReorderBufferTXNMemoryUpdate(rb, txn, false, mem_freed);
-
 	/*
 	 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
 	 * They are always stored in the toplevel transaction.
@@ -1635,14 +1630,21 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	/* deallocate */
 	ReorderBufferReturnTXN(rb, txn);
 
+	/* Update the memory counter */
+	Assert(mem_freed == txn->size);
+	ReorderBufferTXNMemoryUpdate(rb, txn, false, mem_freed);
+
 	/*
-	 * Check if the number of transactions get lower than the threshold. If
+	 * Check if the number of transactions got lower than the threshold. If
 	 * so, switch to NO_MAXHEAP state and reset the max-heap.
 	 *
-	 * XXX: If a new transaction is added and the memory usage reached the
+	 * XXX: If a new transaction is added and the memory usage reaches the
 	 * limit soon, we will end up building the max-heap again. It might be
 	 * more efficient if we accept a certain amount of transactions to switch
 	 * back to the NO_MAXHEAP state, say 95% of the threshold.
+	 *
+	 * XXX Yes, having the enable/disable threshold exactly the same can lead
+	 * to trashing. Something like 90% would work, I think.
 	 */
 	if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP &&
 		(binaryheap_size(rb->txn_heap) < REORDER_BUFFER_MEM_TRACK_THRESHOLD))
@@ -3257,6 +3259,10 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
  * counters instead - we don't really care about subtransactions as we
  * can't stream them individually anyway, and we only pick toplevel
  * transactions for eviction. So only toplevel transactions matter.
+ *
+ * XXX Not sure the naming is great, it seems pretty similar to the earlier
+ * function, can be quite confusing. Why do we even need the separate function
+ * and can't simply call ReorderBufferChangeMemoryUpdate from everywhere?
  */
 static void
 ReorderBufferTXNMemoryUpdate(ReorderBuffer *rb, ReorderBufferTXN *txn,
@@ -3264,6 +3270,9 @@ ReorderBufferTXNMemoryUpdate(ReorderBuffer *rb, ReorderBufferTXN *txn,
 {
 	ReorderBufferTXN *toptxn;
 
+	if (sz == 0)
+		return;
+
 	/*
 	 * Update the total size in top level as well. This is later used to
 	 * compute the decoding stats.
@@ -3745,6 +3754,8 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 	 * Check the number of transactions in max-heap after evicting large
 	 * transactions. If the number of transactions is small, we switch back
 	 * to the NO_MAXHEAP state, and reset the current the max-heap.
+	 *
+	 * XXX We already have this block elsewhere, maybe have a function?
 	 */
 	if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP &&
 		(binaryheap_size(rb->txn_heap) < REORDER_BUFFER_MEM_TRACK_THRESHOLD))
@@ -3769,7 +3780,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	XLogSegNo	curOpenSegNo = 0;
 	Size		spilled = 0;
 	Size		size = txn->size;
-	Size		mem_freed = 0;
+	Size		mem_freed = 0;	/* XXX why needed? can't we just use txn->size? */
 
 	elog(DEBUG2, "spill %u changes in XID %u to disk",
 		 (uint32) txn->nentries_mem, txn->xid);
@@ -3831,8 +3842,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 	/* Update the memory counter */
 	Assert(mem_freed == txn->size);
-	if (mem_freed > 0)
-		ReorderBufferTXNMemoryUpdate(rb, txn, false, mem_freed);
+	ReorderBufferTXNMemoryUpdate(rb, txn, false, mem_freed);
 
 	/* update the statistics iff we have spilled anything */
 	if (spilled)
-- 
2.43.0

From 889d0dc3a3ff203fd382e5020029a78b9334c586 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.m...@gmail.com>
Date: Fri, 26 Jan 2024 11:31:41 +0900
Subject: [PATCH v5 4/5] Use max-heap to evict largest transactions in
 ReorderBuffer.

Previously, when selecting the transaction to evict, we check all
transactions to find the largest transaction. Which could lead to a
significant replication lag especially in case where there are many
subtransactions.

This commit improves the eviction algorithm in ReorderBuffer using the
max-heap with transaction size as the key to find the largest
transaction. The max-heap state is maneged in two states.

Overall algorithm:

REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP is the starting state, where we do
not update the max-heap when updating the memory counter. We build the
max-heap just before selecting large transactions. Therefore, in this
state, we can update the memory counter with no additional costs but
need O(n) time to get the largest transaction, where n is the number of
transactions including top-level transactions and subtransactions.

Once we build the max-heap, we switch to
REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP state, where we also update
the max-heap when updating the memory counter. The intention is to
efficiently retrieve the largest transaction in O(1) time instead of
incurring the cost of memory counter updates (O(log n)). We remain in
this state as long as the number of transactions is larger than the
threshold, REORDER_BUFFER_MEM_TRACK_THRESHOLD. Otherwise, we switch back
to REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP and reset the max-heap.

The performance benchmark results showed significant speed up (more
than x30 speed up on my machine) in decoding a transaction with 100k
subtransactions, whereas there is no visible overhead in other cases.

XXX: update typedef.list

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
---
 .../replication/logical/reorderbuffer.c       | 197 +++++++++++++++---
 src/include/replication/reorderbuffer.h       |  21 ++
 2 files changed, 189 insertions(+), 29 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 91b9618d7ec..f22cf2fb9b8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -67,6 +67,26 @@
  *	  allocator, evicting the oldest changes would make it more likely the
  *	  memory gets actually freed.
  *
+ *	  We use a max-heap with transaction size as the key to efficiently find
+ *	  the largest transaction. The max-heap state is managed in two states:
+ *	  REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP and REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP.
+ *
+ *	  REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP is the starting state, where we do
+ *	  not update the max-heap when updating the memory counter. We build the
+ *	  max-heap just before selecting large transactions. Therefore, in this
+ *	  state, we can update the memory counter with no additional costs but
+ *	  need O(n) time to get the largest transaction, where n is the number of
+ *	  transactions including top-level transactions and subtransactions.
+ *
+ *	  Once we build the max-heap, we switch to
+ *	  REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP state, where we also update
+ *	  the max-heap when updating the memory counter. The intention is to
+ *	  efficiently retrieve the largest transaction in O(1) time instead of
+ *	  incurring the cost of memory counter updates (O(log n)). We remain in
+ *	  this state as long as the number of transactions is larger than the
+ *	  threshold, REORDER_BUFFER_MEM_TRACK_THRESHOLD. Otherwise, we switch back
+ *	  to REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP and reset the max-heap.
+ *
  *	  We still rely on max_changes_in_memory when loading serialized changes
  *	  back into memory. At that point we can't use the memory limit directly
  *	  as we load the subxacts independently. One option to deal with this
@@ -109,6 +129,11 @@
 #include "utils/rel.h"
 #include "utils/relfilenumbermap.h"
 
+/*
+ * The threshold of the number of transactions in the max-heap (rb->txn_heap)
+ * to switch the state.
+ */
+#define REORDER_BUFFER_MEM_TRACK_THRESHOLD 1024
 
 /* entry for a hash table we use to map from xid to our transaction state */
 typedef struct ReorderBufferTXNByIdEnt
@@ -296,6 +321,9 @@ static Size ReorderBufferChangeSize(ReorderBufferChange *change);
 static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 											ReorderBufferChange *change,
 											bool addition, Size sz);
+static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg);
+static void ReorderBufferTXNMemoryUpdate(ReorderBuffer *rb, ReorderBufferTXN *txn,
+										 bool addition, Size sz);
 
 /*
  * Allocate a new ReorderBuffer and clean out any old serialized state from
@@ -357,6 +385,15 @@ ReorderBufferAllocate(void)
 	buffer->outbufsize = 0;
 	buffer->size = 0;
 
+	/*
+	 * Don't start with a lower number than REORDER_BUFFER_MEM_TRACK_THRESHOLD, since
+	 * we add at least REORDER_BUFFER_MEM_TRACK_THRESHOLD entries at once.
+	 */
+	buffer->memtrack_state = REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP;
+	buffer->txn_heap = binaryheap_allocate(REORDER_BUFFER_MEM_TRACK_THRESHOLD * 2,
+										   ReorderBufferTXNSizeCompare,
+										   true, NULL);
+
 	buffer->spillTxns = 0;
 	buffer->spillCount = 0;
 	buffer->spillBytes = 0;
@@ -1500,6 +1537,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
 	bool		found;
 	dlist_mutable_iter iter;
+	Size		mem_freed = 0;
 
 	/* cleanup subtransactions & their changes */
 	dlist_foreach_modify(iter, &txn->subtxns)
@@ -1529,9 +1567,15 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		/* Check we're not mixing changes from different transactions. */
 		Assert(change->txn == txn);
 
-		ReorderBufferReturnChange(rb, change, true);
+		mem_freed += ReorderBufferChangeSize(change);
+		ReorderBufferReturnChange(rb, change, false);
 	}
 
+	/* Update the memory counter */
+	Assert(mem_freed == txn->size);
+	if (mem_freed > 0)
+		ReorderBufferTXNMemoryUpdate(rb, txn, false, mem_freed);
+
 	/*
 	 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
 	 * They are always stored in the toplevel transaction.
@@ -1590,6 +1634,22 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 	/* deallocate */
 	ReorderBufferReturnTXN(rb, txn);
+
+	/*
+	 * Check if the number of transactions get lower than the threshold. If
+	 * so, switch to NO_MAXHEAP state and reset the max-heap.
+	 *
+	 * XXX: If a new transaction is added and the memory usage reached the
+	 * limit soon, we will end up building the max-heap again. It might be
+	 * more efficient if we accept a certain amount of transactions to switch
+	 * back to the NO_MAXHEAP state, say 95% of the threshold.
+	 */
+	if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP &&
+		(binaryheap_size(rb->txn_heap) < REORDER_BUFFER_MEM_TRACK_THRESHOLD))
+	{
+		rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP;
+		binaryheap_reset(rb->txn_heap);
+	}
 }
 
 /*
@@ -3162,16 +3222,6 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
 
 /*
  * Update memory counters to account for the new or removed change.
- *
- * We update two counters - in the reorder buffer, and in the transaction
- * containing the change. The reorder buffer counter allows us to quickly
- * decide if we reached the memory limit, the transaction counter allows
- * us to quickly pick the largest transaction for eviction.
- *
- * When streaming is enabled, we need to update the toplevel transaction
- * counters instead - we don't really care about subtransactions as we
- * can't stream them individually anyway, and we only pick toplevel
- * transactions for eviction. So only toplevel transactions matter.
  */
 static void
 ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
@@ -3179,7 +3229,6 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 								bool addition, Size sz)
 {
 	ReorderBufferTXN *txn;
-	ReorderBufferTXN *toptxn;
 
 	Assert(change->txn);
 
@@ -3193,6 +3242,28 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 	txn = change->txn;
 
+	ReorderBufferTXNMemoryUpdate(rb, txn, addition, sz);
+}
+
+/*
+ * Update memory counter of the given transaction.
+ *
+ * We update two counters - in the reorder buffer, and in the transaction
+ * containing the change. The reorder buffer counter allows us to quickly
+ * decide if we reached the memory limit, the transaction counter allows
+ * us to quickly pick the largest transaction for eviction.
+ *
+ * When streaming is enabled, we need to update the toplevel transaction
+ * counters instead - we don't really care about subtransactions as we
+ * can't stream them individually anyway, and we only pick toplevel
+ * transactions for eviction. So only toplevel transactions matter.
+ */
+static void
+ReorderBufferTXNMemoryUpdate(ReorderBuffer *rb, ReorderBufferTXN *txn,
+							 bool addition, Size sz)
+{
+	ReorderBufferTXN *toptxn;
+
 	/*
 	 * Update the total size in top level as well. This is later used to
 	 * compute the decoding stats.
@@ -3206,6 +3277,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 		/* Update the total size in the top transaction. */
 		toptxn->total_size += sz;
+
+		/* Update the max-heap as well if necessary */
+		if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP)
+		{
+			if ((txn->size - sz) == 0)
+				binaryheap_add(rb->txn_heap, PointerGetDatum(txn));
+			else
+				binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn));
+		}
 	}
 	else
 	{
@@ -3215,6 +3295,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 		/* Update the total size in the top transaction. */
 		toptxn->total_size -= sz;
+
+		/* Update the max-heap as well if necessary */
+		if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP)
+		{
+			if (txn->size == 0)
+				binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn));
+			else
+				binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn));
+		}
 	}
 
 	Assert(txn->size <= rb->size);
@@ -3472,31 +3561,45 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
 
 /*
  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
- *
- * XXX With many subtransactions this might be quite slow, because we'll have
- * to walk through all of them. There are some options how we could improve
- * that: (a) maintain some secondary structure with transactions sorted by
- * amount of changes, (b) not looking for the entirely largest transaction,
- * but e.g. for transaction using at least some fraction of the memory limit,
- * and (c) evicting multiple transactions at once, e.g. to free a given portion
- * of the memory limit (e.g. 50%).
  */
 static ReorderBufferTXN *
 ReorderBufferLargestTXN(ReorderBuffer *rb)
 {
-	HASH_SEQ_STATUS hash_seq;
-	ReorderBufferTXNByIdEnt *ent;
 	ReorderBufferTXN *largest = NULL;
 
-	hash_seq_init(&hash_seq, rb->by_txn);
-	while ((ent = hash_seq_search(&hash_seq)) != NULL)
+	/*
+	 * Build the max-heap to pick the largest transaction if not yet. We will
+	 * run a heap assembly step at the end, which is more efficient.
+	 */
+	if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP)
 	{
-		ReorderBufferTXN *txn = ent->txn;
+		HASH_SEQ_STATUS hash_seq;
+		ReorderBufferTXNByIdEnt *ent;
 
-		/* if the current transaction is larger, remember it */
-		if ((!largest) || (txn->size > largest->size))
-			largest = txn;
+		hash_seq_init(&hash_seq, rb->by_txn);
+		while ((ent = hash_seq_search(&hash_seq)) != NULL)
+		{
+			ReorderBufferTXN *txn = ent->txn;
+
+			if (txn->size == 0)
+				continue;
+
+			binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn));
+		}
+
+		binaryheap_build(rb->txn_heap);
+
+		/*
+		 * The max-heap is ready now. We remain in this state at least until
+		 * we free up enough transactions to bring the total memory usage
+		 * below the limit.
+		 */
+		rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP;
 	}
+	else
+		Assert(binaryheap_size(rb->txn_heap) > 0);
+
+	largest = (ReorderBufferTXN *) DatumGetPointer(binaryheap_first(rb->txn_heap));
 
 	Assert(largest);
 	Assert(largest->size > 0);
@@ -3638,6 +3741,18 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 		Assert(txn->nentries_mem == 0);
 	}
 
+	/*
+	 * Check the number of transactions in max-heap after evicting large
+	 * transactions. If the number of transactions is small, we switch back
+	 * to the NO_MAXHEAP state, and reset the current the max-heap.
+	 */
+	if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP &&
+		(binaryheap_size(rb->txn_heap) < REORDER_BUFFER_MEM_TRACK_THRESHOLD))
+	{
+		rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP;
+		binaryheap_reset(rb->txn_heap);
+	}
+
 	/* We must be under the memory limit now. */
 	Assert(rb->size < logical_decoding_work_mem * 1024L);
 }
@@ -3654,6 +3769,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	XLogSegNo	curOpenSegNo = 0;
 	Size		spilled = 0;
 	Size		size = txn->size;
+	Size		mem_freed = 0;
 
 	elog(DEBUG2, "spill %u changes in XID %u to disk",
 		 (uint32) txn->nentries_mem, txn->xid);
@@ -3707,11 +3823,17 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 		ReorderBufferSerializeChange(rb, txn, fd, change);
 		dlist_delete(&change->node);
-		ReorderBufferReturnChange(rb, change, true);
+		mem_freed += ReorderBufferChangeSize(change);
+		ReorderBufferReturnChange(rb, change, false);
 
 		spilled++;
 	}
 
+	/* Update the memory counter */
+	Assert(mem_freed == txn->size);
+	if (mem_freed > 0)
+		ReorderBufferTXNMemoryUpdate(rb, txn, false, mem_freed);
+
 	/* update the statistics iff we have spilled anything */
 	if (spilled)
 	{
@@ -5273,3 +5395,20 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+/*
+ * Compare between sizes of two transactions. This is for a binary heap
+ * comparison function.
+ */
+static int
+ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
+{
+	ReorderBufferTXN	*ta = (ReorderBufferTXN *) DatumGetPointer(a);
+	ReorderBufferTXN	*tb = (ReorderBufferTXN *) DatumGetPointer(b);
+
+	if (ta->size < tb->size)
+		return -1;
+	if (ta->size > tb->size)
+		return 1;
+	return 0;
+}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0b2c95f7aa0..f0d352cfcc6 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "lib/binaryheap.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -531,6 +532,22 @@ typedef void (*ReorderBufferUpdateProgressTxnCB) (
 												  ReorderBufferTXN *txn,
 												  XLogRecPtr lsn);
 
+/* State of how to track the memory usage of each transaction being decoded */
+typedef enum ReorderBufferMemTrackState
+{
+	/*
+	 * We don't update max-heap while updating the memory counter. The
+	 * max-heap is built before use.
+	 */
+	REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP,
+
+	/*
+	 * We also update the max-heap when updating the memory counter so
+	 * the heap property is always preserved.
+	 */
+	REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP,
+} ReorderBufferMemTrackState;
+
 struct ReorderBuffer
 {
 	/*
@@ -631,6 +648,10 @@ struct ReorderBuffer
 	/* memory accounting */
 	Size		size;
 
+	/* Max-heap for sizes of all top-level and sub transactions */
+	ReorderBufferMemTrackState memtrack_state;
+	binaryheap	*txn_heap;
+
 	/*
 	 * Statistics about transactions spilled to disk.
 	 *
-- 
2.43.0

From f2b54fbb2bc0b6a74d10f46b086e238d76fe822f Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Fri, 23 Feb 2024 13:32:04 +0100
Subject: [PATCH v5 3/5] review

---
 src/common/binaryheap.c | 21 ++++++++++++++++-----
 1 file changed, 16 insertions(+), 5 deletions(-)

diff --git a/src/common/binaryheap.c b/src/common/binaryheap.c
index ff03c477dc9..f656c47524e 100644
--- a/src/common/binaryheap.c
+++ b/src/common/binaryheap.c
@@ -54,6 +54,8 @@ static void sift_up(binaryheap *heap, int node_off);
  * store the given number of nodes, with the heap property defined by
  * the given comparator function, which will be invoked with the additional
  * argument specified by 'arg'.
+ *
+ * XXX Should document the new "indexed" argument.
  */
 binaryheap *
 binaryheap_allocate(int capacity, binaryheap_comparator compare,
@@ -110,6 +112,7 @@ binaryheap_free(binaryheap *heap)
 {
 	if (heap->bh_indexed)
 		bh_nodeidx_destroy(heap->bh_nodeidx);
+
 	pfree(heap);
 }
 
@@ -152,28 +155,34 @@ bh_enlarge_node_array(binaryheap *heap)
 }
 
 /*
- * Set the given node at the 'idx' and updates its position accordingly.
+ * Set the given node at the 'index' and updates its position accordingly.
+ *
+ * XXX No need to shorten the argument names, I think.
+ *
+ * XXX Should this return "found" maybe?
  */
 static void
-bh_set_node(binaryheap *heap, bh_node_type d, int idx)
+bh_set_node(binaryheap *heap, bh_node_type node, int index)
 {
 	bh_nodeidx_entry *ent;
 	bool	found;
 
 	/* Set the node to the nodes array */
-	heap->bh_nodes[idx] = d;
+	heap->bh_nodes[index] = node;
 
 	if (heap->bh_indexed)
 	{
 		/* Remember its index in the nodes array */
-		ent = bh_nodeidx_insert(heap->bh_nodeidx, d, &found);
-		ent->idx = idx;
+		ent = bh_nodeidx_insert(heap->bh_nodeidx, node, &found);
+		ent->idx = index;
 	}
 }
 
 /*
  * Replace the node at 'idx' with the given node 'replaced_by'. Also
  * update their positions accordingly.
+ *
+ * XXX can we do Assert(found) here? if bh_set_node returns it, ofc
  */
 static void
 bh_replace_node(binaryheap *heap, int idx, bh_node_type replaced_by)
@@ -280,6 +289,8 @@ binaryheap_remove_first(binaryheap *heap)
 	{
 		heap->bh_size--;
 
+		/* XXX maybe it'd be good to make the check in bh_nodeidx_delete, so that
+		 * we don't need to do it everywhere. */
 		if (heap->bh_indexed)
 			bh_nodeidx_delete(heap->bh_nodeidx, result);
 
-- 
2.43.0

From a2a7db6e02344982764b07ec4bf4d509d1dd7ae4 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.m...@gmail.com>
Date: Fri, 26 Jan 2024 11:20:23 +0900
Subject: [PATCH v5 2/5] Add functions to binaryheap to efficiently
 remove/update keys.

Previously, binaryheap didn't support key updates and removing nodes
in an efficient way. For example, in order to remove a node from the
binaryheap, the caller has to pass the node's position within the
array that the binaryheap internally has. Removing a node from the
binaryheap is done in O(log n) but searching for the key's position is
done in O(n).

This commit adds a hash table to binaryheap to track of positions of
each nodes in the binaryheap. That way, by using newly added
functions such as binaryheap_update_up() etc., both updating a key and
removing a node can node can be done in O(1) in an average and
O(log n) in worst case. This is known as the indexed priority
queue. The caller can specify to use the indexed binaryheap by passing
indexed = true. There is no user of it but it will be used by a
upcoming patch.

XXX: update typedef.list

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch-through:
---
 src/backend/executor/nodeGatherMerge.c        |   1 +
 src/backend/executor/nodeMergeAppend.c        |   2 +-
 src/backend/postmaster/pgarch.c               |   3 +-
 .../replication/logical/reorderbuffer.c       |   1 +
 src/backend/storage/buffer/bufmgr.c           |   1 +
 src/bin/pg_dump/pg_backup_archiver.c          |   1 +
 src/bin/pg_dump/pg_dump_sort.c                |   2 +-
 src/common/binaryheap.c                       | 167 ++++++++++++++++--
 src/include/lib/binaryheap.h                  |  35 +++-
 9 files changed, 199 insertions(+), 14 deletions(-)

diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 2d552f42240..250f226d5f8 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -427,6 +427,7 @@ gather_merge_setup(GatherMergeState *gm_state)
 	/* Allocate the resources for the merge */
 	gm_state->gm_heap = binaryheap_allocate(nreaders + 1,
 											heap_compare_slots,
+											false,
 											gm_state);
 }
 
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index 08178684528..1980794cb7a 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -125,7 +125,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	mergestate->ms_nplans = nplans;
 
 	mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
-	mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
+	mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots, false,
 											  mergestate);
 
 	/*
diff --git a/src/backend/postmaster/pgarch.c b/src/backend/postmaster/pgarch.c
index 9c18e4b3efb..36522940dd4 100644
--- a/src/backend/postmaster/pgarch.c
+++ b/src/backend/postmaster/pgarch.c
@@ -250,7 +250,8 @@ PgArchiverMain(void)
 
 	/* Initialize our max-heap for prioritizing files to archive. */
 	arch_files->arch_heap = binaryheap_allocate(NUM_FILES_PER_DIRECTORY_SCAN,
-												ready_file_comparator, NULL);
+												ready_file_comparator, false,
+												NULL);
 
 	/* Load the archive_library. */
 	LoadArchiveLibrary();
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5446df3c647..91b9618d7ec 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1296,6 +1296,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	/* allocate heap */
 	state->heap = binaryheap_allocate(state->nr_txns,
 									  ReorderBufferIterCompare,
+									  false,
 									  state);
 
 	/* Now that the state fields are initialized, it is safe to return it. */
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index bdf89bbc4dc..69f071321dd 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -2725,6 +2725,7 @@ BufferSync(int flags)
 	 */
 	ts_heap = binaryheap_allocate(num_spaces,
 								  ts_ckpt_progress_comparator,
+								  false,
 								  NULL);
 
 	for (i = 0; i < num_spaces; i++)
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index d97ebaff5b8..6587a7b0814 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -4033,6 +4033,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 	/* Set up ready_heap with enough room for all known TocEntrys */
 	ready_heap = binaryheap_allocate(AH->tocCount,
 									 TocEntrySizeCompareBinaryheap,
+									 false,
 									 NULL);
 
 	/*
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 8ee8a42781a..4d10af3a344 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -405,7 +405,7 @@ TopoSort(DumpableObject **objs,
 		return true;
 
 	/* Create workspace for the above-described heap */
-	pendingHeap = binaryheap_allocate(numObjs, int_cmp, NULL);
+	pendingHeap = binaryheap_allocate(numObjs, int_cmp, false, NULL);
 
 	/*
 	 * Scan the constraints, and for each item in the input, generate a count
diff --git a/src/common/binaryheap.c b/src/common/binaryheap.c
index 6f16c83295d..ff03c477dc9 100644
--- a/src/common/binaryheap.c
+++ b/src/common/binaryheap.c
@@ -22,8 +22,28 @@
 #ifdef FRONTEND
 #include "common/logging.h"
 #endif
+#include "common/hashfn.h"
 #include "lib/binaryheap.h"
 
+/*
+ * Define parameters for hash table code generation. The interface is *also*"
+ * declared in binaryheaph.h (to generate the types, which are externally
+ * visible).
+ */
+#define SH_PREFIX bh_nodeidx
+#define SH_ELEMENT_TYPE bh_nodeidx_entry
+#define SH_KEY_TYPE bh_node_type
+#define SH_KEY key
+#define SH_HASH_KEY(tb, key) \
+	hash_bytes((const unsigned char *) &key, sizeof(bh_node_type))
+#define SH_EQUAL(tb, a, b) (memcmp(&a, &b, sizeof(bh_node_type)) == 0)
+#define SH_SCOPE extern
+#ifdef FRONTEND
+#define SH_RAW_ALLOCATOR pg_malloc0
+#endif
+#define SH_DEFINE
+#include "lib/simplehash.h"
+
 static void sift_down(binaryheap *heap, int node_off);
 static void sift_up(binaryheap *heap, int node_off);
 
@@ -36,7 +56,8 @@ static void sift_up(binaryheap *heap, int node_off);
  * argument specified by 'arg'.
  */
 binaryheap *
-binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
+binaryheap_allocate(int capacity, binaryheap_comparator compare,
+					bool indexed, void *arg)
 {
 	binaryheap *heap;
 
@@ -49,6 +70,17 @@ binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
 	heap->bh_has_heap_property = true;
 	heap->bh_nodes = (bh_node_type *) palloc(sizeof(bh_node_type) * capacity);
 
+	heap->bh_indexed = indexed;
+	if (heap->bh_indexed)
+	{
+#ifdef FRONTEND
+		heap->bh_nodeidx = bh_nodeidx_create(capacity, NULL);
+#else
+		heap->bh_nodeidx = bh_nodeidx_create(CurrentMemoryContext, capacity,
+											 NULL);
+#endif
+	}
+
 	return heap;
 }
 
@@ -63,6 +95,9 @@ binaryheap_reset(binaryheap *heap)
 {
 	heap->bh_size = 0;
 	heap->bh_has_heap_property = true;
+
+	if (heap->bh_indexed)
+		bh_nodeidx_reset(heap->bh_nodeidx);
 }
 
 /*
@@ -73,6 +108,8 @@ binaryheap_reset(binaryheap *heap)
 void
 binaryheap_free(binaryheap *heap)
 {
+	if (heap->bh_indexed)
+		bh_nodeidx_destroy(heap->bh_nodeidx);
 	pfree(heap);
 }
 
@@ -114,6 +151,44 @@ bh_enlarge_node_array(binaryheap *heap)
 							  sizeof(bh_node_type) * heap->bh_space);
 }
 
+/*
+ * Set the given node at the 'idx' and updates its position accordingly.
+ */
+static void
+bh_set_node(binaryheap *heap, bh_node_type d, int idx)
+{
+	bh_nodeidx_entry *ent;
+	bool	found;
+
+	/* Set the node to the nodes array */
+	heap->bh_nodes[idx] = d;
+
+	if (heap->bh_indexed)
+	{
+		/* Remember its index in the nodes array */
+		ent = bh_nodeidx_insert(heap->bh_nodeidx, d, &found);
+		ent->idx = idx;
+	}
+}
+
+/*
+ * Replace the node at 'idx' with the given node 'replaced_by'. Also
+ * update their positions accordingly.
+ */
+static void
+bh_replace_node(binaryheap *heap, int idx, bh_node_type replaced_by)
+{
+	bh_node_type	node = heap->bh_nodes[idx];
+
+	/* Remove overwritten node's index */
+	if (heap->bh_indexed)
+		(void) bh_nodeidx_delete(heap->bh_nodeidx, node);
+
+	/* Replace it with the given new node */
+	if (idx < heap->bh_size)
+		bh_set_node(heap, replaced_by, idx);
+}
+
 /*
  * binaryheap_add_unordered
  *
@@ -130,7 +205,7 @@ binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
 		bh_enlarge_node_array(heap);
 
 	heap->bh_has_heap_property = false;
-	heap->bh_nodes[heap->bh_size] = d;
+	bh_set_node(heap, d, heap->bh_size);
 	heap->bh_size++;
 }
 
@@ -163,7 +238,7 @@ binaryheap_add(binaryheap *heap, bh_node_type d)
 	if (heap->bh_size >= heap->bh_space)
 		bh_enlarge_node_array(heap);
 
-	heap->bh_nodes[heap->bh_size] = d;
+	bh_set_node(heap, d, heap->bh_size);
 	heap->bh_size++;
 	sift_up(heap, heap->bh_size - 1);
 }
@@ -204,6 +279,10 @@ binaryheap_remove_first(binaryheap *heap)
 	if (heap->bh_size == 1)
 	{
 		heap->bh_size--;
+
+		if (heap->bh_indexed)
+			bh_nodeidx_delete(heap->bh_nodeidx, result);
+
 		return result;
 	}
 
@@ -211,7 +290,7 @@ binaryheap_remove_first(binaryheap *heap)
 	 * Remove the last node, placing it in the vacated root entry, and sift
 	 * the new root node down to its correct position.
 	 */
-	heap->bh_nodes[0] = heap->bh_nodes[--heap->bh_size];
+	bh_replace_node(heap, 0, heap->bh_nodes[--heap->bh_size]);
 	sift_down(heap, 0);
 
 	return result;
@@ -237,7 +316,7 @@ binaryheap_remove_node(binaryheap *heap, int n)
 						   heap->bh_arg);
 
 	/* remove the last node, placing it in the vacated entry */
-	heap->bh_nodes[n] = heap->bh_nodes[heap->bh_size];
+	bh_replace_node(heap, n, heap->bh_nodes[heap->bh_size]);
 
 	/* sift as needed to preserve the heap property */
 	if (cmp > 0)
@@ -246,6 +325,74 @@ binaryheap_remove_node(binaryheap *heap, int n)
 		sift_down(heap, n);
 }
 
+/*
+ * binaryheap_remove_node_ptr
+ *
+ * Similar to binaryheap_remove_node() but removes the given node. The caller
+ * must ensure that the given node is in the heap. O(log n) worst case.
+ *
+ * This function can be used only if bh_indexed is true.
+ */
+void
+binaryheap_remove_node_ptr(binaryheap *heap, bh_node_type d)
+{
+	bh_nodeidx_entry *ent;
+
+	Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
+	Assert(heap->bh_indexed);
+
+	ent = bh_nodeidx_lookup(heap->bh_nodeidx, d);
+	Assert(ent);
+
+	binaryheap_remove_node(heap, ent->idx);
+}
+
+/*
+ * binaryheap_update_up
+ *
+ * Sift the given node up after the node's key is updated. The caller must
+ * ensure that the given node is in the heap. O(log n) worst case.
+ *
+ * This function can be used only if bh_indexed is true.
+ */
+void
+binaryheap_update_up(binaryheap *heap, bh_node_type d)
+{
+	bh_nodeidx_entry *ent;
+
+	Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
+	Assert(heap->bh_indexed);
+
+	ent = bh_nodeidx_lookup(heap->bh_nodeidx, d);
+	Assert(ent);
+	Assert(ent->idx >= 0 && ent->idx < heap->bh_size);
+
+	sift_up(heap, ent->idx);
+}
+
+/*
+ * binaryheap_update_down
+ *
+ * Sift the given node down after the node's key is updated. The caller must
+ * ensure that the given node is in the heap. O(log n) worst case.
+ *
+ * This function can be used only if bh_indexed is true.
+ */
+void
+binaryheap_update_down(binaryheap *heap, bh_node_type d)
+{
+	bh_nodeidx_entry *ent;
+
+	Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
+	Assert(heap->bh_indexed);
+
+	ent = bh_nodeidx_lookup(heap->bh_nodeidx, d);
+	Assert(ent);
+	Assert(ent->idx >= 0 && ent->idx < heap->bh_size);
+
+	sift_down(heap, ent->idx);
+}
+
 /*
  * binaryheap_replace_first
  *
@@ -258,7 +405,7 @@ binaryheap_replace_first(binaryheap *heap, bh_node_type d)
 {
 	Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
 
-	heap->bh_nodes[0] = d;
+	bh_replace_node(heap, 0, d);
 
 	if (heap->bh_size > 1)
 		sift_down(heap, 0);
@@ -300,11 +447,11 @@ sift_up(binaryheap *heap, int node_off)
 		 * Otherwise, swap the parent value with the hole, and go on to check
 		 * the node's new parent.
 		 */
-		heap->bh_nodes[node_off] = parent_val;
+		bh_set_node(heap, parent_val, node_off);
 		node_off = parent_off;
 	}
 	/* Re-fill the hole */
-	heap->bh_nodes[node_off] = node_val;
+	bh_set_node(heap, node_val, node_off);
 }
 
 /*
@@ -359,9 +506,9 @@ sift_down(binaryheap *heap, int node_off)
 		 * Otherwise, swap the hole with the child that violates the heap
 		 * property; then go on to check its children.
 		 */
-		heap->bh_nodes[node_off] = heap->bh_nodes[swap_off];
+		bh_set_node(heap, heap->bh_nodes[swap_off], node_off);
 		node_off = swap_off;
 	}
 	/* Re-fill the hole */
-	heap->bh_nodes[node_off] = node_val;
+	bh_set_node(heap, node_val, node_off);
 }
diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h
index 1439f208033..48c2de33b48 100644
--- a/src/include/lib/binaryheap.h
+++ b/src/include/lib/binaryheap.h
@@ -29,6 +29,28 @@ typedef Datum bh_node_type;
  */
 typedef int (*binaryheap_comparator) (bh_node_type a, bh_node_type b, void *arg);
 
+/*
+ * Struct for A hash table element to store the node's index in the bh_nodes
+ * array.
+ */
+typedef struct bh_nodeidx_entry
+{
+	bh_node_type	key;
+	char			status;
+	int				idx;
+} bh_nodeidx_entry;
+
+/* define parameters necessary to generate the hash table interface */
+#define SH_PREFIX bh_nodeidx
+#define SH_ELEMENT_TYPE bh_nodeidx_entry
+#define SH_KEY_TYPE bh_node_type
+#define SH_SCOPE extern
+#ifdef FRONTEND
+#define SH_RAW_ALLOCATOR pg_malloc0
+#endif
+#define SH_DECLARE
+#include "lib/simplehash.h"
+
 /*
  * binaryheap
  *
@@ -47,11 +69,19 @@ typedef struct binaryheap
 	binaryheap_comparator bh_compare;
 	void	   *bh_arg;
 	bh_node_type *bh_nodes;
+
+	/*
+	 * If bh_indexed is true, the bh_nodeidx is used to track of each
+	 * node's index in bh_nodes. This enables the caller to perform
+	 * binaryheap_remove_node_ptr(), binaryheap_update_up/down in O(log n).
+	 */
+	bool		bh_indexed;
+	bh_nodeidx_hash	*bh_nodeidx;
 } binaryheap;
 
 extern binaryheap *binaryheap_allocate(int capacity,
 									   binaryheap_comparator compare,
-									   void *arg);
+									   bool indexed, void *arg);
 extern void binaryheap_reset(binaryheap *heap);
 extern void binaryheap_free(binaryheap *heap);
 extern void binaryheap_add_unordered(binaryheap *heap, bh_node_type d);
@@ -60,7 +90,10 @@ extern void binaryheap_add(binaryheap *heap, bh_node_type d);
 extern bh_node_type binaryheap_first(binaryheap *heap);
 extern bh_node_type binaryheap_remove_first(binaryheap *heap);
 extern void binaryheap_remove_node(binaryheap *heap, int n);
+extern void binaryheap_remove_node_ptr(binaryheap *heap, bh_node_type d);
 extern void binaryheap_replace_first(binaryheap *heap, bh_node_type d);
+extern void binaryheap_update_up(binaryheap *heap, bh_node_type d);
+extern void binaryheap_update_down(binaryheap *heap, bh_node_type d);
 
 #define binaryheap_empty(h)			((h)->bh_size == 0)
 #define binaryheap_size(h)			((h)->bh_size)
-- 
2.43.0

From 540bfa5568ee07205bc3e18aaec78e02ef2051c0 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.m...@gmail.com>
Date: Fri, 26 Jan 2024 17:12:20 +0900
Subject: [PATCH v5 1/5] Make binaryheap enlareable.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch-through:
---
 src/common/binaryheap.c      | 36 +++++++++++++++++++-----------------
 src/include/lib/binaryheap.h |  2 +-
 2 files changed, 20 insertions(+), 18 deletions(-)

diff --git a/src/common/binaryheap.c b/src/common/binaryheap.c
index 7377ebdf156..6f16c83295d 100644
--- a/src/common/binaryheap.c
+++ b/src/common/binaryheap.c
@@ -38,17 +38,16 @@ static void sift_up(binaryheap *heap, int node_off);
 binaryheap *
 binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
 {
-	int			sz;
 	binaryheap *heap;
 
-	sz = offsetof(binaryheap, bh_nodes) + sizeof(bh_node_type) * capacity;
-	heap = (binaryheap *) palloc(sz);
+	heap = (binaryheap *) palloc(sizeof(binaryheap));
 	heap->bh_space = capacity;
 	heap->bh_compare = compare;
 	heap->bh_arg = arg;
 
 	heap->bh_size = 0;
 	heap->bh_has_heap_property = true;
+	heap->bh_nodes = (bh_node_type *) palloc(sizeof(bh_node_type) * capacity);
 
 	return heap;
 }
@@ -104,6 +103,17 @@ parent_offset(int i)
 	return (i - 1) / 2;
 }
 
+/*
+ * Make sure there is enough space for nodes.
+ */
+static void
+bh_enlarge_node_array(binaryheap *heap)
+{
+	heap->bh_space *= 2;
+	heap->bh_nodes = repalloc(heap->bh_nodes,
+							  sizeof(bh_node_type) * heap->bh_space);
+}
+
 /*
  * binaryheap_add_unordered
  *
@@ -115,14 +125,10 @@ parent_offset(int i)
 void
 binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
 {
+	/* make sure enough space for a new node */
 	if (heap->bh_size >= heap->bh_space)
-	{
-#ifdef FRONTEND
-		pg_fatal("out of binary heap slots");
-#else
-		elog(ERROR, "out of binary heap slots");
-#endif
-	}
+		bh_enlarge_node_array(heap);
+
 	heap->bh_has_heap_property = false;
 	heap->bh_nodes[heap->bh_size] = d;
 	heap->bh_size++;
@@ -153,14 +159,10 @@ binaryheap_build(binaryheap *heap)
 void
 binaryheap_add(binaryheap *heap, bh_node_type d)
 {
+	/* make sure enough space for a new node */
 	if (heap->bh_size >= heap->bh_space)
-	{
-#ifdef FRONTEND
-		pg_fatal("out of binary heap slots");
-#else
-		elog(ERROR, "out of binary heap slots");
-#endif
-	}
+		bh_enlarge_node_array(heap);
+
 	heap->bh_nodes[heap->bh_size] = d;
 	heap->bh_size++;
 	sift_up(heap, heap->bh_size - 1);
diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h
index 19025c08ef1..1439f208033 100644
--- a/src/include/lib/binaryheap.h
+++ b/src/include/lib/binaryheap.h
@@ -46,7 +46,7 @@ typedef struct binaryheap
 	bool		bh_has_heap_property;	/* debugging cross-check */
 	binaryheap_comparator bh_compare;
 	void	   *bh_arg;
-	bh_node_type bh_nodes[FLEXIBLE_ARRAY_MEMBER];
+	bh_node_type *bh_nodes;
 } binaryheap;
 
 extern binaryheap *binaryheap_allocate(int capacity,
-- 
2.43.0

Attachment: test-scripts.tgz
Description: application/compressed-tar

Reply via email to