On Sat, Jul 22, 2023 at 07:47:50PM -0400, Tom Lane wrote:
> Nathan Bossart <nathandboss...@gmail.com> writes:
>> I first tried modifying
>> binaryheap to use "int" or "void *" instead, but that ended up requiring
>> some rather invasive changes in backend code, not to mention any extensions
>> that happen to be using it.

I followed through with the "void *" approach (attached), and it wasn't as
bad as I expected.

> I wonder whether we can't provide some alternate definition or "skin"
> for binaryheap that preserves the Datum API for backend code that wants
> that, while providing a void *-based API for frontend code to use.

I can give this a try next, but it might be rather #ifdef-heavy.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From b570460b19c5955c91b1fc28e8fdc791f41998e2 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Sat, 22 Jul 2023 20:59:46 -0700
Subject: [PATCH v4 1/4] use "void *" instead of "Datum" in binaryheap

---
 src/backend/executor/nodeGatherMerge.c        | 16 +++++-----
 src/backend/executor/nodeMergeAppend.c        | 16 +++++-----
 src/backend/lib/binaryheap.c                  | 20 ++++++------
 src/backend/postmaster/pgarch.c               | 18 +++++------
 .../replication/logical/reorderbuffer.c       | 31 +++++++++----------
 src/backend/storage/buffer/bufmgr.c           | 11 +++----
 src/include/lib/binaryheap.h                  | 14 ++++-----
 7 files changed, 61 insertions(+), 65 deletions(-)

diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 9d5e1a46e9..021682d87c 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -52,7 +52,7 @@ typedef struct GMReaderTupleBuffer
 } GMReaderTupleBuffer;
 
 static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
-static int32 heap_compare_slots(Datum a, Datum b, void *arg);
+static int32 heap_compare_slots(const void *a, const void *b, void *arg);
 static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
 static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
 									  bool nowait, bool *done);
@@ -489,7 +489,7 @@ reread:
 				/* Don't have a tuple yet, try to get one */
 				if (gather_merge_readnext(gm_state, i, nowait))
 					binaryheap_add_unordered(gm_state->gm_heap,
-											 Int32GetDatum(i));
+											 (void *) (intptr_t) i);
 			}
 			else
 			{
@@ -565,10 +565,10 @@ gather_merge_getnext(GatherMergeState *gm_state)
 		 * the heap, because it might now compare differently against the
 		 * other elements of the heap.
 		 */
-		i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
+		i = (intptr_t) binaryheap_first(gm_state->gm_heap);
 
 		if (gather_merge_readnext(gm_state, i, false))
-			binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i));
+			binaryheap_replace_first(gm_state->gm_heap, (void *) (intptr_t) i);
 		else
 		{
 			/* reader exhausted, remove it from heap */
@@ -585,7 +585,7 @@ gather_merge_getnext(GatherMergeState *gm_state)
 	else
 	{
 		/* Return next tuple from whichever participant has the leading one */
-		i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
+		i = (intptr_t) binaryheap_first(gm_state->gm_heap);
 		return gm_state->gm_slots[i];
 	}
 }
@@ -750,11 +750,11 @@ typedef int32 SlotNumber;
  * Compare the tuples in the two given slots.
  */
 static int32
-heap_compare_slots(Datum a, Datum b, void *arg)
+heap_compare_slots(const void *a, const void *b, void *arg)
 {
 	GatherMergeState *node = (GatherMergeState *) arg;
-	SlotNumber	slot1 = DatumGetInt32(a);
-	SlotNumber	slot2 = DatumGetInt32(b);
+	SlotNumber	slot1 = (intptr_t) a;
+	SlotNumber	slot2 = (intptr_t) b;
 
 	TupleTableSlot *s1 = node->gm_slots[slot1];
 	TupleTableSlot *s2 = node->gm_slots[slot2];
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index 21b5726e6e..b3e2c29401 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -52,7 +52,7 @@
 typedef int32 SlotNumber;
 
 static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
-static int	heap_compare_slots(Datum a, Datum b, void *arg);
+static int	heap_compare_slots(const void *a, const void *b, void *arg);
 
 
 /* ----------------------------------------------------------------
@@ -229,7 +229,7 @@ ExecMergeAppend(PlanState *pstate)
 		{
 			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
 			if (!TupIsNull(node->ms_slots[i]))
-				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
+				binaryheap_add_unordered(node->ms_heap, (void *) (intptr_t) i);
 		}
 		binaryheap_build(node->ms_heap);
 		node->ms_initialized = true;
@@ -244,10 +244,10 @@ ExecMergeAppend(PlanState *pstate)
 		 * by doing this before returning from the prior call, but it's better
 		 * to not pull tuples until necessary.)
 		 */
-		i = DatumGetInt32(binaryheap_first(node->ms_heap));
+		i = (intptr_t) binaryheap_first(node->ms_heap);
 		node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
 		if (!TupIsNull(node->ms_slots[i]))
-			binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
+			binaryheap_replace_first(node->ms_heap, (void *) (intptr_t) i);
 		else
 			(void) binaryheap_remove_first(node->ms_heap);
 	}
