
Here's a WIP patch allowing parallel CREATE INDEX for BRIN indexes. The
infrastructure (starting workers etc.) is "inspired" by the BTREE code
(i.e. copied from that and massaged a bit to call brin stuff).

 _bt_begin_parallel -> _brin_begin_parallel
 _bt_end_parallel -> _brin_end_parallel
 _bt_parallel_estimate_shared -> _brin_parallel_estimate_shared
 _bt_leader_participate_as_worker -> _brin_leader_participate_as_worker
 _bt_parallel_scan_and_sort -> _brin_parallel_scan_and_build

This is mostly mechanical stuff - setting up the parallel workers,
starting the scan etc.

The tricky part is how to divide the work between workers and how we
combine the partial results. For BTREE we simply let each worker to read
a subset of the table (using a parallel scan), sort it and then do a
merge sort on the partial results.

For BRIN it's a bit different, because the indexes essentially splits
the table into smaller ranges and treat them independently. So the
easiest way is to organize the table scan so that each range gets
processed by exactly one worker. Each worker writes the index tuples
into a temporary file, and then when all workers are done we read and
write them into the index.

The problem is a parallel scan assigns mostly random subset of the table
to each worker - it's not guaranteed a BRIN page range to be processed
by a single worker.

0001 does that in a bit silly way - instead of doing single large scan,
each worker does a sequence of TID range scans for each worker (see
_brin_parallel_scan_and_build), and BrinShared has fields used to track
which ranges were already assigned to workers. A bit cumbersome, but it
works pretty well.

0002 replaces the TID range scan sequence with a single parallel scan,
modified to assign "chunks" in multiple of pagesPerRange.

In both cases _brin_end_parallel then reads the summaries from worker
files, and adds them into the index. In 0001 this is fairly simple,
although we could do one more improvement and sort the ranges by range
start to make the index nicer (and possibly a bit more efficient). This
should be simple, because the per-worker results are already sorted like
that (so a merge sort in _brin_end_parallel would be enough).

For 0002 it's a bit more complicated, because with a single parallel
scan brinbuildCallbackParallel can't decide if a range is assigned to a
different worker or empty. And we want to generate summaries for empty
ranges in the index. We could either skip such range during index build,
and then add empty summaries in _brin_end_parallel (if needed), or add
them and then merge them using "union".

I just realized there's a third option to do this - we could just do
regular parallel scan (with no particular regard to pagesPerRange), and
then do "union" when merging results from workers. It doesn't require
the sequence of TID scans, and the union would also handle the empty
ranges. The per-worker results might be much larger, though, because
each worker might produce up to the "full" BRIN index.


Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From 0d37a829e768772ef3e9c080f96333e24cdd43b7 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Sun, 26 Mar 2023 00:44:01 +0100
Subject: [PATCH 1/2] parallel CREATE INDEX for BRIN

 src/backend/access/brin/brin.c        | 714 +++++++++++++++++++++++++-
 src/backend/access/transam/parallel.c |   4 +
 src/backend/catalog/index.c           |   3 +-
 src/include/access/brin.h             |   3 +
 4 files changed, 719 insertions(+), 5 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 3c6a956eaa3..13d94931efc 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -31,8 +31,10 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
+#include "storage/buffile.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
+#include "tcop/tcopprot.h"		/* pgrminclude ignore */
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/datum.h"
@@ -41,6 +43,98 @@
 #include "utils/memutils.h"
 #include "utils/rel.h"
+/* Magic numbers for parallel state sharing */
+#define PARALLEL_KEY_BRIN_SHARED		UINT64CONST(0xA000000000000001)
+#define PARALLEL_KEY_QUERY_TEXT			UINT64CONST(0xA000000000000002)
+#define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xA000000000000003)
+#define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xA000000000000004)
+ * Status for index builds performed in parallel.  This is allocated in a
+ * dynamic shared memory segment.
+ */
+typedef struct BrinShared
+	/*
+	 * These fields are not modified during the build.  They primarily exist for
+	 * the benefit of worker processes that need to create state corresponding
+	 * to that used by the leader.
+	 */
+	Oid			heaprelid;
+	Oid			indexrelid;
+	bool		isconcurrent;
+	BlockNumber	pagesPerRange;
+	/*
+	 * workersdonecv is used to monitor the progress of workers.  All parallel
+	 * participants must indicate that they are done before leader can use
+	 * results built by the workers (and before leader can write the data into
+	 * the index).
+	 */
+	ConditionVariable workersdonecv;
+	/* Used to pass built BRIN tuples from workers to leader (for insert). */
+	SharedFileSet	fileset;
+	/*
+	 * mutex protects all fields before heapdesc.
+	 *
+	 * These fields contain status information of interest to BRIN index
+	 * builds that must work just the same when an index is built in parallel.
+	 */
+	slock_t		mutex;
+	/*
+	 * 
+	 */
+	int			last_worker_id;
+	BlockNumber	next_range;
+	BlockNumber	last_range;
+	/*
+	 * Mutable state that is maintained by workers, and reported back to
+	 * leader at end of the scans.
+	 *
+	 * nparticipantsdone is number of worker processes finished.
+	 *
+	 * reltuples is the total number of input heap tuples.
+	 *
+	 * indtuples is the total number of tuples that made it into the index.
+	 */
+	int			nparticipantsdone;
+	double		reltuples;
+	double		indtuples;
+} BrinShared;
+ * Status for leader in parallel index build.
+ */
+typedef struct BrinLeader
+	/* parallel context itself */
+	ParallelContext *pcxt;
+	/*
+	 * nparticipantworkers is the exact number of worker processes successfully
+	 * launched, plus one leader process if it participates as a worker (only
+	 * DISABLE_LEADER_PARTICIPATION builds avoid leader participating as a
+	 * worker).
+	 */
+	int			nparticipantworkers;
+	/*
+	 * Leader process convenience pointers to shared state (leader avoids TOC
+	 * lookups).
+	 *
+	 * brinshared is the shared state for entire build.  snapshot is the snapshot
+	 * used by the scan iff an MVCC snapshot is required.
+	 */
+	BrinShared	   *brinshared;
+	Snapshot	snapshot;
+	WalUsage   *walusage;
+	BufferUsage *bufferusage;
+} BrinLeader;
  * We use a BrinBuildState during initial construction of a BRIN index.
