On Sat, Sep 02, 2023 at 11:55:21AM -0700, Nathan Bossart wrote:
> I ended up hacking together a (nowhere near committable) patch to see how
> hard it would be to allow using any type with binaryheap.  It doesn't seem
> too bad.

I spent some more time on this patch and made the relevant adjustments to
the rest of the set.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 3a51f25ae712b792bdde90b133c09b0b940f4198 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 4 Sep 2023 15:04:40 -0700
Subject: [PATCH v7 1/5] Allow binaryheap to use any type.

We would like to make binaryheap available to frontend code in a
future commit, but presently the implementation uses Datum for the
node type, which is inaccessible in the frontend.  This commit
modifies the implementation to allow using any type for the nodes.
binaryheap_allocate() now requires callers to specify the size of
the node, and several of the other functions now use void pointers.
---
 src/backend/executor/nodeGatherMerge.c        | 19 ++--
 src/backend/executor/nodeMergeAppend.c        | 22 ++---
 src/backend/lib/binaryheap.c                  | 99 +++++++++++--------
 src/backend/postmaster/pgarch.c               | 36 ++++---
 .../replication/logical/reorderbuffer.c       | 21 ++--
 src/backend/storage/buffer/bufmgr.c           | 20 ++--
 src/include/lib/binaryheap.h                  | 21 ++--
 7 files changed, 137 insertions(+), 101 deletions(-)

diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 9d5e1a46e9..f76406b575 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -52,7 +52,7 @@ typedef struct GMReaderTupleBuffer
 } GMReaderTupleBuffer;
 
 static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
-static int32 heap_compare_slots(Datum a, Datum b, void *arg);
+static int32 heap_compare_slots(const void *a, const void *b, void *arg);
 static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
 static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
 									  bool nowait, bool *done);
@@ -429,6 +429,7 @@ gather_merge_setup(GatherMergeState *gm_state)
 
 	/* Allocate the resources for the merge */
 	gm_state->gm_heap = binaryheap_allocate(nreaders + 1,
+											sizeof(int),
 											heap_compare_slots,
 											gm_state);
 }
@@ -489,7 +490,7 @@ reread:
 				/* Don't have a tuple yet, try to get one */
 				if (gather_merge_readnext(gm_state, i, nowait))
 					binaryheap_add_unordered(gm_state->gm_heap,
-											 Int32GetDatum(i));
+											 &i);
 			}
 			else
 			{
@@ -565,14 +566,14 @@ gather_merge_getnext(GatherMergeState *gm_state)
 		 * the heap, because it might now compare differently against the
 		 * other elements of the heap.
 		 */
-		i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
+		binaryheap_first(gm_state->gm_heap, &i);
 
 		if (gather_merge_readnext(gm_state, i, false))
-			binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i));
+			binaryheap_replace_first(gm_state->gm_heap, &i);
 		else
 		{
 			/* reader exhausted, remove it from heap */
-			(void) binaryheap_remove_first(gm_state->gm_heap);
+			binaryheap_remove_first(gm_state->gm_heap, NULL);
 		}
 	}
 
@@ -585,7 +586,7 @@ gather_merge_getnext(GatherMergeState *gm_state)
 	else
 	{
 		/* Return next tuple from whichever participant has the leading one */
-		i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
+		binaryheap_first(gm_state->gm_heap, &i);
 		return gm_state->gm_slots[i];
 	}
 }
@@ -750,11 +751,11 @@ typedef int32 SlotNumber;
  * Compare the tuples in the two given slots.
  */
 static int32
-heap_compare_slots(Datum a, Datum b, void *arg)
+heap_compare_slots(const void *a, const void *b, void *arg)
 {
 	GatherMergeState *node = (GatherMergeState *) arg;
-	SlotNumber	slot1 = DatumGetInt32(a);
-	SlotNumber	slot2 = DatumGetInt32(b);
+	SlotNumber	slot1 = *((const int *) a);
+	SlotNumber	slot2 = *((const int *) b);
 
 	TupleTableSlot *s1 = node->gm_slots[slot1];
 	TupleTableSlot *s2 = node->gm_slots[slot2];
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index 21b5726e6e..e0ace294a0 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -52,7 +52,7 @@
 typedef int32 SlotNumber;
 
 static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
-static int	heap_compare_slots(Datum a, Datum b, void *arg);
+static int	heap_compare_slots(const void *a, const void *b, void *arg);
 
 
 /* ----------------------------------------------------------------
@@ -125,8 +125,8 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	mergestate->ms_nplans = nplans;
 
 	mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
-	mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
-											  mergestate);
+	mergestate->ms_heap = binaryheap_allocate(nplans, sizeof(int),
+											  heap_compare_slots, mergestate);
 
 	/*
 	 * Miscellaneous initialization
@@ -229,7 +229,7 @@ ExecMergeAppend(PlanState *pstate)
 		{
 			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
 			if (!TupIsNull(node->ms_slots[i]))
-				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
+				binaryheap_add_unordered(node->ms_heap, &i);
 		}
 		binaryheap_build(node->ms_heap);
 		node->ms_initialized = true;
@@ -244,12 +244,12 @@ ExecMergeAppend(PlanState *pstate)
 		 * by doing this before returning from the prior call, but it's better
 		 * to not pull tuples until necessary.)
 		 */
-		i = DatumGetInt32(binaryheap_first(node->ms_heap));
+		binaryheap_first(node->ms_heap, &i);
 		node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
 		if (!TupIsNull(node->ms_slots[i]))
-			binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
+			binaryheap_replace_first(node->ms_heap, &i);
 		else
-			(void) binaryheap_remove_first(node->ms_heap);
+			binaryheap_remove_first(node->ms_heap, NULL);
 	}
 
 	if (binaryheap_empty(node->ms_heap))
@@ -259,7 +259,7 @@ ExecMergeAppend(PlanState *pstate)
 	}
 	else
 	{
-		i = DatumGetInt32(binaryheap_first(node->ms_heap));
+		binaryheap_first(node->ms_heap, &i);
 		result = node->ms_slots[i];
 	}
 
@@ -270,11 +270,11 @@ ExecMergeAppend(PlanState *pstate)
  * Compare the tuples in the two given slots.
  */
 static int32