@@ -259,7 +259,7 @@ ExecMergeAppend(PlanState *pstate)
 	}
 	else
 	{
-		i = DatumGetInt32(binaryheap_first(node->ms_heap));
+		i = (intptr_t) binaryheap_first(node->ms_heap);
 		result = node->ms_slots[i];
 	}
 
@@ -270,11 +270,11 @@ ExecMergeAppend(PlanState *pstate)
  * Compare the tuples in the two given slots.
  */
 static int32
-heap_compare_slots(Datum a, Datum b, void *arg)
+heap_compare_slots(const void *a, const void *b, void *arg)
 {
 	MergeAppendState *node = (MergeAppendState *) arg;
-	SlotNumber	slot1 = DatumGetInt32(a);
-	SlotNumber	slot2 = DatumGetInt32(b);
+	SlotNumber	slot1 = (intptr_t) a;
+	SlotNumber	slot2 = (intptr_t) b;
 
 	TupleTableSlot *s1 = node->ms_slots[slot1];
 	TupleTableSlot *s2 = node->ms_slots[slot2];
diff --git a/src/backend/lib/binaryheap.c b/src/backend/lib/binaryheap.c
index 1737546757..c7fcfc550b 100644
--- a/src/backend/lib/binaryheap.c
+++ b/src/backend/lib/binaryheap.c
@@ -34,7 +34,7 @@ binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
 	int			sz;
 	binaryheap *heap;
 
-	sz = offsetof(binaryheap, bh_nodes) + sizeof(Datum) * capacity;
+	sz = offsetof(binaryheap, bh_nodes) + sizeof(void *) * capacity;
 	heap = (binaryheap *) palloc(sz);
 	heap->bh_space = capacity;
 	heap->bh_compare = compare;
@@ -106,7 +106,7 @@ parent_offset(int i)
  * afterwards.
  */
 void
-binaryheap_add_unordered(binaryheap *heap, Datum d)
+binaryheap_add_unordered(binaryheap *heap, void *d)
 {
 	if (heap->bh_size >= heap->bh_space)
 		elog(ERROR, "out of binary heap slots");
@@ -138,7 +138,7 @@ binaryheap_build(binaryheap *heap)
  * the heap property.
  */
 void
-binaryheap_add(binaryheap *heap, Datum d)
+binaryheap_add(binaryheap *heap, void *d)
 {
 	if (heap->bh_size >= heap->bh_space)
 		elog(ERROR, "out of binary heap slots");
@@ -154,7 +154,7 @@ binaryheap_add(binaryheap *heap, Datum d)
  * without modifying the heap. The caller must ensure that this
  * routine is not used on an empty heap. Always O(1).
  */
-Datum
+void *
 binaryheap_first(binaryheap *heap)
 {
 	Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
@@ -169,10 +169,10 @@ binaryheap_first(binaryheap *heap)
  * that this routine is not used on an empty heap. O(log n) worst
  * case.
  */
-Datum
+void *
 binaryheap_remove_first(binaryheap *heap)
 {
-	Datum		result;
+	void	   *result;
 
 	Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
 
@@ -204,7 +204,7 @@ binaryheap_remove_first(binaryheap *heap)
  * sifting the new node down.
  */
 void
-binaryheap_replace_first(binaryheap *heap, Datum d)
+binaryheap_replace_first(binaryheap *heap, void *d)
 {
 	Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
 
@@ -221,7 +221,7 @@ binaryheap_replace_first(binaryheap *heap, Datum d)
 static void
 sift_up(binaryheap *heap, int node_off)
 {
-	Datum		node_val = heap->bh_nodes[node_off];
+	void	   *node_val = heap->bh_nodes[node_off];
 
 	/*
 	 * Within the loop, the node_off'th array entry is a "hole" that
@@ -232,7 +232,7 @@ sift_up(binaryheap *heap, int node_off)
 	{
 		int			cmp;
 		int			parent_off;
-		Datum		parent_val;
+		void	   *parent_val;
 
 		/*
 		 * If this node is smaller than its parent, the heap condition is
@@ -264,7 +264,7 @@ sift_up(binaryheap *heap, int node_off)
 static void
 sift_down(binaryheap *heap, int node_off)
 {
-	Datum		node_val = heap->bh_nodes[node_off];
+	void	   *node_val = heap->bh_nodes[node_off];
 
 	/*
 	 * Within the loop, the node_off'th array entry is a "hole" that
diff --git a/src/backend/postmaster/pgarch.c b/src/backend/postmaster/pgarch.c
index 46af349564..9491b822db 100644
--- a/src/backend/postmaster/pgarch.c
+++ b/src/backend/postmaster/pgarch.c
@@ -145,7 +145,7 @@ static bool pgarch_readyXlog(char *xlog);
 static void pgarch_archiveDone(char *xlog);
 static void pgarch_die(int code, Datum arg);
 static void HandlePgArchInterrupts(void);
-static int	ready_file_comparator(Datum a, Datum b, void *arg);
+static int	ready_file_comparator(const void *a, const void *b, void *arg);
 static void LoadArchiveLibrary(void);
 static void pgarch_call_module_shutdown_cb(int code, Datum arg);
 
@@ -631,22 +631,22 @@ pgarch_readyXlog(char *xlog)
 			/* If the heap isn't full yet, quickly add it. */
 			arch_file = arch_files->arch_filenames[arch_files->arch_heap->bh_size];
 			strcpy(arch_file, basename);
-			binaryheap_add_unordered(arch_files->arch_heap, CStringGetDatum(arch_file));
+			binaryheap_add_unordered(arch_files->arch_heap, arch_file);
 
 			/* If we just filled the heap, make it a valid one. */
 			if (arch_files->arch_heap->bh_size == NUM_FILES_PER_DIRECTORY_SCAN)
 				binaryheap_build(arch_files->arch_heap);
 		}
 		else if (ready_file_comparator(binaryheap_first(arch_files->arch_heap),
-									   CStringGetDatum(basename), NULL) > 0)
+									   basename, NULL) > 0)
 		{
 			/*
 			 * Remove the lowest priority file and add the current one to the
 			 * heap.
 			 */
-			arch_file = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap));
+			arch_file = binaryheap_remove_first(arch_files->arch_heap);
 			strcpy(arch_file, basename);
-			binaryheap_add(arch_files->arch_heap, CStringGetDatum(arch_file));
+			binaryheap_add(arch_files->arch_heap, arch_file);
 		}
 	}
 	FreeDir(rldir);
@@ -668,7 +668,7 @@ pgarch_readyXlog(char *xlog)
 	 */
 	arch_files->arch_files_size = arch_files->arch_heap->bh_size;
 	for (int i = 0; i < arch_files->arch_files_size; i++)
-		arch_files->arch_files[i] = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap));
+		arch_files->arch_files[i] = binaryheap_remove_first(arch_files->arch_heap);
 
 	/* Return the highest priority file. */
 	arch_files->arch_files_size--;
