On 11/20/23 20:48, Matthias van de Meent wrote:
> On Wed, 8 Nov 2023 at 12:03, Tomas Vondra <tomas.von...@enterprisedb.com> 
> wrote:
>>
>> Hi,
>>
>> here's an updated patch, addressing the review comments, and reworking
>> how the work is divided between the workers & leader etc.
>>
>> 0001 is just v2, rebased to current master
>>
>> 0002 and 0003 address most of the issues, in particular it
>>
>>   - removes the unnecessary spool
>>   - fixes bs_reltuples type to double
>>   - a couple comments are reworded to be clearer
>>   - changes loop/condition in brinbuildCallbackParallel
>>   - removes asserts added for debugging
>>   - fixes cast in comparetup_index_brin
>>   - 0003 then simplifies comparetup_index_brin
>>   - I haven't inlined the tuplesort_puttuple_common parameter
>>     (didn't seem worth it)
> 
> OK, thanks
> 
>> 0004 Reworks how the work is divided between workers and combined by the
>> leader. It undoes the tableam.c changes that attempted to divide the
>> relation into chunks matching the BRIN ranges, and instead merges the
>> results in the leader (using the BRIN "union" function).
> 
> That's OK.
> 
>> I haven't done any indentation fixes yet.
>>
>> I did fairly extensive testing, using pageinspect to compare indexes
>> built with/without parallelism. More testing is needed, but it seems to
>> work fine (with other opclasses and so on).
> 
> After code-only review, here are some comments:
> 
>> +++ b/src/backend/access/brin/brin.c
>> [...]
>> +/* Magic numbers for parallel state sharing */
>> +#define PARALLEL_KEY_BRIN_SHARED        UINT64CONST(0xA000000000000001)
>> +#define PARALLEL_KEY_TUPLESORT            UINT64CONST(0xA000000000000002)
> 
> These shm keys use the same constants also in use in
> access/nbtree/nbtsort.c. While this shouldn't be an issue in normal
> operations, I'd prefer if we didn't actively introduce conflicting
> identifiers when we still have significant amounts of unused values
> remaining.
> 

Hmmm. Is there some rule of thumb how to pick these key values? I see
nbtsort.c uses 0xA prefix, execParallel.c uses 0xE, while parallel.c
ended up using 0xFFFFFFFFFFFF as prefix. I've user 0xB, simply because
BRIN also starts with B.

>> +#define PARALLEL_KEY_QUERY_TEXT            UINT64CONST(0xA000000000000003)
> 
> This is the fourth definition of a PARALLEL%_KEY_QUERY_TEXT, the
> others being in access/nbtree/nbtsort.c (value 0xA000000000000004, one
> more than brin's), backend/executor/execParallel.c
> (0xE000000000000008), and PARALLEL_VACUUM_KEY_QUERY_TEXT (0x3) (though
> I've not checked that their uses are exactly the same, I'd expect at
> least btree to match mostly, if not fully, 1:1).
> I think we could probably benefit from a less ad-hoc sharing of query
> texts. I don't think that needs to happen specifically in this patch,
> but I think it's something to keep in mind in future efforts.
> 

I'm afraid I don't quite get what you mean by "ad hoc sharing of query
texts". Are you saying we shouldn't propagate the query text to the
parallel workers? Why? Or what's the proper solution?

>> +_brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
>> [...]
>> +    BrinSpool  *spool = state->bs_spool;
>> [...]
>> +    if (!state)
>> +        return;
> 
> I think the assignment to spool should be moved to below this
> condition, as _brin_begin_parallel calls this with state=NULL when it
> can't launch parallel workers, which will cause issues here.
> 

Good catch! I wonder if we have tests that might trigger this, say by
setting max_parallel_maintenance_workers > 0 while no workers allowed.

>> +    state->bs_numtuples = brinshared->indtuples;
> 
> My IDE complains about bs_numtuples being an integer. This is a
> pre-existing issue, but still valid: we can hit an overflow on tables
> with pages_per_range=1 and relsize >= 2^31 pages. Extremely unlikely,
> but problematic nonetheless.
> 

True. I think I've been hesitant to make this a double because it seems
a bit weird to do +1 with a double, and at some point (d == d+1). But
this seems safe, we're guaranteed to be far away from that threshold.

>> +     * FIXME This probably needs some memory management fixes - we're 
>> reading
>> +     * tuples from the tuplesort, we're allocating an emty tuple, and so on.
>> +     * Probably better to release this memory.
> 
> This should probably be resolved.
> 

AFAICS that comment is actually inaccurate/stale, sorry about that. The
code actually allocates (and resets) a single memtuple, and also
emptyTuple. So I think that's OK, I've removed the comment.

> I also noticed that this is likely to execute `union_tuples` many
> times when pages_per_range is coprime with the parallel table scan's
> block stride (or when we for other reasons have many tuples returned
> for each range); and this `union_tuples` internally allocates and
> deletes its own memory context for its deserialization of the 'b'
> tuple. I think we should just pass a scratch context instead, so that
> we don't have the overhead of continously creating then deleting the
> same memory context

Perhaps. Looking at the code, isn't it a bit strange how union_tuples
uses the context? It creates the context, calls brin_deform_tuple in
that context, but then the rest of the function (including datumCopy and
similar stuff) happens in the caller's context ...

However, I don't think the number of union_tuples calls is likely to be
very high, especially for large tables. Because we split the table into
2048 chunks, and then cap the chunk size by 8192. For large tables
(where this matters) we're likely close to 8192.

> 
>> +++ b/src/backend/catalog/index.c
>> [...]
>> -        indexRelation->rd_rel->relam == BTREE_AM_OID)
>> +        (indexRelation->rd_rel->relam == BTREE_AM_OID ||
>> +         indexRelation->rd_rel->relam == BRIN_AM_OID))
> 
> I think this needs some more effort. I imagine a new
> IndexAmRoutine->amcanbuildparallel is more appropriate than this
> hard-coded list - external indexes may want to utilize the parallel
> index creation planner facility, too.
> 

Good idea. I added the IndexAmRoutine flag and used it here.

> 
> Some notes:
> As a project PostgreSQL seems to be trying to move away from
> hardcoding heap into everything in favour of the more AM-agnostic
> 'table'. I suggest replacing all mentions of "heap" in the arguments
> with "table", to reduce the work future maintainers need to do to fix
> this. Even when this AM is mostly targetted towards the heap AM, other
> AMs (such as those I've heard of that were developed internally at
> EDB) use the same block-addressing that heap does, and should thus be
> compatible with BRIN. Thus, "heap" is not a useful name here.
> 

I'm not against doing that, but I'd prefer to do that in a separate
patch. There's a bunch of preexisting heap references, so and I don't
want to introduce inconsistency (patch using table, old code heap) nor
do I want to tweak unrelated code.

> There are 2 new mentions of "tuplestore" in the patch, while the
> structure used is tuplesort: one on form_and_spill_tuple, and one on
> brinbuildCallbackParallel. Please update those comments.
> 
> That's it for code review. I'll do some performance comparisons and
> testing soon, too.
> 

Thanks! Attached is a patch squashing the previous version into a single
v3 commit, with fixes for your review in a separate commit.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From dc59165200834ce9a9756055e7dd7f492977e223 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Tue, 7 Nov 2023 17:04:28 +0100
Subject: [PATCH v4 1/2] parallel CREATE INDEX for BRIN v3

---
 src/backend/access/brin/brin.c             | 868 ++++++++++++++++++++-
 src/backend/access/transam/parallel.c      |   4 +
 src/backend/catalog/index.c                |   3 +-
 src/backend/utils/sort/tuplesortvariants.c | 207 +++++
 src/include/access/brin.h                  |   3 +
 src/include/utils/tuplesort.h              |  11 +
 6 files changed, 1091 insertions(+), 5 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index f0eac078e0b..0d3d728c9bf 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -33,6 +33,7 @@
 #include "postmaster/autovacuum.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