-heap_compare_slots(Datum a, Datum b, void *arg)
+heap_compare_slots(const void *a, const void *b, void *arg)
 {
 	MergeAppendState *node = (MergeAppendState *) arg;
-	SlotNumber	slot1 = DatumGetInt32(a);
-	SlotNumber	slot2 = DatumGetInt32(b);
+	SlotNumber	slot1 = *((const int *) a);
+	SlotNumber	slot2 = *((const int *) b);
 
 	TupleTableSlot *s1 = node->ms_slots[slot1];
 	TupleTableSlot *s2 = node->ms_slots[slot2];
diff --git a/src/backend/lib/binaryheap.c b/src/backend/lib/binaryheap.c
index 1737546757..6f63181c4c 100644
--- a/src/backend/lib/binaryheap.c
+++ b/src/backend/lib/binaryheap.c
@@ -29,16 +29,19 @@ static void sift_up(binaryheap *heap, int node_off);
  * argument specified by 'arg'.
  */
 binaryheap *
-binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
+binaryheap_allocate(int capacity, size_t elem_size,
+					binaryheap_comparator compare, void *arg)
 {
 	int			sz;
 	binaryheap *heap;
 
-	sz = offsetof(binaryheap, bh_nodes) + sizeof(Datum) * capacity;
+	sz = offsetof(binaryheap, bh_nodes) + elem_size * (capacity + 1);
 	heap = (binaryheap *) palloc(sz);
 	heap->bh_space = capacity;
+	heap->bh_elem_size = elem_size;
 	heap->bh_compare = compare;
 	heap->bh_arg = arg;
+	heap->bh_hole = &heap->bh_nodes[capacity * elem_size];
 
 	heap->bh_size = 0;
 	heap->bh_has_heap_property = true;
@@ -97,6 +100,27 @@ parent_offset(int i)
 	return (i - 1) / 2;
 }
 
+/*
+ * This utility function returns a pointer to the nth node of the binary heap.
+ */
+static inline void *
+bh_node(binaryheap *heap, int n)
+{
+	return &heap->bh_nodes[n * heap->bh_elem_size];
+}
+
+/*
+ * This utility function sets the nth node of the binary heap to the value that
+ * d points to.
+ */
+static inline void
+bh_set(binaryheap *heap, int n, const void *d)
+{
+	void	   *node = bh_node(heap, n);
+
+	memmove(node, d, heap->bh_elem_size);
+}
+
 /*
  * binaryheap_add_unordered
  *
@@ -106,12 +130,12 @@ parent_offset(int i)
  * afterwards.
  */
 void
-binaryheap_add_unordered(binaryheap *heap, Datum d)
+binaryheap_add_unordered(binaryheap *heap, void *d)
 {
 	if (heap->bh_size >= heap->bh_space)
 		elog(ERROR, "out of binary heap slots");
 	heap->bh_has_heap_property = false;
-	heap->bh_nodes[heap->bh_size] = d;
+	bh_set(heap, heap->bh_size, d);
 	heap->bh_size++;
 }
 
@@ -138,11 +162,11 @@ binaryheap_build(binaryheap *heap)
  * the heap property.
  */
 void
-binaryheap_add(binaryheap *heap, Datum d)
+binaryheap_add(binaryheap *heap, void *d)
 {
 	if (heap->bh_size >= heap->bh_space)
 		elog(ERROR, "out of binary heap slots");
-	heap->bh_nodes[heap->bh_size] = d;
+	bh_set(heap, heap->bh_size, d);
 	heap->bh_size++;
 	sift_up(heap, heap->bh_size - 1);
 }
@@ -154,11 +178,11 @@ binaryheap_add(binaryheap *heap, Datum d)
  * without modifying the heap. The caller must ensure that this
  * routine is not used on an empty heap. Always O(1).
  */
-Datum
-binaryheap_first(binaryheap *heap)
+void
+binaryheap_first(binaryheap *heap, void *result)
 {
 	Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
-	return heap->bh_nodes[0];
+	memmove(result, heap->bh_nodes, heap->bh_elem_size);
 }
 
 /*
@@ -169,31 +193,28 @@ binaryheap_first(binaryheap *heap)
  * that this routine is not used on an empty heap. O(log n) worst
  * case.
  */
-Datum
-binaryheap_remove_first(binaryheap *heap)
+void
+binaryheap_remove_first(binaryheap *heap, void *result)
 {
-	Datum		result;
-
 	Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
 
 	/* extract the root node, which will be the result */
-	result = heap->bh_nodes[0];
+	if (result)
+		memmove(result, heap->bh_nodes, heap->bh_elem_size);
 
 	/* easy if heap contains one element */
 	if (heap->bh_size == 1)
 	{
 		heap->bh_size--;
-		return result;
+		return;
 	}
 
 	/*
 	 * Remove the last node, placing it in the vacated root entry, and sift
 	 * the new root node down to its correct position.
 	 */
-	heap->bh_nodes[0] = heap->bh_nodes[--heap->bh_size];
+	bh_set(heap, 0, bh_node(heap, --heap->bh_size));
 	sift_down(heap, 0);
-
-	return result;
 }
 
 /*
@@ -204,11 +225,11 @@ binaryheap_remove_first(binaryheap *heap)
  * sifting the new node down.
  */
 void
-binaryheap_replace_first(binaryheap *heap, Datum d)
+binaryheap_replace_first(binaryheap *heap, void *d)
 {
 	Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
 
-	heap->bh_nodes[0] = d;
+	bh_set(heap, 0, d);
 
 	if (heap->bh_size > 1)
 		sift_down(heap, 0);
@@ -221,26 +242,26 @@ binaryheap_replace_first(binaryheap *heap, Datum d)
 static void
 sift_up(binaryheap *heap, int node_off)
 {
-	Datum		node_val = heap->bh_nodes[node_off];
+	memmove(heap->bh_hole, bh_node(heap, node_off), heap->bh_elem_size);
 
 	/*
 	 * Within the loop, the node_off'th array entry is a "hole" that
-	 * notionally holds node_val, but we don't actually store node_val there
-	 * till the end, saving some unnecessary data copying steps.
+	 * notionally holds the swapped value, but we don't actually store the
+	 * value there till the end, saving some unnecessary data copying steps.
 	 */
 	while (node_off != 0)
 	{
 		int			cmp;
 		int			parent_off;
-		Datum		parent_val;
+		void	   *parent_val;
 
 		/*
 		 * If this node is smaller than its parent, the heap condition is
 		 * satisfied, and we're done.
 		 */
 		parent_off = parent_offset(node_off);
-		parent_val = heap->bh_nodes[parent_off];
-		cmp = heap->bh_compare(node_val,
+		parent_val = bh_node(heap, parent_off);
+		cmp = heap->bh_compare(heap->bh_hole,
 							   parent_val,
 							   heap->bh_arg);
 		if (cmp <= 0)
@@ -250,11 +271,11 @@ sift_up(binaryheap *heap, int node_off)
 		 * Otherwise, swap the parent value with the hole, and go on to check
 		 * the node's new parent.
 		 */
-		heap->bh_nodes[node_off] = parent_val;
+		bh_set(heap, node_off, parent_val);
 		node_off = parent_off;
 	}
 	/* Re-fill the hole */
-	heap->bh_nodes[node_off] = node_val;
+	bh_set(heap, node_off, heap->bh_hole);
 }
 
 /*
@@ -264,12 +285,12 @@ sift_up(binaryheap *heap, int node_off)
 static void
 sift_down(binaryheap *heap, int node_off)
 {
-	Datum		node_val = heap->bh_nodes[node_off];
+	memmove(heap->bh_hole, bh_node(heap, node_off), heap->bh_elem_size);
 
 	/*
 	 * Within the loop, the node_off'th array entry is a "hole" that
-	 * notionally holds node_val, but we don't actually store node_val there
-	 * till the end, saving some unnecessary data copying steps.
+	 * notionally holds the swapped value, but we don't actually store the
+	 * value there till the end, saving some unnecessary data copying steps.
 	 */
 	while (true)
 	{
@@ -279,21 +300,21 @@ sift_down(binaryheap *heap, int node_off)
 
 		/* Is the left child larger than the parent? */
 		if (left_off < heap->bh_size &&
-			heap->bh_compare(node_val,
-							 heap->bh_nodes[left_off],
+			heap->bh_compare(heap->bh_hole,
+							 bh_node(heap, left_off),
 							 heap->bh_arg) < 0)
 			swap_off = left_off;
 
 		/* Is the right child larger than the parent? */
 		if (right_off < heap->bh_size &&
-			heap->bh_compare(node_val,
-							 heap->bh_nodes[right_off],
+			heap->bh_compare(heap->bh_hole,
+							 bh_node(heap, right_off),
 							 heap->bh_arg) < 0)
 		{
 			/* swap with the larger child */
 			if (!swap_off ||
-				heap->bh_compare(heap->bh_nodes[left_off],
-								 heap->bh_nodes[right_off],
+				heap->bh_compare(bh_node(heap, left_off),
+								 bh_node(heap, right_off),
 								 heap->bh_arg) < 0)
 				swap_off = right_off;
 		}
@@ -309,9 +330,9 @@ sift_down(binaryheap *heap, int node_off)
 		 * Otherwise, swap the hole with the child that violates the heap
 		 * property; then go on to check its children.
 		 */
-		heap->bh_nodes[node_off] = heap->bh_nodes[swap_off];
+		bh_set(heap, node_off, bh_node(heap, swap_off));
 		node_off = swap_off;
 	}
 	/* Re-fill the hole */
-	heap->bh_nodes[node_off] = node_val;
+	bh_set(heap, node_off, heap->bh_hole);
 }
diff --git a/src/backend/postmaster/pgarch.c b/src/backend/postmaster/pgarch.c
index 46af349564..f7ee95d8f9 100644
--- a/src/backend/postmaster/pgarch.c
+++ b/src/backend/postmaster/pgarch.c
@@ -145,7 +145,7 @@ static bool pgarch_readyXlog(char *xlog);
 static void pgarch_archiveDone(char *xlog);
 static void pgarch_die(int code, Datum arg);
 static void HandlePgArchInterrupts(void);
-static int	ready_file_comparator(Datum a, Datum b, void *arg);
+static int	ready_file_comparator(const void *a, const void *b, void *arg);
 static void LoadArchiveLibrary(void);
 static void pgarch_call_module_shutdown_cb(int code, Datum arg);
 
@@ -250,6 +250,7 @@ PgArchiverMain(void)
 
 	/* Initialize our max-heap for prioritizing files to archive. */
 	arch_files->arch_heap = binaryheap_allocate(NUM_FILES_PER_DIRECTORY_SCAN,
+												sizeof(char *),
 												ready_file_comparator, NULL);
 
 	/* Load the archive_library. */
@@ -631,22 +632,27 @@ pgarch_readyXlog(char *xlog)
 			/* If the heap isn't full yet, quickly add it. */
 			arch_file = arch_files->arch_filenames[arch_files->arch_heap->bh_size];
 			strcpy(arch_file, basename);
-			binaryheap_add_unordered(arch_files->arch_heap, CStringGetDatum(arch_file));
+			binaryheap_add_unordered(arch_files->arch_heap, &arch_file);
 
 			/* If we just filled the heap, make it a valid one. */
 			if (arch_files->arch_heap->bh_size == NUM_FILES_PER_DIRECTORY_SCAN)
 				binaryheap_build(arch_files->arch_heap);
 		}
-		else if (ready_file_comparator(binaryheap_first(arch_files->arch_heap),
-									   CStringGetDatum(basename), NULL) > 0)
+		else
 		{
-			/*
-			 * Remove the lowest priority file and add the current one to the
-			 * heap.
-			 */
-			arch_file = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap));
-			strcpy(arch_file, basename);
-			binaryheap_add(arch_files->arch_heap, CStringGetDatum(arch_file));
+			char	   *root;
+
+			binaryheap_first(arch_files->arch_heap, &root);
+			if (ready_file_comparator(&root, &basename, NULL) > 0)
+			{
+				/*
+				 * Remove the lowest priority file and add the current one to
+				 * the heap.
+				 */
+				binaryheap_remove_first(arch_files->arch_heap, &arch_file);
+				strcpy(arch_file, basename);
+				binaryheap_add(arch_files->arch_heap, &arch_file);
+			}
 		}
 	}
 	FreeDir(rldir);
@@ -668,7 +674,7 @@ pgarch_readyXlog(char *xlog)
 	 */
 	arch_files->arch_files_size = arch_files->arch_heap->bh_size;
 	for (int i = 0; i < arch_files->arch_files_size; i++)
-		arch_files->arch_files[i] = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap));
+		binaryheap_remove_first(arch_files->arch_heap, &arch_files->arch_files[i]);
 
 	/* Return the highest priority file. */
 	arch_files->arch_files_size--;
