On 4/15/24 10:18, Tomas Vondra wrote:
> ...
>
> I'll try a bit more to make this work without the temp table.
> 

Considering the earlier discussion in e2933a6e1, I think making the
table TEMP is the best fix, so I'll do that. Thanks for remembering that
change, Alexander!

Attached is the cleanup I thought about doing earlier in this patch [1]
to make the code more like btree. The diff might make it seem like a big
change, but it really just moves the merge code into a separate function
and makes it use using the conditional variable. I still believe the old
code is correct, but this seems like an improvement so plan to push this
soon and resolve the open item.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From c37d96c4d3a151b3a30f28b971e6058d20978b76 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <to...@2ndquadrant.com>
Date: Mon, 15 Apr 2024 17:59:31 +0200
Subject: [PATCH] Cleanup parallel BRIN index build code

Commit b43757171470 introduced parallel builds for BRIN indexes, using
code fairly similar to BTREE. But there happened to be a couple minor
unnecessary differences, particularly in when the leader waits for the
workers and merges the results.

Unlike the BTREE code, the leader never waited on the workersdonecv
condition variable, but simply called WaitForParallelWorkersToFinish()
in _brin_end_parallel() before merging the per-worker results. While
this works correctly, it's probably better to do the merging earlier,
after waiting on the condition variable. This way _brin_end_parallel()
is responsible only for exiting the parallel mode and accumulating WAL
usage data, same as in BTREE.

The mering of per-worker results now happens in _brin_parallel_merge(),
while _brin_parallel_heapscan() is responsible for waiting for the
workers to finish scanning the heap.

Discussion: https://postgr.es/m/3733d042-71e1-6ae6-5fac-00c12db62...@enterprisedb.com
---
 src/backend/access/brin/brin.c | 130 ++++++++++++++++++++++++---------
 1 file changed, 97 insertions(+), 33 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 041415a40e7..32722f0961b 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -229,6 +229,8 @@ static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Rela
 								 bool isconcurrent, int request);
 static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state);
 static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
+static double _brin_parallel_heapscan(BrinBuildState *buildstate);
+static double _brin_parallel_merge(BrinBuildState *buildstate);
 static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
 											   Relation heap, Relation index);
 static void _brin_parallel_scan_and_build(BrinBuildState *buildstate,
@@ -1201,6 +1203,9 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 			tuplesort_begin_index_brin(maintenance_work_mem, coordinate,
 									   TUPLESORT_NONE);
 
+		/* scan the relation and merge per-worker results */
+		reltuples = _brin_parallel_merge(state);
+
 		_brin_end_parallel(state->bs_leader, state);
 	}
 	else						/* no parallel index build */
@@ -1233,14 +1238,10 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 		brin_fill_empty_ranges(state,
 							   state->bs_currRangeStart,
 							   state->bs_maxRangeStart);
-
-		/* track the number of relation tuples */
-		state->bs_reltuples = reltuples;
 	}
 
 	/* release resources */
 	idxtuples = state->bs_numtuples;
-	reltuples = state->bs_reltuples;
 	brinRevmapTerminate(state->bs_rmAccess);
 	terminate_brin_buildstate(state);
 
@@ -2329,6 +2330,22 @@ check_null_keys(BrinValues *bval, ScanKey *nullkeys, int nnullkeys)
 	return true;
 }
 
+/*
+ * Create parallel context, and launch workers for leader.
+ *
+ * buildstate argument should be initialized (with the exception of the
+ * tuplesort states, which may later be created based on shared
+ * state initially set up here).
+ *
+ * isconcurrent indicates if operation is CREATE INDEX CONCURRENTLY.
+ *
+ * request is the target number of parallel worker processes to launch.
+ *
+ * Sets buildstate's BrinLeader, which caller must use to shut down parallel
+ * mode by passing it to _brin_end_parallel() at the very end of its index
+ * build.  If not even a single worker process can be launched, this is
+ * never set, and caller should proceed with a serial index build.
+ */
 static void
 _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
 					 bool isconcurrent, int request)