@@ -686,10 +686,10 @@ pgarch_readyXlog(char *xlog)
  * If "a" and "b" have equivalent values, 0 will be returned.
  */
 static int
-ready_file_comparator(Datum a, Datum b, void *arg)
+ready_file_comparator(const void *a, const void *b, void *arg)
 {
-	char	   *a_str = DatumGetCString(a);
-	char	   *b_str = DatumGetCString(b);
+	const char *a_str = (const char *) a;
+	const char *b_str = (const char *) b;
 	bool		a_history = IsTLHistoryFileName(a_str);
 	bool		b_history = IsTLHistoryFileName(b_str);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 26d252bd87..191c0d1181 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1223,11 +1223,10 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
  * Binary heap comparison function.
  */
 static int
-ReorderBufferIterCompare(Datum a, Datum b, void *arg)
+ReorderBufferIterCompare(const void *a, const void *b, void *arg)
 {
-	ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
-	XLogRecPtr	pos_a = state->entries[DatumGetInt32(a)].lsn;
-	XLogRecPtr	pos_b = state->entries[DatumGetInt32(b)].lsn;
+	XLogRecPtr	pos_a = ((ReorderBufferIterTXNEntry *) a)->lsn;
+	XLogRecPtr	pos_b = ((ReorderBufferIterTXNEntry *) b)->lsn;
 
 	if (pos_a < pos_b)
 		return 1;
@@ -1298,7 +1297,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	/* allocate heap */
 	state->heap = binaryheap_allocate(state->nr_txns,
 									  ReorderBufferIterCompare,
-									  state);
+									  NULL);
 
 	/* Now that the state fields are initialized, it is safe to return it. */
 	*iter_state = state;
@@ -1330,7 +1329,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		state->entries[off].change = cur_change;
 		state->entries[off].txn = txn;
 
-		binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
+		binaryheap_add_unordered(state->heap, &state->entries[off++]);
 	}
 
 	/* add subtransactions if they contain changes */
@@ -1359,7 +1358,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			state->entries[off].change = cur_change;
 			state->entries[off].txn = cur_txn;
 
-			binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
+			binaryheap_add_unordered(state->heap, &state->entries[off++]);
 		}
 	}
 
@@ -1378,14 +1377,12 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 {
 	ReorderBufferChange *change;
 	ReorderBufferIterTXNEntry *entry;
-	int32		off;
 
 	/* nothing there anymore */
 	if (state->heap->bh_size == 0)
 		return NULL;
 
-	off = DatumGetInt32(binaryheap_first(state->heap));
-	entry = &state->entries[off];
+	entry = binaryheap_first(state->heap);
 
 	/* free memory we might have "leaked" in the previous *Next call */
 	if (!dlist_is_empty(&state->old_change))
@@ -1411,10 +1408,10 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 			dlist_container(ReorderBufferChange, node, next);
 
 		/* txn stays the same */
-		state->entries[off].lsn = next_change->lsn;
-		state->entries[off].change = next_change;
+		entry->lsn = next_change->lsn;
+		entry->change = next_change;
 
-		binaryheap_replace_first(state->heap, Int32GetDatum(off));
+		binaryheap_replace_first(state->heap, entry);
 		return change;
 	}
 
@@ -1435,7 +1432,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		 */
 		rb->totalBytes += entry->txn->size;
 		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
