Hi,

here's an updated patch, addressing the review comments, and reworking
how the work is divided between the workers & leader etc.

0001 is just v2, rebased to current master

0002 and 0003 address most of the issues, in particular it

  - removes the unnecessary spool
  - fixes bs_reltuples type to double
  - a couple comments are reworded to be clearer
  - changes loop/condition in brinbuildCallbackParallel
  - removes asserts added for debugging
  - fixes cast in comparetup_index_brin
  - 0003 then simplifies comparetup_index_brin
  - I haven't inlined the tuplesort_puttuple_common parameter
    (didn't seem worth it)

0004 Reworks how the work is divided between workers and combined by the
leader. It undoes the tableam.c changes that attempted to divide the
relation into chunks matching the BRIN ranges, and instead merges the
results in the leader (using the BRIN "union" function).

I haven't done any indentation fixes yet.

I did fairly extensive testing, using pageinspect to compare indexes
built with/without parallelism. More testing is needed, but it seems to
work fine (with other opclasses and so on).

In general I'm quite happy with the current state, and I believe it's
fairly close to be committable.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From 3e5fabc6e5a30bb16ec11fdc7fcf65880e172d0c Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Tue, 7 Nov 2023 17:04:28 +0100
Subject: [PATCH v3 1/4] parallel CREATE INDEX for BRIN v2

---
 src/backend/access/brin/brin.c             | 781 ++++++++++++++++++++-
 src/backend/access/nbtree/nbtsort.c        |   2 +-
 src/backend/access/table/tableam.c         |  49 +-
 src/backend/access/transam/parallel.c      |   4 +
 src/backend/catalog/index.c                |   3 +-
 src/backend/executor/nodeSeqscan.c         |   3 +-
 src/backend/utils/sort/tuplesort.c         |   3 +
 src/backend/utils/sort/tuplesortvariants.c | 211 ++++++
 src/include/access/brin.h                  |   3 +
 src/include/access/relscan.h               |   1 +
 src/include/access/tableam.h               |   9 +-
 src/include/utils/tuplesort.h              |  11 +
 12 files changed, 1065 insertions(+), 15 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 25338a90e29..b7cd29c5968 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -33,6 +33,7 @@
 #include "postmaster/autovacuum.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
+#include "tcop/tcopprot.h"		/* pgrminclude ignore */
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/datum.h"
@@ -40,7 +41,118 @@
 #include "utils/index_selfuncs.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
+#include "utils/tuplesort.h"
 
+/* Magic numbers for parallel state sharing */
+#define PARALLEL_KEY_BRIN_SHARED		UINT64CONST(0xA000000000000001)
+#define PARALLEL_KEY_TUPLESORT			UINT64CONST(0xA000000000000002)
+#define PARALLEL_KEY_QUERY_TEXT			UINT64CONST(0xA000000000000003)
+#define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xA000000000000004)
+#define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xA000000000000005)
+
+/*
+ * Status record for spooling/sorting phase.
+ */
+typedef struct BrinSpool
+{
+	Tuplesortstate *sortstate;	/* state data for tuplesort.c */
+	Relation	heap;
+	Relation	index;
+} BrinSpool;
+
+/*
+ * Status for index builds performed in parallel.  This is allocated in a
+ * dynamic shared memory segment.
+ */
+typedef struct BrinShared
+{
+	/*
+	 * These fields are not modified during the build.  They primarily exist for
+	 * the benefit of worker processes that need to create state corresponding
+	 * to that used by the leader.
+	 */
+	Oid			heaprelid;
+	Oid			indexrelid;
+	bool		isconcurrent;
+	BlockNumber	pagesPerRange;
+	int			scantuplesortstates;
+
+	/*
+	 * workersdonecv is used to monitor the progress of workers.  All parallel
+	 * participants must indicate that they are done before leader can use
+	 * results built by the workers (and before leader can write the data into
+	 * the index).
+	 */
+	ConditionVariable workersdonecv;
+
+	/*
+	 * mutex protects all fields before heapdesc.
+	 *
+	 * These fields contain status information of interest to BRIN index
+	 * builds that must work just the same when an index is built in parallel.
+	 */
+	slock_t		mutex;
+
+	/*
+	 * Mutable state that is maintained by workers, and reported back to
+	 * leader at end of the scans.
+	 *
+	 * nparticipantsdone is number of worker processes finished.
+	 *
+	 * reltuples is the total number of input heap tuples.
+	 *
+	 * indtuples is the total number of tuples that made it into the index.
+	 */
+	int			nparticipantsdone;
+	double		reltuples;
+	double		indtuples;
+
+	/*
+	 * ParallelTableScanDescData data follows. Can't directly embed here, as
+	 * implementations of the parallel table scan desc interface might need
+	 * stronger alignment.
+	 */
+} BrinShared;
+
+/*
+ * Return pointer to a BrinShared's parallel table scan.
+ *
+ * c.f. shm_toc_allocate as to why BUFFERALIGN is used, rather than just
+ * MAXALIGN.
+ */
+#define ParallelTableScanFromBrinShared(shared) \
+	(ParallelTableScanDesc) ((char *) (shared) + BUFFERALIGN(sizeof(BrinShared)))
+
+/*
+ * Status for leader in parallel index build.
+ */
+typedef struct BrinLeader
+{
+	/* parallel context itself */
+	ParallelContext *pcxt;
+
+	/*
+	 * nparticipanttuplesorts is the exact number of worker processes
+	 * successfully launched, plus one leader process if it participates as a
+	 * worker (only DISABLE_LEADER_PARTICIPATION builds avoid leader
+	 * participating as a worker).
+	 */
+	int			nparticipanttuplesorts;
+
+	/*
+	 * Leader process convenience pointers to shared state (leader avoids TOC
+	 * lookups).
+	 *
+	 * brinshared is the shared state for entire build.  sharedsort is the
+	 * shared, tuplesort-managed state passed to each process tuplesort.
+	 * snapshot is the snapshot used by the scan iff an MVCC snapshot is required.
+	 */
+	BrinShared	   *brinshared;
+	Sharedsort *sharedsort;
+	Snapshot	snapshot;
+	WalUsage   *walusage;
+	BufferUsage *bufferusage;
+} BrinLeader;
 
 /*
  * We use a BrinBuildState during initial construction of a BRIN index.
@@ -50,12 +162,23 @@ typedef struct BrinBuildState
 {
 	Relation	bs_irel;
 	int			bs_numtuples;
+	int			bs_reltuples;
 	Buffer		bs_currentInsertBuf;
 	BlockNumber bs_pagesPerRange;
 	BlockNumber bs_currRangeStart;
 	BrinRevmap *bs_rmAccess;
 	BrinDesc   *bs_bdesc;
 	BrinMemTuple *bs_dtuple;
+
+	/*
+	 * bs_leader is only present when a parallel index build is performed, and
+	 * only in the leader process. (Actually, only the leader has a
+	 * BrinBuildState.)
+	 */
+	BrinLeader *bs_leader;
+	int			bs_worker_id;
+	BrinSpool  *bs_spool;
+	BrinSpool  *bs_spool_out;
 } BrinBuildState;
 
 /*
@@ -76,6 +199,7 @@ static void terminate_brin_buildstate(BrinBuildState *state);
 static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
 						  bool include_partial, double *numSummarized, double *numExisting);
 static void form_and_insert_tuple(BrinBuildState *state);
+static void form_and_spill_tuple(BrinBuildState *state);
 static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
 						 BrinTuple *b);
 static void brin_vacuum_scan(Relation idxrel, BufferAccessStrategy strategy);
@@ -83,6 +207,20 @@ static bool add_values_to_range(Relation idxRel, BrinDesc *bdesc,
 								BrinMemTuple *dtup, const Datum *values, const bool *nulls);
 static bool check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys);
 
+/* parallel index builds */
+static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
+								 bool isconcurrent, int request);
+static void _brin_end_parallel(BrinLeader *btleader, BrinBuildState *state);
+static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
+static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
+											   Relation heap, Relation index);
+static void _brin_parallel_scan_and_build(BrinBuildState *buildstate,
+										  BrinSpool *brinspool,
+										  BrinShared *brinshared,
+										  Sharedsort *sharedsort,
+										  Relation heap, Relation index,
+										  int sortmem, bool progress);
+
 /*
  * BRIN handler function: return IndexAmRoutine with access method parameters
  * and callbacks.
@@ -820,6 +958,67 @@ brinbuildCallback(Relation index,
 							   values, isnull);
 }
 
+/*
+ * A version of the callback, used by parallel index builds. The main difference
+ * is that instead of writing the BRIN tuples into the index, we write them into
+ * a shared tuplestore file, and leave the insertion up to the leader (which may
+ * reorder them a bit etc.). The callback also does not generate empty ranges,
+ * those may be added by the leader when merging results from workers.
+ */
+static void
+brinbuildCallbackParallel(Relation index,
+						  ItemPointer tid,
+						  Datum *values,
+						  bool *isnull,
+						  bool tupleIsAlive,
+						  void *brstate)
+{
+	BrinBuildState *state = (BrinBuildState *) brstate;
+	BlockNumber thisblock;
+
+	thisblock = ItemPointerGetBlockNumber(tid);
+
+	/*
+	 * If we're in a block that belongs to a future range, summarize what
+	 * we've got and start afresh.  Note the scan might have skipped many
+	 * pages, if they were devoid of live tuples; make sure to insert index
+	 * tuples for those too.
+	 */
+	while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
+	{
+
+		BRIN_elog((DEBUG2,
+				   "brinbuildCallback: completed a range: %u--%u",
+				   state->bs_currRangeStart,
+				   state->bs_currRangeStart + state->bs_pagesPerRange));
+
+		/* create the index tuple and write it into the tuplesort */
+		form_and_spill_tuple(state);
+
+		/*
+		 * set state to correspond to the next range
+		 *
+		 * XXX This has the issue that it skips ranges summarized by other
+		 * workers, but it also skips empty ranges that should have been
+		 * summarized. We'd need to either make the workers aware which
+		 * chunk they are actually processing (which is currently known
+		 * only in the ParallelBlockTableScan bit). Or we could ignore it
+		 * here, and then decide it while "merging" results from workers
+		 * (if there's no entry for the range, it had to be empty so we
+		 * just add an empty one).
+		 */
+		while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
+			state->bs_currRangeStart += state->bs_pagesPerRange;
+
+		/* re-initialize state for it */
+		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
+	}
+
+	/* Accumulate the current tuple into the running state */
+	(void) add_values_to_range(index, state->bs_bdesc, state->bs_dtuple,
+							   values, isnull);
+}
+
 /*
  * brinbuild() -- build a new BRIN index.
  */