@@ -50,12 +144,22 @@ typedef struct BrinBuildState
 	Relation	bs_irel;
 	int			bs_numtuples;
+	int			bs_reltuples;
 	Buffer		bs_currentInsertBuf;
 	BlockNumber bs_pagesPerRange;
 	BlockNumber bs_currRangeStart;
 	BrinRevmap *bs_rmAccess;
 	BrinDesc   *bs_bdesc;
 	BrinMemTuple *bs_dtuple;
+	/*
+	 * bs_leader is only present when a parallel index build is performed, and
+	 * only in the leader process. (Actually, only the leader has a
+	 * BrinBuildState.)
+	 */
+	BrinLeader *bs_leader;
+	int			bs_worker_id;
+	BufFile	   *bs_file;
 } BrinBuildState;
@@ -76,6 +180,7 @@ static void terminate_brin_buildstate(BrinBuildState *state);
 static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
 						  bool include_partial, double *numSummarized, double *numExisting);
 static void form_and_insert_tuple(BrinBuildState *state);
+static void form_and_spill_tuple(BrinBuildState *state);
 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
 						 BrinTuple *b);
 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
@@ -83,6 +188,18 @@ static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
 								BrinMemTuple *dtup, Datum *values, bool *nulls);
 static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
+/* parallel index builds */
+static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
+								 bool isconcurrent, int request);
+static void _brin_end_parallel(BrinLeader *btleader, BrinBuildState *state);
+static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
+static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
+											   Relation heap, Relation index);
+static void _brin_parallel_scan_and_build(BrinBuildState *buildstate,
+										  BrinShared *brinshared,
+										  Relation heap, Relation index,
+										  int sortmem, bool progress);
  * BRIN handler function: return IndexAmRoutine with access method parameters
  * and callbacks.
@@ -822,6 +939,54 @@ brinbuildCallback(Relation index,
 							   values, isnull);
+ * A version of the callback, used by parallel index builds. The main difference
+ * is that instead of writing the BRIN tuples into the index, we write them into
+ * a temporary file, and leave the insertion up to the leader (which may reorder
+ * them a bit etc.).
+ */
+static void
+brinbuildCallbackParallel(Relation index,
+						  ItemPointer tid,
+						  Datum *values,
+						  bool *isnull,
+						  bool tupleIsAlive,
+						  void *brstate)
+	BrinBuildState *state = (BrinBuildState *) brstate;
+	BlockNumber thisblock;
+	thisblock = ItemPointerGetBlockNumber(tid);
+	/*
+	 * If we're in a block that belongs to a future range, summarize what
+	 * we've got and start afresh.  Note the scan might have skipped many
+	 * pages, if they were devoid of live tuples; make sure to insert index
+	 * tuples for those too.
+	 */
+	while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
+	{
+		BRIN_elog((DEBUG2,
+				   "brinbuildCallback: completed a range: %u--%u",
+				   state->bs_currRangeStart,
+				   state->bs_currRangeStart + state->bs_pagesPerRange));
+		/* create the index tuple and write it into the temporary file */
+		form_and_spill_tuple(state);
+		/* set state to correspond to the next range */
+		state->bs_currRangeStart += state->bs_pagesPerRange;
+		/* re-initialize state for it */
+		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
+	}
+	/* Accumulate the current tuple into the running state */
+	(void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
+							   values, isnull);
  * brinbuild() -- build a new BRIN index.
@@ -883,18 +1048,46 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
 	state = initialize_brin_buildstate(index, revmap, pagesPerRange);