-										&state->entries[off].segno))
+										&entry->segno))
 		{
 			/* successfully restored changes from disk */
 			ReorderBufferChange *next_change =
@@ -1448,9 +1445,9 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
 			Assert(entry->txn->nentries_mem);
 			/* txn stays the same */
-			state->entries[off].lsn = next_change->lsn;
-			state->entries[off].change = next_change;
-			binaryheap_replace_first(state->heap, Int32GetDatum(off));
+			entry->lsn = next_change->lsn;
+			entry->change = next_change;
+			binaryheap_replace_first(state->heap, entry);
 
 			return change;
 		}
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index a7e3b9bb1d..391d5f3dd8 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -501,7 +501,7 @@ static void CheckForBufferLeaks(void);
 static int	rlocator_comparator(const void *p1, const void *p2);
 static inline int buffertag_comparator(const BufferTag *ba, const BufferTag *bb);
 static inline int ckpt_buforder_comparator(const CkptSortItem *a, const CkptSortItem *b);
-static int	ts_ckpt_progress_comparator(Datum a, Datum b, void *arg);
+static int	ts_ckpt_progress_comparator(const void *a, const void *b, void *arg);
 
 
 /*
@@ -2649,7 +2649,7 @@ BufferSync(int flags)
 
 		ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan;
 
-		binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat));
+		binaryheap_add_unordered(ts_heap, ts_stat);
 	}
 
 	binaryheap_build(ts_heap);
@@ -2665,8 +2665,7 @@ BufferSync(int flags)
 	while (!binaryheap_empty(ts_heap))
 	{
 		BufferDesc *bufHdr = NULL;
-		CkptTsStatus *ts_stat = (CkptTsStatus *)
-			DatumGetPointer(binaryheap_first(ts_heap));
+		CkptTsStatus *ts_stat = (CkptTsStatus *) binaryheap_first(ts_heap);
 
 		buf_id = CkptBufferIds[ts_stat->index].buf_id;
 		Assert(buf_id != -1);
@@ -2713,7 +2712,7 @@ BufferSync(int flags)
 		else
 		{
 			/* update heap with the new progress */
-			binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat));
+			binaryheap_replace_first(ts_heap, ts_stat);
 		}
 
 		/*
@@ -5416,7 +5415,7 @@ ckpt_buforder_comparator(const CkptSortItem *a, const CkptSortItem *b)
  * progress.
  */
 static int
