On Sat, Jul 22, 2023 at 07:47:50PM -0400, Tom Lane wrote: > Nathan Bossart <nathandboss...@gmail.com> writes: >> I first tried modifying >> binaryheap to use "int" or "void *" instead, but that ended up requiring >> some rather invasive changes in backend code, not to mention any extensions >> that happen to be using it.
I followed through with the "void *" approach (attached), and it wasn't as bad as I expected. > I wonder whether we can't provide some alternate definition or "skin" > for binaryheap that preserves the Datum API for backend code that wants > that, while providing a void *-based API for frontend code to use. I can give this a try next, but it might be rather #ifdef-heavy. -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From b570460b19c5955c91b1fc28e8fdc791f41998e2 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Sat, 22 Jul 2023 20:59:46 -0700 Subject: [PATCH v4 1/4] use "void *" instead of "Datum" in binaryheap --- src/backend/executor/nodeGatherMerge.c | 16 +++++----- src/backend/executor/nodeMergeAppend.c | 16 +++++----- src/backend/lib/binaryheap.c | 20 ++++++------ src/backend/postmaster/pgarch.c | 18 +++++------ .../replication/logical/reorderbuffer.c | 31 +++++++++---------- src/backend/storage/buffer/bufmgr.c | 11 +++---- src/include/lib/binaryheap.h | 14 ++++----- 7 files changed, 61 insertions(+), 65 deletions(-) diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 9d5e1a46e9..021682d87c 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -52,7 +52,7 @@ typedef struct GMReaderTupleBuffer } GMReaderTupleBuffer; static TupleTableSlot *ExecGatherMerge(PlanState *pstate); -static int32 heap_compare_slots(Datum a, Datum b, void *arg); +static int32 heap_compare_slots(const void *a, const void *b, void *arg); static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state); static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, bool *done); @@ -489,7 +489,7 @@ reread: /* Don't have a tuple yet, try to get one */ if (gather_merge_readnext(gm_state, i, nowait)) binaryheap_add_unordered(gm_state->gm_heap, - Int32GetDatum(i)); + (void *) (intptr_t) i); } else { @@ -565,10 +565,10 @@ gather_merge_getnext(GatherMergeState *gm_state) * the heap, because it might now compare differently against the * other elements of the heap. */ - i = DatumGetInt32(binaryheap_first(gm_state->gm_heap)); + i = (intptr_t) binaryheap_first(gm_state->gm_heap); if (gather_merge_readnext(gm_state, i, false)) - binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i)); + binaryheap_replace_first(gm_state->gm_heap, (void *) (intptr_t) i); else { /* reader exhausted, remove it from heap */ @@ -585,7 +585,7 @@ gather_merge_getnext(GatherMergeState *gm_state) else { /* Return next tuple from whichever participant has the leading one */ - i = DatumGetInt32(binaryheap_first(gm_state->gm_heap)); + i = (intptr_t) binaryheap_first(gm_state->gm_heap); return gm_state->gm_slots[i]; } } @@ -750,11 +750,11 @@ typedef int32 SlotNumber; * Compare the tuples in the two given slots. */ static int32 -heap_compare_slots(Datum a, Datum b, void *arg) +heap_compare_slots(const void *a, const void *b, void *arg) { GatherMergeState *node = (GatherMergeState *) arg; - SlotNumber slot1 = DatumGetInt32(a); - SlotNumber slot2 = DatumGetInt32(b); + SlotNumber slot1 = (intptr_t) a; + SlotNumber slot2 = (intptr_t) b; TupleTableSlot *s1 = node->gm_slots[slot1]; TupleTableSlot *s2 = node->gm_slots[slot2]; diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c index 21b5726e6e..b3e2c29401 100644 --- a/src/backend/executor/nodeMergeAppend.c +++ b/src/backend/executor/nodeMergeAppend.c @@ -52,7 +52,7 @@ typedef int32 SlotNumber; static TupleTableSlot *ExecMergeAppend(PlanState *pstate); -static int heap_compare_slots(Datum a, Datum b, void *arg); +static int heap_compare_slots(const void *a, const void *b, void *arg); /* ---------------------------------------------------------------- @@ -229,7 +229,7 @@ ExecMergeAppend(PlanState *pstate) { node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); if (!TupIsNull(node->ms_slots[i])) - binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i)); + binaryheap_add_unordered(node->ms_heap, (void *) (intptr_t) i); } binaryheap_build(node->ms_heap); node->ms_initialized = true; @@ -244,10 +244,10 @@ ExecMergeAppend(PlanState *pstate) * by doing this before returning from the prior call, but it's better * to not pull tuples until necessary.) */ - i = DatumGetInt32(binaryheap_first(node->ms_heap)); + i = (intptr_t) binaryheap_first(node->ms_heap); node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); if (!TupIsNull(node->ms_slots[i])) - binaryheap_replace_first(node->ms_heap, Int32GetDatum(i)); + binaryheap_replace_first(node->ms_heap, (void *) (intptr_t) i); else (void) binaryheap_remove_first(node->ms_heap); } @@ -259,7 +259,7 @@ ExecMergeAppend(PlanState *pstate) } else { - i = DatumGetInt32(binaryheap_first(node->ms_heap)); + i = (intptr_t) binaryheap_first(node->ms_heap); result = node->ms_slots[i]; } @@ -270,11 +270,11 @@ ExecMergeAppend(PlanState *pstate) * Compare the tuples in the two given slots. */ static int32 -heap_compare_slots(Datum a, Datum b, void *arg) +heap_compare_slots(const void *a, const void *b, void *arg) { MergeAppendState *node = (MergeAppendState *) arg; - SlotNumber slot1 = DatumGetInt32(a); - SlotNumber slot2 = DatumGetInt32(b); + SlotNumber slot1 = (intptr_t) a; + SlotNumber slot2 = (intptr_t) b; TupleTableSlot *s1 = node->ms_slots[slot1]; TupleTableSlot *s2 = node->ms_slots[slot2]; diff --git a/src/backend/lib/binaryheap.c b/src/backend/lib/binaryheap.c index 1737546757..c7fcfc550b 100644 --- a/src/backend/lib/binaryheap.c +++ b/src/backend/lib/binaryheap.c @@ -34,7 +34,7 @@ binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg) int sz; binaryheap *heap; - sz = offsetof(binaryheap, bh_nodes) + sizeof(Datum) * capacity; + sz = offsetof(binaryheap, bh_nodes) + sizeof(void *) * capacity; heap = (binaryheap *) palloc(sz); heap->bh_space = capacity; heap->bh_compare = compare; @@ -106,7 +106,7 @@ parent_offset(int i) * afterwards. */ void -binaryheap_add_unordered(binaryheap *heap, Datum d) +binaryheap_add_unordered(binaryheap *heap, void *d) { if (heap->bh_size >= heap->bh_space) elog(ERROR, "out of binary heap slots"); @@ -138,7 +138,7 @@ binaryheap_build(binaryheap *heap) * the heap property. */ void -binaryheap_add(binaryheap *heap, Datum d) +binaryheap_add(binaryheap *heap, void *d) { if (heap->bh_size >= heap->bh_space) elog(ERROR, "out of binary heap slots"); @@ -154,7 +154,7 @@ binaryheap_add(binaryheap *heap, Datum d) * without modifying the heap. The caller must ensure that this * routine is not used on an empty heap. Always O(1). */ -Datum +void * binaryheap_first(binaryheap *heap) { Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); @@ -169,10 +169,10 @@ binaryheap_first(binaryheap *heap) * that this routine is not used on an empty heap. O(log n) worst * case. */ -Datum +void * binaryheap_remove_first(binaryheap *heap) { - Datum result; + void *result; Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); @@ -204,7 +204,7 @@ binaryheap_remove_first(binaryheap *heap) * sifting the new node down. */ void -binaryheap_replace_first(binaryheap *heap, Datum d) +binaryheap_replace_first(binaryheap *heap, void *d) { Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); @@ -221,7 +221,7 @@ binaryheap_replace_first(binaryheap *heap, Datum d) static void sift_up(binaryheap *heap, int node_off) { - Datum node_val = heap->bh_nodes[node_off]; + void *node_val = heap->bh_nodes[node_off]; /* * Within the loop, the node_off'th array entry is a "hole" that @@ -232,7 +232,7 @@ sift_up(binaryheap *heap, int node_off) { int cmp; int parent_off; - Datum parent_val; + void *parent_val; /* * If this node is smaller than its parent, the heap condition is @@ -264,7 +264,7 @@ sift_up(binaryheap *heap, int node_off) static void sift_down(binaryheap *heap, int node_off) { - Datum node_val = heap->bh_nodes[node_off]; + void *node_val = heap->bh_nodes[node_off]; /* * Within the loop, the node_off'th array entry is a "hole" that diff --git a/src/backend/postmaster/pgarch.c b/src/backend/postmaster/pgarch.c index 46af349564..9491b822db 100644 --- a/src/backend/postmaster/pgarch.c +++ b/src/backend/postmaster/pgarch.c @@ -145,7 +145,7 @@ static bool pgarch_readyXlog(char *xlog); static void pgarch_archiveDone(char *xlog); static void pgarch_die(int code, Datum arg); static void HandlePgArchInterrupts(void); -static int ready_file_comparator(Datum a, Datum b, void *arg); +static int ready_file_comparator(const void *a, const void *b, void *arg); static void LoadArchiveLibrary(void); static void pgarch_call_module_shutdown_cb(int code, Datum arg); @@ -631,22 +631,22 @@ pgarch_readyXlog(char *xlog) /* If the heap isn't full yet, quickly add it. */ arch_file = arch_files->arch_filenames[arch_files->arch_heap->bh_size]; strcpy(arch_file, basename); - binaryheap_add_unordered(arch_files->arch_heap, CStringGetDatum(arch_file)); + binaryheap_add_unordered(arch_files->arch_heap, arch_file); /* If we just filled the heap, make it a valid one. */ if (arch_files->arch_heap->bh_size == NUM_FILES_PER_DIRECTORY_SCAN) binaryheap_build(arch_files->arch_heap); } else if (ready_file_comparator(binaryheap_first(arch_files->arch_heap), - CStringGetDatum(basename), NULL) > 0) + basename, NULL) > 0) { /* * Remove the lowest priority file and add the current one to the * heap. */ - arch_file = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap)); + arch_file = binaryheap_remove_first(arch_files->arch_heap); strcpy(arch_file, basename); - binaryheap_add(arch_files->arch_heap, CStringGetDatum(arch_file)); + binaryheap_add(arch_files->arch_heap, arch_file); } } FreeDir(rldir); @@ -668,7 +668,7 @@ pgarch_readyXlog(char *xlog) */ arch_files->arch_files_size = arch_files->arch_heap->bh_size; for (int i = 0; i < arch_files->arch_files_size; i++) - arch_files->arch_files[i] = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap)); + arch_files->arch_files[i] = binaryheap_remove_first(arch_files->arch_heap); /* Return the highest priority file. */ arch_files->arch_files_size--; @@ -686,10 +686,10 @@ pgarch_readyXlog(char *xlog) * If "a" and "b" have equivalent values, 0 will be returned. */ static int -ready_file_comparator(Datum a, Datum b, void *arg) +ready_file_comparator(const void *a, const void *b, void *arg) { - char *a_str = DatumGetCString(a); - char *b_str = DatumGetCString(b); + const char *a_str = (const char *) a; + const char *b_str = (const char *) b; bool a_history = IsTLHistoryFileName(a_str); bool b_history = IsTLHistoryFileName(b_str); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 26d252bd87..191c0d1181 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1223,11 +1223,10 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, * Binary heap comparison function. */ static int -ReorderBufferIterCompare(Datum a, Datum b, void *arg) +ReorderBufferIterCompare(const void *a, const void *b, void *arg) { - ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg; - XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn; - XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn; + XLogRecPtr pos_a = ((ReorderBufferIterTXNEntry *) a)->lsn; + XLogRecPtr pos_b = ((ReorderBufferIterTXNEntry *) b)->lsn; if (pos_a < pos_b) return 1; @@ -1298,7 +1297,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, /* allocate heap */ state->heap = binaryheap_allocate(state->nr_txns, ReorderBufferIterCompare, - state); + NULL); /* Now that the state fields are initialized, it is safe to return it. */ *iter_state = state; @@ -1330,7 +1329,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, state->entries[off].change = cur_change; state->entries[off].txn = txn; - binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + binaryheap_add_unordered(state->heap, &state->entries[off++]); } /* add subtransactions if they contain changes */ @@ -1359,7 +1358,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, state->entries[off].change = cur_change; state->entries[off].txn = cur_txn; - binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + binaryheap_add_unordered(state->heap, &state->entries[off++]); } } @@ -1378,14 +1377,12 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) { ReorderBufferChange *change; ReorderBufferIterTXNEntry *entry; - int32 off; /* nothing there anymore */ if (state->heap->bh_size == 0) return NULL; - off = DatumGetInt32(binaryheap_first(state->heap)); - entry = &state->entries[off]; + entry = binaryheap_first(state->heap); /* free memory we might have "leaked" in the previous *Next call */ if (!dlist_is_empty(&state->old_change)) @@ -1411,10 +1408,10 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) dlist_container(ReorderBufferChange, node, next); /* txn stays the same */ - state->entries[off].lsn = next_change->lsn; - state->entries[off].change = next_change; + entry->lsn = next_change->lsn; + entry->change = next_change; - binaryheap_replace_first(state->heap, Int32GetDatum(off)); + binaryheap_replace_first(state->heap, entry); return change; } @@ -1435,7 +1432,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) */ rb->totalBytes += entry->txn->size; if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, - &state->entries[off].segno)) + &entry->segno)) { /* successfully restored changes from disk */ ReorderBufferChange *next_change = @@ -1448,9 +1445,9 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) Assert(entry->txn->nentries_mem); /* txn stays the same */ - state->entries[off].lsn = next_change->lsn; - state->entries[off].change = next_change; - binaryheap_replace_first(state->heap, Int32GetDatum(off)); + entry->lsn = next_change->lsn; + entry->change = next_change; + binaryheap_replace_first(state->heap, entry); return change; } diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index a7e3b9bb1d..391d5f3dd8 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -501,7 +501,7 @@ static void CheckForBufferLeaks(void); static int rlocator_comparator(const void *p1, const void *p2); static inline int buffertag_comparator(const BufferTag *ba, const BufferTag *bb); static inline int ckpt_buforder_comparator(const CkptSortItem *a, const CkptSortItem *b); -static int ts_ckpt_progress_comparator(Datum a, Datum b, void *arg); +static int ts_ckpt_progress_comparator(const void *a, const void *b, void *arg); /* @@ -2649,7 +2649,7 @@ BufferSync(int flags) ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan; - binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat)); + binaryheap_add_unordered(ts_heap, ts_stat); } binaryheap_build(ts_heap); @@ -2665,8 +2665,7 @@ BufferSync(int flags) while (!binaryheap_empty(ts_heap)) { BufferDesc *bufHdr = NULL; - CkptTsStatus *ts_stat = (CkptTsStatus *) - DatumGetPointer(binaryheap_first(ts_heap)); + CkptTsStatus *ts_stat = (CkptTsStatus *) binaryheap_first(ts_heap); buf_id = CkptBufferIds[ts_stat->index].buf_id; Assert(buf_id != -1); @@ -2713,7 +2712,7 @@ BufferSync(int flags) else { /* update heap with the new progress */ - binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat)); + binaryheap_replace_first(ts_heap, ts_stat); } /* @@ -5416,7 +5415,7 @@ ckpt_buforder_comparator(const CkptSortItem *a, const CkptSortItem *b) * progress. */ static int -ts_ckpt_progress_comparator(Datum a, Datum b, void *arg) +ts_ckpt_progress_comparator(const void *a, const void *b, void *arg) { CkptTsStatus *sa = (CkptTsStatus *) a; CkptTsStatus *sb = (CkptTsStatus *) b; diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h index 52f7b06b25..71a25db0f4 100644 --- a/src/include/lib/binaryheap.h +++ b/src/include/lib/binaryheap.h @@ -15,7 +15,7 @@ * For a max-heap, the comparator must return <0 iff a < b, 0 iff a == b, * and >0 iff a > b. For a min-heap, the conditions are reversed. */ -typedef int (*binaryheap_comparator) (Datum a, Datum b, void *arg); +typedef int (*binaryheap_comparator) (const void *a, const void *b, void *arg); /* * binaryheap @@ -34,7 +34,7 @@ typedef struct binaryheap bool bh_has_heap_property; /* debugging cross-check */ binaryheap_comparator bh_compare; void *bh_arg; - Datum bh_nodes[FLEXIBLE_ARRAY_MEMBER]; + void *bh_nodes[FLEXIBLE_ARRAY_MEMBER]; } binaryheap; extern binaryheap *binaryheap_allocate(int capacity, @@ -42,12 +42,12 @@ extern binaryheap *binaryheap_allocate(int capacity, void *arg); extern void binaryheap_reset(binaryheap *heap); extern void binaryheap_free(binaryheap *heap); -extern void binaryheap_add_unordered(binaryheap *heap, Datum d); +extern void binaryheap_add_unordered(binaryheap *heap, void *d); extern void binaryheap_build(binaryheap *heap); -extern void binaryheap_add(binaryheap *heap, Datum d); -extern Datum binaryheap_first(binaryheap *heap); -extern Datum binaryheap_remove_first(binaryheap *heap); -extern void binaryheap_replace_first(binaryheap *heap, Datum d); +extern void binaryheap_add(binaryheap *heap, void *d); +extern void *binaryheap_first(binaryheap *heap); +extern void *binaryheap_remove_first(binaryheap *heap); +extern void binaryheap_replace_first(binaryheap *heap, void *d); #define binaryheap_empty(h) ((h)->bh_size == 0) -- 2.25.1
>From 4fb71fc94af597b9ec9b5b233fedc7bb9ca5f05c Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Sat, 22 Jul 2023 15:04:45 -0700 Subject: [PATCH v4 2/4] make binaryheap available to frontend --- src/backend/lib/Makefile | 1 - src/backend/lib/meson.build | 1 - src/common/Makefile | 1 + src/{backend/lib => common}/binaryheap.c | 17 ++++++++++++++++- src/common/meson.build | 1 + 5 files changed, 18 insertions(+), 3 deletions(-) rename src/{backend/lib => common}/binaryheap.c (96%) diff --git a/src/backend/lib/Makefile b/src/backend/lib/Makefile index 9dad31398a..b6cefd9cca 100644 --- a/src/backend/lib/Makefile +++ b/src/backend/lib/Makefile @@ -13,7 +13,6 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global OBJS = \ - binaryheap.o \ bipartite_match.o \ bloomfilter.o \ dshash.o \ diff --git a/src/backend/lib/meson.build b/src/backend/lib/meson.build index 974cab8776..b4e88f54ae 100644 --- a/src/backend/lib/meson.build +++ b/src/backend/lib/meson.build @@ -1,7 +1,6 @@ # Copyright (c) 2022-2023, PostgreSQL Global Development Group backend_sources += files( - 'binaryheap.c', 'bipartite_match.c', 'bloomfilter.c', 'dshash.c', diff --git a/src/common/Makefile b/src/common/Makefile index 113029bf7b..cc5c54dcee 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -48,6 +48,7 @@ LIBS += $(PTHREAD_LIBS) OBJS_COMMON = \ archive.o \ base64.o \ + binaryheap.o \ checksum_helper.o \ compression.o \ config_info.o \ diff --git a/src/backend/lib/binaryheap.c b/src/common/binaryheap.c similarity index 96% rename from src/backend/lib/binaryheap.c rename to src/common/binaryheap.c index c7fcfc550b..400a730c85 100644 --- a/src/backend/lib/binaryheap.c +++ b/src/common/binaryheap.c @@ -6,15 +6,22 @@ * Portions Copyright (c) 2012-2023, PostgreSQL Global Development Group * * IDENTIFICATION - * src/backend/lib/binaryheap.c + * src/common/binaryheap.c * *------------------------------------------------------------------------- */ +#ifdef FRONTEND +#include "postgres_fe.h" +#else #include "postgres.h" +#endif #include <math.h> +#ifdef FRONTEND +#include "common/logging.h" +#endif #include "lib/binaryheap.h" static void sift_down(binaryheap *heap, int node_off); @@ -109,7 +116,11 @@ void binaryheap_add_unordered(binaryheap *heap, void *d) { if (heap->bh_size >= heap->bh_space) +#ifdef FRONTEND + pg_fatal("out of binary heap slots"); +#else elog(ERROR, "out of binary heap slots"); +#endif heap->bh_has_heap_property = false; heap->bh_nodes[heap->bh_size] = d; heap->bh_size++; @@ -141,7 +152,11 @@ void binaryheap_add(binaryheap *heap, void *d) { if (heap->bh_size >= heap->bh_space) +#ifdef FRONTEND + pg_fatal("out of binary heap slots"); +#else elog(ERROR, "out of binary heap slots"); +#endif heap->bh_nodes[heap->bh_size] = d; heap->bh_size++; sift_up(heap, heap->bh_size - 1); diff --git a/src/common/meson.build b/src/common/meson.build index 53942a9a61..3b97497d1a 100644 --- a/src/common/meson.build +++ b/src/common/meson.build @@ -3,6 +3,7 @@ common_sources = files( 'archive.c', 'base64.c', + 'binaryheap.c', 'checksum_helper.c', 'compression.c', 'controldata_utils.c', -- 2.25.1
>From b28739db1e5ef373110d3d76e250bf285ad856a4 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Thu, 20 Jul 2023 09:52:20 -0700 Subject: [PATCH v4 3/4] expand binaryheap api --- src/common/binaryheap.c | 29 +++++++++++++++++++++++++++++ src/include/lib/binaryheap.h | 3 +++ 2 files changed, 32 insertions(+) diff --git a/src/common/binaryheap.c b/src/common/binaryheap.c index 400a730c85..aaf529d6a9 100644 --- a/src/common/binaryheap.c +++ b/src/common/binaryheap.c @@ -211,6 +211,35 @@ binaryheap_remove_first(binaryheap *heap) return result; } +/* + * binaryheap_remove_node + * + * Removes the nth node from the heap. The caller must ensure that there are + * at least (n - 1) nodes in the heap. O(log n) worst case. + */ +void +binaryheap_remove_node(binaryheap *heap, int n) +{ + int cmp; + + Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property); + Assert(n >= 0 && n < heap->bh_size); + + /* compare last node to the one that is being removed */ + cmp = heap->bh_compare(heap->bh_nodes[--heap->bh_size], + heap->bh_nodes[n], + heap->bh_arg); + + /* remove the last node, placing it in the vacated entry */ + heap->bh_nodes[n] = heap->bh_nodes[heap->bh_size]; + + /* sift as needed to preserve the heap property */ + if (cmp > 0) + sift_up(heap, n); + else if (cmp < 0) + sift_down(heap, n); +} + /* * binaryheap_replace_first * diff --git a/src/include/lib/binaryheap.h b/src/include/lib/binaryheap.h index 71a25db0f4..06fe2ff213 100644 --- a/src/include/lib/binaryheap.h +++ b/src/include/lib/binaryheap.h @@ -47,8 +47,11 @@ extern void binaryheap_build(binaryheap *heap); extern void binaryheap_add(binaryheap *heap, void *d); extern void *binaryheap_first(binaryheap *heap); extern void *binaryheap_remove_first(binaryheap *heap); +extern void binaryheap_remove_node(binaryheap *heap, int n); extern void binaryheap_replace_first(binaryheap *heap, void *d); #define binaryheap_empty(h) ((h)->bh_size == 0) +#define binaryheap_size(h) ((h)->bh_size) +#define binaryheap_get_node(h, n) ((h)->bh_nodes[n]) #endif /* BINARYHEAP_H */ -- 2.25.1
>From 08b6f280ad46e159ca330109c82ebb41c5508e4b Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nat...@postgresql.org> Date: Thu, 20 Jul 2023 10:19:08 -0700 Subject: [PATCH v4 4/4] use priority queue for pg_restore ready_list --- src/bin/pg_dump/pg_backup_archiver.c | 193 ++++++++------------------- 1 file changed, 53 insertions(+), 140 deletions(-) diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 39ebcfec32..d05266b341 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -34,6 +34,7 @@ #include "compress_io.h" #include "dumputils.h" #include "fe_utils/string_utils.h" +#include "lib/binaryheap.h" #include "lib/stringinfo.h" #include "libpq/libpq-fs.h" #include "parallel.h" @@ -44,24 +45,6 @@ #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n" #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n" -/* - * State for tracking TocEntrys that are ready to process during a parallel - * restore. (This used to be a list, and we still call it that, though now - * it's really an array so that we can apply qsort to it.) - * - * tes[] is sized large enough that we can't overrun it. - * The valid entries are indexed first_te .. last_te inclusive. - * We periodically sort the array to bring larger-by-dataLength entries to - * the front; "sorted" is true if the valid entries are known sorted. - */ -typedef struct _parallelReadyList -{ - TocEntry **tes; /* Ready-to-dump TocEntrys */ - int first_te; /* index of first valid entry in tes[] */ - int last_te; /* index of last valid entry in tes[] */ - bool sorted; /* are valid entries currently sorted? */ -} ParallelReadyList; - static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, @@ -110,16 +93,13 @@ static void restore_toc_entries_postfork(ArchiveHandle *AH, static void pending_list_header_init(TocEntry *l); static void pending_list_append(TocEntry *l, TocEntry *te); static void pending_list_remove(TocEntry *te); -static void ready_list_init(ParallelReadyList *ready_list, int tocCount); -static void ready_list_free(ParallelReadyList *ready_list); -static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te); -static void ready_list_remove(ParallelReadyList *ready_list, int i); -static void ready_list_sort(ParallelReadyList *ready_list); -static int TocEntrySizeCompare(const void *p1, const void *p2); -static void move_to_ready_list(TocEntry *pending_list, - ParallelReadyList *ready_list, +static int TocEntrySizeCompareQsort(const void *p1, const void *p2); +static int TocEntrySizeCompareBinaryheap(const void *p1, const void *p2, + void *arg); +static void move_to_ready_heap(TocEntry *pending_list, + binaryheap *ready_heap, RestorePass pass); -static TocEntry *pop_next_work_item(ParallelReadyList *ready_list, +static TocEntry *pop_next_work_item(binaryheap *ready_heap, ParallelState *pstate); static void mark_dump_job_done(ArchiveHandle *AH, TocEntry *te, @@ -134,7 +114,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2); static void repoint_table_dependencies(ArchiveHandle *AH); static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te); static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, - ParallelReadyList *ready_list); + binaryheap *ready_heap); static void mark_create_done(ArchiveHandle *AH, TocEntry *te); static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te); @@ -2380,7 +2360,7 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate) } if (ntes > 1) - qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompare); + qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort); for (int i = 0; i < ntes; i++) DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP, @@ -3980,7 +3960,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list) (void) restore_toc_entry(AH, next_work_item, false); - /* Reduce dependencies, but don't move anything to ready_list */ + /* Reduce dependencies, but don't move anything to ready_heap */ reduce_dependencies(AH, next_work_item, NULL); } else @@ -4023,24 +4003,26 @@ static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list) { - ParallelReadyList ready_list; + binaryheap *ready_heap; TocEntry *next_work_item; pg_log_debug("entering restore_toc_entries_parallel"); - /* Set up ready_list with enough room for all known TocEntrys */ - ready_list_init(&ready_list, AH->tocCount); + /* Set up ready_heap with enough room for all known TocEntrys */ + ready_heap = binaryheap_allocate(AH->tocCount, + TocEntrySizeCompareBinaryheap, + NULL); /* * The pending_list contains all items that we need to restore. Move all - * items that are available to process immediately into the ready_list. + * items that are available to process immediately into the ready_heap. * After this setup, the pending list is everything that needs to be done - * but is blocked by one or more dependencies, while the ready list + * but is blocked by one or more dependencies, while the ready heap * contains items that have no remaining dependencies and are OK to * process in the current restore pass. */ AH->restorePass = RESTORE_PASS_MAIN; - move_to_ready_list(pending_list, &ready_list, AH->restorePass); + move_to_ready_heap(pending_list, ready_heap, AH->restorePass); /* * main parent loop @@ -4054,7 +4036,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, for (;;) { /* Look for an item ready to be dispatched to a worker */ - next_work_item = pop_next_work_item(&ready_list, pstate); + next_work_item = pop_next_work_item(ready_heap, pstate); if (next_work_item != NULL) { /* If not to be restored, don't waste time launching a worker */ @@ -4064,7 +4046,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, next_work_item->dumpId, next_work_item->desc, next_work_item->tag); /* Update its dependencies as though we'd completed it */ - reduce_dependencies(AH, next_work_item, &ready_list); + reduce_dependencies(AH, next_work_item, ready_heap); /* Loop around to see if anything else can be dispatched */ continue; } @@ -4075,7 +4057,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, /* Dispatch to some worker */ DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE, - mark_restore_job_done, &ready_list); + mark_restore_job_done, ready_heap); } else if (IsEveryWorkerIdle(pstate)) { @@ -4089,7 +4071,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, /* Advance to next restore pass */ AH->restorePass++; /* That probably allows some stuff to be made ready */ - move_to_ready_list(pending_list, &ready_list, AH->restorePass); + move_to_ready_heap(pending_list, ready_heap, AH->restorePass); /* Loop around to see if anything's now ready */ continue; } @@ -4118,10 +4100,10 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS); } - /* There should now be nothing in ready_list. */ - Assert(ready_list.first_te > ready_list.last_te); + /* There should now be nothing in ready_heap. */ + Assert(binaryheap_empty(ready_heap)); - ready_list_free(&ready_list); + binaryheap_free(ready_heap); pg_log_info("finished main parallel loop"); } @@ -4221,80 +4203,9 @@ pending_list_remove(TocEntry *te) } -/* - * Initialize the ready_list with enough room for up to tocCount entries. - */ -static void -ready_list_init(ParallelReadyList *ready_list, int tocCount) -{ - ready_list->tes = (TocEntry **) - pg_malloc(tocCount * sizeof(TocEntry *)); - ready_list->first_te = 0; - ready_list->last_te = -1; - ready_list->sorted = false; -} - -/* - * Free storage for a ready_list. - */ -static void -ready_list_free(ParallelReadyList *ready_list) -{ - pg_free(ready_list->tes); -} - -/* Add te to the ready_list */ -static void -ready_list_insert(ParallelReadyList *ready_list, TocEntry *te) -{ - ready_list->tes[++ready_list->last_te] = te; - /* List is (probably) not sorted anymore. */ - ready_list->sorted = false; -} - -/* Remove the i'th entry in the ready_list */ -static void -ready_list_remove(ParallelReadyList *ready_list, int i) -{ - int f = ready_list->first_te; - - Assert(i >= f && i <= ready_list->last_te); - - /* - * In the typical case where the item to be removed is the first ready - * entry, we need only increment first_te to remove it. Otherwise, move - * the entries before it to compact the list. (This preserves sortedness, - * if any.) We could alternatively move the entries after i, but there - * are typically many more of those. - */ - if (i > f) - { - TocEntry **first_te_ptr = &ready_list->tes[f]; - - memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *)); - } - ready_list->first_te++; -} - -/* Sort the ready_list into the desired order */ -static void -ready_list_sort(ParallelReadyList *ready_list) -{ - if (!ready_list->sorted) - { - int n = ready_list->last_te - ready_list->first_te + 1; - - if (n > 1) - qsort(ready_list->tes + ready_list->first_te, n, - sizeof(TocEntry *), - TocEntrySizeCompare); - ready_list->sorted = true; - } -} - /* qsort comparator for sorting TocEntries by dataLength */ static int -TocEntrySizeCompare(const void *p1, const void *p2) +TocEntrySizeCompareQsort(const void *p1, const void *p2) { const TocEntry *te1 = *(const TocEntry *const *) p1; const TocEntry *te2 = *(const TocEntry *const *) p2; @@ -4314,17 +4225,24 @@ TocEntrySizeCompare(const void *p1, const void *p2) return 0; } +/* binaryheap comparator for sorting TocEntries by dataLength */ +static int +TocEntrySizeCompareBinaryheap(const void *p1, const void *p2, void *arg) +{ + return TocEntrySizeCompareQsort(p1, p2); +} + /* - * Move all immediately-ready items from pending_list to ready_list. + * Move all immediately-ready items from pending_list to ready_heap. * * Items are considered ready if they have no remaining dependencies and * they belong in the current restore pass. (See also reduce_dependencies, * which applies the same logic one-at-a-time.) */ static void -move_to_ready_list(TocEntry *pending_list, - ParallelReadyList *ready_list, +move_to_ready_heap(TocEntry *pending_list, + binaryheap *ready_heap, RestorePass pass) { TocEntry *te; @@ -4340,38 +4258,33 @@ move_to_ready_list(TocEntry *pending_list, { /* Remove it from pending_list ... */ pending_list_remove(te); - /* ... and add to ready_list */ - ready_list_insert(ready_list, te); + /* ... and add to ready_heap */ + binaryheap_add(ready_heap, te); } } } /* * Find the next work item (if any) that is capable of being run now, - * and remove it from the ready_list. + * and remove it from the ready_heap. * * Returns the item, or NULL if nothing is runnable. * * To qualify, the item must have no remaining dependencies * and no requirements for locks that are incompatible with - * items currently running. Items in the ready_list are known to have + * items currently running. Items in the ready_heap are known to have * no remaining dependencies, but we have to check for lock conflicts. */ static TocEntry * -pop_next_work_item(ParallelReadyList *ready_list, +pop_next_work_item(binaryheap *ready_heap, ParallelState *pstate) { /* - * Sort the ready_list so that we'll tackle larger jobs first. - */ - ready_list_sort(ready_list); - - /* - * Search the ready_list until we find a suitable item. + * Search the ready_heap until we find a suitable item. */ - for (int i = ready_list->first_te; i <= ready_list->last_te; i++) + for (int i = 0; i < binaryheap_size(ready_heap); i++) { - TocEntry *te = ready_list->tes[i]; + TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i); bool conflicts = false; /* @@ -4397,7 +4310,7 @@ pop_next_work_item(ParallelReadyList *ready_list, continue; /* passed all tests, so this item can run */ - ready_list_remove(ready_list, i); + binaryheap_remove_node(ready_heap, i); return te; } @@ -4443,7 +4356,7 @@ mark_restore_job_done(ArchiveHandle *AH, int status, void *callback_data) { - ParallelReadyList *ready_list = (ParallelReadyList *) callback_data; + binaryheap *ready_heap = (binaryheap *) callback_data; pg_log_info("finished item %d %s %s", te->dumpId, te->desc, te->tag); @@ -4461,7 +4374,7 @@ mark_restore_job_done(ArchiveHandle *AH, pg_fatal("worker process failed: exit code %d", status); - reduce_dependencies(AH, te, ready_list); + reduce_dependencies(AH, te, ready_heap); } @@ -4704,11 +4617,11 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te) /* * Remove the specified TOC entry from the depCounts of items that depend on * it, thereby possibly making them ready-to-run. Any pending item that - * becomes ready should be moved to the ready_list, if that's provided. + * becomes ready should be moved to the ready_heap, if that's provided. */ static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, - ParallelReadyList *ready_list) + binaryheap *ready_heap) { int i; @@ -4726,18 +4639,18 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te, * the current restore pass, and it is currently a member of the * pending list (that check is needed to prevent double restore in * some cases where a list-file forces out-of-order restoring). - * However, if ready_list == NULL then caller doesn't want any list + * However, if ready_heap == NULL then caller doesn't want any list * memberships changed. */ if (otherte->depCount == 0 && _tocEntryRestorePass(otherte) == AH->restorePass && otherte->pending_prev != NULL && - ready_list != NULL) + ready_heap != NULL) { /* Remove it from pending list ... */ pending_list_remove(otherte); - /* ... and add to ready_list */ - ready_list_insert(ready_list, otherte); + /* ... and add to ready_heap */ + binaryheap_add(ready_heap, otherte); } } } -- 2.25.1