+	/*
+	 * Attempt to launch parallel worker scan when required
+	 *
+	 * XXX plan_create_index_workers makes the number of workers dependent on
+	 * maintenance_work_mem, requiring 32MB for each worker. That makes sense
+	 * for btree, but not for BRIN, which can do away with much less memory.
+	 * So maybe make that somehow less strict, optionally?
+	 */
+	if (indexInfo->ii_ParallelWorkers > 0)
+		_brin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent,
+							 indexInfo->ii_ParallelWorkers);
 	 * Now scan the relation.  No syncscan allowed here because we want the
 	 * heap blocks in physical order.
-	reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
-									   brinbuildCallback, (void *) state, NULL);
-	/* process the final batch */
-	form_and_insert_tuple(state);
+	/* no parallel index build, just do the usual thing */
+	if (state->bs_leader == NULL)
+	{
+		reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
+										   brinbuildCallback, (void *) state, NULL);
+		/* process the final batch */
+		form_and_insert_tuple(state);
+		/* track the number of relation tuples */
+		state->bs_reltuples = reltuples;
+	}
+	/*
+	 * In parallel mode, wait for workers to complete, and then read
+	 * the tuples from the temporary files and insert them into the index.
+	 */
+	if (state->bs_leader)
+		_brin_end_parallel(state->bs_leader, state);
 	/* release resources */
 	idxtuples = state->bs_numtuples;
+	reltuples = state->bs_reltuples;
@@ -1299,12 +1492,15 @@ initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
 	state->bs_irel = idxRel;
 	state->bs_numtuples = 0;
+	state->bs_reltuples = 0;
 	state->bs_currentInsertBuf = InvalidBuffer;
 	state->bs_pagesPerRange = pagesPerRange;
 	state->bs_currRangeStart = 0;
 	state->bs_rmAccess = revmap;
 	state->bs_bdesc = brin_build_desc(idxRel);
 	state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
+	state->bs_leader = NULL;
+	state->bs_worker_id = 0;
 	return state;
@@ -1597,6 +1793,31 @@ form_and_insert_tuple(BrinBuildState *state)
+ * Given a deformed tuple in the build state, convert it into the on-disk
+ * format and write it to a temporary file (leader will insert it into the
+ * index later).
+ */
+static void
+form_and_spill_tuple(BrinBuildState *state)
+	BrinTuple  *tup;
+	Size		size;
+	Assert(state->bs_file);
+	tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
+						  state->bs_dtuple, &size);
+	BufFileWrite(state->bs_file, &size, sizeof(Size));
+	BufFileWrite(state->bs_file, &state->bs_currRangeStart, sizeof(BlockNumber));
+	BufFileWrite(state->bs_file, tup, size);
+	state->bs_numtuples++;
+	pfree(tup);
  * Given two deformed tuples, adjust the first one so that it's consistent
  * with the summary values in both.
@@ -1916,3 +2137,488 @@ check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
 	return true;