-ts_ckpt_progress_comparator(Datum a, Datum b, void *arg)
+ts_ckpt_progress_comparator(const void *a, const void *b, void *arg)
 {
 	CkptTsStatus *sa = (CkptTsStatus *) a;
 	CkptTsStatus *sb = (CkptTsStatus *) b;
diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h
index 52f7b06b25..71a25db0f4 100644
--- a/src/include/lib/binaryheap.h
+++ b/src/include/lib/binaryheap.h
@@ -15,7 +15,7 @@
  * For a max-heap, the comparator must return <0 iff a < b, 0 iff a == b,
  * and >0 iff a > b.  For a min-heap, the conditions are reversed.
  */
-typedef int (*binaryheap_comparator) (Datum a, Datum b, void *arg);
+typedef int (*binaryheap_comparator) (const void *a, const void *b, void *arg);
 
 /*
  * binaryheap
@@ -34,7 +34,7 @@ typedef struct binaryheap
 	bool		bh_has_heap_property;	/* debugging cross-check */
 	binaryheap_comparator bh_compare;
 	void	   *bh_arg;
-	Datum		bh_nodes[FLEXIBLE_ARRAY_MEMBER];
+	void	   *bh_nodes[FLEXIBLE_ARRAY_MEMBER];
 } binaryheap;
 
 extern binaryheap *binaryheap_allocate(int capacity,
@@ -42,12 +42,12 @@ extern binaryheap *binaryheap_allocate(int capacity,
 									   void *arg);
 extern void binaryheap_reset(binaryheap *heap);
 extern void binaryheap_free(binaryheap *heap);
-extern void binaryheap_add_unordered(binaryheap *heap, Datum d);
+extern void binaryheap_add_unordered(binaryheap *heap, void *d);
 extern void binaryheap_build(binaryheap *heap);
-extern void binaryheap_add(binaryheap *heap, Datum d);
-extern Datum binaryheap_first(binaryheap *heap);
-extern Datum binaryheap_remove_first(binaryheap *heap);
-extern void binaryheap_replace_first(binaryheap *heap, Datum d);
+extern void binaryheap_add(binaryheap *heap, void *d);
+extern void *binaryheap_first(binaryheap *heap);
+extern void *binaryheap_remove_first(binaryheap *heap);
+extern void binaryheap_replace_first(binaryheap *heap, void *d);
 
 #define binaryheap_empty(h)			((h)->bh_size == 0)
 
-- 
2.25.1

>From 4fb71fc94af597b9ec9b5b233fedc7bb9ca5f05c Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Sat, 22 Jul 2023 15:04:45 -0700
Subject: [PATCH v4 2/4] make binaryheap available to frontend

---
 src/backend/lib/Makefile                 |  1 -
 src/backend/lib/meson.build              |  1 -
 src/common/Makefile                      |  1 +
 src/{backend/lib => common}/binaryheap.c | 17 ++++++++++++++++-
 src/common/meson.build                   |  1 +
 5 files changed, 18 insertions(+), 3 deletions(-)
 rename src/{backend/lib => common}/binaryheap.c (96%)

diff --git a/src/backend/lib/Makefile b/src/backend/lib/Makefile
index 9dad31398a..b6cefd9cca 100644
--- a/src/backend/lib/Makefile
+++ b/src/backend/lib/Makefile
@@ -13,7 +13,6 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = \
-	binaryheap.o \
 	bipartite_match.o \
 	bloomfilter.o \
 	dshash.o \
diff --git a/src/backend/lib/meson.build b/src/backend/lib/meson.build
index 974cab8776..b4e88f54ae 100644
--- a/src/backend/lib/meson.build
+++ b/src/backend/lib/meson.build
@@ -1,7 +1,6 @@
 # Copyright (c) 2022-2023, PostgreSQL Global Development Group
 
 backend_sources += files(
-  'binaryheap.c',
   'bipartite_match.c',
   'bloomfilter.c',
   'dshash.c',
diff --git a/src/common/Makefile b/src/common/Makefile
index 113029bf7b..cc5c54dcee 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -48,6 +48,7 @@ LIBS += $(PTHREAD_LIBS)
 OBJS_COMMON = \
 	archive.o \
 	base64.o \
+	binaryheap.o \
 	checksum_helper.o \
 	compression.o \
 	config_info.o \
diff --git a/src/backend/lib/binaryheap.c b/src/common/binaryheap.c
similarity index 96%
rename from src/backend/lib/binaryheap.c
rename to src/common/binaryheap.c
index c7fcfc550b..400a730c85 100644
--- a/src/backend/lib/binaryheap.c
+++ b/src/common/binaryheap.c
@@ -6,15 +6,22 @@
  * Portions Copyright (c) 2012-2023, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
- *	  src/backend/lib/binaryheap.c
+ *	  src/common/binaryheap.c
  *
  *-------------------------------------------------------------------------
  */
 
+#ifdef FRONTEND
+#include "postgres_fe.h"
+#else
 #include "postgres.h"
+#endif
 
 #include <math.h>
 
+#ifdef FRONTEND
+#include "common/logging.h"
+#endif
 #include "lib/binaryheap.h"
 
 static void sift_down(binaryheap *heap, int node_off);
@@ -109,7 +116,11 @@ void
 binaryheap_add_unordered(binaryheap *heap, void *d)
 {
 	if (heap->bh_size >= heap->bh_space)
+#ifdef FRONTEND
+		pg_fatal("out of binary heap slots");
+#else
 		elog(ERROR, "out of binary heap slots");
+#endif
 	heap->bh_has_heap_property = false;
 	heap->bh_nodes[heap->bh_size] = d;
 	heap->bh_size++;
@@ -141,7 +152,11 @@ void
 binaryheap_add(binaryheap *heap, void *d)
 {
 	if (heap->bh_size >= heap->bh_space)
+#ifdef FRONTEND
+		pg_fatal("out of binary heap slots");
+#else
 		elog(ERROR, "out of binary heap slots");
+#endif
 	heap->bh_nodes[heap->bh_size] = d;
 	heap->bh_size++;
 	sift_up(heap, heap->bh_size - 1);
diff --git a/src/common/meson.build b/src/common/meson.build
index 53942a9a61..3b97497d1a 100644
--- a/src/common/meson.build
+++ b/src/common/meson.build
@@ -3,6 +3,7 @@
 common_sources = files(
   'archive.c',
   'base64.c',
+  'binaryheap.c',
   'checksum_helper.c',
   'compression.c',
   'controldata_utils.c',
-- 
2.25.1

>From b28739db1e5ef373110d3d76e250bf285ad856a4 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Thu, 20 Jul 2023 09:52:20 -0700
Subject: [PATCH v4 3/4] expand binaryheap api

---
 src/common/binaryheap.c      | 29 +++++++++++++++++++++++++++++
 src/include/lib/binaryheap.h |  3 +++
 2 files changed, 32 insertions(+)

diff --git a/src/common/binaryheap.c b/src/common/binaryheap.c
index 400a730c85..aaf529d6a9 100644
--- a/src/common/binaryheap.c
+++ b/src/common/binaryheap.c
@@ -211,6 +211,35 @@ binaryheap_remove_first(binaryheap *heap)
 	return result;
 }
 
+/*
+ * binaryheap_remove_node
+ *
+ * Removes the nth node from the heap.  The caller must ensure that there are
+ * at least (n - 1) nodes in the heap.  O(log n) worst case.
+ */
+void
+binaryheap_remove_node(binaryheap *heap, int n)
+{
+	int			cmp;
+
+	Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
+	Assert(n >= 0 && n < heap->bh_size);
+
+	/* compare last node to the one that is being removed */
+	cmp = heap->bh_compare(heap->bh_nodes[--heap->bh_size],
+						   heap->bh_nodes[n],
+						   heap->bh_arg);
+
+	/* remove the last node, placing it in the vacated entry */
+	heap->bh_nodes[n] = heap->bh_nodes[heap->bh_size];
+
+	/* sift as needed to preserve the heap property */
+	if (cmp > 0)
+		sift_up(heap, n);
+	else if (cmp < 0)
+		sift_down(heap, n);
+}
+
 /*
  * binaryheap_replace_first
  *
diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h
index 71a25db0f4..06fe2ff213 100644
--- a/src/include/lib/binaryheap.h
+++ b/src/include/lib/binaryheap.h
@@ -47,8 +47,11 @@ extern void binaryheap_build(binaryheap *heap);
 extern void binaryheap_add(binaryheap *heap, void *d);
 extern void *binaryheap_first(binaryheap *heap);
 extern void *binaryheap_remove_first(binaryheap *heap);
+extern void binaryheap_remove_node(binaryheap *heap, int n);
 extern void binaryheap_replace_first(binaryheap *heap, void *d);
 
 #define binaryheap_empty(h)			((h)->bh_size == 0)
+#define binaryheap_size(h)			((h)->bh_size)
+#define binaryheap_get_node(h, n)	((h)->bh_nodes[n])
 
 #endif							/* BINARYHEAP_H */
-- 
2.25.1

>From 08b6f280ad46e159ca330109c82ebb41c5508e4b Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Thu, 20 Jul 2023 10:19:08 -0700
Subject: [PATCH v4 4/4] use priority queue for pg_restore ready_list

---
 src/bin/pg_dump/pg_backup_archiver.c | 193 ++++++++-------------------
 1 file changed, 53 insertions(+), 140 deletions(-)

diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 39ebcfec32..d05266b341 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -34,6 +34,7 @@
 #include "compress_io.h"
 #include "dumputils.h"
 #include "fe_utils/string_utils.h"
+#include "lib/binaryheap.h"
 #include "lib/stringinfo.h"
 #include "libpq/libpq-fs.h"
 #include "parallel.h"
@@ -44,24 +45,6 @@
 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
 
-/*
- * State for tracking TocEntrys that are ready to process during a parallel
- * restore.  (This used to be a list, and we still call it that, though now
- * it's really an array so that we can apply qsort to it.)
- *
- * tes[] is sized large enough that we can't overrun it.
- * The valid entries are indexed first_te .. last_te inclusive.
- * We periodically sort the array to bring larger-by-dataLength entries to
- * the front; "sorted" is true if the valid entries are known sorted.
- */
-typedef struct _parallelReadyList
-{
-	TocEntry  **tes;			/* Ready-to-dump TocEntrys */
-	int			first_te;		/* index of first valid entry in tes[] */
-	int			last_te;		/* index of last valid entry in tes[] */
-	bool		sorted;			/* are valid entries currently sorted? */
-} ParallelReadyList;
-
 
 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
 							   const pg_compress_specification compression_spec,
@@ -110,16 +93,13 @@ static void restore_toc_entries_postfork(ArchiveHandle *AH,
 static void pending_list_header_init(TocEntry *l);
 static void pending_list_append(TocEntry *l, TocEntry *te);
 static void pending_list_remove(TocEntry *te);
-static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
-static void ready_list_free(ParallelReadyList *ready_list);
-static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
-static void ready_list_remove(ParallelReadyList *ready_list, int i);
-static void ready_list_sort(ParallelReadyList *ready_list);
-static int	TocEntrySizeCompare(const void *p1, const void *p2);
-static void move_to_ready_list(TocEntry *pending_list,
-							   ParallelReadyList *ready_list,
+static int	TocEntrySizeCompareQsort(const void *p1, const void *p2);
+static int	TocEntrySizeCompareBinaryheap(const void *p1, const void *p2,
+										  void *arg);
+static void move_to_ready_heap(TocEntry *pending_list,
+							   binaryheap *ready_heap,
 							   RestorePass pass);
-static TocEntry *pop_next_work_item(ParallelReadyList *ready_list,
+static TocEntry *pop_next_work_item(binaryheap *ready_heap,
 									ParallelState *pstate);
 static void mark_dump_job_done(ArchiveHandle *AH,
 							   TocEntry *te,
@@ -134,7 +114,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
 static void repoint_table_dependencies(ArchiveHandle *AH);
 static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
-								ParallelReadyList *ready_list);
+								binaryheap *ready_heap);
 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
 
@@ -2380,7 +2360,7 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
 		}
 
 		if (ntes > 1)
-			qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompare);
+			qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort);
 
 		for (int i = 0; i < ntes; i++)
 			DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
@@ -3980,7 +3960,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
 
 			(void) restore_toc_entry(AH, next_work_item, false);
 
-			/* Reduce dependencies, but don't move anything to ready_list */
+			/* Reduce dependencies, but don't move anything to ready_heap */
 			reduce_dependencies(AH, next_work_item, NULL);
 		}
 		else
@@ -4023,24 +4003,26 @@ static void
 restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 							 TocEntry *pending_list)
 {
-	ParallelReadyList ready_list;
+	binaryheap *ready_heap;
 	TocEntry   *next_work_item;
 
 	pg_log_debug("entering restore_toc_entries_parallel");
 
-	/* Set up ready_list with enough room for all known TocEntrys */
-	ready_list_init(&ready_list, AH->tocCount);
+	/* Set up ready_heap with enough room for all known TocEntrys */
+	ready_heap = binaryheap_allocate(AH->tocCount,
+									 TocEntrySizeCompareBinaryheap,
+									 NULL);
 
 	/*
 	 * The pending_list contains all items that we need to restore.  Move all
-	 * items that are available to process immediately into the ready_list.
+	 * items that are available to process immediately into the ready_heap.
 	 * After this setup, the pending list is everything that needs to be done
-	 * but is blocked by one or more dependencies, while the ready list
+	 * but is blocked by one or more dependencies, while the ready heap
 	 * contains items that have no remaining dependencies and are OK to
 	 * process in the current restore pass.
 	 */
 	AH->restorePass = RESTORE_PASS_MAIN;
-	move_to_ready_list(pending_list, &ready_list, AH->restorePass);
+	move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
 
 	/*
 	 * main parent loop
@@ -4054,7 +4036,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 	for (;;)
 	{
 		/* Look for an item ready to be dispatched to a worker */
-		next_work_item = pop_next_work_item(&ready_list, pstate);
+		next_work_item = pop_next_work_item(ready_heap, pstate);
 		if (next_work_item != NULL)
 		{
 			/* If not to be restored, don't waste time launching a worker */
@@ -4064,7 +4046,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 							next_work_item->dumpId,
 							next_work_item->desc, next_work_item->tag);
 				/* Update its dependencies as though we'd completed it */
-				reduce_dependencies(AH, next_work_item, &ready_list);
+				reduce_dependencies(AH, next_work_item, ready_heap);
 				/* Loop around to see if anything else can be dispatched */
 				continue;
 			}
@@ -4075,7 +4057,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 
 			/* Dispatch to some worker */
 			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
-								   mark_restore_job_done, &ready_list);
+								   mark_restore_job_done, ready_heap);
 		}
 		else if (IsEveryWorkerIdle(pstate))
 		{
@@ -4089,7 +4071,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 			/* Advance to next restore pass */
 			AH->restorePass++;
 			/* That probably allows some stuff to be made ready */
-			move_to_ready_list(pending_list, &ready_list, AH->restorePass);
+			move_to_ready_heap(pending_list, ready_heap, AH->restorePass);
 			/* Loop around to see if anything's now ready */
 			continue;
 		}