+#include "tcop/tcopprot.h"		/* pgrminclude ignore */
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/datum.h"
@@ -40,7 +41,118 @@
 #include "utils/index_selfuncs.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
+#include "utils/tuplesort.h"
 
+/* Magic numbers for parallel state sharing */
+#define PARALLEL_KEY_BRIN_SHARED		UINT64CONST(0xA000000000000001)
+#define PARALLEL_KEY_TUPLESORT			UINT64CONST(0xA000000000000002)
+#define PARALLEL_KEY_QUERY_TEXT			UINT64CONST(0xA000000000000003)
+#define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xA000000000000004)
+#define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xA000000000000005)
+
+/*
+ * Status record for spooling/sorting phase.
+ */
+typedef struct BrinSpool
+{
+	Tuplesortstate *sortstate;	/* state data for tuplesort.c */
+	Relation	heap;
+	Relation	index;
+} BrinSpool;
+
+/*
+ * Status for index builds performed in parallel.  This is allocated in a
+ * dynamic shared memory segment.
+ */
+typedef struct BrinShared
+{
+	/*
+	 * These fields are not modified during the build.  They primarily exist for
+	 * the benefit of worker processes that need to create state corresponding
+	 * to that used by the leader.
+	 */
+	Oid			heaprelid;
+	Oid			indexrelid;
+	bool		isconcurrent;
+	BlockNumber	pagesPerRange;
+	int			scantuplesortstates;
+
+	/*
+	 * workersdonecv is used to monitor the progress of workers.  All parallel
+	 * participants must indicate that they are done before leader can use
+	 * results built by the workers (and before leader can write the data into
+	 * the index).
+	 */
+	ConditionVariable workersdonecv;
+
+	/*
+	 * mutex protects all fields before heapdesc.
+	 *
+	 * These fields contain status information of interest to BRIN index
+	 * builds that must work just the same when an index is built in parallel.
+	 */
+	slock_t		mutex;
+
+	/*
+	 * Mutable state that is maintained by workers, and reported back to
+	 * leader at end of the scans.
+	 *
+	 * nparticipantsdone is number of worker processes finished.
+	 *
+	 * reltuples is the total number of input heap tuples.
+	 *
+	 * indtuples is the total number of tuples that made it into the index.
+	 */
+	int			nparticipantsdone;
+	double		reltuples;
+	double		indtuples;
+
+	/*
+	 * ParallelTableScanDescData data follows. Can't directly embed here, as
+	 * implementations of the parallel table scan desc interface might need
+	 * stronger alignment.
+	 */
+} BrinShared;
+
+/*
+ * Return pointer to a BrinShared's parallel table scan.
+ *
+ * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just
+ * MAXALIGN.
+ */
+#define ParallelTableScanFromBrinShared(shared) \
+	(ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(BrinShared)))
+
+/*
+ * Status for leader in parallel index build.
+ */
+typedef struct BrinLeader
+{
+	/* parallel context itself */
+	ParallelContext *pcxt;
+
+	/*
+	 * nparticipanttuplesorts is the exact number of worker processes
+	 * successfully launched, plus one leader process if it participates as a
+	 * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader
+	 * participating as a worker).
+	 */
+	int			nparticipanttuplesorts;
+
+	/*
+	 * Leader process convenience pointers to shared state (leader avoids TOC
+	 * lookups).
+	 *
+	 * brinshared is the shared state for entire build.  sharedsort is the
+	 * shared, tuplesort-managed state passed to each process tuplesort.
+	 * snapshot is the snapshot used by the scan iff an MVCC snapshot is required.
+	 */
+	BrinShared	   *brinshared;
+	Sharedsort *sharedsort;
+	Snapshot	snapshot;
+	WalUsage   *walusage;
+	BufferUsage *bufferusage;
+} BrinLeader;
 
 /*
  * We use a BrinBuildState during initial construction of a BRIN index.
@@ -50,12 +162,22 @@ typedef struct BrinBuildState
 {
 	Relation	bs_irel;
 	int			bs_numtuples;
+	double		bs_reltuples;
 	Buffer		bs_currentInsertBuf;
 	BlockNumber bs_pagesPerRange;
 	BlockNumber bs_currRangeStart;
 	BrinRevmap *bs_rmAccess;
 	BrinDesc   *bs_bdesc;
 	BrinMemTuple *bs_dtuple;
+
+	/*
+	 * bs_leader is only present when a parallel index build is performed, and
+	 * only in the leader process. (Actually, only the leader process has a
+	 * BrinBuildState.)
+	 */
+	BrinLeader *bs_leader;
+	int			bs_worker_id;
+	BrinSpool  *bs_spool;
 } BrinBuildState;
 
 /*
@@ -76,6 +198,7 @@ static void terminate_brin_buildstate(BrinBuildState *state);
 static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
 						  bool include_partial, double *numSummarized, double *numExisting);
 static void form_and_insert_tuple(BrinBuildState *state);
+static void form_and_spill_tuple(BrinBuildState *state);
 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
 						 BrinTuple *b);
 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
@@ -83,6 +206,20 @@ static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
 								BrinMemTuple *dtup, const Datum *values, const bool *nulls);
 static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
 
+/* parallel index builds */
+static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
+								 bool isconcurrent, int request);
+static void _brin_end_parallel(BrinLeader *btleader, BrinBuildState *state);
+static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
+static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
+											   Relation heap, Relation index);
+static void _brin_parallel_scan_and_build(BrinBuildState *buildstate,
+										  BrinSpool *brinspool,
+										  BrinShared *brinshared,
+										  Sharedsort *sharedsort,
+										  Relation heap, Relation index,
+										  int sortmem, bool progress);
+
 /*
  * BRIN handler function: return IndexAmRoutine with access method parameters
  * and callbacks.
@@ -820,6 +957,63 @@ brinbuildCallback(Relation index,
 							   values, isnull);
 }
 
+/*
+ * A version of the callback, used by parallel index builds. The main difference
+ * is that instead of writing the BRIN tuples into the index, we write them into
+ * a shared tuplestore, and leave the insertion up to the leader (which may
+ * reorder them a bit etc.). The callback also does not generate empty ranges,
+ * those may be added by the leader when merging results from workers.
+ */
+static void
+brinbuildCallbackParallel(Relation index,
+						  ItemPointer tid,
+						  Datum *values,
+						  bool *isnull,
+						  bool tupleIsAlive,
+						  void *brstate)
+{
+	BrinBuildState *state = (BrinBuildState *) brstate;
+	BlockNumber thisblock;
+
+	thisblock = ItemPointerGetBlockNumber(tid);
+
+	/*
+	 * If we're in a block that belongs to a future range, summarize what
+	 * we've got and start afresh.  Note the scan might have skipped many
+	 * pages, if they were devoid of live tuples; we do not create emptry
+	 * BRIN ranges here - the leader is responsible for filling them in.
+	 */
+	if (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
+	{
+
+		BRIN_elog((DEBUG2,
+				   "brinbuildCallback: completed a range: %u--%u",
+				   state->bs_currRangeStart,
+				   state->bs_currRangeStart + state->bs_pagesPerRange));
+
+		/* create the index tuple and write it into the tuplesort */
+		form_and_spill_tuple(state);
+
+		/*
+		 * Set state to correspond to the next range (for this block).
+		 *
+		 * This skips ranges that are either empty (and so we don't get any
+		 * tuples to summarize), or processes by other workers. We can't
+		 * differentiate those cases here easily, so we leave it up to the
+		 * leader to fill empty ranges where needed.
+		 */
+		state->bs_currRangeStart
+			= state->bs_pagesPerRange * (thisblock / state->bs_pagesPerRange);
+
+		/* re-initialize state for it */
+		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
+	}
+
+	/* Accumulate the current tuple into the running state */
+	(void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
+							   values, isnull);
+}
+
 /*
  * brinbuild() -- build a new BRIN index.
  */
@@ -881,18 +1075,89 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	revmap = brinRevmapInitialize(index, &pagesPerRange);
 	state = initialize_brin_buildstate(index, revmap, pagesPerRange);
 
+	state->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
+	state->bs_spool->heap = heap;
+	state->bs_spool->index = index;
+
+	/*
+	 * Attempt to launch parallel worker scan when required
+	 *
+	 * XXX plan_create_index_workers makes the number of workers dependent on
+	 * maintenance_work_mem, requiring 32MB for each worker. That makes sense
+	 * for btree, but not for BRIN, which can do away with much less memory.
+	 * So maybe make that somehow less strict, optionally?
+	 */
+	if (indexInfo->ii_ParallelWorkers > 0)
+		_brin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent,
+							 indexInfo->ii_ParallelWorkers);
+
 	/*
 	 * Now scan the relation.  No syncscan allowed here because we want the
 	 * heap blocks in physical order.
 	 */