+static void
+_brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
+					 bool isconcurrent, int request)
+	ParallelContext *pcxt;
+	Snapshot	snapshot;
+	Size		estbrinshared;
+	BrinShared   *brinshared;
+	BrinLeader   *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader));
+	WalUsage   *walusage;
+	BufferUsage *bufferusage;
+	bool		leaderparticipates = true;
+	int			querylen;
+	leaderparticipates = false;
+	/*
+	 * Enter parallel mode, and create context for parallel build of brin
+	 * index
+	 */
+	EnterParallelMode();
+	Assert(request > 0);
+	pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main",
+								 request);
+	/*
+	 * Prepare for scan of the base relation.  In a normal index build, we use
+	 * SnapshotAny because we must retrieve all tuples and do our own time
+	 * qual checks (because we have to index RECENTLY_DEAD tuples).  In a
+	 * concurrent build, we take a regular MVCC snapshot and index whatever's
+	 * live according to that.
+	 */
+	if (!isconcurrent)
+		snapshot = SnapshotAny;
+	else
+		snapshot = RegisterSnapshot(GetTransactionSnapshot());
+	/*
+	 * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
+	 */
+	estbrinshared = _brin_parallel_estimate_shared(heap, snapshot);
+	shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	/*
+	 * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
+	 *
+	 * If there are no extensions loaded that care, we could skip this.  We
+	 * have no way of knowing whether anyone's looking at pgWalUsage or
+	 * pgBufferUsage, so do it unconditionally.
+	 */
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
+	if (debug_query_string)
+	{
+		querylen = strlen(debug_query_string);
+		shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+	else
+		querylen = 0;			/* keep compiler quiet */
+	/* Everyone's had a chance to ask for space, so now create the DSM */
+	InitializeParallelDSM(pcxt);
+	/* If no DSM segment was available, back out (do serial build) */
+	if (pcxt->seg == NULL)
+	{
+		if (IsMVCCSnapshot(snapshot))
+			UnregisterSnapshot(snapshot);
+		DestroyParallelContext(pcxt);
+		ExitParallelMode();
+		return;
+	}
+	/* Store shared build state, for which we reserved space */
+	brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared);
+	/* Initialize immutable state */
+	brinshared->heaprelid = RelationGetRelid(heap);
+	brinshared->indexrelid = RelationGetRelid(index);
+	brinshared->isconcurrent = isconcurrent;
+	brinshared->pagesPerRange = buildstate->bs_pagesPerRange;
+	ConditionVariableInit(&brinshared->workersdonecv);
+	SpinLockInit(&brinshared->mutex);
+	/* */
+	SharedFileSetInit(&brinshared->fileset, pcxt->seg);
+	/* Initialize mutable state */
+	brinshared->nparticipantsdone = 0;
+	brinshared->reltuples = 0.0;
+	brinshared->indtuples = 0.0;
+	/* Track work assigned to workers etc. */
+	brinshared->last_worker_id = 0;
+	brinshared->next_range = 0;
+	brinshared->last_range = RelationGetNumberOfBlocks(heap);
+	/*
+	 * Store shared tuplesort-private state, for which we reserved space.
+	 * Then, initialize opaque state using tuplesort routine.
+	 */
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BRIN_SHARED, brinshared);
+	/* Store query string for workers */
+	if (debug_query_string)
+	{
+		char	   *sharedquery;
+		sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
+		memcpy(sharedquery, debug_query_string, querylen + 1);
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
+	}
+	/*
+	 * Allocate space for each worker's WalUsage and BufferUsage; no need to
+	 * initialize.
+	 */
+	walusage = shm_toc_allocate(pcxt->toc,
+								mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
+	bufferusage = shm_toc_allocate(pcxt->toc,
+								   mul_size(sizeof(BufferUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
+	/* Launch workers, saving status for leader/caller */
+	LaunchParallelWorkers(pcxt);
+	brinleader->pcxt = pcxt;
+	brinleader->nparticipantworkers = pcxt->nworkers_launched;
+	if (leaderparticipates)
+		brinleader->nparticipantworkers++;
+	brinleader->brinshared = brinshared;
+	brinleader->snapshot = snapshot;
+	brinleader->walusage = walusage;
+	brinleader->bufferusage = bufferusage;
+	/* If no workers were successfully launched, back out (do serial build) */
+	if (pcxt->nworkers_launched == 0)
+	{
+		_brin_end_parallel(brinleader, NULL);
+		return;
+	}
+	/* Save leader state now that it's clear build will be parallel */
+	buildstate->bs_leader = brinleader;
+	/* Join heap scan ourselves */
+	if (leaderparticipates)
+		_brin_leader_participate_as_worker(buildstate, heap, index);
+	/*
+	 * Caller needs to wait for all launched workers when we return.  Make
+	 * sure that the failure-to-start case will not hang forever.
+	 */
+	WaitForParallelWorkersToAttach(pcxt);
+ * Shut down workers, destroy parallel context, and end parallel mode.
+ */
+static void
+_brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
+	int			i;
+	BrinShared *brinshared = brinleader->brinshared;
+	/* Shutdown worker processes */
+	WaitForParallelWorkersToFinish(brinleader->pcxt);
+	if (!state)
+		return;
+	/* copy the data into leader state (we have to wait for the workers ) */
+	state->bs_reltuples = brinshared->reltuples;
+	state->bs_numtuples = brinshared->indtuples;
+	/*
+	 * XXX maybe we should sort the ranges by rangeStart? That'd give us index
+	 * that is cheaper to walk sequentially, because we'd not have any page
+	 * misses (mostly getting data from the same page as before). Although the
+	 * index should be pretty small in general, and thus cached. OTOH each
+	 * worker should produce tuples in the right order, so we could just merge
+	 * sort them.
+	 *
+	 * XXX Alternatively, we could arrange the build so that the workers read
+	 * a continuous chunk of the table. For example, with K workers we might
+	 * leave the first 1/K to the first worker, then 1/K to the second etc.
+	 * The we would not need to reorder anything, we would just read the
+	 * results for all workers.
+	 *
+	 * XXX That's also mean we don't do many small TID scans, as now.
+	 *
+	 * The problem is we don't know how many workers will be started while
+	 * determining this, but maybe we could postpone that decision somehow?
+	 * We'd have to wait for all the launched workers to attach, I guess.
+	 */
+	for (i = 1; i <= brinshared->last_worker_id; i++)
+	{
+		BufFile	   *f;
+		char		name[MAXPGPATH];
+		int64		fsize;
+		int64		fpos = 0;
+		snprintf(name, MAXPGPATH, "tuples.%d", i);
+		f = BufFileOpenFileSet(&brinshared->fileset.fs, name, O_RDONLY, false);
+		fsize = BufFileSize(f);
+		while (fpos < fsize)
+		{
+			Size		size;
+			BlockNumber	rangeStart;
+			BrinTuple  *tup;
+			if (BufFileRead(f, &size, sizeof(Size)) != sizeof(Size))
+				elog(ERROR, "failed read");
+			if (BufFileRead(f, &rangeStart, sizeof(BlockNumber)) != sizeof(BlockNumber))
+				elog(ERROR, "failed read");
+			tup = (BrinTuple *) palloc(size);
+			if (BufFileRead(f, tup, size) != size)
+				elog(ERROR, "failed read");
+			brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+						  &state->bs_currentInsertBuf, rangeStart, tup, size);
+			fpos += sizeof(Size) + sizeof(BlockNumber) + size;
+		}
+		BufFileClose(f);
+	}
+	/*
+	 * Next, accumulate WAL usage.  (This must wait for the workers to finish,
+	 * or we might get incomplete data.)
+	 */
+	for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
+		InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
+	/* Free last reference to MVCC snapshot, if one was used */
+	if (IsMVCCSnapshot(brinleader->snapshot))
+		UnregisterSnapshot(brinleader->snapshot);
+	DestroyParallelContext(brinleader->pcxt);
+	ExitParallelMode();
+ * Returns size of shared memory required to store state for a parallel
+ * brin index build based on the snapshot its parallel scan will use.
+ */
+static Size
+_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
+	/* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
+	return add_size(BUFFERALIGN(sizeof(BrinShared)),
+					table_parallelscan_estimate(heap, snapshot));
+ * Within leader, participate as a parallel worker.
+ */
+static void
+_brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index)
+	BrinLeader *brinleader = buildstate->bs_leader;
+	int			sortmem;
+	/*
+	 * Might as well use reliable figure when doling out maintenance_work_mem
+	 * (when requested number of workers were not launched, this will be
+	 * somewhat higher than it is for other workers).
+	 */
+	sortmem = maintenance_work_mem / brinleader->nparticipantworkers;
+	/* Perform work common to all participants */
+	_brin_parallel_scan_and_build(buildstate, brinleader->brinshared,
+								  heap, index, sortmem, true);
+	/* insert the last range */
+	form_and_spill_tuple(buildstate);
+	BufFileClose(buildstate->bs_file);
+	buildstate->bs_file = NULL;
+ * Perform a worker's portion of a parallel sort.
+ *
+ * This generates a tuplesort for passed btspool, and a second tuplesort
+ * state if a second btspool is need (i.e. for unique index builds).  All
+ * other spool fields should already be set when this is called.
+ *
+ * sortmem is the amount of working memory to use within each worker,
+ * expressed in KBs.
+ *
+ * When this returns, workers are done, and need only release resources.
+ */
+static void
+_brin_parallel_scan_and_build(BrinBuildState *state, BrinShared *brinshared,
+							  Relation heap, Relation index, int sortmem,
+							  bool progress)
+	double		reltuples;
+	IndexInfo  *indexInfo;
+	char		name[MAXPGPATH];
+	/* Join parallel scan */
+	indexInfo = BuildIndexInfo(index);
+	indexInfo->ii_Concurrent = brinshared->isconcurrent;
+	SpinLockAcquire(&brinshared->mutex);
+	state->bs_worker_id = (++brinshared->last_worker_id);
+	SpinLockRelease(&brinshared->mutex);
+	snprintf(name, MAXPGPATH, "tuples.%d", state->bs_worker_id);
+	state->bs_file = BufFileCreateFileSet(&brinshared->fileset.fs, name);
+	/* Get chunks of the table, do TID Scans and build the ranges */
+	while (true)
+	{
+		TableScanDesc	scan;
+		BlockNumber		startBlock,
+						lastBlock,
+						chunkBlocks;
+		ItemPointerData	mintid,
+						maxtid;
+		/*
+		 * Acquire larger chunks of data - this matters especially for low
+		 * pages_per_range settings (e.g. set to 1). Otherwise there would
+		 * be a lot of trashing and overhead with multiple workers.
+		 *
+		 * Not sure where's the sweet spot. Maybe tie this to the prefetching
+		 * too (maintenance_effective_io_concucrrency)?
+		 *
+		 * FIXME The chunkBlocks needs to be a multiple of bs_pagesPerRange.
+		 */
+		chunkBlocks = Max(128, state->bs_pagesPerRange);
+		SpinLockAcquire(&brinshared->mutex);
+		startBlock = brinshared->next_range;
+		lastBlock = brinshared->last_range;
+		brinshared->next_range += chunkBlocks;
+		SpinLockRelease(&brinshared->mutex);
+		state->bs_currRangeStart = startBlock;
+		/* did we reach the end of the heap relation? */
+		if (startBlock > lastBlock)
+			break;
+		/* re-initialize state for it */
+		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
+		ItemPointerSet(&mintid, startBlock, 0);
+		ItemPointerSet(&maxtid, startBlock + (chunkBlocks - 1),
+					   MaxHeapTuplesPerPage);
+		/* start tidscan to read the relevant part of the table */
+		scan = table_beginscan_tidrange(heap, SnapshotAny,	// FIXME which snapshot to use?
+										&mintid, &maxtid);
+		/* do prefetching (this prefetches the whole range. not sure that's good) */
+		for (BlockNumber blkno = startBlock; blkno < startBlock + chunkBlocks; blkno++)
+			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, blkno);
+		reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
+										   brinbuildCallbackParallel, state, scan);
+		/* spill the last tuple */
+		form_and_spill_tuple(state);
+		state->bs_reltuples += reltuples;
+		/* set state to invalid range */
+		state->bs_currRangeStart = InvalidBlockNumber;
+	}
+	/*
+	 * Done.  Record ambuild statistics.
+	 */
+	SpinLockAcquire(&brinshared->mutex);
+	brinshared->nparticipantsdone++;
+	brinshared->reltuples += state->bs_reltuples;
+	brinshared->indtuples += state->bs_numtuples;
+	SpinLockRelease(&brinshared->mutex);
+	/* Notify leader */
+	ConditionVariableSignal(&brinshared->workersdonecv);
+ * Perform work within a launched parallel process.
+ */
+_brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
+	char	   *sharedquery;
+	BrinShared *brinshared;
+	BrinBuildState *buildstate;
+	Relation	heapRel;
+	Relation	indexRel;
+	LOCKMODE	heapLockmode;
+	LOCKMODE	indexLockmode;
+	WalUsage   *walusage;
+	BufferUsage *bufferusage;
+	int			sortmem;
+	/*
+	 * The only possible status flag that can be set to the parallel worker is
+	 */
+	Assert((MyProc->statusFlags == 0) ||
+		   (MyProc->statusFlags == PROC_IN_SAFE_IC));
+	/* Set debug_query_string for individual workers first */
+	sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
+	debug_query_string = sharedquery;
+	/* Report the query string from leader */
+	pgstat_report_activity(STATE_RUNNING, debug_query_string);
+	/* Look up brin shared state */
+	brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false);
+	/* Open relations using lock modes known to be obtained by index.c */
+	if (!brinshared->isconcurrent)
+	{
+		heapLockmode = ShareLock;
+		indexLockmode = AccessExclusiveLock;
+	}
+	else
+	{
+		heapLockmode = ShareUpdateExclusiveLock;
+		indexLockmode = RowExclusiveLock;
+	}
+	/* Open relations within worker */
+	heapRel = table_open(brinshared->heaprelid, heapLockmode);
+	indexRel = index_open(brinshared->indexrelid, indexLockmode);
+	buildstate = initialize_brin_buildstate(indexRel, NULL, brinshared->pagesPerRange);
+	/* Attach to the shared fileset. */
+	SharedFileSetAttach(&brinshared->fileset, seg);
+	/* Prepare to track buffer usage during parallel execution */
+	InstrStartParallelQuery();
+	/* FIXME tie this to number of participants, somehow */
+	sortmem = maintenance_work_mem / 2;
+	_brin_parallel_scan_and_build(buildstate, brinshared,
+								  heapRel, indexRel, sortmem, false);
+	/* insert the last range */
+	form_and_spill_tuple(buildstate);
+	BufFileClose(buildstate->bs_file);
+	buildstate->bs_file = NULL;
+	/* Report WAL/buffer usage during parallel execution */
+	bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
+	walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+	InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
+						  &walusage[ParallelWorkerNumber]);
+	index_close(indexRel, indexLockmode);
+	table_close(heapRel, heapLockmode);
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 2b8bc2f58dd..72086212f47 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -14,6 +14,7 @@
 #include "postgres.h"