@@ -4118,10 +4100,10 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 					   next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
 	}
 
-	/* There should now be nothing in ready_list. */
-	Assert(ready_list.first_te > ready_list.last_te);
+	/* There should now be nothing in ready_heap. */
+	Assert(binaryheap_empty(ready_heap));
 
-	ready_list_free(&ready_list);
+	binaryheap_free(ready_heap);
 
 	pg_log_info("finished main parallel loop");
 }
@@ -4221,80 +4203,9 @@ pending_list_remove(TocEntry *te)
 }
 
 
-/*
- * Initialize the ready_list with enough room for up to tocCount entries.
- */
-static void
-ready_list_init(ParallelReadyList *ready_list, int tocCount)
-{
-	ready_list->tes = (TocEntry **)
-		pg_malloc(tocCount * sizeof(TocEntry *));
-	ready_list->first_te = 0;
-	ready_list->last_te = -1;
-	ready_list->sorted = false;
-}
-
-/*
- * Free storage for a ready_list.
- */
-static void
-ready_list_free(ParallelReadyList *ready_list)
-{
-	pg_free(ready_list->tes);
-}
-
-/* Add te to the ready_list */
-static void
-ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
-{
-	ready_list->tes[++ready_list->last_te] = te;
-	/* List is (probably) not sorted anymore. */
-	ready_list->sorted = false;
-}
-
-/* Remove the i'th entry in the ready_list */
-static void
-ready_list_remove(ParallelReadyList *ready_list, int i)
-{
-	int			f = ready_list->first_te;
-
-	Assert(i >= f && i <= ready_list->last_te);
-
-	/*
-	 * In the typical case where the item to be removed is the first ready
-	 * entry, we need only increment first_te to remove it.  Otherwise, move
-	 * the entries before it to compact the list.  (This preserves sortedness,
-	 * if any.)  We could alternatively move the entries after i, but there
-	 * are typically many more of those.
-	 */
-	if (i > f)
-	{
-		TocEntry  **first_te_ptr = &ready_list->tes[f];
-
-		memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
-	}
-	ready_list->first_te++;
-}
-
-/* Sort the ready_list into the desired order */
-static void
-ready_list_sort(ParallelReadyList *ready_list)
-{
-	if (!ready_list->sorted)
-	{
-		int			n = ready_list->last_te - ready_list->first_te + 1;
-
-		if (n > 1)
-			qsort(ready_list->tes + ready_list->first_te, n,
-				  sizeof(TocEntry *),
-				  TocEntrySizeCompare);
-		ready_list->sorted = true;
-	}
-}
-
 /* qsort comparator for sorting TocEntries by dataLength */
 static int