-	reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
-									   brinbuildCallback, (void *) state, NULL);
 
-	/* process the final batch */
-	form_and_insert_tuple(state);
+	/*
+	 * If parallel build requested and at least one worker process was
+	 * successfully launched, set up coordination state
+	 */
+	if (state->bs_leader)
+	{
+		SortCoordinate coordinate;
+
+		coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData));
+		coordinate->isWorker = false;
+		coordinate->nParticipants =
+			state->bs_leader->nparticipanttuplesorts;
+		coordinate->sharedsort = state->bs_leader->sharedsort;
+
+
+		/*
+		 * Begin serial/leader tuplesort.
+		 *
+		 * In cases where parallelism is involved, the leader receives the same
+		 * share of maintenance_work_mem as a serial sort (it is generally treated
+		 * in the same way as a serial sort once we return).  Parallel worker
+		 * Tuplesortstates will have received only a fraction of
+		 * maintenance_work_mem, though.
+		 *
+		 * We rely on the lifetime of the Leader Tuplesortstate almost not
+		 * overlapping with any worker Tuplesortstate's lifetime.  There may be
+		 * some small overlap, but that's okay because we rely on leader
+		 * Tuplesortstate only allocating a small, fixed amount of memory here.
+		 * When its tuplesort_performsort() is called (by our caller), and
+		 * significant amounts of memory are likely to be used, all workers must
+		 * have already freed almost all memory held by their Tuplesortstates
+		 * (they are about to go away completely, too).  The overall effect is
+		 * that maintenance_work_mem always represents an absolute high watermark
+		 * on the amount of memory used by a CREATE INDEX operation, regardless of
+		 * the use of parallelism or any other factor.
+		 */
+		state->bs_spool->sortstate =
+			tuplesort_begin_index_brin(heap, index,
+									   maintenance_work_mem, coordinate,
+									   TUPLESORT_NONE);
+
+		/*
+		 * In parallel mode, wait for workers to complete, and then read all
+		 * tuples from the shared tuplesort and insert them into the index.
+		 */
+		_brin_end_parallel(state->bs_leader, state);
+	}
+	else	/* no parallel index build, just do the usual thing */
+	{
+		reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
+										   brinbuildCallback, (void *) state, NULL);
+
+		/* process the final batch */
+		form_and_insert_tuple(state);
+
+		/* track the number of relation tuples */
+		state->bs_reltuples = reltuples;
+	}
 
 	/* release resources */
 	idxtuples = state->bs_numtuples;
+	reltuples = state->bs_reltuples;
 	brinRevmapTerminate(state->bs_rmAccess);
 	terminate_brin_buildstate(state);
 
@@ -1312,12 +1577,16 @@ initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
 
 	state->bs_irel = idxRel;
 	state->bs_numtuples = 0;
+	state->bs_reltuples = 0;
 	state->bs_currentInsertBuf = InvalidBuffer;
 	state->bs_pagesPerRange = pagesPerRange;
 	state->bs_currRangeStart = 0;
 	state->bs_rmAccess = revmap;
 	state->bs_bdesc = brin_build_desc(idxRel);
 	state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
+	state->bs_leader = NULL;
+	state->bs_worker_id = 0;
+	state->bs_spool = NULL;
 
 	return state;
 }
@@ -1609,6 +1878,32 @@ form_and_insert_tuple(BrinBuildState *state)
 	pfree(tup);
 }
 