+#include "access/brin.h"
 #include "access/nbtree.h"
 #include "access/parallel.h"
 #include "access/session.h"
@@ -144,6 +145,9 @@ static const struct
 		"_bt_parallel_build_main", _bt_parallel_build_main
+	{
+		"_brin_parallel_build_main", _brin_parallel_build_main
+	},
 		"parallel_vacuum_main", parallel_vacuum_main
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 352e43d0e61..5bdd025bb25 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2980,7 +2980,8 @@ index_build(Relation heapRelation,
 	 * Note that planner considers parallel safety for us.
 	if (parallel && IsNormalProcessingMode() &&
-		indexRelation->rd_rel->relam == BTREE_AM_OID)
+		(indexRelation->rd_rel->relam == BTREE_AM_OID ||
+		 indexRelation->rd_rel->relam == BRIN_AM_OID))
 		indexInfo->ii_ParallelWorkers =
diff --git a/src/include/access/brin.h b/src/include/access/brin.h
index ed66f1b3d51..3451ecb211f 100644
--- a/src/include/access/brin.h
+++ b/src/include/access/brin.h
@@ -11,6 +11,7 @@
 #define BRIN_H
 #include "nodes/execnodes.h"
+#include "storage/shm_toc.h"
 #include "utils/relcache.h"