@@ -686,10 +692,10 @@ pgarch_readyXlog(char *xlog)
  * If "a" and "b" have equivalent values, 0 will be returned.
  */
 static int
-ready_file_comparator(Datum a, Datum b, void *arg)
+ready_file_comparator(const void *a, const void *b, void *arg)
 {
-	char	   *a_str = DatumGetCString(a);
-	char	   *b_str = DatumGetCString(b);
+	const char *a_str = *((const char **) a);
+	const char *b_str = *((const char **) b);
 	bool		a_history = IsTLHistoryFileName(a_str);
 	bool		b_history = IsTLHistoryFileName(b_str);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 12edc5772a..e22680da0b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1223,11 +1223,11 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
  * Binary heap comparison function.
  */
 static int
-ReorderBufferIterCompare(Datum a, Datum b, void *arg)
+ReorderBufferIterCompare(const void *a, const void *b, void *arg)
 {
 	ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
-	XLogRecPtr	pos_a = state->entries[DatumGetInt32(a)].lsn;
-	XLogRecPtr	pos_b = state->entries[DatumGetInt32(b)].lsn;
+	XLogRecPtr	pos_a = state->entries[*((const int *) a)].lsn;
+	XLogRecPtr	pos_b = state->entries[*((const int *) b)].lsn;
 
 	if (pos_a < pos_b)
 		return 1;
@@ -1297,6 +1297,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 	/* allocate heap */
 	state->heap = binaryheap_allocate(state->nr_txns,
+									  sizeof(int),
 									  ReorderBufferIterCompare,
 									  state);
 
@@ -1330,7 +1331,8 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		state->entries[off].change = cur_change;
 		state->entries[off].txn = txn;
 
-		binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
+		binaryheap_add_unordered(state->heap, &off);
+		off++;
 	}
 
 	/* add subtransactions if they contain changes */
@@ -1359,7 +1361,8 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			state->entries[off].change = cur_change;
 			state->entries[off].txn = cur_txn;
 
-			binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
+			binaryheap_add_unordered(state->heap, &off);
+			off++;
 		}
 	}
 
@@ -1384,7 +1387,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 	if (state->heap->bh_size == 0)
 		return NULL;
 
-	off = DatumGetInt32(binaryheap_first(state->heap));
+	binaryheap_first(state->heap, &off);
 	entry = &state->entries[off];
 
 	/* free memory we might have "leaked" in the previous *Next call */
@@ -1414,7 +1417,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		state->entries[off].lsn = next_change->lsn;
 		state->entries[off].change = next_change;
 
-		binaryheap_replace_first(state->heap, Int32GetDatum(off));
+		binaryheap_replace_first(state->heap, &off);
 		return change;
 	}
 