+/*
+ * Given a deformed tuple in the build state, convert it into the on-disk
+ * format and write it to a (shared) tuplestore (the leader will insert it
+ * into the index later).
+ */
+static void
+form_and_spill_tuple(BrinBuildState *state)
+{
+	BrinTuple  *tup;
+	Size		size;
+
+	/* don't insert empty tuples in parallel build */
+	if (state->bs_dtuple->bt_empty_range)
+		return;
+
+	tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
+						  state->bs_dtuple, &size);
+
+	/* write the BRIN tuple to the tuplesort */
+	tuplesort_putbrintuple(state->bs_spool->sortstate, tup, size);
+
+	state->bs_numtuples++;
+
+	pfree(tup);
+}
+
 /*
  * Given two deformed tuples, adjust the first one so that it's consistent
  * with the summary values in both.
@@ -1928,3 +2223,568 @@ check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
 
 	return true;
 }
+
+static void
+_brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
+					 bool isconcurrent, int request)
+{
+	ParallelContext *pcxt;
+	int			scantuplesortstates;
+	Snapshot	snapshot;
+	Size		estbrinshared;
+	Size		estsort;
+	BrinShared   *brinshared;
+	Sharedsort   *sharedsort;
+	BrinLeader   *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader));
+	WalUsage   *walusage;
+	BufferUsage *bufferusage;
+	bool		leaderparticipates = true;
+	int			querylen;
+
+#ifdef DISABLE_LEADER_PARTICIPATION
+	leaderparticipates = false;
+#endif
+
+	/*
+	 * Enter parallel mode, and create context for parallel build of brin
+	 * index
+	 */
+	EnterParallelMode();
+	Assert(request > 0);
+	pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main",
+								 request);
+
+	scantuplesortstates = leaderparticipates ? request + 1 : request;
+
+	/*
+	 * Prepare for scan of the base relation.  In a normal index build, we use
+	 * SnapshotAny because we must retrieve all tuples and do our own time
+	 * qual checks (because we have to index RECENTLY_DEAD tuples).  In a
+	 * concurrent build, we take a regular MVCC snapshot and index whatever's
+	 * live according to that.
+	 */
+	if (!isconcurrent)
+		snapshot = SnapshotAny;
+	else
+		snapshot = RegisterSnapshot(GetTransactionSnapshot());
+
+	/*
+	 * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
+	 */
+	estbrinshared = _brin_parallel_estimate_shared(heap, snapshot);
+	shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared);
+	estsort = tuplesort_estimate_shared(scantuplesortstates);
+	shm_toc_estimate_chunk(&pcxt->estimator, estsort);
+
+	shm_toc_estimate_keys(&pcxt->estimator, 2);
+
+	/*
+	 * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
+	 * and PARALLEL_KEY_BUFFER_USAGE.
+	 *
+	 * If there are no extensions loaded that care, we could skip this.  We
+	 * have no way of knowing whether anyone's looking at pgWalUsage or
+	 * pgBufferUsage, so do it unconditionally.
+	 */
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
+	if (debug_query_string)
+	{
+		querylen = strlen(debug_query_string);
+		shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+	else
+		querylen = 0;			/* keep compiler quiet */
+
+	/* Everyone's had a chance to ask for space, so now create the DSM */
+	InitializeParallelDSM(pcxt);
+
+	/* If no DSM segment was available, back out (do serial build) */
+	if (pcxt->seg == NULL)
+	{
+		if (IsMVCCSnapshot(snapshot))
+			UnregisterSnapshot(snapshot);
+		DestroyParallelContext(pcxt);
+		ExitParallelMode();
+		return;
+	}
+
+	/* Store shared build state, for which we reserved space */
+	brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared);
+	/* Initialize immutable state */
+	brinshared->heaprelid = RelationGetRelid(heap);
+	brinshared->indexrelid = RelationGetRelid(index);
+	brinshared->isconcurrent = isconcurrent;
+	brinshared->scantuplesortstates = scantuplesortstates;
+	brinshared->pagesPerRange = buildstate->bs_pagesPerRange;
+	ConditionVariableInit(&brinshared->workersdonecv);
+	SpinLockInit(&brinshared->mutex);
+
+	/* Initialize mutable state */
+	brinshared->nparticipantsdone = 0;
+	brinshared->reltuples = 0.0;
+	brinshared->indtuples = 0.0;
+
+	table_parallelscan_initialize(heap,
+								  ParallelTableScanFromBrinShared(brinshared),
+								  snapshot);
+
+	/*
+	 * Store shared tuplesort-private state, for which we reserved space.
+	 * Then, initialize opaque state using tuplesort routine.
+	 */
+	sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
+	tuplesort_initialize_shared(sharedsort, scantuplesortstates,
+								pcxt->seg);
+
+	/*
+	 * Store shared tuplesort-private state, for which we reserved space.
+	 * Then, initialize opaque state using tuplesort routine.
+	 */
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BRIN_SHARED, brinshared);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
+
+	/* Store query string for workers */
+	if (debug_query_string)
+	{
+		char	   *sharedquery;
+
+		sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
+		memcpy(sharedquery, debug_query_string, querylen + 1);
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
+	}
+
+	/*
+	 * Allocate space for each worker's WalUsage and BufferUsage; no need to
+	 * initialize.
+	 */
+	walusage = shm_toc_allocate(pcxt->toc,
+								mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
+	bufferusage = shm_toc_allocate(pcxt->toc,
+								   mul_size(sizeof(BufferUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
+
+	/* Launch workers, saving status for leader/caller */
+	LaunchParallelWorkers(pcxt);
+	brinleader->pcxt = pcxt;
+	brinleader->nparticipanttuplesorts = pcxt->nworkers_launched;
+	if (leaderparticipates)
+		brinleader->nparticipanttuplesorts++;
+	brinleader->brinshared = brinshared;
+	brinleader->sharedsort = sharedsort;
+	brinleader->snapshot = snapshot;
+	brinleader->walusage = walusage;
+	brinleader->bufferusage = bufferusage;
+
+	/* If no workers were successfully launched, back out (do serial build) */
+	if (pcxt->nworkers_launched == 0)
+	{
+		_brin_end_parallel(brinleader, NULL);
+		return;
+	}
+
+	/* Save leader state now that it's clear build will be parallel */
+	buildstate->bs_leader = brinleader;
+
+	/* Join heap scan ourselves */
+	if (leaderparticipates)
+		_brin_leader_participate_as_worker(buildstate, heap, index);
+
+	/*
+	 * Caller needs to wait for all launched workers when we return.  Make
+	 * sure that the failure-to-start case will not hang forever.
+	 */
+	WaitForParallelWorkersToAttach(pcxt);
+}
+
+/*
+ * Shut down workers, destroy parallel context, and end parallel mode.
+ */
+static void
+_brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
+{
+	int			i;
+	BrinTuple  *btup;
+	BrinMemTuple *memtuple = NULL;
+	Size		tuplen;
+	BrinShared *brinshared = brinleader->brinshared;
+	BlockNumber	prevblkno = InvalidBlockNumber;
+	BrinTuple  *emptyTuple = NULL;
+	Size		emptySize;
+	BrinSpool  *spool = state->bs_spool;
+
+	/* Shutdown worker processes */
+	WaitForParallelWorkersToFinish(brinleader->pcxt);
+
+	if (!state)
+		return;
+
+	/* copy the data into leader state (we have to wait for the workers ) */
+	state->bs_reltuples = brinshared->reltuples;
+	state->bs_numtuples = brinshared->indtuples;
+
+	tuplesort_performsort(spool->sortstate);
+
+	/*
+	 * Initialize BrinMemTuple we'll use to union summaries from workers
+	 * (in case they happened to produce parts of the same paga range).
+	 */
+	memtuple = brin_new_memtuple(state->bs_bdesc);
+
+	/*
+	 * Read the BRIN tuples from the shared tuplesort, sorted by block number.
+	 * That probably gives us an index that is cheaper to scan, thanks to mostly
+	 * getting data from the same index page as before.
+	 *
+	 * FIXME This probably needs some memory management fixes - we're reading
+	 * tuples from the tuplesort, we're allocating an empty tuple, and so on.
+	 * Probably better to release this memory.
+	 *
+	 * XXX We can't quite free the BrinTuple, though, because that's a field
+	 * in BrinSortTuple.
+	 */
+	while ((btup = tuplesort_getbrintuple(spool->sortstate, &tuplen, true)) != NULL)
+	{
+		/* Ranges should be multiples of pages_per_range for the index. */
+		Assert(btup->bt_blkno % brinshared->pagesPerRange == 0);
+
+		/*
+		 * Do we need to union summaries for the same page range?
+		 *
+		 * If this is the first brin tuple we read, then just deform it into
+		 * the memtuple, and continue with the next one from tuplesort. We
+		 * however may need to insert empty summaries into the index.
+		 *
+		 * If it's the same block as the last we saw, we simply union the
+		 * brin tuple into it, and we're done - we don't even need to insert
+		 * empty ranges, because that was done earlier when we saw the first
+		 * brin tuple (for this range).
+		 *
+		 * Finally, if it's not the first brin tuple, and it's not the same
+		 * page range, we need to do the insert and then deform the tuple
+		 * into the memtuple. Then we'll insert empty ranges before the
+		 * new brin tuple, if needed.
+		 */
+		if (prevblkno == InvalidBlockNumber)
+		{
+			/* First brin tuples, just deform into memtuple. */
+			memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
+
+			/* continue to insert empty pages before thisblock */
+		}
+		else if (memtuple->bt_blkno == btup->bt_blkno)
+		{
+			/*
+			 * Not the first brin tuple, but same page range as the previous
+			 * one, so we can merge it into the memtuple.
+			 */
+			union_tuples(state->bs_bdesc, memtuple, btup);
+			continue;
+		}
+		else
+		{
+			BrinTuple  *tmp;
+			Size		len;
+
+			/*
+			 * We got brin tuple for a different page range, so form a brin
+			 * tuple from the memtuple, insert it, and re-init the memtuple
+			 * from the new brin tuple.
+			 */
+			tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
+								  memtuple, &len);
+
+			brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+						  &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
+
+			/* free the formed on-disk tuple */
+			pfree(tmp);
+
+			memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
+
+			/* continue to insert empty pages before thisblock */
+		}
+
+		/* Fill empty ranges for all ranges missing in the tuplesort. */
+		prevblkno = (prevblkno == InvalidBlockNumber) ? 0 : prevblkno;
+		while (prevblkno + state->bs_pagesPerRange < btup->bt_blkno)
+		{
+			/* the missing range */
+			prevblkno += state->bs_pagesPerRange;
+
+			/* Did we already build the empty range? If not, do it now. */
+			if (emptyTuple == NULL)
+			{
+				BrinMemTuple *dtuple = brin_new_memtuple(state->bs_bdesc);
+
+				emptyTuple = brin_form_tuple(state->bs_bdesc, prevblkno, dtuple, &emptySize);
+			}
+			else
+			{
+				/* we already have am "empty range" tuple, just set the block */
+				emptyTuple->bt_blkno = prevblkno;
+			}
+
+			brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+						  &state->bs_currentInsertBuf,
+						  emptyTuple->bt_blkno, emptyTuple, emptySize);
+		}
+
+		prevblkno = btup->bt_blkno;
+	}
+
+	tuplesort_end(spool->sortstate);
+
+	/* Fill empty ranges for all ranges missing in the tuplesort. */
+	prevblkno = (prevblkno == InvalidBlockNumber) ? 0 : prevblkno;
+	while (prevblkno + state->bs_pagesPerRange < memtuple->bt_blkno)
+	{
+		/* the missing range */
+		prevblkno += state->bs_pagesPerRange;
+
+		/* Did we already build the empty range? If not, do it now. */
+		if (emptyTuple == NULL)
+		{
+			BrinMemTuple *dtuple = brin_new_memtuple(state->bs_bdesc);
+
+			emptyTuple = brin_form_tuple(state->bs_bdesc, prevblkno, dtuple, &emptySize);
+		}
+		else
+		{
+			/* we already have am "empty range" tuple, just set the block */
+			emptyTuple->bt_blkno = prevblkno;
+		}
+
+		brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+					  &state->bs_currentInsertBuf,
+					  emptyTuple->bt_blkno, emptyTuple, emptySize);
+	}
+
+	/**/
+	if (prevblkno != InvalidBlockNumber)
+	{
+		BrinTuple  *tmp;
+		Size		len;
+
+		tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
+							  memtuple, &len);
+
+		brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+					  &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
+
+		pfree(tmp);
+	}
+
+	/*
+	 * Next, accumulate WAL usage.  (This must wait for the workers to finish,
+	 * or we might get incomplete data.)
+	 */
+	for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
+		InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
+
+	/* Free last reference to MVCC snapshot, if one was used */
+	if (IsMVCCSnapshot(brinleader->snapshot))
+		UnregisterSnapshot(brinleader->snapshot);
+	DestroyParallelContext(brinleader->pcxt);
+	ExitParallelMode();
+}
+
+/*
+ * Returns size of shared memory required to store state for a parallel
+ * brin index build based on the snapshot its parallel scan will use.
+ */
+static Size
+_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
+{
+	/* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
+	return add_size(BUFFERALIGN(sizeof(BrinShared)),
+					table_parallelscan_estimate(heap, snapshot));
+}
+
+/*
+ * Within leader, participate as a parallel worker.
+ */
+static void
+_brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index)
+{
+	BrinLeader *brinleader = buildstate->bs_leader;
+	int			sortmem;
+
+	/* Allocate memory and initialize private spool */
+	buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
+	buildstate->bs_spool->heap = buildstate->bs_spool->heap;
+	buildstate->bs_spool->index = buildstate->bs_spool->index;
+
+	/*
+	 * Might as well use reliable figure when doling out maintenance_work_mem
+	 * (when requested number of workers were not launched, this will be
+	 * somewhat higher than it is for other workers).
+	 */
+	sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts;
+
+	/* Perform work common to all participants */
+	_brin_parallel_scan_and_build(buildstate, buildstate->bs_spool, brinleader->brinshared,
+								  brinleader->sharedsort, heap, index, sortmem, true);
+}
+
+/*
+ * Perform a worker's portion of a parallel sort.
+ *
+ * This generates a tuplesort for passed btspool, and a second tuplesort
+ * state if a second btspool is need (i.e. for unique index builds).  All
+ * other spool fields should already be set when this is called.
+ *
+ * sortmem is the amount of working memory to use within each worker,
+ * expressed in KBs.
+ *
+ * When this returns, workers are done, and need only release resources.
+ */
+static void
+_brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool,
+							  BrinShared *brinshared, Sharedsort *sharedsort,
+							  Relation heap, Relation index, int sortmem,
+							  bool progress)
+{
+	SortCoordinate coordinate;
+	TableScanDesc	scan;
+	double		reltuples;
+	IndexInfo  *indexInfo;
+
+	/* Initialize local tuplesort coordination state */
+	coordinate = palloc0(sizeof(SortCoordinateData));
+	coordinate->isWorker = true;
+	coordinate->nParticipants = -1;
+	coordinate->sharedsort = sharedsort;
+
+	/* Begin "partial" tuplesort */
+	brinspool->sortstate = tuplesort_begin_index_brin(brinspool->heap,
+													  brinspool->index,
+													  sortmem, coordinate,
+													  TUPLESORT_NONE);
+
+	/* Join parallel scan */
+	indexInfo = BuildIndexInfo(index);
+	indexInfo->ii_Concurrent = brinshared->isconcurrent;
+
+	scan = table_beginscan_parallel(heap,
+									ParallelTableScanFromBrinShared(brinshared));
+
+	reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
+									   brinbuildCallbackParallel, state, scan);
+
+	/* insert the last item */
+	form_and_spill_tuple(state);
+
+	/* sort the BRIN ranges built by this worker */
+	tuplesort_performsort(brinspool->sortstate);
+
+	state->bs_reltuples += reltuples;
+
+	/*
+	 * Done.  Record ambuild statistics.
+	 */
+	SpinLockAcquire(&brinshared->mutex);
+	brinshared->nparticipantsdone++;
+	brinshared->reltuples += state->bs_reltuples;
+	brinshared->indtuples += state->bs_numtuples;
+	SpinLockRelease(&brinshared->mutex);
+
+	/* Notify leader */
+	ConditionVariableSignal(&brinshared->workersdonecv);
+
+	tuplesort_end(brinspool->sortstate);
+}
+
+/*
+ * Perform work within a launched parallel process.
+ */
+void
+_brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
+{
+	char	   *sharedquery;
+	BrinShared *brinshared;
+	Sharedsort *sharedsort;
+	BrinBuildState *buildstate;
+	Relation	heapRel;
+	Relation	indexRel;
+	LOCKMODE	heapLockmode;
+	LOCKMODE	indexLockmode;
+	WalUsage   *walusage;
+	BufferUsage *bufferusage;
+	int			sortmem;
+
+	/*
+	 * The only possible status flag that can be set to the parallel worker is
+	 * PROC_IN_SAFE_IC.
+	 */
+	Assert((MyProc->statusFlags == 0) ||
+		   (MyProc->statusFlags == PROC_IN_SAFE_IC));
+
+	/* Set debug_query_string for individual workers first */
+	sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
+	debug_query_string = sharedquery;
+
+	/* Report the query string from leader */
+	pgstat_report_activity(STATE_RUNNING, debug_query_string);
+
+	/* Look up brin shared state */
+	brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false);
+
+	/* Open relations using lock modes known to be obtained by index.c */
+	if (!brinshared->isconcurrent)
+	{
+		heapLockmode = ShareLock;
+		indexLockmode = AccessExclusiveLock;
+	}
+	else
+	{
+		heapLockmode = ShareUpdateExclusiveLock;
+		indexLockmode = RowExclusiveLock;
+	}
+
+	/* Open relations within worker */
+	heapRel = table_open(brinshared->heaprelid, heapLockmode);
+	indexRel = index_open(brinshared->indexrelid, indexLockmode);
+
+	buildstate = initialize_brin_buildstate(indexRel, NULL, brinshared->pagesPerRange);
+
+	/* Initialize worker's own spool */
+	buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
+	buildstate->bs_spool->heap = heapRel;
+	buildstate->bs_spool->index = indexRel;
+
+	/* Look up shared state private to tuplesort.c */
+	sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
+	tuplesort_attach_shared(sharedsort, seg);
+
+	/* Prepare to track buffer usage during parallel execution */
+	InstrStartParallelQuery();
+
+	/*
+	 * Might as well use reliable figure when doling out maintenance_work_mem
+	 * (when requested number of workers were not launched, this will be
+	 * somewhat higher than it is for other workers).
+	 */
+	sortmem = maintenance_work_mem / brinshared->scantuplesortstates;
+
+	_brin_parallel_scan_and_build(buildstate, buildstate->bs_spool,
+								  brinshared, sharedsort,
+								  heapRel, indexRel, sortmem, false);
+
+	/* Report WAL/buffer usage during parallel execution */
+	bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
+	walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+	InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
+						  &walusage[ParallelWorkerNumber]);
+
+	index_close(indexRel, indexLockmode);
+	table_close(heapRel, heapLockmode);
+}
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 194a1207be6..d78314062e0 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -14,6 +14,7 @@
 
 #include "postgres.h"
 