@@ -881,18 +1080,93 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	revmap = brinRevmapInitialize(index, &pagesPerRange);
 	state = initialize_brin_buildstate(index, revmap, pagesPerRange);
 
+	state->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
+	state->bs_spool->heap = heap;
+	state->bs_spool->index = index;
+
+	/*
+	 * Attempt to launch parallel worker scan when required
+	 *
+	 * XXX plan_create_index_workers makes the number of workers dependent on
+	 * maintenance_work_mem, requiring 32MB for each worker. That makes sense
+	 * for btree, but not for BRIN, which can do away with much less memory.
+	 * So maybe make that somehow less strict, optionally?
+	 */
+	if (indexInfo->ii_ParallelWorkers > 0)
+		_brin_begin_parallel(state, heap, index, indexInfo->ii_Concurrent,
+							 indexInfo->ii_ParallelWorkers);
+
 	/*
 	 * Now scan the relation.  No syncscan allowed here because we want the
 	 * heap blocks in physical order.
 	 */
-	reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
-									   brinbuildCallback, (void *) state, NULL);
 
-	/* process the final batch */
-	form_and_insert_tuple(state);
+	/*
+	 * If parallel build requested and at least one worker process was
+	 * successfully launched, set up coordination state
+	 */
+	if (state->bs_leader)
+	{
+		SortCoordinate coordinate;
+
+		coordinate = (SortCoordinate) palloc0(sizeof(SortCoordinateData));
+		coordinate->isWorker = false;
+		coordinate->nParticipants =
+			state->bs_leader->nparticipanttuplesorts;
+		coordinate->sharedsort = state->bs_leader->sharedsort;
+
+
+		/*
+		 * Begin serial/leader tuplesort.
+		 *
+		 * In cases where parallelism is involved, the leader receives the same
+		 * share of maintenance_work_mem as a serial sort (it is generally treated
+		 * in the same way as a serial sort once we return).  Parallel worker
+		 * Tuplesortstates will have received only a fraction of
+		 * maintenance_work_mem, though.
+		 *
+		 * We rely on the lifetime of the Leader Tuplesortstate almost not
+		 * overlapping with any worker Tuplesortstate's lifetime.  There may be
+		 * some small overlap, but that's okay because we rely on leader
+		 * Tuplesortstate only allocating a small, fixed amount of memory here.
+		 * When its tuplesort_performsort() is called (by our caller), and
+		 * significant amounts of memory are likely to be used, all workers must
+		 * have already freed almost all memory held by their Tuplesortstates
+		 * (they are about to go away completely, too).  The overall effect is
+		 * that maintenance_work_mem always represents an absolute high watermark
+		 * on the amount of memory used by a CREATE INDEX operation, regardless of
+		 * the use of parallelism or any other factor.
+		 */
+		state->bs_spool_out = (BrinSpool *) palloc0(sizeof(BrinSpool));
+		state->bs_spool_out->heap = heap;
+		state->bs_spool_out->index = index;
+
+		state->bs_spool_out->sortstate =
+			tuplesort_begin_index_brin(heap, index,
+									   maintenance_work_mem, coordinate,
+									   TUPLESORT_NONE);
+
+		/*
+		 * In parallel mode, wait for workers to complete, and then read all
+		 * tuples from the shared tuplesort and insert them into the index.
+		 */
+		_brin_end_parallel(state->bs_leader, state);
+	}
+	else	/* no parallel index build, just do the usual thing */
+	{
+		reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
+										   brinbuildCallback, (void *) state, NULL);
+
+		/* process the final batch */
+		form_and_insert_tuple(state);
+
+		/* track the number of relation tuples */
+		state->bs_reltuples = reltuples;
+	}
 
 	/* release resources */
 	idxtuples = state->bs_numtuples;
+	reltuples = state->bs_reltuples;
 	brinRevmapTerminate(state->bs_rmAccess);
 	terminate_brin_buildstate(state);
 
@@ -1312,12 +1586,16 @@ initialize_brin_buildstate(Relation idxRel, BrinRevmap *revmap,
 
 	state->bs_irel = idxRel;
 	state->bs_numtuples = 0;
+	state->bs_reltuples = 0;
 	state->bs_currentInsertBuf = InvalidBuffer;
 	state->bs_pagesPerRange = pagesPerRange;
 	state->bs_currRangeStart = 0;
 	state->bs_rmAccess = revmap;
 	state->bs_bdesc = brin_build_desc(idxRel);
 	state->bs_dtuple = brin_new_memtuple(state->bs_bdesc);
+	state->bs_leader = NULL;
+	state->bs_worker_id = 0;
+	state->bs_spool = NULL;
 
 	return state;
 }
@@ -1609,6 +1887,32 @@ form_and_insert_tuple(BrinBuildState *state)
 	pfree(tup);
 }
 