@@ -1450,14 +1453,14 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 			/* txn stays the same */
 			state->entries[off].lsn = next_change->lsn;
 			state->entries[off].change = next_change;
-			binaryheap_replace_first(state->heap, Int32GetDatum(off));
+			binaryheap_replace_first(state->heap, &off);
 
 			return change;
 		}
 	}
 
 	/* ok, no changes there anymore, remove */
-	binaryheap_remove_first(state->heap);
+	binaryheap_remove_first(state->heap, NULL);
 
 	return change;
 }
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 3bd82dbfca..7c78a0d625 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -501,7 +501,7 @@ static void CheckForBufferLeaks(void);
 static int	rlocator_comparator(const void *p1, const void *p2);
 static inline int buffertag_comparator(const BufferTag *ba, const BufferTag *bb);
 static inline int ckpt_buforder_comparator(const CkptSortItem *a, const CkptSortItem *b);
-static int	ts_ckpt_progress_comparator(Datum a, Datum b, void *arg);
+static int	ts_ckpt_progress_comparator(const void *a, const void *b, void *arg);
 
 
 /*
@@ -2640,6 +2640,7 @@ BufferSync(int flags)
 	 * processed buffer is.
 	 */
 	ts_heap = binaryheap_allocate(num_spaces,
+								  sizeof(CkptTsStatus *),
 								  ts_ckpt_progress_comparator,
 								  NULL);
 
@@ -2649,7 +2650,7 @@ BufferSync(int flags)
 
 		ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan;
 
-		binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat));
+		binaryheap_add_unordered(ts_heap, &ts_stat);
 	}
 
 	binaryheap_build(ts_heap);
@@ -2665,8 +2666,9 @@ BufferSync(int flags)
 	while (!binaryheap_empty(ts_heap))
 	{
 		BufferDesc *bufHdr = NULL;
-		CkptTsStatus *ts_stat = (CkptTsStatus *)
-			DatumGetPointer(binaryheap_first(ts_heap));
+		CkptTsStatus *ts_stat;
+
+		binaryheap_first(ts_heap, &ts_stat);
 
 		buf_id = CkptBufferIds[ts_stat->index].buf_id;
 		Assert(buf_id != -1);
@@ -2708,12 +2710,12 @@ BufferSync(int flags)
 		/* Have all the buffers from the tablespace been processed? */
 		if (ts_stat->num_scanned == ts_stat->num_to_scan)
 		{
-			binaryheap_remove_first(ts_heap);
+			binaryheap_remove_first(ts_heap, NULL);
 		}
 		else
 		{
 			/* update heap with the new progress */
-			binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat));
+			binaryheap_replace_first(ts_heap, &ts_stat);
 		}
 
 		/*
@@ -5416,10 +5418,10 @@ ckpt_buforder_comparator(const CkptSortItem *a, const CkptSortItem *b)
  * progress.
  */
 static int