+#include "access/brin.h"
 #include "access/nbtree.h"
 #include "access/parallel.h"
 #include "access/session.h"
@@ -145,6 +146,9 @@ static const struct
 	{
 		"_bt_parallel_build_main", _bt_parallel_build_main
 	},
+	{
+		"_brin_parallel_build_main", _brin_parallel_build_main
+	},
 	{
 		"parallel_vacuum_main", parallel_vacuum_main
 	}
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 143fae01ebd..37e4305d50a 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2982,7 +2982,8 @@ index_build(Relation heapRelation,
 	 * Note that planner considers parallel safety for us.
 	 */
 	if (parallel && IsNormalProcessingMode() &&
-		indexRelation->rd_rel->relam == BTREE_AM_OID)
+		(indexRelation->rd_rel->relam == BTREE_AM_OID ||
+		 indexRelation->rd_rel->relam == BRIN_AM_OID))
 		indexInfo->ii_ParallelWorkers =
 			plan_create_index_workers(RelationGetRelid(heapRelation),
 									  RelationGetRelid(indexRelation));
diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c
index 2cd508e5130..9b3a70e6ccf 100644
--- a/src/backend/utils/sort/tuplesortvariants.c
+++ b/src/backend/utils/sort/tuplesortvariants.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/brin_tuple.h"
 #include "access/hash.h"
 #include "access/htup_details.h"
 #include "access/nbtree.h"