+/*
+ * Given a deformed tuple in the build state, convert it into the on-disk
+ * format and write it to a (shared) tuplestore (the leader will insert it
+ * into the index later).
+ */
+static void
+form_and_spill_tuple(BrinBuildState *state)
+{
+	BrinTuple  *tup;
+	Size		size;
+
+	/* don't insert empty tuples in parallel build */
+	if (state->bs_dtuple->bt_empty_range)
+		return;
+
+	tup = brin_form_tuple(state->bs_bdesc, state->bs_currRangeStart,
+						  state->bs_dtuple, &size);
+
+	/* write the BRIN tuple to the tuplesort */
+	tuplesort_putbrintuple(state->bs_spool->sortstate, tup, size);
+
+	state->bs_numtuples++;
+
+	pfree(tup);
+}
+
 /*
  * Given two deformed tuples, adjust the first one so that it's consistent
  * with the summary values in both.
@@ -1928,3 +2232,472 @@ check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
 
 	return true;
 }
+
+static void
+_brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
+					 bool isconcurrent, int request)
+{
+	ParallelContext *pcxt;
+	int			scantuplesortstates;
+	Snapshot	snapshot;
+	Size		estbrinshared;
+	Size		estsort;
+	BrinShared   *brinshared;
+	Sharedsort   *sharedsort;
+	BrinLeader   *brinleader = (BrinLeader *) palloc0(sizeof(BrinLeader));
+	WalUsage   *walusage;
+	BufferUsage *bufferusage;
+	bool		leaderparticipates = true;
+	int			querylen;
+
+#ifdef DISABLE_LEADER_PARTICIPATION
+	leaderparticipates = false;
+#endif
+
+	/*
+	 * Enter parallel mode, and create context for parallel build of brin
+	 * index
+	 */
+	EnterParallelMode();
+	Assert(request > 0);
+	pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main",
+								 request);
+
+	scantuplesortstates = leaderparticipates ? request + 1 : request;
+
+	/*
+	 * Prepare for scan of the base relation.  In a normal index build, we use
+	 * SnapshotAny because we must retrieve all tuples and do our own time
+	 * qual checks (because we have to index RECENTLY_DEAD tuples).  In a
+	 * concurrent build, we take a regular MVCC snapshot and index whatever's
+	 * live according to that.
+	 */
+	if (!isconcurrent)
+		snapshot = SnapshotAny;
+	else
+		snapshot = RegisterSnapshot(GetTransactionSnapshot());
+
+	/*
+	 * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
+	 */
+	estbrinshared = _brin_parallel_estimate_shared(heap, snapshot);
+	shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared);
+	estsort = tuplesort_estimate_shared(scantuplesortstates);
+	shm_toc_estimate_chunk(&pcxt->estimator, estsort);
+
+	shm_toc_estimate_keys(&pcxt->estimator, 2);
+
+	/*
+	 * Estimate space for WalUsage and BufferUsage -- PARALLEL_KEY_WAL_USAGE
+	 * and PARALLEL_KEY_BUFFER_USAGE.
+	 *
+	 * If there are no extensions loaded that care, we could skip this.  We
+	 * have no way of knowing whether anyone's looking at pgWalUsage or
+	 * pgBufferUsage, so do it unconditionally.
+	 */
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   mul_size(sizeof(BufferUsage), pcxt->nworkers));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
+	if (debug_query_string)
+	{
+		querylen = strlen(debug_query_string);
+		shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+	else
+		querylen = 0;			/* keep compiler quiet */
+
+	/* Everyone's had a chance to ask for space, so now create the DSM */
+	InitializeParallelDSM(pcxt);
+
+	/* If no DSM segment was available, back out (do serial build) */
+	if (pcxt->seg == NULL)
+	{
+		if (IsMVCCSnapshot(snapshot))
+			UnregisterSnapshot(snapshot);
+		DestroyParallelContext(pcxt);
+		ExitParallelMode();
+		return;
+	}
+
+	/* Store shared build state, for which we reserved space */
+	brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared);
+	/* Initialize immutable state */
+	brinshared->heaprelid = RelationGetRelid(heap);
+	brinshared->indexrelid = RelationGetRelid(index);
+	brinshared->isconcurrent = isconcurrent;
+	brinshared->scantuplesortstates = scantuplesortstates;
+	brinshared->pagesPerRange = buildstate->bs_pagesPerRange;
+	ConditionVariableInit(&brinshared->workersdonecv);
+	SpinLockInit(&brinshared->mutex);
+
+	/* Initialize mutable state */
+	brinshared->nparticipantsdone = 0;
+	brinshared->reltuples = 0.0;
+	brinshared->indtuples = 0.0;
+
+	table_parallelscan_initialize(heap,
+								  ParallelTableScanFromBrinShared(brinshared),
+								  snapshot, brinshared->pagesPerRange);
+
+	/*
+	 * Store shared tuplesort-private state, for which we reserved space.
+	 * Then, initialize opaque state using tuplesort routine.
+	 */
+	sharedsort = (Sharedsort *) shm_toc_allocate(pcxt->toc, estsort);
+	tuplesort_initialize_shared(sharedsort, scantuplesortstates,
+								pcxt->seg);
+
+	/*
+	 * Store shared tuplesort-private state, for which we reserved space.
+	 * Then, initialize opaque state using tuplesort routine.
+	 */
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BRIN_SHARED, brinshared);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort);
+
+	/* Store query string for workers */
+	if (debug_query_string)
+	{
+		char	   *sharedquery;
+
+		sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
+		memcpy(sharedquery, debug_query_string, querylen + 1);
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
+	}
+
+	/*
+	 * Allocate space for each worker's WalUsage and BufferUsage; no need to
+	 * initialize.
+	 */
+	walusage = shm_toc_allocate(pcxt->toc,
+								mul_size(sizeof(WalUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
+	bufferusage = shm_toc_allocate(pcxt->toc,
+								   mul_size(sizeof(BufferUsage), pcxt->nworkers));
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage);
+
+	/* Launch workers, saving status for leader/caller */
+	LaunchParallelWorkers(pcxt);
+	brinleader->pcxt = pcxt;
+	brinleader->nparticipanttuplesorts = pcxt->nworkers_launched;
+	if (leaderparticipates)
+		brinleader->nparticipanttuplesorts++;
+	brinleader->brinshared = brinshared;
+	brinleader->sharedsort = sharedsort;
+	brinleader->snapshot = snapshot;
+	brinleader->walusage = walusage;
+	brinleader->bufferusage = bufferusage;
+
+	/* If no workers were successfully launched, back out (do serial build) */
+	if (pcxt->nworkers_launched == 0)
+	{
+		_brin_end_parallel(brinleader, NULL);
+		return;
+	}
+
+	/* Save leader state now that it's clear build will be parallel */
+	buildstate->bs_leader = brinleader;
+
+	/* Join heap scan ourselves */
+	if (leaderparticipates)
+		_brin_leader_participate_as_worker(buildstate, heap, index);
+
+	/*
+	 * Caller needs to wait for all launched workers when we return.  Make
+	 * sure that the failure-to-start case will not hang forever.
+	 */
+	WaitForParallelWorkersToAttach(pcxt);
+}
+
+/*
+ * Shut down workers, destroy parallel context, and end parallel mode.
+ */
+static void
+_brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
+{
+	int			i;
+	BrinTuple  *btup;
+	Size		tuplen;
+	BrinShared *brinshared = brinleader->brinshared;
+	BlockNumber	prevblkno = InvalidBlockNumber;
+	BrinTuple  *emptyTuple = NULL;
+	Size		emptySize;
+
+	/* Shutdown worker processes */
+	WaitForParallelWorkersToFinish(brinleader->pcxt);
+
+	if (!state)
+		return;
+
+	/* copy the data into leader state (we have to wait for the workers ) */
+	state->bs_reltuples = brinshared->reltuples;
+	state->bs_numtuples = brinshared->indtuples;
+
+	tuplesort_performsort(state->bs_spool_out->sortstate);
+
+	/*
+	 * Read the BRIN tuples from the shared tuplesort, sorted by block number.
+	 * That probably gives us an index that is cheaper to scan, thanks to mostly
+	 * getting data from the same index page as before.
+	 *
+	 * FIXME This probably needs some memory management fixes - we're reading
+	 * tuples from the tuplesort, we're allocating an emty tuple, and so on.
+	 * Probably better to release this memory.
+	 *
+	 * XXX We can't quite free the BrinTuple, though, because that's a field
+	 * in BrinSortTuple.
+	 */
+	while ((btup = tuplesort_getbrintuple(state->bs_spool_out->sortstate, &tuplen, true)) != NULL)
+	{
+		/*
+		 * We should not get two summaries for the same range. The workers
+		 * are producing ranges for non-overlapping sections of the table.
+		 */
+		Assert(btup->bt_blkno != prevblkno);
+
+		/* Ranges should be multiples of pages_per_range for the index. */
+		Assert(btup->bt_blkno % brinshared->pagesPerRange == 0);
+
+		/* Fill empty ranges for all ranges missing in the tuplesort. */
+		prevblkno = (prevblkno == InvalidBlockNumber) ? 0 : prevblkno;
+		while (prevblkno + state->bs_pagesPerRange < btup->bt_blkno)
+		{
+			/* the missing range */
+			prevblkno += state->bs_pagesPerRange;
+
+			/* Did we already build the empty range? If not, do it now. */
+			if (emptyTuple == NULL)
+			{
+				BrinMemTuple *dtuple = brin_new_memtuple(state->bs_bdesc);
+
+				emptyTuple = brin_form_tuple(state->bs_bdesc, prevblkno, dtuple, &emptySize);
+			}
+			else
+			{
+				/* we already have am "empty range" tuple, just set the block */
+				emptyTuple->bt_blkno = prevblkno;
+			}
+
+			brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+						  &state->bs_currentInsertBuf,
+						  emptyTuple->bt_blkno, emptyTuple, emptySize);
+		}
+
+		brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+					  &state->bs_currentInsertBuf, btup->bt_blkno, btup, tuplen);
+
+		prevblkno = btup->bt_blkno;
+	}
+
+	tuplesort_end(state->bs_spool_out->sortstate);
+
+	/*
+	 * Next, accumulate WAL usage.  (This must wait for the workers to finish,
+	 * or we might get incomplete data.)
+	 */
+	for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
+		InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
+
+	/* Free last reference to MVCC snapshot, if one was used */
+	if (IsMVCCSnapshot(brinleader->snapshot))
+		UnregisterSnapshot(brinleader->snapshot);
+	DestroyParallelContext(brinleader->pcxt);
+	ExitParallelMode();
+}
+
+/*
+ * Returns size of shared memory required to store state for a parallel
+ * brin index build based on the snapshot its parallel scan will use.
+ */
+static Size
+_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
+{
+	/* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
+	return add_size(BUFFERALIGN(sizeof(BrinShared)),
+					table_parallelscan_estimate(heap, snapshot));
+}
+
+/*
+ * Within leader, participate as a parallel worker.
+ */
+static void
+_brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Relation index)
+{
+	BrinLeader *brinleader = buildstate->bs_leader;
+	int			sortmem;
+
+	/* Allocate memory and initialize private spool */
+	buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
+	buildstate->bs_spool->heap = buildstate->bs_spool->heap;
+	buildstate->bs_spool->index = buildstate->bs_spool->index;
+
+	/*
+	 * Might as well use reliable figure when doling out maintenance_work_mem
+	 * (when requested number of workers were not launched, this will be
+	 * somewhat higher than it is for other workers).
+	 */
+	sortmem = maintenance_work_mem / brinleader->nparticipanttuplesorts;
+
+	/* Perform work common to all participants */
+	_brin_parallel_scan_and_build(buildstate, buildstate->bs_spool, brinleader->brinshared,
+								  brinleader->sharedsort, heap, index, sortmem, true);
+}
+
+/*
+ * Perform a worker's portion of a parallel sort.
+ *
+ * This generates a tuplesort for passed btspool, and a second tuplesort
+ * state if a second btspool is need (i.e. for unique index builds).  All
+ * other spool fields should already be set when this is called.
+ *
+ * sortmem is the amount of working memory to use within each worker,
+ * expressed in KBs.
+ *
+ * When this returns, workers are done, and need only release resources.
+ */
+static void
+_brin_parallel_scan_and_build(BrinBuildState *state, BrinSpool *brinspool,
+							  BrinShared *brinshared, Sharedsort *sharedsort,
+							  Relation heap, Relation index, int sortmem,
+							  bool progress)
+{
+	SortCoordinate coordinate;
+	TableScanDesc	scan;
+	double		reltuples;
+	IndexInfo  *indexInfo;
+
+	/* Initialize local tuplesort coordination state */
+	coordinate = palloc0(sizeof(SortCoordinateData));
+	coordinate->isWorker = true;
+	coordinate->nParticipants = -1;
+	coordinate->sharedsort = sharedsort;
+
+	/* Begin "partial" tuplesort */
+	brinspool->sortstate = tuplesort_begin_index_brin(brinspool->heap,
+													  brinspool->index,
+													  sortmem, coordinate,
+													  TUPLESORT_NONE);
+
+	/* Join parallel scan */
+	indexInfo = BuildIndexInfo(index);
+	indexInfo->ii_Concurrent = brinshared->isconcurrent;
+
+	scan = table_beginscan_parallel(heap,
+									ParallelTableScanFromBrinShared(brinshared));
+
+	reltuples = table_index_build_scan(heap, index, indexInfo, true, true,
+									   brinbuildCallbackParallel, state, scan);
+
+	/* insert the last item */
+	form_and_spill_tuple(state);
+
+	/* sort the BRIN ranges built by this worker */
+	tuplesort_performsort(brinspool->sortstate);
+
+	state->bs_reltuples += reltuples;
+
+	/*
+	 * Done.  Record ambuild statistics.
+	 */
+	SpinLockAcquire(&brinshared->mutex);
+	brinshared->nparticipantsdone++;
+	brinshared->reltuples += state->bs_reltuples;
+	brinshared->indtuples += state->bs_numtuples;
+	SpinLockRelease(&brinshared->mutex);
+
+	/* Notify leader */
+	ConditionVariableSignal(&brinshared->workersdonecv);
+
+	tuplesort_end(brinspool->sortstate);
+}
+
+/*
+ * Perform work within a launched parallel process.
+ */
+void
+_brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
+{
+	char	   *sharedquery;
+	BrinShared *brinshared;
+	Sharedsort *sharedsort;
+	BrinBuildState *buildstate;
+	Relation	heapRel;
+	Relation	indexRel;
+	LOCKMODE	heapLockmode;
+	LOCKMODE	indexLockmode;
+	WalUsage   *walusage;
+	BufferUsage *bufferusage;
+	int			sortmem;
+
+	/*
+	 * The only possible status flag that can be set to the parallel worker is
+	 * PROC_IN_SAFE_IC.
+	 */
+	Assert((MyProc->statusFlags == 0) ||
+		   (MyProc->statusFlags == PROC_IN_SAFE_IC));
+
+	/* Set debug_query_string for individual workers first */
+	sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true);
+	debug_query_string = sharedquery;
+
+	/* Report the query string from leader */
+	pgstat_report_activity(STATE_RUNNING, debug_query_string);
+
+	/* Look up brin shared state */
+	brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false);
+
+	/* Open relations using lock modes known to be obtained by index.c */
+	if (!brinshared->isconcurrent)
+	{
+		heapLockmode = ShareLock;
+		indexLockmode = AccessExclusiveLock;
+	}
+	else
+	{
+		heapLockmode = ShareUpdateExclusiveLock;
+		indexLockmode = RowExclusiveLock;
+	}
+
+	/* Open relations within worker */
+	heapRel = table_open(brinshared->heaprelid, heapLockmode);
+	indexRel = index_open(brinshared->indexrelid, indexLockmode);
+
+	buildstate = initialize_brin_buildstate(indexRel, NULL, brinshared->pagesPerRange);
+
+	/* Initialize worker's own spool */
+	buildstate->bs_spool = (BrinSpool *) palloc0(sizeof(BrinSpool));
+	buildstate->bs_spool->heap = heapRel;
+	buildstate->bs_spool->index = indexRel;
+
+	/* Look up shared state private to tuplesort.c */
+	sharedsort = shm_toc_lookup(toc, PARALLEL_KEY_TUPLESORT, false);
+	tuplesort_attach_shared(sharedsort, seg);
+
+	/* Prepare to track buffer usage during parallel execution */
+	InstrStartParallelQuery();
+
+	/*
+	 * Might as well use reliable figure when doling out maintenance_work_mem
+	 * (when requested number of workers were not launched, this will be
+	 * somewhat higher than it is for other workers).
+	 */
+	sortmem = maintenance_work_mem / brinshared->scantuplesortstates;
+
+	_brin_parallel_scan_and_build(buildstate, buildstate->bs_spool,
+								  brinshared, sharedsort,
+								  heapRel, indexRel, sortmem, false);
+
+	/* Report WAL/buffer usage during parallel execution */
+	bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
+	walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+	InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber],
+						  &walusage[ParallelWorkerNumber]);
+
+	index_close(indexRel, indexLockmode);
+	table_close(heapRel, heapLockmode);
+}
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index c2665fce411..6241baeea86 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -1575,7 +1575,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	btshared->brokenhotchain = false;
 	table_parallelscan_initialize(btspool->heap,
 								  ParallelTableScanFromBTShared(btshared),
-								  snapshot);
+								  snapshot, InvalidBlockNumber);
 
 	/*
 	 * Store shared tuplesort-private state, for which we reserved space.
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index c6bdb7e1c68..4af0d433e9d 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -153,9 +153,9 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot)
 
 void
 table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan,
-							  Snapshot snapshot)
+							  Snapshot snapshot, BlockNumber chunk_factor)
 {
-	Size		snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan);
+	Size		snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan, chunk_factor);
 
 	pscan->phs_snapshot_off = snapshot_off;
 
@@ -395,16 +395,21 @@ table_block_parallelscan_estimate(Relation rel)
 }
 
 Size
-table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
+table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, BlockNumber chunk_factor)
 {
 	ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
 
 	bpscan->base.phs_relid = RelationGetRelid(rel);
 	bpscan->phs_nblocks = RelationGetNumberOfBlocks(rel);
-	/* compare phs_syncscan initialization to similar logic in initscan */
+	bpscan->phs_chunk_factor = chunk_factor;
+	/* compare phs_syncscan initialization to similar logic in initscan
+	 *
+	 * Disable sync scans if the chunk factor is set (valid block number).
+	 */
 	bpscan->base.phs_syncscan = synchronize_seqscans &&
 		!RelationUsesLocalBuffers(rel) &&