-ts_ckpt_progress_comparator(Datum a, Datum b, void *arg)
+ts_ckpt_progress_comparator(const void *a, const void *b, void *arg)
 {
-	CkptTsStatus *sa = (CkptTsStatus *) a;
-	CkptTsStatus *sb = (CkptTsStatus *) b;
+	const CkptTsStatus *sa = *((const CkptTsStatus **) a);
+	const CkptTsStatus *sb = *((const CkptTsStatus **) b);
 
 	/* we want a min-heap, so return 1 for the a < b */
 	if (sa->progress < sb->progress)
diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h
index 52f7b06b25..b8c7ef81ef 100644
--- a/src/include/lib/binaryheap.h
+++ b/src/include/lib/binaryheap.h
@@ -15,7 +15,7 @@
  * For a max-heap, the comparator must return <0 iff a < b, 0 iff a == b,
  * and >0 iff a > b.  For a min-heap, the conditions are reversed.
  */
-typedef int (*binaryheap_comparator) (Datum a, Datum b, void *arg);
+typedef int (*binaryheap_comparator) (const void *a, const void *b, void *arg);
 
 /*
  * binaryheap
@@ -25,29 +25,32 @@ typedef int (*binaryheap_comparator) (Datum a, Datum b, void *arg);
  *		bh_has_heap_property	no unordered operations since last heap build
  *		bh_compare		comparison function to define the heap property
  *		bh_arg			user data for comparison function
- *		bh_nodes		variable-length array of "space" nodes
+ *		bh_hole			extra node used during sifting operations
+ *		bh_nodes		variable-length array of "space + 1" nodes
  */
 typedef struct binaryheap
 {
 	int			bh_size;
 	int			bh_space;
+	size_t		bh_elem_size;
 	bool		bh_has_heap_property;	/* debugging cross-check */
 	binaryheap_comparator bh_compare;
 	void	   *bh_arg;
-	Datum		bh_nodes[FLEXIBLE_ARRAY_MEMBER];
+	void	   *bh_hole;
+	char		bh_nodes[FLEXIBLE_ARRAY_MEMBER];
 } binaryheap;
 
-extern binaryheap *binaryheap_allocate(int capacity,
+extern binaryheap *binaryheap_allocate(int capacity, size_t elem_size,
 									   binaryheap_comparator compare,
 									   void *arg);
 extern void binaryheap_reset(binaryheap *heap);
 extern void binaryheap_free(binaryheap *heap);
-extern void binaryheap_add_unordered(binaryheap *heap, Datum d);
+extern void binaryheap_add_unordered(binaryheap *heap, void *d);
 extern void binaryheap_build(binaryheap *heap);
-extern void binaryheap_add(binaryheap *heap, Datum d);
-extern Datum binaryheap_first(binaryheap *heap);
-extern Datum binaryheap_remove_first(binaryheap *heap);
-extern void binaryheap_replace_first(binaryheap *heap, Datum d);
+extern void binaryheap_add(binaryheap *heap, void *d);
+extern void binaryheap_first(binaryheap *heap, void *result);
+extern void binaryheap_remove_first(binaryheap *heap, void *result);
+extern void binaryheap_replace_first(binaryheap *heap, void *d);
 
 #define binaryheap_empty(h)			((h)->bh_size == 0)
 
-- 
2.25.1

>From a0dad80775e9e1947d49bb1483cbdc2f2b86415a Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 4 Sep 2023 15:17:34 -0700
Subject: [PATCH v7 2/5] Make binaryheap available to frontend code.

There are a couple of places in frontend code that could make use
of this simple binary heap implementation.  This commit makes
binaryheap usable in frontend code, much like commit 26aaf97b68 did
for StringInfo.  Like StringInfo, the header file is left in lib/
to reduce the likelihood of unnecessary breakage.
---
 src/backend/lib/Makefile                 |  1 -
 src/backend/lib/meson.build              |  1 -
 src/common/Makefile                      |  1 +
 src/{backend/lib => common}/binaryheap.c | 17 ++++++++++++++++-
 src/common/meson.build                   |  1 +
 5 files changed, 18 insertions(+), 3 deletions(-)
 rename src/{backend/lib => common}/binaryheap.c (96%)

diff --git a/src/backend/lib/Makefile b/src/backend/lib/Makefile
index 9dad31398a..b6cefd9cca 100644
--- a/src/backend/lib/Makefile
+++ b/src/backend/lib/Makefile
@@ -13,7 +13,6 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = \
-	binaryheap.o \
 	bipartite_match.o \
 	bloomfilter.o \
 	dshash.o \
diff --git a/src/backend/lib/meson.build b/src/backend/lib/meson.build
index 974cab8776..b4e88f54ae 100644
--- a/src/backend/lib/meson.build
+++ b/src/backend/lib/meson.build
@@ -1,7 +1,6 @@
 # Copyright (c) 2022-2023, PostgreSQL Global Development Group
 
 backend_sources += files(
-  'binaryheap.c',
   'bipartite_match.c',
   'bloomfilter.c',
   'dshash.c',
diff --git a/src/common/Makefile b/src/common/Makefile
index 113029bf7b..cc5c54dcee 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -48,6 +48,7 @@ LIBS += $(PTHREAD_LIBS)
 OBJS_COMMON = \
 	archive.o \
 	base64.o \
+	binaryheap.o \
 	checksum_helper.o \
 	compression.o \
 	config_info.o \
diff --git a/src/backend/lib/binaryheap.c b/src/common/binaryheap.c
similarity index 96%
rename from src/backend/lib/binaryheap.c
rename to src/common/binaryheap.c
index 6f63181c4c..07021d10b8 100644
--- a/src/backend/lib/binaryheap.c
+++ b/src/common/binaryheap.c
@@ -6,15 +6,22 @@
  * Portions Copyright (c) 2012-2023, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
- *	  src/backend/lib/binaryheap.c
+ *	  src/common/binaryheap.c
  *
  *-------------------------------------------------------------------------
  */
 
+#ifdef FRONTEND
+#include "postgres_fe.h"
+#else
 #include "postgres.h"
+#endif
 
 #include <math.h>
 
+#ifdef FRONTEND
+#include "common/logging.h"
+#endif
 #include "lib/binaryheap.h"
 
 static void sift_down(binaryheap *heap, int node_off);
@@ -133,7 +140,11 @@ void
 binaryheap_add_unordered(binaryheap *heap, void *d)
 {
 	if (heap->bh_size >= heap->bh_space)
+#ifdef FRONTEND
+		pg_fatal("out of binary heap slots");
+#else
 		elog(ERROR, "out of binary heap slots");
+#endif
 	heap->bh_has_heap_property = false;
 	bh_set(heap, heap->bh_size, d);
 	heap->bh_size++;
@@ -165,7 +176,11 @@ void
 binaryheap_add(binaryheap *heap, void *d)
 {
 	if (heap->bh_size >= heap->bh_space)
+#ifdef FRONTEND
+		pg_fatal("out of binary heap slots");
+#else
 		elog(ERROR, "out of binary heap slots");
+#endif
 	bh_set(heap, heap->bh_size, d);
 	heap->bh_size++;
 	sift_up(heap, heap->bh_size - 1);
diff --git a/src/common/meson.build b/src/common/meson.build
index 53942a9a61..3b97497d1a 100644
--- a/src/common/meson.build
+++ b/src/common/meson.build
@@ -3,6 +3,7 @@
 common_sources = files(
   'archive.c',
   'base64.c',
+  'binaryheap.c',
   'checksum_helper.c',
   'compression.c',
   'controldata_utils.c',
-- 
2.25.1

>From 658f4c1899b4721275986e50f388dd5be88e8c20 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Thu, 20 Jul 2023 09:52:20 -0700
Subject: [PATCH v7 3/5] Add function for removing arbitrary nodes in
 binaryheap.

This commit introduces binaryheap_remove_node(), which can be used
to remove any node from a binary heap.  The implementation is
straightforward.  The target node is replaced with the last node in
the heap, and then we sift as needed to preserve the heap property.
This new function is intended for use in a follow-up commit that
will improve the performance of pg_restore.
---
 src/common/binaryheap.c      | 44 ++++++++++++++++++++++++++++++++++++
 src/include/lib/binaryheap.h |  3 +++
 2 files changed, 47 insertions(+)

diff --git a/src/common/binaryheap.c b/src/common/binaryheap.c
index 07021d10b8..f1ed0e157e 100644
--- a/src/common/binaryheap.c
+++ b/src/common/binaryheap.c
@@ -232,6 +232,50 @@ binaryheap_remove_first(binaryheap *heap, void *result)
 	sift_down(heap, 0);
 }
 
+/*
+ * binaryheap_nth
+ *
+ * Returns the nth node from the heap.  The caller must ensure that there are
+ * at least (n - 1) nodes in the heap.  Always O(1).
+ */
+void
+binaryheap_nth(binaryheap *heap, int n, void *result)
+{
+	Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
+	Assert(n >= 0 && n < heap->bh_size);
+
+	memmove(result, bh_node(heap, n), heap->bh_elem_size);
+}
+
+/*
+ * binaryheap_remove_nth
+ *
+ * Removes the nth node from the heap.  The caller must ensure that there are
+ * at least (n - 1) nodes in the heap.  O(log n) worst case.
+ */
+void
+binaryheap_remove_nth(binaryheap *heap, int n)
+{
+	int			cmp;
+
+	Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
+	Assert(n >= 0 && n < heap->bh_size);
+
+	/* compare last node to the one that is being removed */
+	cmp = heap->bh_compare(bh_node(heap, --heap->bh_size),
+						   bh_node(heap, n),
+						   heap->bh_arg);
+
+	/* remove the last node, placing it in the vacated entry */
+	bh_set(heap, n, bh_node(heap, heap->bh_size));
+
+	/* sift as needed to preserve the heap property */
+	if (cmp > 0)
+		sift_up(heap, n);
+	else if (cmp < 0)
+		sift_down(heap, n);
+}
+
 /*
  * binaryheap_replace_first
  *
diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h
index b8c7ef81ef..083821ddbd 100644
--- a/src/include/lib/binaryheap.h
+++ b/src/include/lib/binaryheap.h
@@ -50,8 +50,11 @@ extern void binaryheap_build(binaryheap *heap);
 extern void binaryheap_add(binaryheap *heap, void *d);
 extern void binaryheap_first(binaryheap *heap, void *result);
 extern void binaryheap_remove_first(binaryheap *heap, void *result);
+extern void binaryheap_nth(binaryheap *heap, int n, void *result);
+extern void binaryheap_remove_nth(binaryheap *heap, int n);
 extern void binaryheap_replace_first(binaryheap *heap, void *d);
 
 #define binaryheap_empty(h)			((h)->bh_size == 0)
+#define binaryheap_size(h)			((h)->bh_size)
 
 #endif							/* BINARYHEAP_H */
-- 
2.25.1

>From 65de4421ba130a5716fc226f6de05b97b30c2711 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Mon, 4 Sep 2023 15:35:08 -0700
Subject: [PATCH v7 4/5] Convert pg_restore's ready_list to a priority queue.

Presently, we spend a lot of time sorting this list so that we pick
the largest items first.  With many tables, this sorting can become
a significant bottleneck.  There are a couple of reports from the
field about this, and it is easily reproducible, so this is not a
hypothetical issue.

This commit improves the performance of pg_restore with many tables
by converting its ready_list to a priority queue, i.e., a binary
heap.  We will first try to run the highest priority item, but if
it cannot be chosen due to the lock heuristic, we'll do a
sequential scan through the heap nodes until we find one that is
runnable.  This means that we might end up picking an item with
much lower priority, but since we expect that we'll typically be
able to choose one of the first few nodes, we should usually pick
an item with a relatively high priority.

On my machine, a basic test with 100,000 tables takes 11.5 minutes
without this patch and 1.5 minutes with it.  Pierre Ducroquet
claims to see a speedup from 30 minutes to 23 minutes for a
real-world dump of over 50,000 tables.
---
 src/bin/pg_dump/pg_backup_archiver.c | 201 ++++++++-------------------
 1 file changed, 61 insertions(+), 140 deletions(-)

diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 39ebcfec32..ff7349537e 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -34,6 +34,7 @@
 #include "compress_io.h"
 #include "dumputils.h"
 #include "fe_utils/string_utils.h"
+#include "lib/binaryheap.h"
 #include "lib/stringinfo.h"
 #include "libpq/libpq-fs.h"
 #include "parallel.h"
@@ -44,24 +45,6 @@
 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
 
-/*
- * State for tracking TocEntrys that are ready to process during a parallel
- * restore.  (This used to be a list, and we still call it that, though now
- * it's really an array so that we can apply qsort to it.)
- *
- * tes[] is sized large enough that we can't overrun it.
- * The valid entries are indexed first_te .. last_te inclusive.
- * We periodically sort the array to bring larger-by-dataLength entries to
- * the front; "sorted" is true if the valid entries are known sorted.
- */
-typedef struct _parallelReadyList
-{
-	TocEntry  **tes;			/* Ready-to-dump TocEntrys */
-	int			first_te;		/* index of first valid entry in tes[] */
-	int			last_te;		/* index of last valid entry in tes[] */
-	bool		sorted;			/* are valid entries currently sorted? */
-} ParallelReadyList;
-
 
 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
 							   const pg_compress_specification compression_spec,
@@ -110,16 +93,13 @@ static void restore_toc_entries_postfork(ArchiveHandle *AH,
 static void pending_list_header_init(TocEntry *l);
 static void pending_list_append(TocEntry *l, TocEntry *te);
 static void pending_list_remove(TocEntry *te);
-static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
-static void ready_list_free(ParallelReadyList *ready_list);
-static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
-static void ready_list_remove(ParallelReadyList *ready_list, int i);
-static void ready_list_sort(ParallelReadyList *ready_list);
-static int	TocEntrySizeCompare(const void *p1, const void *p2);
-static void move_to_ready_list(TocEntry *pending_list,
-							   ParallelReadyList *ready_list,
+static int	TocEntrySizeCompareQsort(const void *p1, const void *p2);
+static int	TocEntrySizeCompareBinaryheap(const void *p1, const void *p2,
+										  void *arg);
+static void move_to_ready_heap(TocEntry *pending_list,
+							   binaryheap *ready_heap,
 							   RestorePass pass);
-static TocEntry *pop_next_work_item(ParallelReadyList *ready_list,
+static TocEntry *pop_next_work_item(binaryheap *ready_heap,
 									ParallelState *pstate);
 static void mark_dump_job_done(ArchiveHandle *AH,
 							   TocEntry *te,
@@ -134,7 +114,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
 static void repoint_table_dependencies(ArchiveHandle *AH);
 static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
-								ParallelReadyList *ready_list);
+								binaryheap *ready_heap);
 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
 
@@ -2380,7 +2360,7 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
 		}
 
 		if (ntes > 1)
-			qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompare);
+			qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
 
 		for (int i = 0; i < ntes; i++)
 			DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
@@ -3980,7 +3960,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
 
 			(void) restore_toc_entry(AH, next_work_item, false);
 
-			/* Reduce dependencies, but don't move anything to ready_list */
+			/* Reduce dependencies, but don't move anything to ready_heap */
 			reduce_dependencies(AH, next_work_item, NULL);
 		}
 		else
@@ -4023,24 +4003,27 @@ static void
 restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 							 TocEntry *pending_list)
 {
-	ParallelReadyList ready_list;
+	binaryheap *ready_heap;
 	TocEntry   *next_work_item;
 
 	pg_log_debug("entering restore_toc_entries_parallel");
 
-	/* Set up ready_list with enough room for all known TocEntrys */
-	ready_list_init(&ready_list, AH->tocCount);
+	/* Set up ready_heap with enough room for all known TocEntrys */
+	ready_heap = binaryheap_allocate(AH->tocCount,
+									 sizeof(TocEntry *),
+									 TocEntrySizeCompareBinaryheap,
+									 NULL);
 
 	/*
 	 * The pending_list contains all items that we need to restore.  Move all
-	 * items that are available to process immediately into the ready_list.
+	 * items that are available to process immediately into the ready_heap.
 	 * After this setup, the pending list is everything that needs to be done
-	 * but is blocked by one or more dependencies, while the ready list
+	 * but is blocked by one or more dependencies, while the ready heap
 	 * contains items that have no remaining dependencies and are OK to
 	 * process in the current restore pass.
 	 */
 	AH->restorePass = RESTORE_PASS_MAIN;
-	move_to_ready_list(pending_list, &ready_list, AH->restorePass);
+	move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
 
 	/*
 	 * main parent loop
@@ -4054,7 +4037,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 	for (;;)
 	{
 		/* Look for an item ready to be dispatched to a worker */
-		next_work_item = pop_next_work_item(&ready_list, pstate);
+		next_work_item = pop_next_work_item(ready_heap, pstate);
 		if (next_work_item != NULL)
 		{
 			/* If not to be restored, don't waste time launching a worker */
@@ -4064,7 +4047,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 							next_work_item->dumpId,
 							next_work_item->desc, next_work_item->tag);
 				/* Update its dependencies as though we'd completed it */
-				reduce_dependencies(AH, next_work_item, &ready_list);
+				reduce_dependencies(AH, next_work_item, ready_heap);
 				/* Loop around to see if anything else can be dispatched */
 				continue;
 			}
@@ -4075,7 +4058,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 
 			/* Dispatch to some worker */
 			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
-								   mark_restore_job_done, &ready_list);
+								   mark_restore_job_done, ready_heap);
 		}
 		else if (IsEveryWorkerIdle(pstate))
 		{
@@ -4089,7 +4072,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 			/* Advance to next restore pass */
 			AH->restorePass++;
 			/* That probably allows some stuff to be made ready */
-			move_to_ready_list(pending_list, &ready_list, AH->restorePass);
+			move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
 			/* Loop around to see if anything's now ready */
 			continue;
 		}
