From a4512c383e4758cfdc8a34a8abff94b22f4e55fa Mon Sep 17 00:00:00 2001
From: Amit Kapila <amit.kapila@enterprisedb.com>
Date: Wed, 27 Nov 2019 17:46:35 +0530
Subject: [PATCH] fixed issues, rearranged some code and made cosmetic changes.

---
 src/backend/access/heap/vacuumlazy.c | 124 ++++++++++++++++++++---------------
 1 file changed, 70 insertions(+), 54 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 33e78a6cca..b71e6a2fc6 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -31,12 +31,13 @@
  * When starting either index vacuuming or index cleanup, we launch parallel
  * worker processes.  Once all indexes are processed the parallel worker
  * processes exit.  And then the leader process re-initializes the parallel
- * context so that the leader can launch parallel workers again in the next
- * time.  Note that all parallel workers live during either index vacuuming
- * or index cleanup but the leader process neither exits from the parallel
- * mode nor destroys the parallel context.  For updating the index statistics,
- * since any updates are not allowed during parallel mode we update the index
- * statistics after exited from the parallel mode.
+ * context so that it can use the same DSM for multiple passses of index
+ * vacuum and for performing index cleanup.  Note that all parallel workers
+ * live during either index vacuuming or index cleanup but the leader process
+ * neither exits from the parallel mode nor destroys the parallel context.
+ * For updating the index statistics, since any updates are not allowed during
+ * parallel mode we update the index statistics after exited from the parallel
+ * mode.
  *
  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -181,7 +182,7 @@ typedef struct LVShared
 	int		elevel;
 
 	/*
-	 * An indication for vacuum workers of doing either index vacuuming or
+	 * An indication for vacuum workers to perform either index vacuuming or
 	 * index cleanup.  first_time is true only if for_cleanup is true and
 	 * bulk-deletion is not performed yet.
 	 */
@@ -225,11 +226,9 @@ typedef struct LVShared
 	pg_atomic_uint32 active_nworkers;
 
 	/*
-	 * Variables to control parallel index vacuuming.  Index statistics
-	 * returned from ambulkdelete and amvacuumcleanup is nullable variable
-	 * length.  'offset' is NULL bitmap. Note that a 0 indicates a null,
-	 * while 1 indicates non-null.  The index statistics follows at end of
-	 * struct.
+	 * Variables to control parallel index vacuuming.  We have a bitmap to
+	 * indicate which index has stats in shared memory.  The set bit in the
+	 * map indicates that the particular index supports a parallel vacuum.
 	 */
 	pg_atomic_uint32	idx;		/* counter for vacuuming and clean up */
 	pg_atomic_uint32	nprocessed;	/* # of indexes done during parallel execution */
@@ -374,7 +373,7 @@ static void vacuum_or_cleanup_indexes_worker(Relation *Irel, int nindexes,
 											 LVDeadTuples *dead_tuples);
 static void vacuum_or_cleanup_skipped_indexes(LVRelStats *vacrelstats, Relation *Irel,
 											  int nindexes, IndexBulkDeleteResult **stats,
-											  LVParallelState *lps, bool in_parallel);
+											  LVParallelState *lps);
 static void vacuum_or_cleanup_one_index_worker(Relation indrel, IndexBulkDeleteResult **stats,
 											   LVShared *lvshared, LVSharedIndStats *shared_indstats,
 											   LVDeadTuples *dead_tuples);