@@ -43,6 +44,8 @@ static void removeabbrev_cluster(Tuplesortstate *state, SortTuple *stups,
 								 int count);
 static void removeabbrev_index(Tuplesortstate *state, SortTuple *stups,
 							   int count);
+static void removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups,
+									int count);
 static void removeabbrev_datum(Tuplesortstate *state, SortTuple *stups,
 							   int count);
 static int	comparetup_heap(const SortTuple *a, const SortTuple *b,
@@ -69,10 +72,16 @@ static int	comparetup_index_hash(const SortTuple *a, const SortTuple *b,
 								  Tuplesortstate *state);
 static int	comparetup_index_hash_tiebreak(const SortTuple *a, const SortTuple *b,
 										   Tuplesortstate *state);
+static int	comparetup_index_brin(const SortTuple *a, const SortTuple *b,
+								  Tuplesortstate *state);
 static void writetup_index(Tuplesortstate *state, LogicalTape *tape,
 						   SortTuple *stup);
 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
 						  LogicalTape *tape, unsigned int len);
+static void writetup_index_brin(Tuplesortstate *state, LogicalTape *tape,
+								SortTuple *stup);
+static void readtup_index_brin(Tuplesortstate *state, SortTuple *stup,
+							   LogicalTape *tape, unsigned int len);
 static int	comparetup_datum(const SortTuple *a, const SortTuple *b,
 							 Tuplesortstate *state);
 static int	comparetup_datum_tiebreak(const SortTuple *a, const SortTuple *b,
@@ -128,6 +137,16 @@ typedef struct
 	uint32		max_buckets;
 } TuplesortIndexHashArg;
 
+/*
+ * Data struture pointed by "TuplesortPublic.arg" for the index_brin subcase.
+ */
+typedef struct
+{
+	TuplesortIndexArg index;
+
+	/* XXX do we need something here? */
+} TuplesortIndexBrinArg;
+
 /*
  * Data struture pointed by "TuplesortPublic.arg" for the Datum case.
  * Set by tuplesort_begin_datum and used only by the DatumTuple routines.
@@ -140,6 +159,21 @@ typedef struct
 	int			datumTypeLen;
 } TuplesortDatumArg;
 
+/*
+ * Computing BrinTuple size with only the tuple is difficult, so we want to track
+ * the length referenced by the SortTuple. That's what BrinSortTuple is meant
+ * to do - it's essentially a BrinTuple prefixed by its length.
+ */
+typedef struct BrinSortTuple
+{
+	Size		tuplen;
+	BrinTuple	tuple;
+} BrinSortTuple;
+
+/* Size of the BrinSortTuple, given length of the BrinTuple. */
+#define BRINSORTTUPLE_SIZE(len)		(offsetof(BrinSortTuple, tuple) + (len))
+
+
 Tuplesortstate *
 tuplesort_begin_heap(TupleDesc tupDesc,
 					 int nkeys, AttrNumber *attNums,
@@ -527,6 +561,47 @@ tuplesort_begin_index_gist(Relation heapRel,
 	return state;
 }
 
+Tuplesortstate *
+tuplesort_begin_index_brin(Relation heapRel,
+						   Relation indexRel,
+						   int workMem,
+						   SortCoordinate coordinate,
+						   int sortopt)
+{
+	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+												   sortopt);
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	MemoryContext oldcontext;
+	TuplesortIndexBrinArg *arg;
+
+	oldcontext = MemoryContextSwitchTo(base->maincontext);
+	arg = (TuplesortIndexBrinArg *) palloc(sizeof(TuplesortIndexBrinArg));
+
+#ifdef TRACE_SORT
+	if (trace_sort)
+		elog(LOG,
+			 "begin index sort: workMem = %d, randomAccess = %c",
+			 workMem,
+			 sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f');
+#endif
+
+	base->nKeys = 1;			/* Only one sort column, the block number */
+
+	base->removeabbrev = removeabbrev_index_brin;
+	base->comparetup = comparetup_index_brin;
+	base->writetup = writetup_index_brin;
+	base->readtup = readtup_index_brin;
+	base->haveDatum1 = true;
+	base->arg = arg;
+
+	arg->index.heapRel = heapRel;
+	arg->index.indexRel = indexRel;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	return state;
+}
+
 Tuplesortstate *
 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
 					  bool nullsFirstFlag, int workMem,
@@ -707,6 +782,35 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
 							  !stup.isnull1);
 }
 