@@ -4118,10 +4101,10 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 					   next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
 	}
 
-	/* There should now be nothing in ready_list. */
-	Assert(ready_list.first_te > ready_list.last_te);
+	/* There should now be nothing in ready_heap. */
+	Assert(binaryheap_empty(ready_heap));
 
-	ready_list_free(&ready_list);
+	binaryheap_free(ready_heap);
 
 	pg_log_info("finished main parallel loop");
 }
@@ -4221,80 +4204,9 @@ pending_list_remove(TocEntry *te)
 }
 
 
-/*
- * Initialize the ready_list with enough room for up to tocCount entries.
- */
-static void
-ready_list_init(ParallelReadyList *ready_list, int tocCount)
-{
-	ready_list->tes = (TocEntry **)
-		pg_malloc(tocCount * sizeof(TocEntry *));
-	ready_list->first_te = 0;
-	ready_list->last_te = -1;
-	ready_list->sorted = false;
-}
-
-/*
- * Free storage for a ready_list.
- */
-static void
-ready_list_free(ParallelReadyList *ready_list)
-{
-	pg_free(ready_list->tes);
-}
-
-/* Add te to the ready_list */
-static void
-ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
-{
-	ready_list->tes[++ready_list->last_te] = te;
-	/* List is (probably) not sorted anymore. */
-	ready_list->sorted = false;
-}
-
-/* Remove the i'th entry in the ready_list */
-static void
-ready_list_remove(ParallelReadyList *ready_list, int i)
-{
-	int			f = ready_list->first_te;
-
-	Assert(i >= f && i <= ready_list->last_te);
-
-	/*
-	 * In the typical case where the item to be removed is the first ready
-	 * entry, we need only increment first_te to remove it.  Otherwise, move
-	 * the entries before it to compact the list.  (This preserves sortedness,
-	 * if any.)  We could alternatively move the entries after i, but there
-	 * are typically many more of those.
-	 */
-	if (i > f)
-	{
-		TocEntry  **first_te_ptr = &ready_list->tes[f];
-
-		memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
-	}
-	ready_list->first_te++;
-}
-
-/* Sort the ready_list into the desired order */
-static void
-ready_list_sort(ParallelReadyList *ready_list)
-{
-	if (!ready_list->sorted)
-	{
-		int			n = ready_list->last_te - ready_list->first_te + 1;
-
-		if (n > 1)
-			qsort(ready_list->tes + ready_list->first_te, n,
-				  sizeof(TocEntry *),
-				  TocEntrySizeCompare);
-		ready_list->sorted = true;
-	}
-}
-
 /* qsort comparator for sorting TocEntries by dataLength */
 static int