@@ -52,4 +53,6 @@ typedef struct BrinStatsData
 extern void brinGetStats(Relation index, BrinStatsData *stats);
+extern void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc);
 #endif							/* BRIN_H */

From 5e19887efa57f77b17df45a65a101b84721f665c Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Sat, 8 Apr 2023 16:39:51 +0200
Subject: [PATCH 2/2] switch CREATE INDEX for BRIN to parallel scan

 src/backend/access/brin/brin.c      | 110 +++++++++++-----------------
 src/backend/access/nbtree/nbtsort.c |   2 +-
 src/backend/access/table/tableam.c  |  26 +++++--
 src/backend/executor/nodeSeqscan.c  |   3 +-
 src/include/access/relscan.h        |   1 +
 src/include/access/tableam.h        |   9 ++-
 6 files changed, 73 insertions(+), 78 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 13d94931efc..796c0ecf06e 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -85,12 +85,8 @@ typedef struct BrinShared
 	slock_t		mutex;
-	/*
-	 * 
-	 */
+	/* XXX Probably not needed. Identifies the worker. */
 	int			last_worker_id;
-	BlockNumber	next_range;
-	BlockNumber	last_range;
 	 * Mutable state that is maintained by workers, and reported back to
@@ -105,8 +101,23 @@ typedef struct BrinShared
 	int			nparticipantsdone;
 	double		reltuples;
 	double		indtuples;