+/*
+ * Collect one BRIN tuple while collecting input data for sort.
+ */
+void
+tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tuple, Size size)
+{
+	SortTuple	stup;
+	BrinSortTuple *bstup;
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	MemoryContext oldcontext = MemoryContextSwitchTo(base->tuplecontext);
+
+	/* allocate space for the whole BRIN sort tuple */
+	bstup = palloc(BRINSORTTUPLE_SIZE(size));
+
+	bstup->tuplen = size;
+	memcpy(&bstup->tuple, tuple, size);
+
+	stup.tuple = bstup;
+	stup.datum1 = tuple->bt_blkno;
+	stup.isnull1 = false;
+
+	tuplesort_puttuple_common(state, &stup,
+							  base->sortKeys &&
+							  base->sortKeys->abbrev_converter &&
+							  !stup.isnull1);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
 /*
  * Accept one Datum while collecting input data for sort.
  *
@@ -850,6 +954,35 @@ tuplesort_getindextuple(Tuplesortstate *state, bool forward)
 	return (IndexTuple) stup.tuple;
 }
 
+/*
+ * Fetch the next BRIN tuple in either forward or back direction.
+ * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
+ * context, and must not be freed by caller.  Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
+ */
+BrinTuple *
+tuplesort_getbrintuple(Tuplesortstate *state, Size *len, bool forward)
+{
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	MemoryContext oldcontext = MemoryContextSwitchTo(base->sortcontext);
+	SortTuple		stup;
+	BrinSortTuple  *btup;
+
+	if (!tuplesort_gettuple_common(state, forward, &stup))
+		stup.tuple = NULL;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	if (!stup.tuple)
+		return NULL;
+
+	btup = (BrinSortTuple *) stup.tuple;
+
+	*len = btup->tuplen;
+
+	return &btup->tuple;
+}
+
 /*
  * Fetch the next Datum in either forward or back direction.
  * Returns false if no more datums.
@@ -1564,6 +1697,80 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
 								 &stup->isnull1);
 }
 
+/*
+ * Routines specialized for BrinTuple case
+ */
+
+static void
+removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups, int count)
+{
+	int			i;
+
+	for (i = 0; i < count; i++)
+	{
+		BrinSortTuple   *tuple;
+
+		tuple = stups[i].tuple;
+		stups[i].datum1 = tuple->tuple.bt_blkno;
+	}
+}
+
+static int
+comparetup_index_brin(const SortTuple *a, const SortTuple *b,
+					  Tuplesortstate *state)
+{
+	Assert(TuplesortstateGetPublic(state)->haveDatum1);
+
+	if (DatumGetUInt32(a->datum1) > DatumGetUInt32(b->datum1))
+		return 1;
+
+	if (DatumGetUInt32(a->datum1) < DatumGetUInt32(b->datum1))
+		return -1;
+
+	/* silence compilers */
+	return 0;
+}
+
+static void
+writetup_index_brin(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
+{
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	BrinSortTuple  *tuple = (BrinSortTuple *) stup->tuple;
+	unsigned int	tuplen = tuple->tuplen;
+
+	tuplen = tuplen + sizeof(tuplen);
+	LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+	LogicalTapeWrite(tape, &tuple->tuple, tuple->tuplen);
+	if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */
+		LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+}
+
+static void
+readtup_index_brin(Tuplesortstate *state, SortTuple *stup,
+				   LogicalTape *tape, unsigned int len)
+{
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	unsigned int tuplen = len - sizeof(unsigned int);
+
+	/*
+	 * Allocate space for the BRIN sort tuple, which is BrinTuple with an
+	 * extra length field.
+	 */
+	BrinSortTuple *tuple
+		= (BrinSortTuple *) tuplesort_readtup_alloc(state,
+													BRINSORTTUPLE_SIZE(tuplen));
+
+	tuple->tuplen = tuplen;
+
+	LogicalTapeReadExact(tape, &tuple->tuple, tuplen);
+	if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */
+		LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
+	stup->tuple = (void *) tuple;
+
+	/* set up first-column key value, which is block number */
+	stup->datum1 = tuple->tuple.bt_blkno;
+}
+
 /*
  * Routines specialized for DatumTuple case
  */
diff --git a/src/include/access/brin.h b/src/include/access/brin.h
index ed66f1b3d51..3451ecb211f 100644
--- a/src/include/access/brin.h
+++ b/src/include/access/brin.h
@@ -11,6 +11,7 @@
 #define BRIN_H
 
 #include "nodes/execnodes.h"
+#include "storage/shm_toc.h"
 #include "utils/relcache.h"
 
 
@@ -52,4 +53,6 @@ typedef struct BrinStatsData
 
 extern void brinGetStats(Relation index, BrinStatsData *stats);
 
+extern void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc);
+
 #endif							/* BRIN_H */
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index 9ed2de76cd6..357eb35311d 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -21,6 +21,7 @@
 #ifndef TUPLESORT_H
 #define TUPLESORT_H
 
+#include "access/brin_tuple.h"
 #include "access/itup.h"
 #include "executor/tuptable.h"
 #include "storage/dsm.h"
@@ -282,6 +283,9 @@ typedef struct
  * The "index_hash" API is similar to index_btree, but the tuples are
  * actually sorted by their hash codes not the raw data.
  *
+ * The "index_brin" API is similar to index_btree, but the tuples are
+ * BrinTuple and are sorted by their block number not the raw data.
+ *
  * Parallel sort callers are required to coordinate multiple tuplesort states
  * in a leader process and one or more worker processes.  The leader process
  * must launch workers, and have each perform an independent "partial"
@@ -426,6 +430,10 @@ extern Tuplesortstate *tuplesort_begin_index_gist(Relation heapRel,
 												  Relation indexRel,
 												  int workMem, SortCoordinate coordinate,
 												  int sortopt);
+extern Tuplesortstate *tuplesort_begin_index_brin(Relation heapRel,
+												  Relation indexRel,
+												  int workMem, SortCoordinate coordinate,
+												  int sortopt);
 extern Tuplesortstate *tuplesort_begin_datum(Oid datumType,
 											 Oid sortOperator, Oid sortCollation,
 											 bool nullsFirstFlag,
@@ -438,6 +446,7 @@ extern void tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup);
 extern void tuplesort_putindextuplevalues(Tuplesortstate *state,
 										  Relation rel, ItemPointer self,
 										  const Datum *values, const bool *isnull);
+extern void tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tup, Size len);
 extern void tuplesort_putdatum(Tuplesortstate *state, Datum val,
 							   bool isNull);
 
@@ -445,6 +454,8 @@ extern bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
 								   bool copy, TupleTableSlot *slot, Datum *abbrev);
 extern HeapTuple tuplesort_getheaptuple(Tuplesortstate *state, bool forward);
 extern IndexTuple tuplesort_getindextuple(Tuplesortstate *state, bool forward);
+extern BrinTuple *tuplesort_getbrintuple(Tuplesortstate *state, Size *len,
+										 bool forward);
 extern bool tuplesort_getdatum(Tuplesortstate *state, bool forward, bool copy,
 							   Datum *val, bool *isNull, Datum *abbrev);
 
-- 
2.41.0

From 625880de662181fd433eb583a00af0f16d57c420 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Wed, 22 Nov 2023 19:58:10 +0100
Subject: [PATCH v4 2/2] review fixes

---
 contrib/bloom/blutils.c                       |  1 +
 src/backend/access/brin/brin.c                | 28 ++++++++-----------
 src/backend/access/gin/ginutil.c              |  1 +
 src/backend/access/gist/gist.c                |  1 +
 src/backend/access/hash/hash.c                |  1 +
 src/backend/access/nbtree/nbtree.c            |  1 +
 src/backend/access/spgist/spgutils.c          |  1 +
 src/backend/catalog/index.c                   |  3 +-
 src/include/access/amapi.h                    |  2 ++
 .../modules/dummy_index_am/dummy_index_am.c   |  1 +
 10 files changed, 22 insertions(+), 18 deletions(-)

diff --git a/contrib/bloom/blutils.c b/contrib/bloom/blutils.c
index f23fbb1d9e0..7451fb1b3bb 100644
--- a/contrib/bloom/blutils.c
+++ b/contrib/bloom/blutils.c
@@ -122,6 +122,7 @@ blhandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = false;
 	amroutine->amcanparallel = false;
+	amroutine->amcanbuildparallel = false;
 	amroutine->amcaninclude = false;
 	amroutine->amusemaintenanceworkmem = false;
 	amroutine->amparallelvacuumoptions =
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 0d3d728c9bf..e836cbaad0b 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -44,11 +44,11 @@
 #include "utils/tuplesort.h"
 
 /* Magic numbers for parallel state sharing */