@@ -2034,17 +2033,6 @@ lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
 	/* Setup the shared cost-based vacuum delay and launch workers*/
 	if (nworkers > 0)
 	{
-		/* Enable shared cost balance */
-		VacuumSharedCostBalance = &(lps->lvshared->cost_balance);
-		VacuumActiveNWorkers = &(lps->lvshared->active_nworkers);
-
-		/*
-		 * Set up shared cost balance and the number of active workers for
-		 * vacuum delay.
-		 */
-		pg_atomic_write_u32(VacuumSharedCostBalance, VacuumCostBalance);
-		pg_atomic_write_u32(VacuumActiveNWorkers, 0);
-
 		/*
 		 * Reset the local value so that we compute cost balance during
 		 * parallel index vacuuming.
@@ -2054,6 +2042,21 @@ lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
 
 		LaunchParallelWorkers(lps->pcxt, nworkers);
 
+		/* Enable shared costing iff we process indexes in parallel. */
+		if (lps->pcxt->nworkers_launched > 0)
+		{
+			/* Enable shared cost balance */
+			VacuumSharedCostBalance = &(lps->lvshared->cost_balance);
+			VacuumActiveNWorkers = &(lps->lvshared->active_nworkers);
+
+			/*
+			 * Set up shared cost balance and the number of active workers for
+			 * vacuum delay.
+			 */
+			pg_atomic_write_u32(VacuumSharedCostBalance, VacuumCostBalance);
+			pg_atomic_write_u32(VacuumActiveNWorkers, 0);
+		}
+
 		if (lps->lvshared->for_cleanup)
 			ereport(elevel,
 					(errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
@@ -2081,14 +2084,15 @@ lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
 	 * are remaining. If there are such indexes the leader process does vacuum
 	 * or cleanup them one by one.
 	 */
-	vacuum_or_cleanup_skipped_indexes(vacrelstats, Irel, nindexes,
-									  stats, lps, nworkers > 0);
+	vacuum_or_cleanup_skipped_indexes(vacrelstats, Irel, nindexes, stats,
+									  lps);
 
 	/* Wait for all vacuum workers to finish */
 	WaitForParallelWorkersToFinish(lps->pcxt);
 
-	/* Take over the shared balance value to heap scan */
-	VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
+	/* Carry the shared balance value to heap scan */
+	if (VacuumSharedCostBalance)
+		VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
 
 	/* Disable shared cost balance for vacuum delay */
 	VacuumSharedCostBalance = NULL;
@@ -2115,7 +2119,8 @@ lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel,
 
 /*
  * Index vacuuming and index cleanup routine used by parallel vacuum
- * worker processes including the leader process.
+ * worker processes and the leader process to process the indexes in
+ * parallel.
  */
 static void
 vacuum_or_cleanup_indexes_worker(Relation *Irel, int nindexes,
@@ -2123,8 +2128,11 @@ vacuum_or_cleanup_indexes_worker(Relation *Irel, int nindexes,
 								 LVShared *lvshared,
 								 LVDeadTuples *dead_tuples)
 {
-	/* Increment the active worker count */
-	pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+	/*
+	 * Increment the active worker count if we are able to launch any worker.
+	 */
+	if (VacuumActiveNWorkers)
+		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
 
 	/* Loop until all indexes are vacuumed */
 	for (;;)
@@ -2139,18 +2147,20 @@ vacuum_or_cleanup_indexes_worker(Relation *Irel, int nindexes,
 		if (idx >= nindexes)
 			break;
 
-		/*
-		 * Skip if this index doesn't support parallel execution
-		 * at this time.
-		 */
+		/* Skip processing indexes that doesn't support parallel operation */
 		if (skip_parallel_index_vacuum(Irel[idx], lvshared))
 			continue;
 
 		/* Increment the processing count */
 		pg_atomic_add_fetch_u32(&(lvshared->nprocessed), 1);
 
-		/* Get index statistics struct of this index */
+		/* Get the index statistics of this index from DSM */
 		shared_indstats = get_indstats(lvshared, idx);
+
+		/*
+		 * This must exist in DSM as we reach here only for indexes that
+		 * support the parallel operation.
+		 */
 		Assert(shared_indstats);
 
 		/* Do vacuum or cleanup one index */
@@ -2163,7 +2173,8 @@ vacuum_or_cleanup_indexes_worker(Relation *Irel, int nindexes,
 	 * We have completed the index vacuum so decrement the active worker
 	 * count.
 	 */
-	pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+	if (VacuumActiveNWorkers)
+		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
 }
 
 /*
@@ -2176,7 +2187,7 @@ vacuum_or_cleanup_indexes_worker(Relation *Irel, int nindexes,
 static void
 vacuum_or_cleanup_skipped_indexes(LVRelStats *vacrelstats, Relation *Irel,
 								  int nindexes, IndexBulkDeleteResult **stats,
-								  LVParallelState *lps, bool in_parallel)
+								  LVParallelState *lps)
 {
 	int nindexes_remains;
 	int i;
@@ -2191,8 +2202,10 @@ vacuum_or_cleanup_skipped_indexes(LVRelStats *vacrelstats, Relation *Irel,
 	if (nindexes_remains == 0)
 		return;
 
-	/* Increment the active worker count if some worker might be running */
-	if (in_parallel)
+	/*
+	 * Increment the active worker count if we are able to launch any worker.
+	 */
+	if (VacuumActiveNWorkers)
 		pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
 
 	for (i = 0; i < nindexes; i++)
@@ -2216,7 +2229,7 @@ vacuum_or_cleanup_skipped_indexes(LVRelStats *vacrelstats, Relation *Irel,
 	 * We have completed the index vacuum so decrement the active worker
 	 * count.
 	 */
-	if (in_parallel)
+	if (VacuumActiveNWorkers)
 		pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
 
 #ifdef USE_ASSERT_CHECKING
@@ -3095,8 +3108,8 @@ compute_parallel_workers(Relation *Irel, int nindexes, int nrequested)
 }
 
 /*
- * Initialize variables for shared index statistics, set NULL bitmap and
- * the struct size of each indexes.  Also this function sets the number of
+ * Initialize variables for shared index statistics, set NULL bitmap and the
+ * size of stats for each index.  Also, this function sets the number of
  * indexes that do not support parallel index vacuuming and that use
  * maintenance_work_mem.  Since currently we don't support parallel vacuum
  * for autovacuum we don't need to care about autovacuum_work_mem.
@@ -3105,30 +3118,31 @@ static void
 prepare_index_statistics(LVShared *lvshared, Relation *Irel, int nindexes,
 	int nworkers)
 {
-	char *p = (char *)GetSharedIndStats(lvshared);
+	char *p = (char *) GetSharedIndStats(lvshared);
 	int nindexes_mwm = 0;
 	int i;
 
 	Assert(!IsAutoVacuumWorkerProcess());
 
+	/* Set NULL for all indexes */
+	memset(lvshared->bitmap, 0x00, BITMAPLEN(nindexes));
+
 	for (i = 0; i < nindexes; i++)
 	{
 		LVSharedIndStats *indstats;
 
 		if (Irel[i]->rd_indam->amparallelvacuumoptions ==
 			VACUUM_OPTION_NO_PARALLEL)
-		{
-			/* Set NULL as this index does not support parallel vacuum */
-			lvshared->bitmap[i >> 3] |= 0 << (i & 0x07);
 			continue;
-		}
 
 		if (Irel[i]->rd_indam->amusemaintenanceworkmem)
 			nindexes_mwm++;
 
-		/* Set the size for index statistics */
-		indstats = (LVSharedIndStats *)p;
+		/* Set NOT NULL as this index do support parallelism */
 		lvshared->bitmap[i >> 3] |= 1 << (i & 0x07);
+
+		/* Set the size for index statistics */
+		indstats = (LVSharedIndStats *) p;
 		indstats->size = index_parallelvacuum_estimate(Irel[i]);
 
 		p += SizeOfSharedIndStats(indstats);
@@ -3330,7 +3344,7 @@ get_indstats(LVShared *lvshared, int n)
 }
 
 /*
- * Check if the given index participates parallel index vacuuming
+ * Check if the given index participates in parallel index vacuuming
  * or parallel index cleanup.
  */
 static bool
@@ -3343,14 +3357,16 @@ skip_parallel_index_vacuum(Relation indrel, LVShared *lvshared)
 
 	if (lvshared->for_cleanup)
 	{
-		/* Skip if the index does not support parallel cleanup */
+		/* Skip, if the index does not support parallel cleanup */
 		if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
 			((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
 			return true;
 
 		/*
-		 * Skip if the index support to parallel cleanup only first
-		 * time cleanup but it is not the first time.
+		 * Skip, if the index supports parallel cleanup conditionally, but we
+		 * have already processed the index (for bulkdelete).  See the
+		 * comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know
+		 * when indexes support parallel cleanup conditionally.
 		 */
 		if (!lvshared->first_time &&
 			((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
-- 
2.16.2.windows.1