+	/*
+	 * ParallelTableScanDescData data follows. Can't directly embed here, as
+	 * implementations of the parallel table scan desc interface might need
+	 * stronger alignment.
+	 */
 } BrinShared;
+ * Return pointer to a BrinShared's parallel table scan.
+ *
+ * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just
+ */
+#define ParallelTableScanFromBrinShared(shared) \
+	(ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(BrinShared)))
  * Status for leader in parallel index build.
@@ -975,8 +986,20 @@ brinbuildCallbackParallel(Relation index,
 		/* create the index tuple and write it into the temporary file */
-		/* set state to correspond to the next range */
-		state->bs_currRangeStart += state->bs_pagesPerRange;
+		/*
+		 * set state to correspond to the next range
+		 *
+		 * XXX This has the issue that it skips ranges summarized by other
+		 * workers, but it also skips empty ranges that should have been
+		 * summarized. We'd need to either make the workers aware which
+		 * chunk they are actually processing (which is currently known
+		 * only in the ParallelBlockTableScan bit). Or we could ignore it
+		 * here, and then decide it while "merging" results from workers
+		 * (if there's no entry for the range, it had to be empty so we
+		 * just add an empty one).
+		 */
+		while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
+			state->bs_currRangeStart += state->bs_pagesPerRange;
 		/* re-initialize state for it */
 		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
@@ -2243,8 +2266,10 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 	/* Track work assigned to workers etc. */
 	brinshared->last_worker_id = 0;
-	brinshared->next_range = 0;
-	brinshared->last_range = RelationGetNumberOfBlocks(heap);
+	table_parallelscan_initialize(heap,
+								  ParallelTableScanFromBrinShared(brinshared),
+								  snapshot, brinshared->pagesPerRange);
 	 * Store shared tuplesort-private state, for which we reserved space.
@@ -2452,6 +2477,7 @@ _brin_parallel_scan_and_build(BrinBuildState *state, BrinShared *brinshared,
 							  Relation heap, Relation index, int sortmem,
 							  bool progress)
+	TableScanDesc	scan;
 	double		reltuples;
 	IndexInfo  *indexInfo;
 	char		name[MAXPGPATH];
@@ -2468,68 +2494,16 @@ _brin_parallel_scan_and_build(BrinBuildState *state, BrinShared *brinshared,
 	state->bs_file = BufFileCreateFileSet(&brinshared->fileset.fs, name);
-	/* Get chunks of the table, do TID Scans and build the ranges */
-	while (true)
-	{
-		TableScanDesc	scan;
-		BlockNumber		startBlock,
-						lastBlock,
-						chunkBlocks;
-		ItemPointerData	mintid,
-						maxtid;
+	scan = table_beginscan_parallel(heap,
+									ParallelTableScanFromBrinShared(brinshared));
-		/*
-		 * Acquire larger chunks of data - this matters especially for low
-		 * pages_per_range settings (e.g. set to 1). Otherwise there would
-		 * be a lot of trashing and overhead with multiple workers.
-		 *
-		 * Not sure where's the sweet spot. Maybe tie this to the prefetching
-		 * too (maintenance_effective_io_concucrrency)?
-		 *
-		 * FIXME The chunkBlocks needs to be a multiple of bs_pagesPerRange.
-		 */
-		chunkBlocks = Max(128, state->bs_pagesPerRange);
-		SpinLockAcquire(&brinshared->mutex);
-		startBlock = brinshared->next_range;
-		lastBlock = brinshared->last_range;
-		brinshared->next_range += chunkBlocks;
-		SpinLockRelease(&brinshared->mutex);
-		state->bs_currRangeStart = startBlock;
-		/* did we reach the end of the heap relation? */
-		if (startBlock > lastBlock)
-			break;
-		/* re-initialize state for it */
-		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
+	reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
+									   brinbuildCallbackParallel, state, scan);
-		ItemPointerSet(&mintid, startBlock, 0);
-		ItemPointerSet(&maxtid, startBlock + (chunkBlocks - 1),
-					   MaxHeapTuplesPerPage);
+	/* spill the last tuple */
+	form_and_spill_tuple(state);
-		/* start tidscan to read the relevant part of the table */
-		scan = table_beginscan_tidrange(heap, SnapshotAny,	// FIXME which snapshot to use?
-										&mintid, &maxtid);
-		/* do prefetching (this prefetches the whole range. not sure that's good) */
-		for (BlockNumber blkno = startBlock; blkno < startBlock + chunkBlocks; blkno++)
-			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, blkno);
-		reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
-										   brinbuildCallbackParallel, state, scan);
-		/* spill the last tuple */
-		form_and_spill_tuple(state);
-		state->bs_reltuples += reltuples;
-		/* set state to invalid range */
-		state->bs_currRangeStart = InvalidBlockNumber;
-	}
+	state->bs_reltuples += reltuples;
 	 * Done.  Record ambuild statistics.
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 6ad3f3c54d5..69ec9d5fe9c 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -1575,7 +1575,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	btshared->brokenhotchain = false;
-								  snapshot);
+								  snapshot, InvalidBlockNumber);
 	 * Store shared tuplesort-private state, for which we reserved space.
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 771438c8cec..834cc9cd7c6 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -153,9 +153,9 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot)
 table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan,