-TocEntrySizeCompare(const void *p1, const void *p2)
+TocEntrySizeCompareQsort(const void *p1, const void *p2)
 {
 	const TocEntry *te1 = *(const TocEntry *const *) p1;
 	const TocEntry *te2 = *(const TocEntry *const *) p2;
@@ -4314,17 +4226,24 @@ TocEntrySizeCompare(const void *p1, const void *p2)
 	return 0;
 }
 
+/* binaryheap comparator for sorting TocEntries by dataLength */
+static int
+TocEntrySizeCompareBinaryheap(const void *p1, const void *p2, void *arg)
+{
+	return TocEntrySizeCompareQsort(p1, p2);
+}
+
 
 /*
- * Move all immediately-ready items from pending_list to ready_list.
+ * Move all immediately-ready items from pending_list to ready_heap.
  *
  * Items are considered ready if they have no remaining dependencies and
  * they belong in the current restore pass.  (See also reduce_dependencies,
  * which applies the same logic one-at-a-time.)
  */
 static void
-move_to_ready_list(TocEntry *pending_list,
-				   ParallelReadyList *ready_list,
+move_to_ready_heap(TocEntry *pending_list,
+				   binaryheap *ready_heap,
 				   RestorePass pass)
 {
 	TocEntry   *te;
@@ -4340,40 +4259,42 @@ move_to_ready_list(TocEntry *pending_list,
 		{
 			/* Remove it from pending_list ... */
 			pending_list_remove(te);
-			/* ... and add to ready_list */
-			ready_list_insert(ready_list, te);
+			/* ... and add to ready_heap */
+			binaryheap_add(ready_heap, &te);
 		}
 	}
 }
 
 /*
  * Find the next work item (if any) that is capable of being run now,
- * and remove it from the ready_list.
+ * and remove it from the ready_heap.
  *
  * Returns the item, or NULL if nothing is runnable.
  *
  * To qualify, the item must have no remaining dependencies
  * and no requirements for locks that are incompatible with
- * items currently running.  Items in the ready_list are known to have
+ * items currently running.  Items in the ready_heap are known to have
  * no remaining dependencies, but we have to check for lock conflicts.
  */
 static TocEntry *
-pop_next_work_item(ParallelReadyList *ready_list,
+pop_next_work_item(binaryheap *ready_heap,
 				   ParallelState *pstate)
 {
 	/*
-	 * Sort the ready_list so that we'll tackle larger jobs first.
-	 */
-	ready_list_sort(ready_list);
-
-	/*
-	 * Search the ready_list until we find a suitable item.
+	 * Search the ready_heap until we find a suitable item.  Note that we do a
+	 * sequential scan through the heap nodes, so even though we will first
+	 * try to choose the highest-priority item, we might end up picking
+	 * something with a much lower priority.  However, it is expected that we
+	 * will typically be able to pick one of the first few items, which should
+	 * usually have a relatively high priority.
 	 */
-	for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
+	for (int i = 0; i < binaryheap_size(ready_heap); i++)
 	{
-		TocEntry   *te = ready_list->tes[i];
+		TocEntry   *te;
 		bool		conflicts = false;
 
+		binaryheap_nth(ready_heap, i, &te);
+
 		/*
 		 * Check to see if the item would need exclusive lock on something
 		 * that a currently running item also needs lock on, or vice versa. If
@@ -4397,7 +4318,7 @@ pop_next_work_item(ParallelReadyList *ready_list,
 			continue;
 
 		/* passed all tests, so this item can run */
-		ready_list_remove(ready_list, i);
+		binaryheap_remove_nth(ready_heap, i);
 		return te;
 	}
 
@@ -4443,7 +4364,7 @@ mark_restore_job_done(ArchiveHandle *AH,
 					  int status,
 					  void *callback_data)
 {
-	ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;
+	binaryheap *ready_heap = (binaryheap *) callback_data;
 
 	pg_log_info("finished item %d %s %s",
 				te->dumpId, te->desc, te->tag);
@@ -4461,7 +4382,7 @@ mark_restore_job_done(ArchiveHandle *AH,
 		pg_fatal("worker process failed: exit code %d",
 				 status);
 
-	reduce_dependencies(AH, te, ready_list);
+	reduce_dependencies(AH, te, ready_heap);
 }
 
 
@@ -4704,11 +4625,11 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
 /*
  * Remove the specified TOC entry from the depCounts of items that depend on
  * it, thereby possibly making them ready-to-run.  Any pending item that
- * becomes ready should be moved to the ready_list, if that's provided.
+ * becomes ready should be moved to the ready_heap, if that's provided.
  */
 static void
 reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
-					ParallelReadyList *ready_list)
+					binaryheap *ready_heap)
 {
 	int			i;
 
@@ -4726,18 +4647,18 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
 		 * the current restore pass, and it is currently a member of the
 		 * pending list (that check is needed to prevent double restore in
 		 * some cases where a list-file forces out-of-order restoring).
-		 * However, if ready_list == NULL then caller doesn't want any list
+		 * However, if ready_heap == NULL then caller doesn't want any list
 		 * memberships changed.
 		 */
 		if (otherte->depCount == 0 &&
 			_tocEntryRestorePass(otherte) == AH->restorePass &&
 			otherte->pending_prev != NULL &&
-			ready_list != NULL)
+			ready_heap != NULL)
 		{
 			/* Remove it from pending list ... */
 			pending_list_remove(otherte);
-			/* ... and add to ready_list */
-			ready_list_insert(ready_list, otherte);
+			/* ... and add to ready_heap */
+			binaryheap_add(ready_heap, &otherte);
 		}
 	}
 }