-TocEntrySizeCompare(const void *p1, const void *p2)
+TocEntrySizeCompareQsort(const void *p1, const void *p2)
 {
 	const TocEntry *te1 = *(const TocEntry *const *) p1;
 	const TocEntry *te2 = *(const TocEntry *const *) p2;
@@ -4314,17 +4225,24 @@ TocEntrySizeCompare(const void *p1, const void *p2)
 	return 0;
 }
 
+/* binaryheap comparator for sorting TocEntries by dataLength */
+static int
+TocEntrySizeCompareBinaryheap(const void *p1, const void *p2, void *arg)
+{
+	return TocEntrySizeCompareQsort(p1, p2);
+}
+
 
 /*
- * Move all immediately-ready items from pending_list to ready_list.
+ * Move all immediately-ready items from pending_list to ready_heap.
  *
  * Items are considered ready if they have no remaining dependencies and
  * they belong in the current restore pass.  (See also reduce_dependencies,
  * which applies the same logic one-at-a-time.)
  */
 static void
-move_to_ready_list(TocEntry *pending_list,
-				   ParallelReadyList *ready_list,
+move_to_ready_heap(TocEntry *pending_list,
+				   binaryheap *ready_heap,
 				   RestorePass pass)
 {
 	TocEntry   *te;
@@ -4340,38 +4258,33 @@ move_to_ready_list(TocEntry *pending_list,
 		{
 			/* Remove it from pending_list ... */
 			pending_list_remove(te);
-			/* ... and add to ready_list */
-			ready_list_insert(ready_list, te);
+			/* ... and add to ready_heap */
+			binaryheap_add(ready_heap, te);
 		}
 	}
 }
 
 /*
  * Find the next work item (if any) that is capable of being run now,
- * and remove it from the ready_list.
+ * and remove it from the ready_heap.
  *
  * Returns the item, or NULL if nothing is runnable.
  *
  * To qualify, the item must have no remaining dependencies
  * and no requirements for locks that are incompatible with
- * items currently running.  Items in the ready_list are known to have
+ * items currently running.  Items in the ready_heap are known to have
  * no remaining dependencies, but we have to check for lock conflicts.
  */
 static TocEntry *
-pop_next_work_item(ParallelReadyList *ready_list,
+pop_next_work_item(binaryheap *ready_heap,
 				   ParallelState *pstate)
 {
 	/*
-	 * Sort the ready_list so that we'll tackle larger jobs first.
-	 */
-	ready_list_sort(ready_list);
-
-	/*
-	 * Search the ready_list until we find a suitable item.
+	 * Search the ready_heap until we find a suitable item.
 	 */
-	for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
+	for (int i = 0; i < binaryheap_size(ready_heap); i++)
 	{
-		TocEntry   *te = ready_list->tes[i];
+		TocEntry   *te = (TocEntry *) binaryheap_get_node(ready_heap, i);
 		bool		conflicts = false;
 
 		/*
@@ -4397,7 +4310,7 @@ pop_next_work_item(ParallelReadyList *ready_list,
 			continue;
 
 		/* passed all tests, so this item can run */
-		ready_list_remove(ready_list, i);
+		binaryheap_remove_node(ready_heap, i);
 		return te;
 	}
 
@@ -4443,7 +4356,7 @@ mark_restore_job_done(ArchiveHandle *AH,
 					  int status,
 					  void *callback_data)
 {
-	ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;
+	binaryheap *ready_heap = (binaryheap *) callback_data;
 
 	pg_log_info("finished item %d %s %s",
 				te->dumpId, te->desc, te->tag);
@@ -4461,7 +4374,7 @@ mark_restore_job_done(ArchiveHandle *AH,
 		pg_fatal("worker process failed: exit code %d",
 				 status);
 
-	reduce_dependencies(AH, te, ready_list);
+	reduce_dependencies(AH, te, ready_heap);
 }
 
 
@@ -4704,11 +4617,11 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
 /*
  * Remove the specified TOC entry from the depCounts of items that depend on
  * it, thereby possibly making them ready-to-run.  Any pending item that
- * becomes ready should be moved to the ready_list, if that's provided.
+ * becomes ready should be moved to the ready_heap, if that's provided.
  */
 static void
 reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
-					ParallelReadyList *ready_list)
+					binaryheap *ready_heap)
 {
 	int			i;
 
@@ -4726,18 +4639,18 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
 		 * the current restore pass, and it is currently a member of the
 		 * pending list (that check is needed to prevent double restore in
 		 * some cases where a list-file forces out-of-order restoring).
-		 * However, if ready_list == NULL then caller doesn't want any list
+		 * However, if ready_heap == NULL then caller doesn't want any list
 		 * memberships changed.
 		 */
 		if (otherte->depCount == 0 &&
 			_tocEntryRestorePass(otherte) == AH->restorePass &&
 			otherte->pending_prev != NULL &&
-			ready_list != NULL)
+			ready_heap != NULL)
 		{
 			/* Remove it from pending list ... */
 			pending_list_remove(otherte);
-			/* ... and add to ready_list */
-			ready_list_insert(ready_list, otherte);
+			/* ... and add to ready_heap */
+			binaryheap_add(ready_heap, otherte);
 		}
 	}
 }
-- 
2.25.1

Reply via email to