-							  Snapshot snapshot)
+							  Snapshot snapshot, BlockNumber chunk_factor)
-	Size		snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan);
+	Size		snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan, chunk_factor);
 	pscan->phs_snapshot_off = snapshot_off;
@@ -395,16 +395,21 @@ table_block_parallelscan_estimate(Relation rel)
-table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
+table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, BlockNumber chunk_factor)
 	ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
 	bpscan->base.phs_relid = RelationGetRelid(rel);
 	bpscan->phs_nblocks = RelationGetNumberOfBlocks(rel);
-	/* compare phs_syncscan initialization to similar logic in initscan */
+	bpscan->phs_chunk_factor = chunk_factor;
+	/* compare phs_syncscan initialization to similar logic in initscan
+	 *
+	 * Disable sync scans if the chunk factor is set (valid block number).
+	 */
 	bpscan->base.phs_syncscan = synchronize_seqscans &&
 		!RelationUsesLocalBuffers(rel) &&
-		bpscan->phs_nblocks > NBuffers / 4;
+		(bpscan->phs_nblocks > NBuffers / 4) &&
+		!BlockNumberIsValid(bpscan->phs_chunk_factor);
 	bpscan->phs_startblock = InvalidBlockNumber;
 	pg_atomic_init_u64(&bpscan->phs_nallocated, 0);
@@ -459,6 +464,17 @@ table_block_parallelscan_startblock_init(Relation rel,
 	pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size,
+	/*
+	 * If the chunk size factor is set, we need to make sure the chunk size is
+	 * a multiple of that value.
+	 */
+	if (pbscan->phs_chunk_factor != InvalidBlockNumber)
+	{
+		int		nchunks = (pbscanwork->phsw_chunk_size / pbscan->phs_chunk_factor);
+		pbscanwork->phsw_chunk_size = Max(1, nchunks) * pbscan->phs_chunk_factor;
+	}
 	/* Grab the spinlock. */
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 4da0f28f7ba..e017f05f780 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -274,7 +274,8 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
 	pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
-								  estate->es_snapshot);
+								  estate->es_snapshot,
+								  InvalidBlockNumber);
 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index d03360eac04..72a20d882f5 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -79,6 +79,7 @@ typedef struct ParallelBlockTableScanDescData
 	BlockNumber phs_nblocks;	/* # blocks in relation at start of scan */
 	slock_t		phs_mutex;		/* mutual exclusion for setting startblock */
 	BlockNumber phs_startblock; /* starting block number */
+	BlockNumber phs_chunk_factor; /* chunks need to be a multiple of this */
 	pg_atomic_uint64 phs_nallocated;	/* number of blocks allocated to
 										 * workers so far. */
 }			ParallelBlockTableScanDescData;
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 230bc39cc0e..82297546e27 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -390,7 +390,8 @@ typedef struct TableAmRoutine
 	 * relation.
 	Size		(*parallelscan_initialize) (Relation rel,
-											ParallelTableScanDesc pscan);
+											ParallelTableScanDesc pscan,
+											BlockNumber chunk_factor);
 	 * Reinitialize `pscan` for a new scan. `rel` will be the same relation as
@@ -1148,7 +1149,8 @@ extern Size table_parallelscan_estimate(Relation rel, Snapshot snapshot);
 extern void table_parallelscan_initialize(Relation rel,
 										  ParallelTableScanDesc pscan,
-										  Snapshot snapshot);
+										  Snapshot snapshot,
+										  BlockNumber chunk_factor);
  * Begin a parallel scan. `pscan` needs to have been initialized with
@@ -2064,7 +2066,8 @@ extern void simple_table_tuple_update(Relation rel, ItemPointer otid,
 extern Size table_block_parallelscan_estimate(Relation rel);
 extern Size table_block_parallelscan_initialize(Relation rel,
-												ParallelTableScanDesc pscan);
+												ParallelTableScanDesc pscan,
+												BlockNumber chunk_factor);
 extern void table_block_parallelscan_reinitialize(Relation rel,
 												  ParallelTableScanDesc pscan);
 extern BlockNumber table_block_parallelscan_nextpage(Relation rel,

Reply via email to