@@ -2517,27 +2534,87 @@ static void
 _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 {
 	int			i;
-	BrinTuple  *btup;
-	BrinMemTuple *memtuple = NULL;
-	Size		tuplen;
-	BrinShared *brinshared = brinleader->brinshared;
-	BlockNumber prevblkno = InvalidBlockNumber;
-	MemoryContext rangeCxt,
-				oldCxt;
 
 	/* Shutdown worker processes */
 	WaitForParallelWorkersToFinish(brinleader->pcxt);
 
 	/*
-	 * If we didn't actually launch workers, we still have to make sure to
-	 * exit parallel mode.
+	 * Next, accumulate WAL usage.  (This must wait for the workers to finish,
+	 * or we might get incomplete data.)
 	 */
-	if (!state)
-		goto cleanup;
+	for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
+		InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
+
+	/* Free last reference to MVCC snapshot, if one was used */
+	if (IsMVCCSnapshot(brinleader->snapshot))
+		UnregisterSnapshot(brinleader->snapshot);
+	DestroyParallelContext(brinleader->pcxt);
+	ExitParallelMode();
+}
+
+/*
+ * Within leader, wait for end of heap scan.
+ *
+ * When called, parallel heap scan started by _brin_begin_parallel() will
+ * already be underway within worker processes (when leader participates
+ * as a worker, we should end up here just as workers are finishing).
+ *
+ * Returns the total number of heap tuples scanned.
+ */
+static double
+_brin_parallel_heapscan(BrinBuildState *state)
+{
+	BrinShared *brinshared = state->bs_leader->brinshared;
+	int			nparticipanttuplesorts;
+
+	nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts;
+	for (;;)
+	{
+		SpinLockAcquire(&brinshared->mutex);
+		if (brinshared->nparticipantsdone == nparticipanttuplesorts)
+		{
+			/* copy the data into leader state */
+			state->bs_reltuples = brinshared->reltuples;
+			state->bs_numtuples = brinshared->indtuples;
 
-	/* copy the data into leader state (we have to wait for the workers ) */
-	state->bs_reltuples = brinshared->reltuples;
-	state->bs_numtuples = brinshared->indtuples;
+			SpinLockRelease(&brinshared->mutex);
+			break;
+		}
+		SpinLockRelease(&brinshared->mutex);
+
+		ConditionVariableSleep(&brinshared->workersdonecv,
+							   WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN);
+	}
+
+	ConditionVariableCancelSleep();
+
+	return state->bs_reltuples;
+}
+
+/*
+ * Within leader, wait for end of heap scan and merge per-worker results.
+ *
+ * After waiting for all workers to finish, merge the per-worker results into
+ * the complete index. The results from each worker are sorted by block number
+ * (start of the page range). While combinig the per-worker results we merge
+ * summaries for the same page range, and also fill-in empty summaries for
+ * ranges without any tuples.
+ *
+ * Returns the total number of heap tuples scanned.
+ */
+static double
+_brin_parallel_merge(BrinBuildState *state)
+{
+	BrinTuple  *btup;
+	BrinMemTuple *memtuple = NULL;
+	Size		tuplen;
+	BlockNumber prevblkno = InvalidBlockNumber;
+	MemoryContext rangeCxt,
+				oldCxt;
+	double		reltuples;
+
+	/* wait for workers to scan table and produce partial results */
+	reltuples = _brin_parallel_heapscan(state);
 
 	/* do the actual sort in the leader */
 	tuplesort_performsort(state->bs_sortstate);
@@ -2569,7 +2646,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 	while ((btup = tuplesort_getbrintuple(state->bs_sortstate, &tuplen, true)) != NULL)
 	{
 		/* Ranges should be multiples of pages_per_range for the index. */
-		Assert(btup->bt_blkno % brinshared->pagesPerRange == 0);
+		Assert(btup->bt_blkno % state->bs_leader->brinshared->pagesPerRange == 0);
 
 		/*
 		 * Do we need to union summaries for the same page range?
@@ -2665,20 +2742,7 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
 	MemoryContextSwitchTo(oldCxt);
 	MemoryContextDelete(rangeCxt);
 
-	/*
-	 * Next, accumulate WAL usage.  (This must wait for the workers to finish,
-	 * or we might get incomplete data.)
-	 */
-	for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
-		InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
-
-cleanup:
-
-	/* Free last reference to MVCC snapshot, if one was used */
-	if (IsMVCCSnapshot(brinleader->snapshot))
-		UnregisterSnapshot(brinleader->snapshot);
-	DestroyParallelContext(brinleader->pcxt);
-	ExitParallelMode();
+	return reltuples;
 }
 
 /*
-- 
2.44.0

Reply via email to