-#define PARALLEL_KEY_BRIN_SHARED		UINT64CONST(0xA000000000000001)
-#define PARALLEL_KEY_TUPLESORT			UINT64CONST(0xA000000000000002)
-#define PARALLEL_KEY_QUERY_TEXT			UINT64CONST(0xA000000000000003)
-#define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xA000000000000004)
-#define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xA000000000000005)
+#define PARALLEL_KEY_BRIN_SHARED		UINT64CONST(0xB000000000000001)
+#define PARALLEL_KEY_TUPLESORT			UINT64CONST(0xB000000000000002)
+#define PARALLEL_KEY_QUERY_TEXT			UINT64CONST(0xB000000000000003)
+#define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xB000000000000004)
+#define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xB000000000000005)
 
 /*
  * Status record for spooling/sorting phase.
@@ -161,7 +161,7 @@ typedef struct BrinLeader
 typedef struct BrinBuildState
 {
 	Relation	bs_irel;
-	int			bs_numtuples;
+	double		bs_numtuples;
 	double		bs_reltuples;
 	Buffer		bs_currentInsertBuf;
 	BlockNumber bs_pagesPerRange;
@@ -244,6 +244,7 @@ brinhandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = false;
 	amroutine->amcanparallel = false;
+	amroutine->amcanbuildparallel = true;
 	amroutine->amcaninclude = false;
 	amroutine->amusemaintenanceworkmem = false;
 	amroutine->amsummarizing = true;
@@ -960,7 +961,7 @@ brinbuildCallback(Relation index,
 /*
  * A version of the callback, used by parallel index builds. The main difference
  * is that instead of writing the BRIN tuples into the index, we write them into
- * a shared tuplestore, and leave the insertion up to the leader (which may
+ * a shared tuplesort, and leave the insertion up to the leader (which may
  * reorder them a bit etc.). The callback also does not generate empty ranges,
  * those may be added by the leader when merging results from workers.
  */
@@ -1880,7 +1881,7 @@ form_and_insert_tuple(BrinBuildState *state)
 
 /*
  * Given a deformed tuple in the build state, convert it into the on-disk
- * format and write it to a (shared) tuplestore (the leader will insert it
+ * format and write it to a (shared) tuplesort (the leader will insert it
  * into the index later).
  */
 static void
@@ -2419,7 +2420,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 	BlockNumber	prevblkno = InvalidBlockNumber;
 	BrinTuple  *emptyTuple = NULL;
 	Size		emptySize;
-	BrinSpool  *spool = state->bs_spool;
+	BrinSpool  *spool;
 
 	/* Shutdown worker processes */
 	WaitForParallelWorkersToFinish(brinleader->pcxt);
@@ -2431,6 +2432,8 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 	state->bs_reltuples = brinshared->reltuples;
 	state->bs_numtuples = brinshared->indtuples;
 
+	/* do the actual sort in the leader */
+	spool = state->bs_spool;
 	tuplesort_performsort(spool->sortstate);
 
 	/*
@@ -2443,13 +2446,6 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 	 * Read the BRIN tuples from the shared tuplesort, sorted by block number.
 	 * That probably gives us an index that is cheaper to scan, thanks to mostly
 	 * getting data from the same index page as before.
-	 *
-	 * FIXME This probably needs some memory management fixes - we're reading
-	 * tuples from the tuplesort, we're allocating an empty tuple, and so on.
-	 * Probably better to release this memory.
-	 *
-	 * XXX We can't quite free the BrinTuple, though, because that's a field
-	 * in BrinSortTuple.
 	 */
 	while ((btup = tuplesort_getbrintuple(spool->sortstate, &tuplen, true)) != NULL)
 	{
diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c
index 7a4cd93f301..d4c9d678223 100644
--- a/src/backend/access/gin/ginutil.c
+++ b/src/backend/access/gin/ginutil.c
@@ -54,6 +54,7 @@ ginhandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = true;
 	amroutine->amcanparallel = false;
+	amroutine->amcanbuildparallel = false;
 	amroutine->amcaninclude = false;
 	amroutine->amusemaintenanceworkmem = true;
 	amroutine->amsummarizing = false;
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 8ef5fa03290..acec490912d 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -76,6 +76,7 @@ gisthandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = true;
 	amroutine->ampredlocks = true;
 	amroutine->amcanparallel = false;
+	amroutine->amcanbuildparallel = false;
 	amroutine->amcaninclude = true;
 	amroutine->amusemaintenanceworkmem = false;
 	amroutine->amsummarizing = false;
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 7a025f33cfe..74592c7d428 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -73,6 +73,7 @@ hashhandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = true;
 	amroutine->amcanparallel = false;
+	amroutine->amcanbuildparallel = false;
 	amroutine->amcaninclude = false;
 	amroutine->amusemaintenanceworkmem = false;
 	amroutine->amsummarizing = false;
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index a88b36a589a..2aba6f5b91e 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -112,6 +112,7 @@ bthandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = true;
 	amroutine->ampredlocks = true;
 	amroutine->amcanparallel = true;
+	amroutine->amcanbuildparallel = true;
 	amroutine->amcaninclude = true;
 	amroutine->amusemaintenanceworkmem = false;
 	amroutine->amsummarizing = false;
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index c112e1e5dd4..77b6af694fb 100644
--- a/src/backend/access/spgist/spgutils.c
+++ b/src/backend/access/spgist/spgutils.c
@@ -60,6 +60,7 @@ spghandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = false;
 	amroutine->amcanparallel = false;
+	amroutine->amcanbuildparallel = false;
 	amroutine->amcaninclude = true;
 	amroutine->amusemaintenanceworkmem = false;
 	amroutine->amsummarizing = false;
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 37e4305d50a..40abbaf476b 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2982,8 +2982,7 @@ index_build(Relation heapRelation,
 	 * Note that planner considers parallel safety for us.
 	 */
 	if (parallel && IsNormalProcessingMode() &&
-		(indexRelation->rd_rel->relam == BTREE_AM_OID ||
-		 indexRelation->rd_rel->relam == BRIN_AM_OID))
+		indexRelation->rd_indam->amcanbuildparallel)
 		indexInfo->ii_ParallelWorkers =
 			plan_create_index_workers(RelationGetRelid(heapRelation),
 									  RelationGetRelid(indexRelation));
diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h
index 995725502a6..408bb7595bb 100644
--- a/src/include/access/amapi.h
+++ b/src/include/access/amapi.h
@@ -240,6 +240,8 @@ typedef struct IndexAmRoutine
 	bool		ampredlocks;
 	/* does AM support parallel scan? */
 	bool		amcanparallel;
+	/* does AM support parallel build? */
+	bool		amcanbuildparallel;
 	/* does AM support columns included with clause INCLUDE? */
 	bool		amcaninclude;
 	/* does AM use maintenance_work_mem? */
diff --git a/src/test/modules/dummy_index_am/dummy_index_am.c b/src/test/modules/dummy_index_am/dummy_index_am.c
index cbdae7ab7a5..eaa0c483b7e 100644
--- a/src/test/modules/dummy_index_am/dummy_index_am.c
+++ b/src/test/modules/dummy_index_am/dummy_index_am.c
@@ -294,6 +294,7 @@ dihandler(PG_FUNCTION_ARGS)
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = false;
 	amroutine->amcanparallel = false;
+	amroutine->amcanbuildparallel = false;
 	amroutine->amcaninclude = false;
 	amroutine->amusemaintenanceworkmem = false;
 	amroutine->amsummarizing = false;
-- 
2.41.0

Reply via email to