-		bpscan->phs_nblocks > NBuffers / 4;
+		(bpscan->phs_nblocks > NBuffers / 4) &&
+		!BlockNumberIsValid(bpscan->phs_chunk_factor);
 	SpinLockInit(&bpscan->phs_mutex);
 	bpscan->phs_startblock = InvalidBlockNumber;
 	pg_atomic_init_u64(&bpscan->phs_nallocated, 0);
@@ -459,6 +464,25 @@ table_block_parallelscan_startblock_init(Relation rel,
 	pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size,
 									  PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
 
+	/*
+	 * If the chunk size factor is set, we need to make sure the chunk size is
+	 * a multiple of that value. We round the chunk size to the nearest chunk
+	 * factor multiple, at least one chunk_factor.
+	 *
+	 * XXX Note this may override PARALLEL_SEQSCAN_MAX_CHUNK_SIZE, in case the
+	 * chunk factor (e.g. BRIN pages_per_range) is larger.
+	 */
+	if (pbscan->phs_chunk_factor != InvalidBlockNumber)
+	{
+		/* nearest (smaller) multiple of chunk_factor */
+		pbscanwork->phsw_chunk_size
+			= pbscan->phs_chunk_factor * (pbscanwork->phsw_chunk_size / pbscan->phs_chunk_factor);
+
+		/* but at least one chunk_factor */
+		pbscanwork->phsw_chunk_size = Max(pbscanwork->phsw_chunk_size,
+										  pbscan->phs_chunk_factor);
+	}
+
 retry:
 	/* Grab the spinlock. */
 	SpinLockAcquire(&pbscan->phs_mutex);
@@ -575,6 +599,21 @@ table_block_parallelscan_nextpage(Relation rel,
 			(pbscanwork->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS))
 			pbscanwork->phsw_chunk_size >>= 1;
 
+		/*
+		 * We need to make sure the new chunk_size is still a suitable multiple
+		 * of chunk_factor.
+		 */
+		if (pbscan->phs_chunk_factor != InvalidBlockNumber)
+		{
+			/* nearest (smaller) multiple of chunk_factor */
+			pbscanwork->phsw_chunk_size
+				= pbscan->phs_chunk_factor * (pbscanwork->phsw_chunk_size / pbscan->phs_chunk_factor);
+
+			/* but at least one chunk_factor */
+			pbscanwork->phsw_chunk_size = Max(pbscanwork->phsw_chunk_size,
+											  pbscan->phs_chunk_factor);
+		}
+
 		nallocated = pbscanwork->phsw_nallocated =
 			pg_atomic_fetch_add_u64(&pbscan->phs_nallocated,
 									pbscanwork->phsw_chunk_size);
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 194a1207be6..d78314062e0 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -14,6 +14,7 @@
 
 #include "postgres.h"
 
+#include "access/brin.h"
 #include "access/nbtree.h"
 #include "access/parallel.h"
 #include "access/session.h"
@@ -145,6 +146,9 @@ static const struct
 	{
 		"_bt_parallel_build_main", _bt_parallel_build_main
 	},
+	{
+		"_brin_parallel_build_main", _brin_parallel_build_main
+	},
 	{
 		"parallel_vacuum_main", parallel_vacuum_main
 	}
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 143fae01ebd..37e4305d50a 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -2982,7 +2982,8 @@ index_build(Relation heapRelation,
 	 * Note that planner considers parallel safety for us.
 	 */
 	if (parallel && IsNormalProcessingMode() &&
-		indexRelation->rd_rel->relam == BTREE_AM_OID)
+		(indexRelation->rd_rel->relam == BTREE_AM_OID ||
+		 indexRelation->rd_rel->relam == BRIN_AM_OID))
 		indexInfo->ii_ParallelWorkers =
 			plan_create_index_workers(RelationGetRelid(heapRelation),
 									  RelationGetRelid(indexRelation));
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 49a5933aff6..529a7ed3284 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -262,7 +262,8 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
 	pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
 	table_parallelscan_initialize(node->ss.ss_currentRelation,
 								  pscan,
-								  estate->es_snapshot);
+								  estate->es_snapshot,
+								  InvalidBlockNumber);
 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index ab6353bdcd1..4c6b396a8a8 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -1328,6 +1328,7 @@ tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbre
 			break;
 
 		default:
+			Assert(false);
 			elog(ERROR, "invalid tuplesort state");
 			break;
 	}
@@ -1462,6 +1463,7 @@ tuplesort_performsort(Tuplesortstate *state)
 			break;
 
 		default:
+			Assert(false);
 			elog(ERROR, "invalid tuplesort state");
 			break;
 	}
@@ -1718,6 +1720,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 			return false;
 
 		default:
+			Assert(false);
 			elog(ERROR, "invalid tuplesort state");
 			return false;		/* keep compiler quiet */
 	}
diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c
index 2cd508e5130..343ed4bbc54 100644
--- a/src/backend/utils/sort/tuplesortvariants.c
+++ b/src/backend/utils/sort/tuplesortvariants.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/brin_tuple.h"
 #include "access/hash.h"
 #include "access/htup_details.h"
 #include "access/nbtree.h"
@@ -43,6 +44,8 @@ static void removeabbrev_cluster(Tuplesortstate *state, SortTuple *stups,
 								 int count);
 static void removeabbrev_index(Tuplesortstate *state, SortTuple *stups,
 							   int count);
+static void removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups,
+									int count);
 static void removeabbrev_datum(Tuplesortstate *state, SortTuple *stups,
 							   int count);
 static int	comparetup_heap(const SortTuple *a, const SortTuple *b,
@@ -69,10 +72,16 @@ static int	comparetup_index_hash(const SortTuple *a, const SortTuple *b,
 								  Tuplesortstate *state);
 static int	comparetup_index_hash_tiebreak(const SortTuple *a, const SortTuple *b,
 										   Tuplesortstate *state);
+static int	comparetup_index_brin(const SortTuple *a, const SortTuple *b,
+								  Tuplesortstate *state);
 static void writetup_index(Tuplesortstate *state, LogicalTape *tape,
 						   SortTuple *stup);
 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
 						  LogicalTape *tape, unsigned int len);
+static void writetup_index_brin(Tuplesortstate *state, LogicalTape *tape,
+								SortTuple *stup);
+static void readtup_index_brin(Tuplesortstate *state, SortTuple *stup,
+							   LogicalTape *tape, unsigned int len);
 static int	comparetup_datum(const SortTuple *a, const SortTuple *b,
 							 Tuplesortstate *state);
 static int	comparetup_datum_tiebreak(const SortTuple *a, const SortTuple *b,
@@ -128,6 +137,16 @@ typedef struct
 	uint32		max_buckets;
 } TuplesortIndexHashArg;
 
+/*
+ * Data struture pointed by "TuplesortPublic.arg" for the index_brin subcase.
+ */
+typedef struct
+{
+	TuplesortIndexArg index;
+
+	/* XXX do we need something here? */
+} TuplesortIndexBrinArg;
+
 /*
  * Data struture pointed by "TuplesortPublic.arg" for the Datum case.
  * Set by tuplesort_begin_datum and used only by the DatumTuple routines.
@@ -140,6 +159,22 @@ typedef struct
 	int			datumTypeLen;
 } TuplesortDatumArg;
 
+/*
+ * Computing BrinTuple size with only the tuple is difficult, so we want to track
+ * the length for r referenced by SortTuple. That's what BrinSortTuple is meant
+ * to do - it's essentially a BrinTuple prefixed by length. We only write the
+ * BrinTuple to the logtapes, though.
+ */
+typedef struct BrinSortTuple
+{
+	Size		tuplen;
+	BrinTuple	tuple;
+} BrinSortTuple;
+
+/* Size of the BrinSortTuple, given length of the BrinTuple. */
+#define BRINSORTTUPLE_SIZE(len)		(offsetof(BrinSortTuple, tuple) + (len))
+
+
 Tuplesortstate *
 tuplesort_begin_heap(TupleDesc tupDesc,
 					 int nkeys, AttrNumber *attNums,
@@ -527,6 +562,47 @@ tuplesort_begin_index_gist(Relation heapRel,
 	return state;
 }
 
+Tuplesortstate *
+tuplesort_begin_index_brin(Relation heapRel,
+						   Relation indexRel,
+						   int workMem,
+						   SortCoordinate coordinate,
+						   int sortopt)
+{
+	Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
+												   sortopt);
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	MemoryContext oldcontext;
+	TuplesortIndexBrinArg *arg;
+
+	oldcontext = MemoryContextSwitchTo(base->maincontext);
+	arg = (TuplesortIndexBrinArg *) palloc(sizeof(TuplesortIndexBrinArg));
+
+#ifdef TRACE_SORT
+	if (trace_sort)
+		elog(LOG,
+			 "begin index sort: workMem = %d, randomAccess = %c",
+			 workMem,
+			 sortopt & TUPLESORT_RANDOMACCESS ? 't' : 'f');
+#endif
+
+	base->nKeys = 1;			/* Only one sort column, the block number */
+
+	base->removeabbrev = removeabbrev_index_brin;
+	base->comparetup = comparetup_index_brin;
+	base->writetup = writetup_index_brin;
+	base->readtup = readtup_index_brin;
+	base->haveDatum1 = true;
+	base->arg = arg;
+
+	arg->index.heapRel = heapRel;
+	arg->index.indexRel = indexRel;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	return state;
+}
+
 Tuplesortstate *
 tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
 					  bool nullsFirstFlag, int workMem,
@@ -707,6 +783,35 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
 							  !stup.isnull1);
 }
 
+/*
+ * Collect one BRIN tuple while collecting input data for sort.
+ */
+void
+tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tuple, Size size)
+{
+	SortTuple	stup;
+	BrinSortTuple *bstup;
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	MemoryContext oldcontext = MemoryContextSwitchTo(base->tuplecontext);
+
+	/* allocate space for the whole BRIN sort tuple */
+	bstup = palloc(BRINSORTTUPLE_SIZE(size));
+	stup.tuple = bstup;
+
+	bstup->tuplen = size;
+	memcpy(&bstup->tuple, tuple, size);
+
+	stup.datum1 = tuple->bt_blkno;
+	stup.isnull1 = false;
+
+	tuplesort_puttuple_common(state, &stup,
+							  base->sortKeys &&
+							  base->sortKeys->abbrev_converter &&
+							  !stup.isnull1);
+
+	MemoryContextSwitchTo(oldcontext);
+}
+
 /*
  * Accept one Datum while collecting input data for sort.
  *
@@ -850,6 +955,35 @@ tuplesort_getindextuple(Tuplesortstate *state, bool forward)
 	return (IndexTuple) stup.tuple;
 }
 
+/*
+ * Fetch the next BRIN tuple in either forward or back direction.
+ * Returns NULL if no more tuples.  Returned tuple belongs to tuplesort memory
+ * context, and must not be freed by caller.  Caller may not rely on tuple
+ * remaining valid after any further manipulation of tuplesort.
+ */
+BrinTuple *
+tuplesort_getbrintuple(Tuplesortstate *state, Size *len, bool forward)
+{
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	MemoryContext oldcontext = MemoryContextSwitchTo(base->sortcontext);
+	SortTuple		stup;
+	BrinSortTuple  *btup;
+
+	if (!tuplesort_gettuple_common(state, forward, &stup))
+		stup.tuple = NULL;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	if (!stup.tuple)
+		return NULL;
+
+	btup = (BrinSortTuple *) stup.tuple;
+
+	*len = btup->tuplen;
+
+	return &btup->tuple;
+}
+
 /*
  * Fetch the next Datum in either forward or back direction.
  * Returns false if no more datums.
@@ -1564,6 +1698,83 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
 								 &stup->isnull1);
 }
 
+/*
+ * Routines specialized for BrinTuple case
+ */
+
+static void
+removeabbrev_index_brin(Tuplesortstate *state, SortTuple *stups, int count)
+{
+	int			i;
+
+	for (i = 0; i < count; i++)
+	{
+		BrinSortTuple   *tuple;
+
+		tuple = stups[i].tuple;
+		stups[i].datum1 = tuple->tuple.bt_blkno;
+	}
+}
+
+static int
+comparetup_index_brin(const SortTuple *a, const SortTuple *b,
+					  Tuplesortstate *state)
+{
+	BrinTuple  *tuple1;
+	BrinTuple  *tuple2;
+
+	tuple1 = &((BrinSortTuple *) a)->tuple;
+	tuple2 = &((BrinSortTuple *) b)->tuple;
+
+	if (tuple1->bt_blkno > tuple2->bt_blkno)
+		return 1;
+	else if (tuple1->bt_blkno < tuple2->bt_blkno)
+		return -1;
+
+	/* silence compilers */
+	return 0;
+}
+
+static void
+writetup_index_brin(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
+{
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	BrinSortTuple  *tuple = (BrinSortTuple *) stup->tuple;
+	unsigned int	tuplen = tuple->tuplen;
+
+	tuplen = tuplen + sizeof(tuplen);
+	LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+	LogicalTapeWrite(tape, &tuple->tuple, tuple->tuplen);
+	if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */
+		LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+}
+
+static void
+readtup_index_brin(Tuplesortstate *state, SortTuple *stup,
+				   LogicalTape *tape, unsigned int len)
+{
+	TuplesortPublic *base = TuplesortstateGetPublic(state);
+	unsigned int tuplen = len - sizeof(unsigned int);
+
+	/*
+	 * Allocate space for the BRIN sort tuple, which is BrinTuple with an
+	 * extra length field.
+	 */
+	BrinSortTuple *tuple
+		= (BrinSortTuple *) tuplesort_readtup_alloc(state,
+													BRINSORTTUPLE_SIZE(tuplen));
+
+	tuple->tuplen = tuplen;
+
+	LogicalTapeReadExact(tape, &tuple->tuple, tuplen);
+	if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */
+		LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
+	stup->tuple = (void *) tuple;
+
+	/* set up first-column key value, which is block number */
+	stup->datum1 = tuple->tuple.bt_blkno;
+}
+
 /*
  * Routines specialized for DatumTuple case
  */
diff --git a/src/include/access/brin.h b/src/include/access/brin.h
index ed66f1b3d51..3451ecb211f 100644
--- a/src/include/access/brin.h
+++ b/src/include/access/brin.h
@@ -11,6 +11,7 @@
 #define BRIN_H
 
 #include "nodes/execnodes.h"
+#include "storage/shm_toc.h"
 #include "utils/relcache.h"
 
 
@@ -52,4 +53,6 @@ typedef struct BrinStatsData
 
 extern void brinGetStats(Relation index, BrinStatsData *stats);
 
+extern void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc);
+
 #endif							/* BRIN_H */
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index d03360eac04..72a20d882f5 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -79,6 +79,7 @@ typedef struct ParallelBlockTableScanDescData
 	BlockNumber phs_nblocks;	/* # blocks in relation at start of scan */
 	slock_t		phs_mutex;		/* mutual exclusion for setting startblock */
 	BlockNumber phs_startblock; /* starting block number */