-- 
2.25.1

>From 51ebcf055faadf2c174bbe533479843837c3945a Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Tue, 25 Jul 2023 11:18:52 -0700
Subject: [PATCH v7 5/5] Remove open-coded binary heap in pg_dump_sort.c.

Thanks to commit XXXXXXXXXX, binaryheap is available to frontend
code.  This commit replaces the open-coded heap implementation in
pg_dump_sort.c with a binaryheap, saving a few lines of code.
---
 src/bin/pg_dump/pg_dump_sort.c | 103 +++++++--------------------------
 1 file changed, 20 insertions(+), 83 deletions(-)

diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 523a19c155..c9b161ed30 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -16,6 +16,7 @@
 #include "postgres_fe.h"
 
 #include "catalog/pg_class_d.h"
+#include "lib/binaryheap.h"
 #include "pg_backup_archiver.h"
 #include "pg_backup_utils.h"
 #include "pg_dump.h"
@@ -161,8 +162,6 @@ static bool TopoSort(DumpableObject **objs,
 					 int numObjs,
 					 DumpableObject **ordering,
 					 int *nOrdering);
-static void addHeapElement(int val, int *heap, int heapLength);
-static int	removeHeapElement(int *heap, int heapLength);
 static void findDependencyLoops(DumpableObject **objs, int nObjs, int totObjs);
 static int	findLoop(DumpableObject *obj,
 					 DumpId startPoint,
@@ -174,6 +173,7 @@ static void repairDependencyLoop(DumpableObject **loop,
 								 int nLoop);
 static void describeDumpableObject(DumpableObject *obj,
 								   char *buf, int bufsize);
+static int	int_cmp(const void *a, const void *b, void *arg);
 
 
 /*
@@ -374,11 +374,10 @@ TopoSort(DumpableObject **objs,
 		 int *nOrdering)		/* output argument */
 {
 	DumpId		maxDumpId = getMaxDumpId();
-	int		   *pendingHeap;
+	binaryheap *pendingHeap;
 	int		   *beforeConstraints;
 	int		   *idMap;
 	DumpableObject *obj;
-	int			heapLength;
 	int			i,
 				j,
 				k;
@@ -403,7 +402,7 @@ TopoSort(DumpableObject **objs,
 		return true;
 
 	/* Create workspace for the above-described heap */
-	pendingHeap = (int *) pg_malloc(numObjs * sizeof(int));
+	pendingHeap = binaryheap_allocate(numObjs, sizeof(int), int_cmp, NULL);
 
 	/*
 	 * Scan the constraints, and for each item in the input, generate a count
@@ -434,19 +433,16 @@ TopoSort(DumpableObject **objs,
 	 * Now initialize the heap of items-ready-to-output by filling it with the
 	 * indexes of items that already have beforeConstraints[id] == 0.
 	 *
-	 * The essential property of a heap is heap[(j-1)/2] >= heap[j] for each j
-	 * in the range 1..heapLength-1 (note we are using 0-based subscripts
-	 * here, while the discussion in Knuth assumes 1-based subscripts). So, if
-	 * we simply enter the indexes into pendingHeap[] in decreasing order, we
-	 * a-fortiori have the heap invariant satisfied at completion of this
-	 * loop, and don't need to do any sift-up comparisons.
+	 * We enter the objects into pendingHeap in decreasing order so that the
+	 * heap invariant is satisfied at the completion of this loop.  This
+	 * reduces the amount of work that binaryheap_build() must do.
 	 */
-	heapLength = 0;
 	for (i = numObjs; --i >= 0;)
 	{
 		if (beforeConstraints[objs[i]->dumpId] == 0)
-			pendingHeap[heapLength++] = i;
+			binaryheap_add_unordered(pendingHeap, &i);
 	}
+	binaryheap_build(pendingHeap);
 
 	/*--------------------
 	 * Now emit objects, working backwards in the output list.  At each step,
@@ -464,10 +460,10 @@ TopoSort(DumpableObject **objs,
 	 *--------------------
 	 */
 	i = numObjs;
-	while (heapLength > 0)
+	while (!binaryheap_empty(pendingHeap))
 	{
 		/* Select object to output by removing largest heap member */
-		j = removeHeapElement(pendingHeap, heapLength--);
+		binaryheap_remove_first(pendingHeap, &j);
 		obj = objs[j];
 		/* Output candidate to ordering[] */
 		ordering[--i] = obj;
@@ -477,7 +473,7 @@ TopoSort(DumpableObject **objs,
 			int			id = obj->dependencies[k];
 
 			if ((--beforeConstraints[id]) == 0)
-				addHeapElement(idMap[id], pendingHeap, heapLength++);
+				binaryheap_add(pendingHeap, &idMap[id]);
 		}
 	}
 
@@ -497,79 +493,13 @@ TopoSort(DumpableObject **objs,
 	}
 
 	/* Done */
-	free(pendingHeap);
+	binaryheap_free(pendingHeap);
 	free(beforeConstraints);
 	free(idMap);
 
 	return (i == 0);
 }
 
-/*
- * Add an item to a heap (priority queue)
- *
- * heapLength is the current heap size; caller is responsible for increasing
- * its value after the call.  There must be sufficient storage at *heap.
- */
-static void
-addHeapElement(int val, int *heap, int heapLength)
-{
-	int			j;
-
-	/*
-	 * Sift-up the new entry, per Knuth 5.2.3 exercise 16. Note that Knuth is
-	 * using 1-based array indexes, not 0-based.
-	 */
-	j = heapLength;
-	while (j > 0)
-	{
-		int			i = (j - 1) >> 1;
-
-		if (val <= heap[i])
-			break;
-		heap[j] = heap[i];
-		j = i;
-	}
-	heap[j] = val;
-}
-
-/*
- * Remove the largest item present in a heap (priority queue)
- *
- * heapLength is the current heap size; caller is responsible for decreasing
- * its value after the call.
- *
- * We remove and return heap[0], which is always the largest element of
- * the heap, and then "sift up" to maintain the heap invariant.
- */
-static int
-removeHeapElement(int *heap, int heapLength)
-{
-	int			result = heap[0];
-	int			val;
-	int			i;
-
-	if (--heapLength <= 0)
-		return result;
-	val = heap[heapLength];		/* value that must be reinserted */
-	i = 0;						/* i is where the "hole" is */
-	for (;;)
-	{
-		int			j = 2 * i + 1;
-
-		if (j >= heapLength)
-			break;
-		if (j + 1 < heapLength &&
-			heap[j] < heap[j + 1])
-			j++;
-		if (val >= heap[j])
-			break;
-		heap[i] = heap[j];
-		i = j;
-	}
-	heap[i] = val;
-	return result;
-}
-
 /*
  * findDependencyLoops - identify loops in TopoSort's failure output,
  *		and pass each such loop to repairDependencyLoop() for action
@@ -1559,3 +1489,10 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
 			 (int) obj->objType,
 			 obj->dumpId, obj->catId.oid);
 }
+
+/* binaryheap comparator for integers */
+static int
+int_cmp(const void *a, const void *b, void *arg)
+{
+	return *((const int *) a) - *((const int *) b);
+}
-- 
2.25.1

Reply via email to