+	BlockNumber phs_chunk_factor; /* chunks need to be a multiple of this */
 	pg_atomic_uint64 phs_nallocated;	/* number of blocks allocated to
 										 * workers so far. */
 }			ParallelBlockTableScanDescData;
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index dbb709b56ce..d94e4d32aa1 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -390,7 +390,8 @@ typedef struct TableAmRoutine
 	 * relation.
 	 */
 	Size		(*parallelscan_initialize) (Relation rel,
-											ParallelTableScanDesc pscan);
+											ParallelTableScanDesc pscan,
+											BlockNumber chunk_factor);
 
 	/*
 	 * Reinitialize `pscan` for a new scan. `rel` will be the same relation as
@@ -1148,7 +1149,8 @@ extern Size table_parallelscan_estimate(Relation rel, Snapshot snapshot);
  */
 extern void table_parallelscan_initialize(Relation rel,
 										  ParallelTableScanDesc pscan,
-										  Snapshot snapshot);
+										  Snapshot snapshot,
+										  BlockNumber chunk_factor);
 
 /*
  * Begin a parallel scan. `pscan` needs to have been initialized with
@@ -2064,7 +2066,8 @@ extern void simple_table_tuple_update(Relation rel, ItemPointer otid,
 
 extern Size table_block_parallelscan_estimate(Relation rel);
 extern Size table_block_parallelscan_initialize(Relation rel,
-												ParallelTableScanDesc pscan);
+												ParallelTableScanDesc pscan,
+												BlockNumber chunk_factor);
 extern void table_block_parallelscan_reinitialize(Relation rel,
 												  ParallelTableScanDesc pscan);
 extern BlockNumber table_block_parallelscan_nextpage(Relation rel,
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index 9ed2de76cd6..357eb35311d 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -21,6 +21,7 @@
 #ifndef TUPLESORT_H
 #define TUPLESORT_H
 
+#include "access/brin_tuple.h"
 #include "access/itup.h"
 #include "executor/tuptable.h"
 #include "storage/dsm.h"
@@ -282,6 +283,9 @@ typedef struct
  * The "index_hash" API is similar to index_btree, but the tuples are
  * actually sorted by their hash codes not the raw data.
  *
+ * The "index_brin" API is similar to index_btree, but the tuples are
+ * BrinTuple and are sorted by their block number not the raw data.
+ *
  * Parallel sort callers are required to coordinate multiple tuplesort states
  * in a leader process and one or more worker processes.  The leader process
  * must launch workers, and have each perform an independent "partial"
@@ -426,6 +430,10 @@ extern Tuplesortstate *tuplesort_begin_index_gist(Relation heapRel,
 												  Relation indexRel,
 												  int workMem, SortCoordinate coordinate,
 												  int sortopt);
+extern Tuplesortstate *tuplesort_begin_index_brin(Relation heapRel,
+												  Relation indexRel,
+												  int workMem, SortCoordinate coordinate,
+												  int sortopt);
 extern Tuplesortstate *tuplesort_begin_datum(Oid datumType,
 											 Oid sortOperator, Oid sortCollation,
 											 bool nullsFirstFlag,
@@ -438,6 +446,7 @@ extern void tuplesort_putheaptuple(Tuplesortstate *state, HeapTuple tup);
 extern void tuplesort_putindextuplevalues(Tuplesortstate *state,
 										  Relation rel, ItemPointer self,
 										  const Datum *values, const bool *isnull);
+extern void tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tup, Size len);
 extern void tuplesort_putdatum(Tuplesortstate *state, Datum val,
 							   bool isNull);
 
@@ -445,6 +454,8 @@ extern bool tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
 								   bool copy, TupleTableSlot *slot, Datum *abbrev);
 extern HeapTuple tuplesort_getheaptuple(Tuplesortstate *state, bool forward);
 extern IndexTuple tuplesort_getindextuple(Tuplesortstate *state, bool forward);
+extern BrinTuple *tuplesort_getbrintuple(Tuplesortstate *state, Size *len,
+										 bool forward);
 extern bool tuplesort_getdatum(Tuplesortstate *state, bool forward, bool copy,
 							   Datum *val, bool *isNull, Datum *abbrev);
 
-- 
2.41.0

From dcb0058540f46fa5f9cbb21e1cff640874418401 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Tue, 7 Nov 2023 18:15:20 +0100
Subject: [PATCH v3 2/4] fix review comments

---
 src/backend/access/brin/brin.c             | 44 +++++++++-------------
 src/backend/utils/sort/tuplesort.c         |  3 --
 src/backend/utils/sort/tuplesortvariants.c | 11 +++---
 3 files changed, 23 insertions(+), 35 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index b7cd29c5968..d54d39a535e 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -162,7 +162,7 @@ typedef struct BrinBuildState
 {
 	Relation	bs_irel;
 	int			bs_numtuples;
-	int			bs_reltuples;
+	double		bs_reltuples;
 	Buffer		bs_currentInsertBuf;
 	BlockNumber bs_pagesPerRange;
 	BlockNumber bs_currRangeStart;
@@ -172,13 +172,12 @@ typedef struct BrinBuildState
 
 	/*
 	 * bs_leader is only present when a parallel index build is performed, and
-	 * only in the leader process. (Actually, only the leader has a
+	 * only in the leader process. (Actually, only the leader process has a
 	 * BrinBuildState.)
 	 */
 	BrinLeader *bs_leader;
 	int			bs_worker_id;
 	BrinSpool  *bs_spool;
-	BrinSpool  *bs_spool_out;
 } BrinBuildState;
 
 /*
@@ -961,7 +960,7 @@ brinbuildCallback(Relation index,
 /*
  * A version of the callback, used by parallel index builds. The main difference
  * is that instead of writing the BRIN tuples into the index, we write them into
- * a shared tuplestore file, and leave the insertion up to the leader (which may
+ * a shared tuplestore, and leave the insertion up to the leader (which may
  * reorder them a bit etc.). The callback also does not generate empty ranges,
  * those may be added by the leader when merging results from workers.
  */
@@ -981,10 +980,10 @@ brinbuildCallbackParallel(Relation index,
 	/*
 	 * If we're in a block that belongs to a future range, summarize what
 	 * we've got and start afresh.  Note the scan might have skipped many
-	 * pages, if they were devoid of live tuples; make sure to insert index
-	 * tuples for those too.
+	 * pages, if they were devoid of live tuples; we do not create emptry
+	 * BRIN ranges here - the leader is responsible for filling them in.
 	 */
-	while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
+	if (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
 	{
 
 		BRIN_elog((DEBUG2,
@@ -996,19 +995,15 @@ brinbuildCallbackParallel(Relation index,
 		form_and_spill_tuple(state);
 
 		/*
-		 * set state to correspond to the next range
+		 * Set state to correspond to the next range (for this block).
 		 *
-		 * XXX This has the issue that it skips ranges summarized by other
-		 * workers, but it also skips empty ranges that should have been
-		 * summarized. We'd need to either make the workers aware which
-		 * chunk they are actually processing (which is currently known
-		 * only in the ParallelBlockTableScan bit). Or we could ignore it
-		 * here, and then decide it while "merging" results from workers
-		 * (if there's no entry for the range, it had to be empty so we
-		 * just add an empty one).
+		 * This skips ranges that are either empty (and so we don't get any
+		 * tuples to summarize), or processes by other workers. We can't
+		 * differentiate those cases here easily, so we leave it up to the
+		 * leader to fill empty ranges where needed.
 		 */
-		while (thisblock > state->bs_currRangeStart + state->bs_pagesPerRange - 1)
-			state->bs_currRangeStart += state->bs_pagesPerRange;
+		state->bs_currRangeStart
+			= state->bs_pagesPerRange * (thisblock / state->bs_pagesPerRange);
 
 		/* re-initialize state for it */
 		brin_memtuple_initialize(state->bs_dtuple, state->bs_bdesc);
@@ -1137,11 +1132,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 		 * on the amount of memory used by a CREATE INDEX operation, regardless of
 		 * the use of parallelism or any other factor.
 		 */
-		state->bs_spool_out = (BrinSpool *) palloc0(sizeof(BrinSpool));
-		state->bs_spool_out->heap = heap;
-		state->bs_spool_out->index = index;
-
-		state->bs_spool_out->sortstate =
+		state->bs_spool->sortstate =
 			tuplesort_begin_index_brin(heap, index,
 									   maintenance_work_mem, coordinate,
 									   TUPLESORT_NONE);
@@ -2427,6 +2418,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 	BlockNumber	prevblkno = InvalidBlockNumber;
 	BrinTuple  *emptyTuple = NULL;
 	Size		emptySize;
+	BrinSpool  *spool = state->bs_spool;
 
 	/* Shutdown worker processes */
 	WaitForParallelWorkersToFinish(brinleader->pcxt);
@@ -2438,7 +2430,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 	state->bs_reltuples = brinshared->reltuples;
 	state->bs_numtuples = brinshared->indtuples;
 
-	tuplesort_performsort(state->bs_spool_out->sortstate);
+	tuplesort_performsort(spool->sortstate);
 
 	/*
 	 * Read the BRIN tuples from the shared tuplesort, sorted by block number.
@@ -2452,7 +2444,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 	 * XXX We can't quite free the BrinTuple, though, because that's a field
 	 * in BrinSortTuple.
 	 */
-	while ((btup = tuplesort_getbrintuple(state->bs_spool_out->sortstate, &tuplen, true)) != NULL)
+	while ((btup = tuplesort_getbrintuple(spool->sortstate, &tuplen, true)) != NULL)
 	{
 		/*
 		 * We should not get two summaries for the same range. The workers
@@ -2494,7 +2486,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 		prevblkno = btup->bt_blkno;
 	}
 
-	tuplesort_end(state->bs_spool_out->sortstate);
+	tuplesort_end(spool->sortstate);
 
 	/*
 	 * Next, accumulate WAL usage.  (This must wait for the workers to finish,
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 4c6b396a8a8..ab6353bdcd1 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -1328,7 +1328,6 @@ tuplesort_puttuple_common(Tuplesortstate *state, SortTuple *tuple, bool useAbbre
 			break;
 
 		default:
-			Assert(false);
 			elog(ERROR, "invalid tuplesort state");
 			break;
 	}
@@ -1463,7 +1462,6 @@ tuplesort_performsort(Tuplesortstate *state)
 			break;
 
 		default:
-			Assert(false);
 			elog(ERROR, "invalid tuplesort state");
 			break;
 	}
@@ -1720,7 +1718,6 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 			return false;
 
 		default:
-			Assert(false);
 			elog(ERROR, "invalid tuplesort state");
 			return false;		/* keep compiler quiet */
 	}
diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c
index 343ed4bbc54..525cc01b474 100644
--- a/src/backend/utils/sort/tuplesortvariants.c
+++ b/src/backend/utils/sort/tuplesortvariants.c
@@ -161,9 +161,8 @@ typedef struct
 
 /*
  * Computing BrinTuple size with only the tuple is difficult, so we want to track
- * the length for r referenced by SortTuple. That's what BrinSortTuple is meant
- * to do - it's essentially a BrinTuple prefixed by length. We only write the
- * BrinTuple to the logtapes, though.
+ * the length referenced by the SortTuple. That's what BrinSortTuple is meant
+ * to do - it's essentially a BrinTuple prefixed by its length.
  */
 typedef struct BrinSortTuple
 {
@@ -796,11 +795,11 @@ tuplesort_putbrintuple(Tuplesortstate *state, BrinTuple *tuple, Size size)
 
 	/* allocate space for the whole BRIN sort tuple */
 	bstup = palloc(BRINSORTTUPLE_SIZE(size));
-	stup.tuple = bstup;
 
 	bstup->tuplen = size;
 	memcpy(&bstup->tuple, tuple, size);
 
+	stup.tuple = bstup;
 	stup.datum1 = tuple->bt_blkno;
 	stup.isnull1 = false;
 
@@ -1723,8 +1722,8 @@ comparetup_index_brin(const SortTuple *a, const SortTuple *b,
 	BrinTuple  *tuple1;
 	BrinTuple  *tuple2;
 
-	tuple1 = &((BrinSortTuple *) a)->tuple;
-	tuple2 = &((BrinSortTuple *) b)->tuple;
+	tuple1 = &((BrinSortTuple *) (a->tuple))->tuple;
+	tuple2 = &((BrinSortTuple *) (b->tuple))->tuple;
 
 	if (tuple1->bt_blkno > tuple2->bt_blkno)
 		return 1;
-- 
2.41.0

From f5a5bf18e3049e21f2b2cc62f7cab7fb892bf3a6 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Tue, 7 Nov 2023 19:07:48 +0100
Subject: [PATCH v3 3/4] simplify comparetup_index_brin

---
 src/backend/utils/sort/tuplesortvariants.c | 11 ++++-------
 1 file changed, 4 insertions(+), 7 deletions(-)

diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c
index 525cc01b474..9b3a70e6ccf 100644
--- a/src/backend/utils/sort/tuplesortvariants.c
+++ b/src/backend/utils/sort/tuplesortvariants.c
@@ -1719,15 +1719,12 @@ static int
 comparetup_index_brin(const SortTuple *a, const SortTuple *b,
 					  Tuplesortstate *state)
 {
-	BrinTuple  *tuple1;
-	BrinTuple  *tuple2;
+	Assert(TuplesortstateGetPublic(state)->haveDatum1);
 
-	tuple1 = &((BrinSortTuple *) (a->tuple))->tuple;
-	tuple2 = &((BrinSortTuple *) (b->tuple))->tuple;
-
-	if (tuple1->bt_blkno > tuple2->bt_blkno)
+	if (DatumGetUInt32(a->datum1) > DatumGetUInt32(b->datum1))
 		return 1;
-	else if (tuple1->bt_blkno < tuple2->bt_blkno)
+
+	if (DatumGetUInt32(a->datum1) < DatumGetUInt32(b->datum1))
 		return -1;
 
 	/* silence compilers */
-- 
2.41.0

From 7c9be5a2a71a0eead626c8f9ac248008bc6db3e9 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.von...@postgresql.org>
Date: Tue, 7 Nov 2023 20:02:03 +0100
Subject: [PATCH v3 4/4] remove tableam changes

---
 src/backend/access/brin/brin.c      | 115 +++++++++++++++++++++++++---
 src/backend/access/nbtree/nbtsort.c |   2 +-
 src/backend/access/table/tableam.c  |  49 ++----------
 src/backend/executor/nodeSeqscan.c  |   3 +-
 src/include/access/relscan.h        |   1 -
 src/include/access/tableam.h        |   9 +--
 6 files changed, 115 insertions(+), 64 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index d54d39a535e..daadd051e30 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -2334,7 +2334,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 
 	table_parallelscan_initialize(heap,
 								  ParallelTableScanFromBrinShared(brinshared),
-								  snapshot, brinshared->pagesPerRange);
+								  snapshot);
 
 	/*
 	 * Store shared tuplesort-private state, for which we reserved space.
@@ -2413,6 +2413,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 {
 	int			i;
 	BrinTuple  *btup;
+	BrinMemTuple *memtuple = NULL;
 	Size		tuplen;
 	BrinShared *brinshared = brinleader->brinshared;
 	BlockNumber	prevblkno = InvalidBlockNumber;
@@ -2432,13 +2433,19 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 
 	tuplesort_performsort(spool->sortstate);
 
+	/*
+	 * Initialize BrinMemTuple we'll use to union summaries from workers
+	 * (in case they happened to produce parts of the same paga range).
+	 */
+	memtuple = brin_new_memtuple(state->bs_bdesc);
+
 	/*
 	 * Read the BRIN tuples from the shared tuplesort, sorted by block number.
 	 * That probably gives us an index that is cheaper to scan, thanks to mostly
 	 * getting data from the same index page as before.
 	 *
 	 * FIXME This probably needs some memory management fixes - we're reading
-	 * tuples from the tuplesort, we're allocating an emty tuple, and so on.
+	 * tuples from the tuplesort, we're allocating an empty tuple, and so on.
 	 * Probably better to release this memory.
 	 *
 	 * XXX We can't quite free the BrinTuple, though, because that's a field
@@ -2446,14 +2453,65 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 	 */
 	while ((btup = tuplesort_getbrintuple(spool->sortstate, &tuplen, true)) != NULL)
 	{
+		/* Ranges should be multiples of pages_per_range for the index. */
+		Assert(btup->bt_blkno % brinshared->pagesPerRange == 0);
+
 		/*
-		 * We should not get two summaries for the same range. The workers
-		 * are producing ranges for non-overlapping sections of the table.
+		 * Do we need to union summaries for the same page range?
+		 *
+		 * If this is the first brin tuple we read, then just deform it into
+		 * the memtuple, and continue with the next one from tuplesort. We
+		 * however may need to insert empty summaries into the index.
+		 *
+		 * If it's the same block as the last we saw, we simply union the
+		 * brin tuple into it, and we're done - we don't even need to insert
+		 * empty ranges, because that was done earlier when we saw the first
+		 * brin tuple (for this range).
+		 *
+		 * Finally, if it's not the first brin tuple, and it's not the same
+		 * page range, we need to do the insert and then deform the tuple
+		 * into the memtuple. Then we'll insert empty ranges before the
+		 * new brin tuple, if needed.
 		 */
-		Assert(btup->bt_blkno != prevblkno);
+		if (prevblkno == InvalidBlockNumber)
+		{
+			/* First brin tuples, just deform into memtuple. */
+			memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
 
-		/* Ranges should be multiples of pages_per_range for the index. */
-		Assert(btup->bt_blkno % brinshared->pagesPerRange == 0);
+			/* continue to insert empty pages before thisblock */
+		}
+		else if (memtuple->bt_blkno == btup->bt_blkno)
+		{
+			/*
+			 * Not the first brin tuple, but same page range as the previous
+			 * one, so we can merge it into the memtuple.
+			 */
+			union_tuples(state->bs_bdesc, memtuple, btup);
+			continue;
+		}
+		else
+		{
+			BrinTuple  *tmp;
+			Size		len;
+
+			/*
+			 * We got brin tuple for a different page range, so form a brin
+			 * tuple from the memtuple, insert it, and re-init the memtuple
+			 * from the new brin tuple.
+			 */
+			tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
+								  memtuple, &len);
+
+			brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+						  &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
+
+			/* free the formed on-disk tuple */
+			pfree(tmp);
+
+			memtuple = brin_deform_tuple(state->bs_bdesc, btup, memtuple);
+
+			/* continue to insert empty pages before thisblock */
+		}
 
 		/* Fill empty ranges for all ranges missing in the tuplesort. */
 		prevblkno = (prevblkno == InvalidBlockNumber) ? 0 : prevblkno;
@@ -2480,14 +2538,51 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 						  emptyTuple->bt_blkno, emptyTuple, emptySize);
 		}
 
-		brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
-					  &state->bs_currentInsertBuf, btup->bt_blkno, btup, tuplen);
-
 		prevblkno = btup->bt_blkno;
 	}
 
 	tuplesort_end(spool->sortstate);
 
+	/* Fill empty ranges for all ranges missing in the tuplesort. */
+	prevblkno = (prevblkno == InvalidBlockNumber) ? 0 : prevblkno;
+	while (prevblkno + state->bs_pagesPerRange < memtuple->bt_blkno)
+	{
+		/* the missing range */
+		prevblkno += state->bs_pagesPerRange;
+
+		/* Did we already build the empty range? If not, do it now. */
+		if (emptyTuple == NULL)
+		{
+			BrinMemTuple *dtuple = brin_new_memtuple(state->bs_bdesc);
+
+			emptyTuple = brin_form_tuple(state->bs_bdesc, prevblkno, dtuple, &emptySize);
+		}
+		else
+		{
+			/* we already have am "empty range" tuple, just set the block */
+			emptyTuple->bt_blkno = prevblkno;
+		}
+
+		brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+					  &state->bs_currentInsertBuf,
+					  emptyTuple->bt_blkno, emptyTuple, emptySize);
+	}
+
+	/**/
+	if (prevblkno != InvalidBlockNumber)
+	{
+		BrinTuple  *tmp;
+		Size		len;
+
+		tmp = brin_form_tuple(state->bs_bdesc, memtuple->bt_blkno,
+							  memtuple, &len);
+
+		brin_doinsert(state->bs_irel, state->bs_pagesPerRange, state->bs_rmAccess,
+					  &state->bs_currentInsertBuf, tmp->bt_blkno, tmp, len);
+
+		pfree(tmp);
+	}
+
 	/*
 	 * Next, accumulate WAL usage.  (This must wait for the workers to finish,
 	 * or we might get incomplete data.)
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 6241baeea86..c2665fce411 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -1575,7 +1575,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
 	btshared->brokenhotchain = false;
 	table_parallelscan_initialize(btspool->heap,
 								  ParallelTableScanFromBTShared(btshared),
-								  snapshot, InvalidBlockNumber);
+								  snapshot);
 
 	/*
 	 * Store shared tuplesort-private state, for which we reserved space.
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 4af0d433e9d..c6bdb7e1c68 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -153,9 +153,9 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot)
 
 void
 table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan,
-							  Snapshot snapshot, BlockNumber chunk_factor)
+							  Snapshot snapshot)
 {
-	Size		snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan, chunk_factor);
+	Size		snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan);
 
 	pscan->phs_snapshot_off = snapshot_off;
 
@@ -395,21 +395,16 @@ table_block_parallelscan_estimate(Relation rel)
 }
 
 Size
-table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, BlockNumber chunk_factor)
+table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
 {
 	ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
 
 	bpscan->base.phs_relid = RelationGetRelid(rel);
 	bpscan->phs_nblocks = RelationGetNumberOfBlocks(rel);
-	bpscan->phs_chunk_factor = chunk_factor;
-	/* compare phs_syncscan initialization to similar logic in initscan
-	 *
-	 * Disable sync scans if the chunk factor is set (valid block number).
-	 */
+	/* compare phs_syncscan initialization to similar logic in initscan */
 	bpscan->base.phs_syncscan = synchronize_seqscans &&
 		!RelationUsesLocalBuffers(rel) &&
-		(bpscan->phs_nblocks > NBuffers / 4) &&
-		!BlockNumberIsValid(bpscan->phs_chunk_factor);
+		bpscan->phs_nblocks > NBuffers / 4;
 	SpinLockInit(&bpscan->phs_mutex);
 	bpscan->phs_startblock = InvalidBlockNumber;
 	pg_atomic_init_u64(&bpscan->phs_nallocated, 0);
@@ -464,25 +459,6 @@ table_block_parallelscan_startblock_init(Relation rel,
 	pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size,
 									  PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
 
-	/*
-	 * If the chunk size factor is set, we need to make sure the chunk size is
-	 * a multiple of that value. We round the chunk size to the nearest chunk
-	 * factor multiple, at least one chunk_factor.
-	 *
-	 * XXX Note this may override PARALLEL_SEQSCAN_MAX_CHUNK_SIZE, in case the
-	 * chunk factor (e.g. BRIN pages_per_range) is larger.
-	 */
-	if (pbscan->phs_chunk_factor != InvalidBlockNumber)
-	{
-		/* nearest (smaller) multiple of chunk_factor */
-		pbscanwork->phsw_chunk_size
-			= pbscan->phs_chunk_factor * (pbscanwork->phsw_chunk_size / pbscan->phs_chunk_factor);
-
-		/* but at least one chunk_factor */
-		pbscanwork->phsw_chunk_size = Max(pbscanwork->phsw_chunk_size,
-										  pbscan->phs_chunk_factor);
-	}
-
 retry:
 	/* Grab the spinlock. */
 	SpinLockAcquire(&pbscan->phs_mutex);
@@ -599,21 +575,6 @@ table_block_parallelscan_nextpage(Relation rel,
 			(pbscanwork->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS))
 			pbscanwork->phsw_chunk_size >>= 1;
 
-		/*
-		 * We need to make sure the new chunk_size is still a suitable multiple
-		 * of chunk_factor.
-		 */
-		if (pbscan->phs_chunk_factor != InvalidBlockNumber)
-		{
-			/* nearest (smaller) multiple of chunk_factor */
-			pbscanwork->phsw_chunk_size
-				= pbscan->phs_chunk_factor * (pbscanwork->phsw_chunk_size / pbscan->phs_chunk_factor);
-
-			/* but at least one chunk_factor */
-			pbscanwork->phsw_chunk_size = Max(pbscanwork->phsw_chunk_size,
-											  pbscan->phs_chunk_factor);
-		}
-
 		nallocated = pbscanwork->phsw_nallocated =
 			pg_atomic_fetch_add_u64(&pbscan->phs_nallocated,
 									pbscanwork->phsw_chunk_size);
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 529a7ed3284..49a5933aff6 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -262,8 +262,7 @@ ExecSeqScanInitializeDSM(SeqScanState *node,
 	pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
 	table_parallelscan_initialize(node->ss.ss_currentRelation,
 								  pscan,
-								  estate->es_snapshot,
-								  InvalidBlockNumber);
+								  estate->es_snapshot);
 	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
 	node->ss.ss_currentScanDesc =
 		table_beginscan_parallel(node->ss.ss_currentRelation, pscan);
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 72a20d882f5..d03360eac04 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -79,7 +79,6 @@ typedef struct ParallelBlockTableScanDescData
 	BlockNumber phs_nblocks;	/* # blocks in relation at start of scan */
 	slock_t		phs_mutex;		/* mutual exclusion for setting startblock */
 	BlockNumber phs_startblock; /* starting block number */
-	BlockNumber phs_chunk_factor; /* chunks need to be a multiple of this */
 	pg_atomic_uint64 phs_nallocated;	/* number of blocks allocated to
 										 * workers so far. */
 }			ParallelBlockTableScanDescData;
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index d94e4d32aa1..dbb709b56ce 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -390,8 +390,7 @@ typedef struct TableAmRoutine
 	 * relation.
 	 */
 	Size		(*parallelscan_initialize) (Relation rel,
-											ParallelTableScanDesc pscan,
-											BlockNumber chunk_factor);
+											ParallelTableScanDesc pscan);
 
 	/*
 	 * Reinitialize `pscan` for a new scan. `rel` will be the same relation as
@@ -1149,8 +1148,7 @@ extern Size table_parallelscan_estimate(Relation rel, Snapshot snapshot);
  */
 extern void table_parallelscan_initialize(Relation rel,
 										  ParallelTableScanDesc pscan,
-										  Snapshot snapshot,
-										  BlockNumber chunk_factor);
+										  Snapshot snapshot);
 
 /*
  * Begin a parallel scan. `pscan` needs to have been initialized with
@@ -2066,8 +2064,7 @@ extern void simple_table_tuple_update(Relation rel, ItemPointer otid,
 
 extern Size table_block_parallelscan_estimate(Relation rel);
 extern Size table_block_parallelscan_initialize(Relation rel,
-												ParallelTableScanDesc pscan,
-												BlockNumber chunk_factor);
+												ParallelTableScanDesc pscan);
 extern void table_block_parallelscan_reinitialize(Relation rel,
 												  ParallelTableScanDesc pscan);
 extern BlockNumber table_block_parallelscan_nextpage(Relation rel,
-- 
2.41.0

Reply via email to